Coverage for python / lsst / daf / butler / _registry_shim.py: 41%
142 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-01 08:18 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-01 08:18 +0000
1# This file is part of daf_butler.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28from __future__ import annotations
30__all__ = ("RegistryShim",)
32import contextlib
33from collections.abc import Iterable, Iterator, Mapping, Sequence
34from typing import TYPE_CHECKING, Any
36from ._collection_type import CollectionType
37from ._dataset_ref import DatasetId, DatasetIdGenEnum, DatasetRef
38from ._dataset_type import DatasetType
39from ._exceptions import CalibrationLookupError
40from ._storage_class import StorageClassFactory
41from ._timespan import Timespan
42from .dimensions import (
43 DataCoordinate,
44 DataId,
45 DimensionElement,
46 DimensionGroup,
47 DimensionRecord,
48 DimensionUniverse,
49)
50from .registry._collection_summary import CollectionSummary
51from .registry._defaults import RegistryDefaults
52from .registry._exceptions import NoDefaultCollectionError
53from .registry._registry_base import RegistryBase
54from .registry.queries._query_common import resolve_collections
56if TYPE_CHECKING:
57 from .direct_butler import DirectButler
58 from .registry._registry import CollectionArgType
59 from .registry.interfaces import ObsCoreTableManager
62class RegistryShim(RegistryBase):
63 """Implementation of `Registry` interface exposed to clients by `Butler`.
65 Parameters
66 ----------
67 butler : `DirectButler`
68 Data butler instance.
70 Notes
71 -----
72 This shim implementation of `Registry` forwards all methods to an actual
73 Registry instance which is internal to Butler or to Butler methods. Its
74 purpose is to provide a stable interface to many client-visible operations
75 while we perform re-structuring of Registry and Butler implementations.
76 """
78 def __init__(self, butler: DirectButler):
79 super().__init__(butler)
80 self._registry = butler._registry
82 def isWriteable(self) -> bool:
83 # Docstring inherited from a base class.
84 return self._registry.isWriteable()
86 @property
87 def dimensions(self) -> DimensionUniverse:
88 # Docstring inherited from a base class.
89 return self._registry.dimensions
91 @property
92 def defaults(self) -> RegistryDefaults:
93 # Docstring inherited from a base class.
94 return self._registry.defaults
96 @defaults.setter
97 def defaults(self, value: RegistryDefaults) -> None:
98 # Docstring inherited from a base class.
99 self._registry.defaults = value
101 def refresh(self) -> None:
102 # Docstring inherited from a base class.
103 self._registry.refresh()
105 def refresh_collection_summaries(self) -> None:
106 # Docstring inherited from a base class.
107 self._registry.refresh_collection_summaries()
109 def caching_context(self) -> contextlib.AbstractContextManager[None]:
110 # Docstring inherited from a base class.
111 return self._butler._caching_context()
113 @contextlib.contextmanager
114 def transaction(self, *, savepoint: bool = False) -> Iterator[None]:
115 # Docstring inherited from a base class.
116 with self._registry.transaction(savepoint=savepoint):
117 yield
119 def resetConnectionPool(self) -> None:
120 # Docstring inherited from a base class.
121 self._registry.resetConnectionPool()
123 def registerCollection(
124 self, name: str, type: CollectionType = CollectionType.TAGGED, doc: str | None = None
125 ) -> bool:
126 # Docstring inherited from a base class.
127 return self._registry.registerCollection(name, type, doc)
129 def getCollectionType(self, name: str) -> CollectionType:
130 # Docstring inherited from a base class.
131 return self._registry.getCollectionType(name)
133 def registerRun(self, name: str, doc: str | None = None) -> bool:
134 # Docstring inherited from a base class.
135 return self._registry.registerRun(name, doc)
137 def removeCollection(self, name: str) -> None:
138 # Docstring inherited from a base class.
139 self._registry.removeCollection(name)
141 def getCollectionChain(self, parent: str) -> Sequence[str]:
142 # Docstring inherited from a base class.
143 return self._registry.getCollectionChain(parent)
145 def setCollectionChain(self, parent: str, children: Any, *, flatten: bool = False) -> None:
146 # Docstring inherited from a base class.
147 self._registry.setCollectionChain(parent, children, flatten=flatten)
149 def getCollectionParentChains(self, collection: str) -> set[str]:
150 # Docstring inherited from a base class.
151 return self._registry.getCollectionParentChains(collection)
153 def getCollectionDocumentation(self, collection: str) -> str | None:
154 # Docstring inherited from a base class.
155 return self._registry.getCollectionDocumentation(collection)
157 def setCollectionDocumentation(self, collection: str, doc: str | None) -> None:
158 # Docstring inherited from a base class.
159 self._registry.setCollectionDocumentation(collection, doc)
161 def getCollectionSummary(self, collection: str) -> CollectionSummary:
162 # Docstring inherited from a base class.
163 return self._registry.getCollectionSummary(collection)
165 def registerDatasetType(self, datasetType: DatasetType) -> bool:
166 # Docstring inherited from a base class.
167 return self._registry.registerDatasetType(datasetType)
169 def removeDatasetType(self, name: str | tuple[str, ...]) -> None:
170 # Docstring inherited from a base class.
171 self._registry.removeDatasetType(name)
173 def getDatasetType(self, name: str) -> DatasetType:
174 # Docstring inherited from a base class.
175 return self._registry.getDatasetType(name)
177 def supportsIdGenerationMode(self, mode: DatasetIdGenEnum) -> bool:
178 # Docstring inherited from a base class.
179 return self._registry.supportsIdGenerationMode(mode)
181 def findDataset(
182 self,
183 datasetType: DatasetType | str,
184 dataId: DataId | None = None,
185 *,
186 collections: CollectionArgType | None = None,
187 timespan: Timespan | None = None,
188 datastore_records: bool = False,
189 **kwargs: Any,
190 ) -> DatasetRef | None:
191 # Docstring inherited from a base class.
192 if not isinstance(datasetType, DatasetType):
193 datasetType = self.getDatasetType(datasetType)
195 dataId = DataCoordinate.standardize(
196 dataId,
197 dimensions=datasetType.dimensions,
198 universe=self.dimensions,
199 defaults=self.defaults.dataId,
200 **kwargs,
201 )
203 with self._butler.query() as query:
204 resolved_collections = resolve_collections(self._butler, collections)
205 if not resolved_collections:
206 if collections is None:
207 raise NoDefaultCollectionError("No collections provided, and no default collections set")
208 else:
209 return None
211 if datasetType.isCalibration() and timespan is None:
212 # Filter out calibration collections, because with no timespan
213 # we have no way of selecting a dataset from them.
214 collection_info = self._butler.collections.query_info(
215 resolved_collections, flatten_chains=True
216 )
217 resolved_collections = [
218 info.name for info in collection_info if info.type != CollectionType.CALIBRATION
219 ]
220 if not resolved_collections:
221 return None
223 result = query.datasets(datasetType, resolved_collections, find_first=True).limit(2)
224 dataset_type_name = result.dataset_type.name
225 # Search only on the 'required' dimensions for the dataset type.
226 # Any extra values provided by the user are ignored.
227 minimal_data_id = DataCoordinate.standardize(
228 dataId.subset(datasetType.dimensions.required).required, universe=self.dimensions
229 )
230 result = result.where(minimal_data_id)
231 if (
232 datasetType.isCalibration()
233 and timespan is not None
234 and (timespan.begin is not None or timespan.end is not None)
235 ):
236 timespan_column = query.expression_factory[dataset_type_name].timespan
237 result = result.where(timespan_column.overlaps(timespan))
239 datasets = list(result)
240 if len(datasets) == 1:
241 ref = datasets[0]
242 if dataId.hasRecords():
243 ref = ref.expanded(dataId)
244 # Propagate storage class from user-provided DatasetType, which
245 # may not match the definition in the database.
246 ref = ref.overrideStorageClass(datasetType.storageClass_name)
247 if datastore_records:
248 ref = self._registry.get_datastore_records(ref)
249 return ref
250 elif len(datasets) == 0:
251 return None
252 else:
253 raise CalibrationLookupError(
254 f"Ambiguous calibration lookup for {datasetType} with timespan {timespan}"
255 f" in collections {resolved_collections}."
256 )
258 def insertDatasets(
259 self,
260 datasetType: DatasetType | str,
261 dataIds: Iterable[DataId],
262 run: str | None = None,
263 expand: bool = True,
264 idGenerationMode: DatasetIdGenEnum = DatasetIdGenEnum.UNIQUE,
265 ) -> list[DatasetRef]:
266 # Docstring inherited from a base class.
267 return self._registry.insertDatasets(datasetType, dataIds, run, expand, idGenerationMode)
269 def _importDatasets(
270 self, datasets: Iterable[DatasetRef], expand: bool = True, assume_new: bool = False
271 ) -> list[DatasetRef]:
272 # Docstring inherited from a base class.
273 return self._registry._importDatasets(datasets, expand, assume_new)
275 def getDataset(self, id: DatasetId) -> DatasetRef | None:
276 # Docstring inherited from a base class.
277 return self._registry.getDataset(id)
279 def _fetch_run_dataset_ids(self, run: str) -> list[DatasetId]:
280 # Docstring inherited.
281 return self._registry._fetch_run_dataset_ids(run)
283 def removeDatasets(self, refs: Iterable[DatasetRef]) -> None:
284 # Docstring inherited from a base class.
285 self._registry.removeDatasets(refs)
287 def associate(self, collection: str, refs: Iterable[DatasetRef]) -> None:
288 # Docstring inherited from a base class.
289 self._registry.associate(collection, refs)
291 def disassociate(self, collection: str, refs: Iterable[DatasetRef]) -> None:
292 # Docstring inherited from a base class.
293 self._registry.disassociate(collection, refs)
295 def certify(self, collection: str, refs: Iterable[DatasetRef], timespan: Timespan) -> None:
296 # Docstring inherited from a base class.
297 self._registry.certify(collection, refs, timespan)
299 def decertify(
300 self,
301 collection: str,
302 datasetType: str | DatasetType,
303 timespan: Timespan,
304 *,
305 dataIds: Iterable[DataId] | None = None,
306 ) -> None:
307 # Docstring inherited from a base class.
308 self._registry.decertify(collection, datasetType, timespan, dataIds=dataIds)
310 def getDatasetLocations(self, ref: DatasetRef) -> Iterable[str]:
311 # Docstring inherited from a base class.
312 return self._registry.getDatasetLocations(ref)
314 def expandDataId(
315 self,
316 dataId: DataId | None = None,
317 *,
318 dimensions: Iterable[str] | DimensionGroup | None = None,
319 records: Mapping[str, DimensionRecord | None] | None = None,
320 withDefaults: bool = True,
321 **kwargs: Any,
322 ) -> DataCoordinate:
323 # Docstring inherited from a base class.
324 return self._registry.expandDataId(
325 dataId, dimensions=dimensions, records=records, withDefaults=withDefaults, **kwargs
326 )
328 def insertDimensionData(
329 self,
330 element: DimensionElement | str,
331 *data: Mapping[str, Any] | DimensionRecord,
332 conform: bool = True,
333 replace: bool = False,
334 skip_existing: bool = False,
335 ) -> None:
336 # Docstring inherited from a base class.
337 self._registry.insertDimensionData(
338 element, *data, conform=conform, replace=replace, skip_existing=skip_existing
339 )
341 def syncDimensionData(
342 self,
343 element: DimensionElement | str,
344 row: Mapping[str, Any] | DimensionRecord,
345 conform: bool = True,
346 update: bool = False,
347 ) -> bool | dict[str, Any]:
348 # Docstring inherited from a base class.
349 return self._registry.syncDimensionData(element, row, conform, update)
351 def queryDatasetTypes(
352 self,
353 expression: Any = ...,
354 *,
355 missing: list[str] | None = None,
356 ) -> Iterable[DatasetType]:
357 # Docstring inherited from a base class.
358 return self._registry.queryDatasetTypes(expression, missing=missing)
360 def queryCollections(
361 self,
362 expression: Any = ...,
363 datasetType: DatasetType | None = None,
364 collectionTypes: Iterable[CollectionType] | CollectionType = CollectionType.all(),
365 flattenChains: bool = False,
366 includeChains: bool | None = None,
367 ) -> Sequence[str]:
368 # Docstring inherited from a base class.
369 return self._registry.queryCollections(
370 expression, datasetType, collectionTypes, flattenChains, includeChains
371 )
373 @property
374 def obsCoreTableManager(self) -> ObsCoreTableManager | None:
375 # Docstring inherited from a base class.
376 return self._registry.obsCoreTableManager
378 @property
379 def storageClasses(self) -> StorageClassFactory:
380 return self._registry.storageClasses