Coverage for python / lsst / daf / butler / tests / hybrid_butler_registry.py: 0%
117 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-01 08:17 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-01 08:17 +0000
1# This file is part of daf_butler.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28from __future__ import annotations
30import contextlib
31from collections.abc import Iterable, Iterator, Mapping, Sequence
32from typing import Any
34from .._collection_type import CollectionType
35from .._dataset_association import DatasetAssociation
36from .._dataset_ref import DatasetId, DatasetIdGenEnum, DatasetRef
37from .._dataset_type import DatasetType
38from .._storage_class import StorageClassFactory
39from .._timespan import Timespan
40from ..dimensions import (
41 DataCoordinate,
42 DataId,
43 DimensionElement,
44 DimensionGroup,
45 DimensionRecord,
46 DimensionUniverse,
47)
48from ..registry import CollectionArgType, CollectionSummary, Registry, RegistryDefaults
49from ..registry.queries import (
50 DataCoordinateQueryResults,
51 DatasetQueryResults,
52 DimensionRecordQueryResults,
53)
54from ..registry.sql_registry import SqlRegistry
57class HybridButlerRegistry(Registry):
58 """A `Registry` that delegates methods to internal `RemoteButlerRegistry`
59 and `SqlRegistry` instances. Intended to allow testing of `RemoteButler`
60 before its implementation is complete, by delegating unsupported methods to
61 the direct `SqlRegistry`.
63 Parameters
64 ----------
65 direct : `SqlRegistry`
66 DirectButler SqlRegistry used to provide methods not yet implemented by
67 RemoteButlerRegistry.
69 remote : `Registry`
70 The RemoteButler Registry implementation we are intending to test.
71 """
73 def __init__(self, direct: SqlRegistry, remote: Registry):
74 self._direct = direct
75 self._remote = remote
77 def isWriteable(self) -> bool:
78 return self._remote.isWriteable()
80 @property
81 def dimensions(self) -> DimensionUniverse:
82 return self._remote.dimensions
84 @property
85 def defaults(self) -> RegistryDefaults:
86 return self._remote.defaults
88 @defaults.setter
89 def defaults(self, value: RegistryDefaults) -> None:
90 # Make a copy before assigning the value.
91 # When assigned, it will have finish() called on it -- we don't want to
92 # intermingle the results of that between Remote and Direct, because
93 # that could let the Remote side cheat.
94 copy = RegistryDefaults(value.collections, value.run, value._infer, **value._kwargs)
95 self._remote.defaults = value
96 self._direct.defaults = copy
98 def refresh(self) -> None:
99 self._direct.refresh()
101 def refresh_collection_summaries(self) -> None:
102 self._direct.refresh_collection_summaries()
104 @contextlib.contextmanager
105 def caching_context(self) -> Iterator[None]:
106 with self._direct.caching_context():
107 with self._remote.caching_context():
108 yield
110 @contextlib.contextmanager
111 def transaction(self, *, savepoint: bool = False) -> Iterator[None]:
112 # RemoteButler doesn't support transactions, and if the direct registry
113 # enters one its changes are invisible to the remote side.
114 raise NotImplementedError()
116 def registerCollection(
117 self, name: str, type: CollectionType = CollectionType.TAGGED, doc: str | None = None
118 ) -> bool:
119 return self._direct.registerCollection(name, type, doc)
121 def getCollectionType(self, name: str) -> CollectionType:
122 return self._remote.getCollectionType(name)
124 def registerRun(self, name: str, doc: str | None = None) -> bool:
125 return self._direct.registerRun(name, doc)
127 def removeCollection(self, name: str) -> None:
128 return self._direct.removeCollection(name)
130 def getCollectionChain(self, parent: str) -> Sequence[str]:
131 return self._remote.getCollectionChain(parent)
133 def setCollectionChain(self, parent: str, children: Any, *, flatten: bool = False) -> None:
134 return self._direct.setCollectionChain(parent, children, flatten=flatten)
136 def getCollectionParentChains(self, collection: str) -> set[str]:
137 return self._remote.getCollectionParentChains(collection)
139 def getCollectionDocumentation(self, collection: str) -> str | None:
140 return self._remote.getCollectionDocumentation(collection)
142 def setCollectionDocumentation(self, collection: str, doc: str | None) -> None:
143 return self._direct.setCollectionDocumentation(collection, doc)
145 def getCollectionSummary(self, collection: str) -> CollectionSummary:
146 return self._remote.getCollectionSummary(collection)
148 def registerDatasetType(self, datasetType: DatasetType) -> bool:
149 # We need to make sure that dataset type universe is the same as
150 # direct registry universe.
151 if datasetType.dimensions.universe is self._remote.dimensions:
152 datasetType = DatasetType(
153 datasetType.name,
154 datasetType.dimensions.names,
155 datasetType.storageClass,
156 universe=self._direct.dimensions,
157 isCalibration=datasetType.isCalibration(),
158 )
159 return self._direct.registerDatasetType(datasetType)
161 def removeDatasetType(self, name: str | tuple[str, ...]) -> None:
162 return self._direct.removeDatasetType(name)
164 def getDatasetType(self, name: str) -> DatasetType:
165 return self._remote.getDatasetType(name)
167 def supportsIdGenerationMode(self, mode: DatasetIdGenEnum) -> bool:
168 return self._direct.supportsIdGenerationMode(mode)
170 def findDataset(
171 self,
172 datasetType: DatasetType | str,
173 dataId: DataId | None = None,
174 *,
175 collections: CollectionArgType | None = None,
176 timespan: Timespan | None = None,
177 datastore_records: bool = False,
178 **kwargs: Any,
179 ) -> DatasetRef | None:
180 return self._remote.findDataset(
181 datasetType,
182 dataId,
183 collections=collections,
184 timespan=timespan,
185 datastore_records=datastore_records,
186 **kwargs,
187 )
189 def insertDatasets(
190 self,
191 datasetType: DatasetType | str,
192 dataIds: Iterable[DataId],
193 run: str | None = None,
194 expand: bool = True,
195 idGenerationMode: DatasetIdGenEnum = DatasetIdGenEnum.UNIQUE,
196 ) -> list[DatasetRef]:
197 return self._direct.insertDatasets(datasetType, dataIds, run, expand, idGenerationMode)
199 def _importDatasets(
200 self, datasets: Iterable[DatasetRef], expand: bool = True, assume_new: bool = False
201 ) -> list[DatasetRef]:
202 return self._direct._importDatasets(datasets, expand, assume_new)
204 def getDataset(self, id: DatasetId) -> DatasetRef | None:
205 return self._remote.getDataset(id)
207 def _fetch_run_dataset_ids(self, run: str) -> list[DatasetId]:
208 return self._direct._fetch_run_dataset_ids(run)
210 def removeDatasets(self, refs: Iterable[DatasetRef]) -> None:
211 return self._direct.removeDatasets(refs)
213 def associate(self, collection: str, refs: Iterable[DatasetRef]) -> None:
214 return self._direct.associate(collection, refs)
216 def disassociate(self, collection: str, refs: Iterable[DatasetRef]) -> None:
217 return self._direct.disassociate(collection, refs)
219 def certify(self, collection: str, refs: Iterable[DatasetRef], timespan: Timespan) -> None:
220 return self._direct.certify(collection, refs, timespan)
222 def decertify(
223 self,
224 collection: str,
225 datasetType: str | DatasetType,
226 timespan: Timespan,
227 *,
228 dataIds: Iterable[DataId] | None = None,
229 ) -> None:
230 return self._direct.decertify(collection, datasetType, timespan, dataIds=dataIds)
232 def getDatasetLocations(self, ref: DatasetRef) -> Iterable[str]:
233 return self._direct.getDatasetLocations(ref)
235 def expandDataId(
236 self,
237 dataId: DataId | None = None,
238 *,
239 dimensions: Iterable[str] | DimensionGroup | None = None,
240 records: Mapping[str, DimensionRecord | None] | None = None,
241 withDefaults: bool = True,
242 **kwargs: Any,
243 ) -> DataCoordinate:
244 return self._remote.expandDataId(
245 dataId, dimensions=dimensions, records=records, withDefaults=withDefaults, **kwargs
246 )
248 def insertDimensionData(
249 self,
250 element: DimensionElement | str,
251 *data: Mapping[str, Any] | DimensionRecord,
252 conform: bool = True,
253 replace: bool = False,
254 skip_existing: bool = False,
255 ) -> None:
256 return self._direct.insertDimensionData(
257 element, *data, conform=conform, replace=replace, skip_existing=skip_existing
258 )
260 def syncDimensionData(
261 self,
262 element: DimensionElement | str,
263 row: Mapping[str, Any] | DimensionRecord,
264 conform: bool = True,
265 update: bool = False,
266 ) -> bool | dict[str, Any]:
267 return self._direct.syncDimensionData(element, row, conform, update)
269 def queryDatasetTypes(
270 self,
271 expression: Any = ...,
272 *,
273 missing: list[str] | None = None,
274 ) -> Iterable[DatasetType]:
275 return self._remote.queryDatasetTypes(expression, missing=missing)
277 def queryCollections(
278 self,
279 expression: Any = ...,
280 datasetType: DatasetType | None = None,
281 collectionTypes: Iterable[CollectionType] | CollectionType = CollectionType.all(),
282 flattenChains: bool = False,
283 includeChains: bool | None = None,
284 ) -> Sequence[str]:
285 return self._remote.queryCollections(
286 expression, datasetType, collectionTypes, flattenChains, includeChains
287 )
289 def queryDatasets(
290 self,
291 datasetType: Any,
292 *,
293 collections: CollectionArgType | None = None,
294 dimensions: Iterable[str] | None = None,
295 dataId: DataId | None = None,
296 where: str = "",
297 findFirst: bool = False,
298 bind: Mapping[str, Any] | None = None,
299 check: bool = True,
300 **kwargs: Any,
301 ) -> DatasetQueryResults:
302 return self._remote.queryDatasets(
303 datasetType,
304 collections=collections,
305 dimensions=dimensions,
306 dataId=dataId,
307 where=where,
308 findFirst=findFirst,
309 bind=bind,
310 check=check,
311 **kwargs,
312 )
314 def queryDataIds(
315 self,
316 dimensions: DimensionGroup | Iterable[str] | str,
317 *,
318 dataId: DataId | None = None,
319 datasets: Any = None,
320 collections: CollectionArgType | None = None,
321 where: str = "",
322 bind: Mapping[str, Any] | None = None,
323 check: bool = True,
324 **kwargs: Any,
325 ) -> DataCoordinateQueryResults:
326 return self._remote.queryDataIds(
327 dimensions,
328 dataId=dataId,
329 datasets=datasets,
330 collections=collections,
331 where=where,
332 bind=bind,
333 check=check,
334 **kwargs,
335 )
337 def queryDimensionRecords(
338 self,
339 element: DimensionElement | str,
340 *,
341 dataId: DataId | None = None,
342 datasets: Any = None,
343 collections: CollectionArgType | None = None,
344 where: str = "",
345 bind: Mapping[str, Any] | None = None,
346 check: bool = True,
347 **kwargs: Any,
348 ) -> DimensionRecordQueryResults:
349 return self._remote.queryDimensionRecords(
350 element,
351 dataId=dataId,
352 datasets=datasets,
353 collections=collections,
354 where=where,
355 bind=bind,
356 check=check,
357 **kwargs,
358 )
360 def queryDatasetAssociations(
361 self,
362 datasetType: str | DatasetType,
363 collections: CollectionArgType | None = ...,
364 *,
365 collectionTypes: Iterable[CollectionType] = CollectionType.all(),
366 flattenChains: bool = False,
367 ) -> Iterator[DatasetAssociation]:
368 return self._remote.queryDatasetAssociations(
369 datasetType, collections, collectionTypes=collectionTypes, flattenChains=flattenChains
370 )
372 @property
373 def storageClasses(self) -> StorageClassFactory:
374 return self._remote.storageClasses
376 @storageClasses.setter
377 def storageClasses(self, value: StorageClassFactory) -> None:
378 raise NotImplementedError()