Coverage for python / lsst / daf / butler / registry / queries / _query_data_coordinates.py: 33%
69 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 08:49 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 08:49 +0000
1# This file is part of daf_butler.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28from __future__ import annotations
30from collections.abc import Iterable, Iterator
31from contextlib import contextmanager
32from typing import Any
34from ..._butler import Butler
35from ..._dataset_ref import DatasetRef
36from ..._dataset_type import DatasetType
37from ..._exceptions import DatasetTypeError
38from ...dimensions import DataCoordinate, DimensionGroup
39from ...queries import DataCoordinateQueryResults, Query
40from ._query_common import CommonQueryArguments, LegacyQueryResultsMixin, resolve_collections
41from ._query_datasets import QueryDriverDatasetRefQueryResults
42from ._results import DataCoordinateQueryResults as LegacyDataCoordinateQueryResults
43from ._results import ParentDatasetQueryResults
46class QueryDriverDataCoordinateQueryResults(
47 LegacyQueryResultsMixin[DataCoordinateQueryResults],
48 LegacyDataCoordinateQueryResults,
49):
50 """Implementation of the legacy ``DimensionRecordQueryResults`` interface
51 using the new query system.
53 Parameters
54 ----------
55 butler : `Butler`
56 Butler object used to execute queries.
57 dimensions : `DimensionGroup` | `None`
58 Dimensions of the data IDs to yield from the query.
59 expanded : bool
60 `True` if the query will also fetch dimension records associated with
61 the data IDs.
62 args : `CommonQueryArguments`
63 User-facing arguments forwarded from
64 ``registry.queryDimensionRecords``.
65 """
67 def __init__(
68 self,
69 butler: Butler,
70 dimensions: DimensionGroup,
71 expanded: bool,
72 args: CommonQueryArguments,
73 ) -> None:
74 LegacyQueryResultsMixin.__init__(self, butler, args)
75 LegacyDataCoordinateQueryResults.__init__(self)
76 self._dimensions = dimensions
77 self._expanded = expanded
79 def _build_result(self, query: Query) -> DataCoordinateQueryResults:
80 results = query.data_ids(self._dimensions)
81 if self._expanded:
82 return results.with_dimension_records()
83 else:
84 return results
86 @property
87 def dimensions(self) -> DimensionGroup:
88 return self._dimensions
90 def hasFull(self) -> bool:
91 return True
93 def hasRecords(self) -> bool:
94 return self._expanded
96 def __iter__(self) -> Iterator[DataCoordinate]:
97 with self._build_query() as result:
98 # We have to eagerly fetch the results to prevent
99 # leaking the resources associated with QueryDriver.
100 records = list(result)
101 return iter(records)
103 @contextmanager
104 def materialize(self) -> Iterator[LegacyDataCoordinateQueryResults]:
105 yield self
107 def expanded(self) -> LegacyDataCoordinateQueryResults:
108 return QueryDriverDataCoordinateQueryResults(self._butler, self._dimensions, True, self._args)
110 def subset(
111 self,
112 dimensions: DimensionGroup | Iterable[str] | None = None,
113 *,
114 unique: bool = False,
115 ) -> LegacyDataCoordinateQueryResults:
116 # 'unique' parameter is intentionally ignored -- all data ID queries
117 # using the new query system are automatically de-duplicated.
119 if dimensions is None:
120 return self
122 dimensions = self.dimensions.universe.conform(dimensions)
123 if not dimensions.issubset(self.dimensions):
124 raise ValueError(f"{dimensions} is not a subset of {self.dimensions}")
125 return QueryDriverDataCoordinateQueryResults(self._butler, dimensions, self._expanded, self._args)
127 def findDatasets(
128 self,
129 datasetType: DatasetType | str,
130 collections: Any,
131 *,
132 findFirst: bool = True,
133 components: bool = False,
134 ) -> ParentDatasetQueryResults:
135 if components is not False:
136 raise DatasetTypeError(
137 "Dataset component queries are no longer supported by Registry. Use "
138 "DatasetType methods to obtain components from parent dataset types instead."
139 )
141 if not isinstance(datasetType, DatasetType):
142 datasetType = self._butler.get_dataset_type(datasetType)
144 doomed_by: list[str] = []
145 collections = resolve_collections(self._butler, collections, doomed_by)
146 args = self._args.replaceCollections(collections)
147 return QueryDriverDatasetRefQueryResults(
148 self._butler,
149 args,
150 dataset_type=datasetType,
151 find_first=findFirst,
152 extra_dimensions=self.dimensions,
153 doomed_by=doomed_by,
154 expanded=False,
155 )
157 def findRelatedDatasets(
158 self,
159 datasetType: DatasetType | str,
160 collections: Any,
161 *,
162 findFirst: bool = True,
163 dimensions: DimensionGroup | Iterable[str] | None = None,
164 ) -> Iterable[tuple[DataCoordinate, DatasetRef]]:
165 with self._butler.query() as query:
166 if not isinstance(datasetType, DatasetType):
167 datasetType = self._butler.get_dataset_type(datasetType)
169 if dimensions is None:
170 dimensions = self.dimensions
171 dimensions = DimensionGroup(self._butler.dimensions, dimensions)
173 collections = resolve_collections(self._butler, collections, [])
174 result = query.join_dataset_search(datasetType, collections).general(
175 dimensions,
176 dataset_fields={datasetType.name: {"dataset_id", "run"}},
177 find_first=findFirst,
178 )
179 result = self._apply_result_modifiers(result)
180 for row in result.iter_tuples(datasetType):
181 yield row.data_id.subset(dimensions), row.refs[0]