Coverage for python / lsst / daf / butler / registry / _registry_base.py: 25%
78 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-30 08:41 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-30 08:41 +0000
1# This file is part of daf_butler.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28from __future__ import annotations
30__all__ = ("RegistryBase",)
32from collections.abc import Iterable, Iterator, Mapping
33from typing import Any
35from lsst.utils.iteration import ensure_iterable
37from .._butler import Butler
38from .._collection_type import CollectionType
39from .._dataset_association import DatasetAssociation
40from .._dataset_type import DatasetType
41from ..dimensions import DataId, DimensionElement, DimensionGroup
42from ..registry.wildcards import CollectionWildcard, DatasetTypeWildcard
43from ._exceptions import ArgumentError, DatasetTypeExpressionError, NoDefaultCollectionError
44from ._registry import CollectionArgType, Registry
45from .queries import (
46 ChainedDatasetQueryResults,
47 DataCoordinateQueryResults,
48 DatasetQueryResults,
49 DimensionRecordQueryResults,
50)
51from .queries._query_common import CommonQueryArguments, resolve_collections
52from .queries._query_data_coordinates import QueryDriverDataCoordinateQueryResults
53from .queries._query_datasets import QueryDriverDatasetRefQueryResults
54from .queries._query_dimension_records import QueryDriverDimensionRecordQueryResults
57class RegistryBase(Registry):
58 """Common implementation for `Registry` methods shared between
59 DirectButler's RegistryShim and RemoteButlerRegistry.
61 Parameters
62 ----------
63 butler : `Butler`
64 Butler instance to which this registry delegates operations.
65 """
67 def __init__(self, butler: Butler) -> None:
68 self._butler = butler
70 def queryDatasets(
71 self,
72 datasetType: Any,
73 *,
74 collections: CollectionArgType | None = None,
75 dimensions: Iterable[str] | None = None,
76 dataId: DataId | None = None,
77 where: str = "",
78 findFirst: bool = False,
79 components: bool = False,
80 bind: Mapping[str, Any] | None = None,
81 check: bool = True,
82 **kwargs: Any,
83 ) -> DatasetQueryResults:
84 doomed_by: list[str] = []
85 dimension_group = self.dimensions.conform(dimensions) if dimensions is not None else None
87 if collections is None and not self.defaults.collections:
88 raise NoDefaultCollectionError("No collections provided, and no default collections set")
89 if findFirst and collections is not None:
90 wildcard = CollectionWildcard.from_expression(collections)
91 if wildcard.patterns:
92 raise TypeError(
93 "Collection search patterns not allowed in findFirst search, "
94 "because collections must be in a specific order."
95 )
97 args = self._convert_common_query_arguments(
98 dataId=dataId,
99 where=where,
100 bind=bind,
101 kwargs=kwargs,
102 datasets=None,
103 collections=collections,
104 doomed_by=doomed_by,
105 check=check,
106 )
108 if not args.collections:
109 doomed_by.append("No datasets can be found because collection list is empty.")
111 missing_dataset_types: list[str] = []
112 dataset_types = list(self.queryDatasetTypes(datasetType, missing=missing_dataset_types))
113 if missing_dataset_types:
114 doomed_by.extend(f"Dataset type {name} is not registered." for name in missing_dataset_types)
116 if len(dataset_types) == 0:
117 doomed_by.extend(
118 [
119 f"No registered dataset type matching {t!r} found, so no matching datasets can "
120 "exist in any collection."
121 for t in ensure_iterable(datasetType)
122 ]
123 )
124 return ChainedDatasetQueryResults([], doomed_by=doomed_by)
126 query_results = [
127 QueryDriverDatasetRefQueryResults(
128 self._butler,
129 args,
130 dataset_type=dt,
131 find_first=findFirst,
132 extra_dimensions=dimension_group,
133 doomed_by=doomed_by,
134 expanded=False,
135 )
136 for dt in dataset_types
137 ]
138 if len(query_results) == 1:
139 return query_results[0]
140 else:
141 return ChainedDatasetQueryResults(query_results)
143 def queryDataIds(
144 self,
145 dimensions: DimensionGroup | Iterable[str] | str,
146 *,
147 dataId: DataId | None = None,
148 datasets: Any = None,
149 collections: CollectionArgType | None = None,
150 where: str = "",
151 components: bool = False,
152 bind: Mapping[str, Any] | None = None,
153 check: bool = True,
154 **kwargs: Any,
155 ) -> DataCoordinateQueryResults:
156 if collections is not None and datasets is None:
157 raise ArgumentError(f"Cannot pass 'collections' (='{collections}') without 'datasets'.")
159 dimensions = self.dimensions.conform(dimensions)
160 args = self._convert_common_query_arguments(
161 dataId=dataId,
162 where=where,
163 bind=bind,
164 kwargs=kwargs,
165 datasets=datasets,
166 collections=collections,
167 check=check,
168 )
169 return QueryDriverDataCoordinateQueryResults(
170 self._butler, dimensions=dimensions, expanded=False, args=args
171 )
173 def queryDimensionRecords(
174 self,
175 element: DimensionElement | str,
176 *,
177 dataId: DataId | None = None,
178 datasets: Any = None,
179 collections: CollectionArgType | None = None,
180 where: str = "",
181 components: bool = False,
182 bind: Mapping[str, Any] | None = None,
183 check: bool = True,
184 **kwargs: Any,
185 ) -> DimensionRecordQueryResults:
186 if not isinstance(element, DimensionElement):
187 element = self.dimensions.elements[element]
189 args = self._convert_common_query_arguments(
190 dataId=dataId,
191 where=where,
192 bind=bind,
193 kwargs=kwargs,
194 datasets=datasets,
195 collections=collections,
196 check=check,
197 )
199 return QueryDriverDimensionRecordQueryResults(self._butler, element, args)
201 def _convert_common_query_arguments(
202 self,
203 *,
204 dataId: DataId | None = None,
205 datasets: object | None = None,
206 collections: CollectionArgType | None = None,
207 where: str = "",
208 bind: Mapping[str, Any] | None = None,
209 kwargs: dict[str, int | str],
210 doomed_by: list[str] | None = None,
211 check: bool = True,
212 ) -> CommonQueryArguments:
213 dataset_types = self._resolve_dataset_types(datasets)
214 if dataset_types and collections is None and not self.defaults.collections:
215 raise NoDefaultCollectionError("'collections' must be provided if 'datasets' is provided")
216 return CommonQueryArguments(
217 dataId=dataId,
218 where=where,
219 bind=dict(bind) if bind else None,
220 kwargs=dict(kwargs),
221 dataset_types=dataset_types,
222 collections=resolve_collections(self._butler, collections, doomed_by),
223 check=check,
224 )
226 def queryDatasetAssociations(
227 self,
228 datasetType: str | DatasetType,
229 collections: CollectionArgType | None = ...,
230 *,
231 collectionTypes: Iterable[CollectionType] = CollectionType.all(),
232 flattenChains: bool = False,
233 ) -> Iterator[DatasetAssociation]:
234 if isinstance(datasetType, str):
235 datasetType = self.getDatasetType(datasetType)
236 with self._butler.query() as query:
237 resolved_collections = self.queryCollections(
238 collections,
239 collectionTypes=collectionTypes,
240 flattenChains=True,
241 )
242 # It's annoyingly difficult to just do the collection query once,
243 # since query_info doesn't accept all the expression types that
244 # queryCollections does. But it's all cached anyway.
245 collection_info = {
246 info.name: info for info in self._butler.collections.query_info(resolved_collections)
247 }
248 query = query.join_dataset_search(datasetType, resolved_collections)
249 result = query.general(
250 datasetType.dimensions,
251 dataset_fields={datasetType.name: {"dataset_id", "run", "collection", "timespan"}},
252 find_first=False,
253 )
254 yield from DatasetAssociation.from_query_result(result, datasetType, collection_info)
256 def _resolve_dataset_types(self, dataset_types: object | None) -> list[str]:
257 if dataset_types is None:
258 return []
260 if dataset_types is ...:
261 raise TypeError(
262 "'...' not permitted for 'datasets'"
263 " -- searching for all dataset types does not constrain the search."
264 )
266 wildcard = DatasetTypeWildcard.from_expression(dataset_types)
267 if wildcard.patterns:
268 raise DatasetTypeExpressionError(
269 "Dataset type wildcard expressions are not supported in this context."
270 )
271 else:
272 return list(wildcard.values.keys())