Coverage for python / lsst / daf / butler / tests / hybrid_butler.py: 0%
130 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-28 08:36 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-28 08:36 +0000
1# This file is part of daf_butler.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28from __future__ import annotations
30from collections.abc import Collection, Iterable, Iterator, Sequence
31from contextlib import AbstractContextManager, contextmanager
32from types import EllipsisType
33from typing import Any, TextIO, cast
35from lsst.resources import ResourcePath, ResourcePathExpression
37from .._butler import Butler, _DeprecatedDefault
38from .._butler_collections import ButlerCollections
39from .._butler_metrics import ButlerMetrics
40from .._dataset_existence import DatasetExistence
41from .._dataset_provenance import DatasetProvenance
42from .._dataset_ref import DatasetId, DatasetRef
43from .._dataset_type import DatasetType
44from .._deferredDatasetHandle import DeferredDatasetHandle
45from .._file_dataset import FileDataset
46from .._limited_butler import LimitedButler
47from .._query_all_datasets import QueryAllDatasetsParameters
48from .._storage_class import StorageClass
49from .._timespan import Timespan
50from ..datastore import DatasetRefURIs, FileTransferSource
51from ..dimensions import DataCoordinate, DataId, DimensionElement, DimensionRecord, DimensionUniverse
52from ..direct_butler import DirectButler
53from ..queries import Query
54from ..registry import CollectionArgType, Registry
55from ..remote_butler import RemoteButler
56from ..transfers import RepoExportContext
57from .hybrid_butler_collections import HybridButlerCollections
58from .hybrid_butler_registry import HybridButlerRegistry
61class HybridButler(Butler):
62 """A `Butler` that delegates methods to internal RemoteButler and
63 DirectButler instances. Intended to allow testing of RemoteButler before
64 its implementation is complete, by delegating unsupported methods to
65 DirectButler.
66 """
68 _remote_butler: RemoteButler
69 _direct_butler: DirectButler
70 _registry: Registry
72 def __new__(cls, remote_butler: RemoteButler, direct_butler: DirectButler) -> HybridButler:
73 self = cast(HybridButler, super().__new__(cls))
74 self._remote_butler = remote_butler
75 self._direct_butler = direct_butler
76 self._datastore = direct_butler._datastore
77 self._registry = HybridButlerRegistry(direct_butler._registry, remote_butler.registry)
78 # Force shared metrics.
79 self._metrics = self._direct_butler._metrics
80 self._remote_butler._metrics = self._metrics
81 return self
83 def isWriteable(self) -> bool:
84 return self._remote_butler.isWriteable()
86 def _caching_context(self) -> AbstractContextManager[None]:
87 return self._direct_butler._caching_context()
89 def transaction(self) -> AbstractContextManager[None]:
90 return self._direct_butler.transaction()
92 @contextmanager
93 def record_metrics(self, metrics: ButlerMetrics | None = None) -> Iterator[ButlerMetrics]:
94 if metrics is None:
95 # Share same metrics in both butlers.
96 metrics = ButlerMetrics()
98 with self._direct_butler.record_metrics(metrics), self._remote_butler.record_metrics(metrics):
99 yield metrics
101 def put(
102 self,
103 obj: Any,
104 datasetRefOrType: DatasetRef | DatasetType | str,
105 /,
106 dataId: DataId | None = None,
107 *,
108 run: str | None = None,
109 provenance: DatasetProvenance | None = None,
110 **kwargs: Any,
111 ) -> DatasetRef:
112 return self._direct_butler.put(
113 obj, datasetRefOrType, dataId, run=run, provenance=provenance, **kwargs
114 )
116 def getDeferred(
117 self,
118 datasetRefOrType: DatasetRef | DatasetType | str,
119 /,
120 dataId: DataId | None = None,
121 *,
122 parameters: dict | None = None,
123 collections: Any = None,
124 storageClass: str | StorageClass | None = None,
125 **kwargs: Any,
126 ) -> DeferredDatasetHandle:
127 return self._remote_butler.getDeferred(
128 datasetRefOrType,
129 dataId,
130 parameters=parameters,
131 collections=collections,
132 storageClass=storageClass,
133 **kwargs,
134 )
136 def get(
137 self,
138 datasetRefOrType: DatasetRef | DatasetType | str,
139 /,
140 dataId: DataId | None = None,
141 *,
142 parameters: dict[str, Any] | None = None,
143 collections: Any = None,
144 storageClass: StorageClass | str | None = None,
145 **kwargs: Any,
146 ) -> Any:
147 return self._remote_butler.get(
148 datasetRefOrType,
149 dataId,
150 parameters=parameters,
151 collections=collections,
152 storageClass=storageClass,
153 **kwargs,
154 )
156 def getURIs(
157 self,
158 datasetRefOrType: DatasetRef | DatasetType | str,
159 /,
160 dataId: DataId | None = None,
161 *,
162 predict: bool = False,
163 collections: Any = None,
164 run: str | None = None,
165 **kwargs: Any,
166 ) -> DatasetRefURIs:
167 return self._remote_butler.getURIs(
168 datasetRefOrType, dataId, predict=predict, collections=collections, run=run, **kwargs
169 )
171 def getURI(
172 self,
173 datasetRefOrType: DatasetRef | DatasetType | str,
174 /,
175 dataId: DataId | None = None,
176 *,
177 predict: bool = False,
178 collections: Any = None,
179 run: str | None = None,
180 **kwargs: Any,
181 ) -> ResourcePath:
182 return self._remote_butler.getURI(
183 datasetRefOrType, dataId, predict=predict, collections=collections, run=run, **kwargs
184 )
186 def get_dataset_type(self, name: str) -> DatasetType:
187 return self._remote_butler.get_dataset_type(name)
189 def get_dataset(
190 self,
191 id: DatasetId | str,
192 *,
193 storage_class: str | StorageClass | None = None,
194 dimension_records: bool = False,
195 datastore_records: bool = False,
196 ) -> DatasetRef | None:
197 return self._remote_butler.get_dataset(
198 id,
199 storage_class=storage_class,
200 dimension_records=dimension_records,
201 datastore_records=datastore_records,
202 )
204 def get_many_datasets(self, ids: Iterable[DatasetId | str]) -> list[DatasetRef]:
205 return self._remote_butler.get_many_datasets(ids)
207 def find_dataset(
208 self,
209 dataset_type: DatasetType | str,
210 data_id: DataId | None = None,
211 *,
212 collections: str | Sequence[str] | None = None,
213 timespan: Timespan | None = None,
214 storage_class: str | StorageClass | None = None,
215 dimension_records: bool = False,
216 datastore_records: bool = False,
217 **kwargs: Any,
218 ) -> DatasetRef | None:
219 return self._remote_butler.find_dataset(
220 dataset_type,
221 data_id,
222 collections=collections,
223 timespan=timespan,
224 storage_class=storage_class,
225 dimension_records=dimension_records,
226 datastore_records=datastore_records,
227 **kwargs,
228 )
230 def retrieve_artifacts_zip(
231 self,
232 refs: Iterable[DatasetRef],
233 destination: ResourcePathExpression,
234 overwrite: bool = True,
235 ) -> ResourcePath:
236 return self._remote_butler.retrieve_artifacts_zip(refs, destination, overwrite)
238 def retrieveArtifacts(
239 self,
240 refs: Iterable[DatasetRef],
241 destination: ResourcePathExpression,
242 transfer: str = "auto",
243 preserve_path: bool = True,
244 overwrite: bool = False,
245 ) -> list[ResourcePath]:
246 return self._remote_butler.retrieveArtifacts(refs, destination, transfer, preserve_path, overwrite)
248 def exists(
249 self,
250 dataset_ref_or_type: DatasetRef | DatasetType | str,
251 /,
252 data_id: DataId | None = None,
253 *,
254 full_check: bool = True,
255 collections: Any = None,
256 **kwargs: Any,
257 ) -> DatasetExistence:
258 return self._remote_butler.exists(
259 dataset_ref_or_type, data_id, full_check=full_check, collections=collections, **kwargs
260 )
262 def _exists_many(
263 self,
264 refs: Iterable[DatasetRef],
265 /,
266 *,
267 full_check: bool = True,
268 ) -> dict[DatasetRef, DatasetExistence]:
269 return self._remote_butler._exists_many(refs, full_check=full_check)
271 def removeRuns(
272 self,
273 names: Iterable[str],
274 unstore: bool | type[_DeprecatedDefault] = _DeprecatedDefault,
275 *,
276 unlink_from_chains: bool = False,
277 ) -> None:
278 return self._direct_butler.removeRuns(names, unstore, unlink_from_chains=unlink_from_chains)
280 def ingest_zip(
281 self,
282 zip_file: ResourcePathExpression,
283 transfer: str = "auto",
284 *,
285 transfer_dimensions: bool = False,
286 dry_run: bool = False,
287 skip_existing: bool = False,
288 ) -> None:
289 # Docstring inherited.
290 return self._direct_butler.ingest_zip(
291 zip_file,
292 transfer=transfer,
293 transfer_dimensions=transfer_dimensions,
294 dry_run=dry_run,
295 skip_existing=skip_existing,
296 )
298 def ingest(
299 self,
300 *datasets: FileDataset,
301 transfer: str | None = "auto",
302 record_validation_info: bool = True,
303 skip_existing: bool = False,
304 ) -> None:
305 return self._direct_butler.ingest(
306 *datasets,
307 transfer=transfer,
308 record_validation_info=record_validation_info,
309 skip_existing=skip_existing,
310 )
312 def export(
313 self,
314 *,
315 directory: str | None = None,
316 filename: str | None = None,
317 format: str | None = None,
318 transfer: str | None = None,
319 ) -> AbstractContextManager[RepoExportContext]:
320 return self._direct_butler.export(
321 directory=directory, filename=filename, format=format, transfer=transfer
322 )
324 def import_(
325 self,
326 *,
327 directory: ResourcePathExpression | None = None,
328 filename: ResourcePathExpression | TextIO | None = None,
329 format: str | None = None,
330 transfer: str | None = None,
331 skip_dimensions: set | None = None,
332 record_validation_info: bool = True,
333 without_datastore: bool = False,
334 ) -> None:
335 self._direct_butler.import_(
336 directory=directory,
337 filename=filename,
338 format=format,
339 transfer=transfer,
340 skip_dimensions=skip_dimensions,
341 record_validation_info=record_validation_info,
342 without_datastore=without_datastore,
343 )
345 def transfer_dimension_records_from(
346 self, source_butler: LimitedButler | Butler, source_refs: Iterable[DatasetRef | DataCoordinate]
347 ) -> None:
348 return self._direct_butler.transfer_dimension_records_from(source_butler, source_refs)
350 def transfer_from(
351 self,
352 source_butler: LimitedButler,
353 source_refs: Iterable[DatasetRef],
354 transfer: str = "auto",
355 skip_missing: bool = True,
356 register_dataset_types: bool = False,
357 transfer_dimensions: bool = False,
358 dry_run: bool = False,
359 ) -> Collection[DatasetRef]:
360 return self._direct_butler.transfer_from(
361 source_butler,
362 source_refs,
363 transfer,
364 skip_missing,
365 register_dataset_types,
366 transfer_dimensions,
367 dry_run,
368 )
370 def validateConfiguration(
371 self,
372 logFailures: bool = False,
373 datasetTypeNames: Iterable[str] | None = None,
374 ignore: Iterable[str] | None = None,
375 ) -> None:
376 return self._direct_butler.validateConfiguration(logFailures, datasetTypeNames, ignore)
378 @property
379 def run(self) -> str | None:
380 return self._remote_butler.run
382 @property
383 def registry(self) -> Registry:
384 return self._registry
386 def query(self) -> AbstractContextManager[Query]:
387 return self._remote_butler.query()
389 def clone(
390 self,
391 *,
392 collections: CollectionArgType | None | EllipsisType = ...,
393 run: str | None | EllipsisType = ...,
394 inferDefaults: bool | EllipsisType = ...,
395 dataId: dict[str, str] | EllipsisType = ...,
396 metrics: ButlerMetrics | None = None,
397 ) -> HybridButler:
398 remote_butler = self._remote_butler.clone(
399 collections=collections, run=run, inferDefaults=inferDefaults, dataId=dataId, metrics=metrics
400 )
401 direct_butler = self._direct_butler.clone(
402 collections=collections, run=run, inferDefaults=inferDefaults, dataId=dataId, metrics=metrics
403 )
404 return HybridButler(remote_butler, direct_butler)
406 def pruneDatasets(
407 self,
408 refs: Iterable[DatasetRef],
409 *,
410 disassociate: bool = True,
411 unstore: bool = False,
412 tags: Iterable[str] = (),
413 purge: bool = False,
414 ) -> None:
415 return self._direct_butler.pruneDatasets(
416 refs, disassociate=disassociate, unstore=unstore, tags=tags, purge=purge
417 )
419 @property
420 def dimensions(self) -> DimensionUniverse:
421 return self._remote_butler.dimensions
423 def _extract_all_dimension_records_from_data_ids(
424 self,
425 source_butler: LimitedButler | Butler,
426 data_ids: set[DataCoordinate],
427 allowed_elements: frozenset[DimensionElement],
428 ) -> dict[DimensionElement, dict[DataCoordinate, DimensionRecord]]:
429 return self._direct_butler._extract_all_dimension_records_from_data_ids(
430 source_butler, data_ids, allowed_elements
431 )
433 def _expand_data_ids(self, data_ids: Iterable[DataCoordinate]) -> list[DataCoordinate]:
434 return self._remote_butler._expand_data_ids(data_ids)
436 @property
437 def collection_chains(self) -> ButlerCollections:
438 return HybridButlerCollections(self)
440 @property
441 def collections(self) -> ButlerCollections:
442 return HybridButlerCollections(self)
444 def _query_all_datasets_by_page(
445 self, args: QueryAllDatasetsParameters
446 ) -> AbstractContextManager[Iterator[list[DatasetRef]]]:
447 return self._remote_butler._query_all_datasets_by_page(args)
449 @property
450 def _file_transfer_source(self) -> FileTransferSource:
451 return self._remote_butler._file_transfer_source
453 def close(self) -> None:
454 self._direct_butler.close()
455 self._remote_butler.close()