Coverage for python / lsst / daf / butler / remote_butler / _remote_butler.py: 0%
290 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-30 08:41 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-30 08:41 +0000
1# This file is part of daf_butler.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28from __future__ import annotations
30__all__ = ("RemoteButler",)
32import logging
33import uuid
34from collections.abc import Collection, Iterable, Iterator, Sequence
35from contextlib import AbstractContextManager, contextmanager
36from types import EllipsisType
37from typing import TYPE_CHECKING, Any, TextIO, cast
39from deprecated.sphinx import deprecated
41from lsst.daf.butler.datastores.file_datastore.retrieve_artifacts import (
42 ArtifactIndexInfo,
43 ZipIndex,
44 determine_destination_for_retrieved_artifact,
45 retrieve_and_zip,
46 unpack_zips,
47)
48from lsst.resources import ResourcePath, ResourcePathExpression
49from lsst.utils.iteration import chunk_iterable
51from .._butler import Butler, _DeprecatedDefault
52from .._butler_collections import ButlerCollections
53from .._butler_metrics import ButlerMetrics
54from .._dataset_existence import DatasetExistence
55from .._dataset_ref import DatasetId, DatasetRef
56from .._dataset_type import DatasetType
57from .._deferredDatasetHandle import DeferredDatasetHandle
58from .._exceptions import DatasetNotFoundError
59from .._query_all_datasets import QueryAllDatasetsParameters
60from .._storage_class import StorageClass, StorageClassFactory
61from .._utilities.locked_object import LockedObject
62from ..datastore import DatasetRefURIs, DatastoreConfig
63from ..datastore.cache_manager import AbstractDatastoreCacheManager, DatastoreCacheManager
64from ..dimensions import DataCoordinate, DataIdValue, DimensionConfig, DimensionUniverse, SerializedDataId
65from ..queries import Query
66from ..queries.tree import make_column_literal
67from ..registry import CollectionArgType, NoDefaultCollectionError, Registry, RegistryDefaults
68from ..registry.expand_data_ids import expand_data_ids
69from ._collection_args import convert_collection_arg_to_glob_string_list
70from ._defaults import DefaultsHolder
71from ._get import convert_http_url_to_resource_path, get_dataset_as_python_object
72from ._http_connection import RemoteButlerHttpConnection, parse_model, quote_path_variable
73from ._query_driver import RemoteQueryDriver
74from ._query_results import convert_dataset_ref_results, read_query_results
75from ._ref_utils import apply_storage_class_override, normalize_dataset_type_name, simplify_dataId
76from ._registry import RemoteButlerRegistry
77from ._remote_butler_collections import RemoteButlerCollections
78from ._remote_file_transfer_source import RemoteFileTransferSource
79from .server_models import (
80 CollectionList,
81 FileInfoPayload,
82 FindDatasetRequestModel,
83 FindDatasetResponseModel,
84 GetDatasetTypeResponseModel,
85 GetFileByDataIdRequestModel,
86 GetFileResponseModel,
87 GetManyDatasetsRequestModel,
88 GetManyDatasetsResponseModel,
89 GetUniverseResponseModel,
90 QueryAllDatasetsRequestModel,
91)
93if TYPE_CHECKING:
94 from .._dataset_provenance import DatasetProvenance
95 from .._file_dataset import FileDataset
96 from .._limited_butler import LimitedButler
97 from .._timespan import Timespan
98 from ..dimensions import DataId
99 from ..transfers import RepoExportContext
102_LOG = logging.getLogger(__name__)
105class RemoteButler(Butler): # numpydoc ignore=PR02
106 """A `Butler` that can be used to connect through a remote server.
108 Parameters
109 ----------
110 options : `ButlerInstanceOptions`
111 Default values and other settings for the Butler instance.
112 connection : `RemoteButlerHttpConnection`
113 Connection to Butler server.
114 cache : `RemoteButlerCache`
115 Cache of data shared between multiple RemoteButler instances connected
116 to the same server.
117 use_disabled_datastore_cache : `bool`, optional
118 If `True`, a datastore cache manager will be created with a default
119 disabled state which can be enabled by the environment. If `False`
120 a cache manager will be constructed from the default local
121 configuration, likely caching by default but only specific storage
122 classes.
124 Notes
125 -----
126 Instead of using this constructor, most users should use either
127 `Butler.from_config` or `RemoteButlerFactory`.
128 """
130 _registry_defaults: DefaultsHolder
131 _connection: RemoteButlerHttpConnection
132 _cache: RemoteButlerCache
133 _registry: RemoteButlerRegistry
134 _datastore_cache_manager: AbstractDatastoreCacheManager | None
135 _use_disabled_datastore_cache: bool
137 # This is __new__ instead of __init__ because we have to support
138 # instantiation via the legacy constructor Butler.__new__(), which
139 # reads the configuration and selects which subclass to instantiate. The
140 # interaction between __new__ and __init__ is kind of wacky in Python. If
141 # we were using __init__ here, __init__ would be called twice (once when
142 # the RemoteButler instance is constructed inside Butler.from_config(), and
143 # a second time with the original arguments to Butler() when the instance
144 # is returned from Butler.__new__()
145 def __new__(
146 cls,
147 *,
148 connection: RemoteButlerHttpConnection,
149 defaults: RegistryDefaults,
150 cache: RemoteButlerCache,
151 use_disabled_datastore_cache: bool = True,
152 metrics: ButlerMetrics | None = None,
153 ) -> RemoteButler:
154 self = cast(RemoteButler, super().__new__(cls))
155 self.storageClasses = StorageClassFactory()
157 self._connection = connection
158 self._cache = cache
159 self._datastore_cache_manager = None
160 self._use_disabled_datastore_cache = use_disabled_datastore_cache
161 self._metrics = metrics if metrics is not None else ButlerMetrics()
163 self._registry_defaults = DefaultsHolder(defaults)
164 self._registry = RemoteButlerRegistry(self, self._registry_defaults, self._connection)
165 defaults.finish(self._registry)
167 return self
169 def isWriteable(self) -> bool:
170 # Docstring inherited.
171 return False
173 @property
174 @deprecated(
175 "Please use 'collections' instead. collection_chains will be removed after v28.",
176 version="v28",
177 category=FutureWarning,
178 )
179 def collection_chains(self) -> ButlerCollections:
180 """Object with methods for modifying collection chains."""
181 return self.collections
183 @property
184 def collections(self) -> ButlerCollections:
185 """Object with methods for modifying and querying collections."""
186 return RemoteButlerCollections(self._registry_defaults, self._connection)
188 @property
189 def dimensions(self) -> DimensionUniverse:
190 # Docstring inherited.
191 with self._cache.access() as cache:
192 if cache.dimensions is not None:
193 return cache.dimensions
195 response = self._connection.get("universe")
196 model = parse_model(response, GetUniverseResponseModel)
198 config = DimensionConfig.from_simple(model.universe)
199 universe = DimensionUniverse(config)
200 with self._cache.access() as cache:
201 if cache.dimensions is None:
202 cache.dimensions = universe
203 return cache.dimensions
205 @property
206 def _cache_manager(self) -> AbstractDatastoreCacheManager:
207 """Cache manager to use when reading files from the butler."""
208 # RemoteButler does not get any cache configuration from the server.
209 # Either create a disabled cache manager which can be enabled via the
210 # environment, or create a cache manager from the default FileDatastore
211 # config. This will not work properly if the defaults for
212 # DatastoreConfig no longer include the cache.
213 if self._datastore_cache_manager is None:
214 datastore_config = DatastoreConfig()
215 if not self._use_disabled_datastore_cache and "cached" in datastore_config:
216 self._datastore_cache_manager = DatastoreCacheManager(
217 datastore_config["cached"], universe=self.dimensions
218 )
219 else:
220 self._datastore_cache_manager = DatastoreCacheManager.create_disabled(
221 universe=self.dimensions
222 )
223 return self._datastore_cache_manager
225 def _caching_context(self) -> AbstractContextManager[None]:
226 # Docstring inherited.
227 # Not implemented for now, will have to think whether this needs to
228 # do something on client side and/or remote side.
229 raise NotImplementedError()
231 def transaction(self) -> AbstractContextManager[None]:
232 """Will always raise NotImplementedError.
233 Transactions are not supported by RemoteButler.
234 """
235 raise NotImplementedError()
237 def put(
238 self,
239 obj: Any,
240 datasetRefOrType: DatasetRef | DatasetType | str,
241 /,
242 dataId: DataId | None = None,
243 *,
244 run: str | None = None,
245 provenance: DatasetProvenance | None = None,
246 **kwargs: Any,
247 ) -> DatasetRef:
248 # Docstring inherited.
249 raise NotImplementedError()
251 def getDeferred(
252 self,
253 datasetRefOrType: DatasetRef | DatasetType | str,
254 /,
255 dataId: DataId | None = None,
256 *,
257 parameters: dict | None = None,
258 collections: Any = None,
259 storageClass: str | StorageClass | None = None,
260 timespan: Timespan | None = None,
261 **kwargs: Any,
262 ) -> DeferredDatasetHandle:
263 response = self._get_file_info(datasetRefOrType, dataId, collections, timespan, kwargs)
264 # Check that artifact information is available.
265 _to_file_payload(response)
266 ref = DatasetRef.from_simple(response.dataset_ref, universe=self.dimensions)
267 return DeferredDatasetHandle(butler=self, ref=ref, parameters=parameters, storageClass=storageClass)
269 def get(
270 self,
271 datasetRefOrType: DatasetRef | DatasetType | str,
272 /,
273 dataId: DataId | None = None,
274 *,
275 parameters: dict[str, Any] | None = None,
276 collections: Any = None,
277 storageClass: StorageClass | str | None = None,
278 timespan: Timespan | None = None,
279 **kwargs: Any,
280 ) -> Any:
281 # Docstring inherited.
282 with self._metrics.instrument_get(log=_LOG, msg="Retrieved remote dataset"):
283 model = self._get_file_info(datasetRefOrType, dataId, collections, timespan, kwargs)
285 ref = DatasetRef.from_simple(model.dataset_ref, universe=self.dimensions)
286 # If the caller provided a DatasetRef, they may have overridden the
287 # component on it. We need to explicitly handle this because we
288 # did not send the DatasetType to the server in this case.
289 if isinstance(datasetRefOrType, DatasetRef):
290 componentOverride = datasetRefOrType.datasetType.component()
291 if componentOverride:
292 ref = ref.makeComponentRef(componentOverride)
293 ref = apply_storage_class_override(ref, datasetRefOrType, storageClass)
295 return self._get_dataset_as_python_object(ref, model, parameters)
297 def _get_dataset_as_python_object(
298 self,
299 ref: DatasetRef,
300 model: GetFileResponseModel,
301 parameters: dict[str, Any] | None,
302 ) -> Any:
303 # This thin wrapper method is here to provide a place to hook in a mock
304 # mimicking DatastoreMock functionality for use in unit tests.
305 return get_dataset_as_python_object(
306 ref,
307 _to_file_payload(model),
308 auth=self._connection.auth,
309 parameters=parameters,
310 cache_manager=self._cache_manager,
311 )
313 def _get_file_info(
314 self,
315 datasetRefOrType: DatasetRef | DatasetType | str,
316 dataId: DataId | None,
317 collections: CollectionArgType,
318 timespan: Timespan | None,
319 kwargs: dict[str, DataIdValue],
320 ) -> GetFileResponseModel:
321 """Send a request to the server for the file URLs and metadata
322 associated with a dataset.
323 """
324 if isinstance(datasetRefOrType, DatasetRef):
325 if dataId is not None:
326 raise ValueError("DatasetRef given, cannot use dataId as well")
327 return self._get_file_info_for_ref(datasetRefOrType)
328 else:
329 request = GetFileByDataIdRequestModel(
330 dataset_type=normalize_dataset_type_name(datasetRefOrType),
331 collections=self._normalize_collections(collections),
332 data_id=simplify_dataId(dataId, kwargs),
333 default_data_id=self._serialize_default_data_id(),
334 timespan=timespan,
335 )
336 response = self._connection.post("get_file_by_data_id", request)
337 return parse_model(response, GetFileResponseModel)
339 def _get_file_info_for_ref(self, ref: DatasetRef) -> GetFileResponseModel:
340 response = self._connection.get(f"get_file/{_to_uuid_string(ref.id)}")
341 return parse_model(response, GetFileResponseModel)
343 def getURIs(
344 self,
345 datasetRefOrType: DatasetRef | DatasetType | str,
346 /,
347 dataId: DataId | None = None,
348 *,
349 predict: bool = False,
350 collections: Any = None,
351 run: str | None = None,
352 **kwargs: Any,
353 ) -> DatasetRefURIs:
354 # Docstring inherited.
355 if predict or run:
356 raise NotImplementedError("Predict mode is not supported by RemoteButler")
358 response = self._get_file_info(datasetRefOrType, dataId, collections, None, kwargs)
359 file_info = _to_file_payload(response).file_info
360 if len(file_info) == 1:
361 return DatasetRefURIs(
362 primaryURI=convert_http_url_to_resource_path(
363 file_info[0].url, self._connection.auth, file_info[0].auth
364 )
365 )
366 else:
367 components = {}
368 for f in file_info:
369 component = f.datastoreRecords.component
370 if component is None:
371 raise ValueError(
372 f"DatasetId {response.dataset_ref.id} has a component file"
373 " with no component name defined"
374 )
375 components[component] = convert_http_url_to_resource_path(
376 f.url, self._connection.auth, f.auth
377 )
378 return DatasetRefURIs(componentURIs=components)
380 def get_dataset_type(self, name: str) -> DatasetType:
381 with self._cache.access() as cache:
382 if (cached_value := cache.dataset_types.get(name)) is not None:
383 return cached_value
385 response = self._connection.get(f"dataset_type/{quote_path_variable(name)}")
386 model = parse_model(response, GetDatasetTypeResponseModel)
387 value = DatasetType.from_simple(model.dataset_type, universe=self.dimensions)
388 with self._cache.access() as cache:
389 return cache.dataset_types.setdefault(name, value)
391 def get_dataset(
392 self,
393 id: DatasetId | str,
394 *,
395 storage_class: str | StorageClass | None = None,
396 dimension_records: bool = False,
397 datastore_records: bool = False,
398 ) -> DatasetRef | None:
399 # datastore_records is intentionally ignored. It is an optimization
400 # flag that only applies to DirectButler.
401 path = f"dataset/{_to_uuid_string(id)}"
402 response = self._connection.get(path, params={"dimension_records": bool(dimension_records)})
403 model = parse_model(response, FindDatasetResponseModel)
404 if model.dataset_ref is None:
405 return None
406 ref = DatasetRef.from_simple(model.dataset_ref, universe=self.dimensions)
407 if storage_class is not None:
408 ref = ref.overrideStorageClass(storage_class)
409 return ref
411 def get_many_datasets(self, ids: Iterable[DatasetId | str]) -> list[DatasetRef]:
412 result = []
413 for batch in chunk_iterable(ids, GetManyDatasetsRequestModel.MAX_ITEMS_PER_REQUEST):
414 request = GetManyDatasetsRequestModel(dataset_ids=batch)
415 response = self._connection.post("datasets", request)
416 model = parse_model(response, GetManyDatasetsResponseModel)
417 refs = convert_dataset_ref_results(model, self.dimensions)
418 result.extend(refs)
419 return result
421 def find_dataset(
422 self,
423 dataset_type: DatasetType | str,
424 data_id: DataId | None = None,
425 *,
426 collections: str | Sequence[str] | None = None,
427 timespan: Timespan | None = None,
428 storage_class: str | StorageClass | None = None,
429 dimension_records: bool = False,
430 datastore_records: bool = False,
431 **kwargs: Any,
432 ) -> DatasetRef | None:
433 # datastore_records is intentionally ignored. It is an optimization
434 # flag that only applies to DirectButler.
436 query = FindDatasetRequestModel(
437 dataset_type=normalize_dataset_type_name(dataset_type),
438 data_id=simplify_dataId(data_id, kwargs),
439 default_data_id=self._serialize_default_data_id(),
440 collections=self._normalize_collections(collections),
441 timespan=timespan,
442 dimension_records=dimension_records,
443 )
445 response = self._connection.post("find_dataset", query)
447 model = parse_model(response, FindDatasetResponseModel)
448 if model.dataset_ref is None:
449 return None
451 ref = DatasetRef.from_simple(model.dataset_ref, universe=self.dimensions)
452 if isinstance(data_id, DataCoordinate) and data_id.hasRecords():
453 ref = ref.expanded(data_id)
454 return apply_storage_class_override(ref, dataset_type, storage_class)
456 def _retrieve_artifacts(
457 self,
458 refs: Iterable[DatasetRef],
459 destination: ResourcePathExpression,
460 transfer: str = "auto",
461 preserve_path: bool = True,
462 overwrite: bool = False,
463 write_index: bool = True,
464 add_prefix: bool = False,
465 ) -> dict[ResourcePath, ArtifactIndexInfo]:
466 destination = ResourcePath(destination).abspath()
467 if not destination.isdir():
468 raise ValueError(f"Destination location must refer to a directory. Given {destination}.")
470 if transfer not in ("auto", "copy"):
471 raise ValueError("Only 'copy' and 'auto' transfer modes are supported.")
473 requested_ids = {ref.id for ref in refs}
474 have_copied: dict[ResourcePath, ResourcePath] = {}
475 artifact_map: dict[ResourcePath, ArtifactIndexInfo] = {}
476 # Sort to ensure that in many refs to one file situation the same
477 # ref is used for any prefix that might be added.
478 for ref in sorted(refs):
479 prefix = str(ref.id)[:8] + "-" if add_prefix else ""
480 file_info = _to_file_payload(self._get_file_info_for_ref(ref)).file_info
481 for file in file_info:
482 source_uri = ResourcePath(str(file.url))
483 # For DECam/zip we only want to copy once.
484 # For zip files we need to unpack so that they can be
485 # zipped up again if needed.
486 is_zip = source_uri.getExtension() == ".zip" and "zip-path" in source_uri.fragment
487 cleaned_source_uri = source_uri.replace(fragment="", query="", params="")
488 if is_zip:
489 if cleaned_source_uri not in have_copied:
490 zipped_artifacts = unpack_zips(
491 [cleaned_source_uri], requested_ids, destination, preserve_path, overwrite
492 )
493 artifact_map.update(zipped_artifacts)
494 have_copied[cleaned_source_uri] = cleaned_source_uri
495 elif cleaned_source_uri not in have_copied:
496 relative_path = ResourcePath(file.datastoreRecords.path, forceAbsolute=False)
497 target_uri = determine_destination_for_retrieved_artifact(
498 destination, relative_path, preserve_path, prefix
499 )
500 # Because signed URLs expire, we want to do the transfer
501 # soon after retrieving the URL.
502 target_uri.transfer_from(source_uri, transfer="copy", overwrite=overwrite)
503 have_copied[cleaned_source_uri] = target_uri
504 artifact_map[target_uri] = ArtifactIndexInfo.from_single(file.datastoreRecords, ref.id)
505 else:
506 target_uri = have_copied[cleaned_source_uri]
507 artifact_map[target_uri].append(ref.id)
509 if write_index:
510 index = ZipIndex.from_artifact_map(refs, artifact_map, destination)
511 index.write_index(destination)
513 return artifact_map
515 def retrieve_artifacts_zip(
516 self,
517 refs: Iterable[DatasetRef],
518 destination: ResourcePathExpression,
519 overwrite: bool = True,
520 ) -> ResourcePath:
521 return retrieve_and_zip(refs, destination, self._retrieve_artifacts, overwrite)
523 def retrieveArtifacts(
524 self,
525 refs: Iterable[DatasetRef],
526 destination: ResourcePathExpression,
527 transfer: str = "auto",
528 preserve_path: bool = True,
529 overwrite: bool = False,
530 ) -> list[ResourcePath]:
531 artifact_map = self._retrieve_artifacts(
532 refs,
533 destination,
534 transfer,
535 preserve_path,
536 overwrite,
537 )
538 return list(artifact_map)
540 def exists(
541 self,
542 dataset_ref_or_type: DatasetRef | DatasetType | str,
543 /,
544 data_id: DataId | None = None,
545 *,
546 full_check: bool = True,
547 collections: Any = None,
548 **kwargs: Any,
549 ) -> DatasetExistence:
550 try:
551 response = self._get_file_info(
552 dataset_ref_or_type, dataId=data_id, collections=collections, timespan=None, kwargs=kwargs
553 )
554 except DatasetNotFoundError:
555 return DatasetExistence.UNRECOGNIZED
557 if response.artifact is None:
558 if full_check:
559 return DatasetExistence.RECORDED
560 else:
561 return DatasetExistence.RECORDED | DatasetExistence._ASSUMED
563 if full_check:
564 for file in response.artifact.file_info:
565 if not ResourcePath(str(file.url)).exists():
566 return DatasetExistence.RECORDED | DatasetExistence.DATASTORE
567 return DatasetExistence.VERIFIED
568 else:
569 return DatasetExistence.KNOWN
571 def _exists_many(
572 self,
573 refs: Iterable[DatasetRef],
574 /,
575 *,
576 full_check: bool = True,
577 ) -> dict[DatasetRef, DatasetExistence]:
578 return {ref: self.exists(ref, full_check=full_check) for ref in refs}
580 def removeRuns(
581 self,
582 names: Iterable[str],
583 unstore: bool | type[_DeprecatedDefault] = _DeprecatedDefault,
584 *,
585 unlink_from_chains: bool = False,
586 ) -> None:
587 # Docstring inherited.
588 raise NotImplementedError()
590 def ingest(
591 self,
592 *datasets: FileDataset,
593 transfer: str | None = "auto",
594 record_validation_info: bool = True,
595 skip_existing: bool = False,
596 ) -> None:
597 # Docstring inherited.
598 raise NotImplementedError()
600 def ingest_zip(
601 self,
602 zip_file: ResourcePathExpression,
603 transfer: str = "auto",
604 *,
605 transfer_dimensions: bool = False,
606 dry_run: bool = False,
607 skip_existing: bool = False,
608 ) -> None:
609 # Docstring inherited.
610 raise NotImplementedError()
612 def export(
613 self,
614 *,
615 directory: str | None = None,
616 filename: str | None = None,
617 format: str | None = None,
618 transfer: str | None = None,
619 ) -> AbstractContextManager[RepoExportContext]:
620 # Docstring inherited.
621 raise NotImplementedError()
623 def import_(
624 self,
625 *,
626 directory: ResourcePathExpression | None = None,
627 filename: ResourcePathExpression | TextIO | None = None,
628 format: str | None = None,
629 transfer: str | None = None,
630 skip_dimensions: set | None = None,
631 record_validation_info: bool = True,
632 without_datastore: bool = False,
633 ) -> None:
634 # Docstring inherited.
635 raise NotImplementedError()
637 def transfer_dimension_records_from(
638 self, source_butler: LimitedButler | Butler, source_refs: Iterable[DatasetRef | DataCoordinate]
639 ) -> None:
640 # Docstring inherited.
641 raise NotImplementedError()
643 def transfer_from(
644 self,
645 source_butler: LimitedButler,
646 source_refs: Iterable[DatasetRef],
647 transfer: str = "auto",
648 skip_missing: bool = True,
649 register_dataset_types: bool = False,
650 transfer_dimensions: bool = False,
651 dry_run: bool = False,
652 ) -> Collection[DatasetRef]:
653 # Docstring inherited.
654 raise NotImplementedError()
656 def validateConfiguration(
657 self,
658 logFailures: bool = False,
659 datasetTypeNames: Iterable[str] | None = None,
660 ignore: Iterable[str] | None = None,
661 ) -> None:
662 # Docstring inherited.
663 raise NotImplementedError()
665 @property
666 def run(self) -> str | None:
667 # Docstring inherited.
668 return self._registry_defaults.get().run
670 @property
671 def registry(self) -> Registry:
672 return self._registry
674 @contextmanager
675 def query(self) -> Iterator[Query]:
676 driver = RemoteQueryDriver(self, self._connection)
677 with driver:
678 query = Query(driver)
679 yield query
681 @contextmanager
682 def _query_all_datasets_by_page(
683 self, args: QueryAllDatasetsParameters
684 ) -> Iterator[Iterator[list[DatasetRef]]]:
685 universe = self.dimensions
687 request = QueryAllDatasetsRequestModel(
688 collections=self._normalize_collections(args.collections),
689 name=[normalize_dataset_type_name(name) for name in args.name],
690 find_first=args.find_first,
691 data_id=simplify_dataId(args.data_id, args.kwargs),
692 default_data_id=self._serialize_default_data_id(),
693 where=args.where,
694 bind={k: make_column_literal(v) for k, v in args.bind.items()},
695 limit=args.limit,
696 with_dimension_records=args.with_dimension_records,
697 )
698 with self._connection.post_with_stream_response("query/all_datasets", request) as response:
699 pages = read_query_results(response)
700 yield (convert_dataset_ref_results(page, universe) for page in pages)
702 def pruneDatasets(
703 self,
704 refs: Iterable[DatasetRef],
705 *,
706 disassociate: bool = True,
707 unstore: bool = False,
708 tags: Iterable[str] = (),
709 purge: bool = False,
710 ) -> None:
711 # Docstring inherited.
712 raise NotImplementedError()
714 def _normalize_collections(self, collections: CollectionArgType | None) -> CollectionList:
715 """Convert the ``collections`` parameter in the format used by Butler
716 methods to a standardized format for the REST API.
717 """
718 if collections is None:
719 if not self.collections.defaults:
720 raise NoDefaultCollectionError(
721 "No collections provided, and no defaults from butler construction."
722 )
723 collections = self.collections.defaults
724 return convert_collection_arg_to_glob_string_list(collections)
726 def clone(
727 self,
728 *,
729 collections: CollectionArgType | None | EllipsisType = ...,
730 run: str | None | EllipsisType = ...,
731 inferDefaults: bool | EllipsisType = ...,
732 dataId: dict[str, str] | EllipsisType = ...,
733 metrics: ButlerMetrics | None = None,
734 ) -> RemoteButler:
735 defaults = self._registry_defaults.get().clone(collections, run, inferDefaults, dataId)
736 return RemoteButler(
737 connection=self._connection, cache=self._cache, defaults=defaults, metrics=metrics
738 )
740 def close(self) -> None:
741 pass
743 def _expand_data_ids(self, data_ids: Iterable[DataCoordinate]) -> list[DataCoordinate]:
744 return expand_data_ids(data_ids, self.dimensions, self.query, None)
746 @property
747 def _file_transfer_source(self) -> RemoteFileTransferSource:
748 return RemoteFileTransferSource(self._connection)
750 def __str__(self) -> str:
751 return f"RemoteButler({self._connection.server_url})"
753 def _serialize_default_data_id(self) -> SerializedDataId:
754 """Convert the default data ID to a serializable format."""
755 # In an ideal world, the default data ID would just get combined with
756 # the rest of the data ID on the client side instead of being sent
757 # separately to the server. Unfortunately, that requires knowledge of
758 # the DatasetType's dimensions which we don't always have available on
759 # the client. Data ID values can be specified indirectly by "implied"
760 # dimensions, but knowing what things are implied depends on what the
761 # required dimensions are.
763 return self._registry_defaults.get().dataId.to_simple(minimal=True).dataId
766def _to_file_payload(get_file_response: GetFileResponseModel) -> FileInfoPayload:
767 if get_file_response.artifact is None:
768 ref = get_file_response.dataset_ref
769 raise DatasetNotFoundError(f"Dataset is known, but artifact is not available. (datasetId='{ref.id}')")
771 return get_file_response.artifact
774def _to_uuid_string(id: uuid.UUID | str) -> str:
775 """Convert a UUID, or string parseable as a UUID, into a string formatted
776 like '1481269e-4c8d-4696-bcca-d1b4c9005d06'
777 """
778 return str(uuid.UUID(str(id)))
781class _RemoteButlerCacheData:
782 def __init__(self) -> None:
783 self.dimensions: DimensionUniverse | None = None
784 self.dataset_types: dict[str, DatasetType] = {}
787class RemoteButlerCache(LockedObject[_RemoteButlerCacheData]):
788 def __init__(self) -> None:
789 super().__init__(_RemoteButlerCacheData())