Coverage for python / lsst / daf / butler / direct_butler / _direct_butler.py: 10%
956 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-01 08:18 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-01 08:18 +0000
1# This file is part of daf_butler.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28"""Butler top level classes."""
30from __future__ import annotations
32__all__ = (
33 "ButlerValidationError",
34 "DirectButler",
35)
37import collections.abc
38import contextlib
39import io
40import itertools
41import math
42import numbers
43import os
44import uuid
45import warnings
46from collections import Counter, defaultdict
47from collections.abc import Collection, Iterable, Iterator, MutableMapping, Sequence
48from functools import partial
49from types import EllipsisType
50from typing import TYPE_CHECKING, Any, ClassVar, NamedTuple, TextIO, cast
52from deprecated.sphinx import deprecated
53from sqlalchemy.exc import IntegrityError
55from lsst.resources import ResourcePath, ResourcePathExpression
56from lsst.utils.introspection import find_outside_stacklevel, get_class_of
57from lsst.utils.iteration import chunk_iterable
58from lsst.utils.logging import VERBOSE, getLogger
59from lsst.utils.timer import time_this
61from .._butler import Butler, _DeprecatedDefault
62from .._butler_config import ButlerConfig
63from .._butler_instance_options import ButlerInstanceOptions
64from .._butler_metrics import ButlerMetrics
65from .._collection_type import CollectionType
66from .._dataset_existence import DatasetExistence
67from .._dataset_ref import DatasetRef
68from .._dataset_type import DatasetType
69from .._deferredDatasetHandle import DeferredDatasetHandle
70from .._exceptions import DatasetNotFoundError, DimensionValueError, EmptyQueryResultError, ValidationError
71from .._file_dataset import FileDataset
72from .._limited_butler import LimitedButler
73from .._query_all_datasets import QueryAllDatasetsParameters, query_all_datasets
74from .._registry_shim import RegistryShim
75from .._storage_class import StorageClass, StorageClassFactory
76from .._timespan import Timespan
77from ..datastore import Datastore, NullDatastore
78from ..datastores.file_datastore.retrieve_artifacts import ZipIndex, retrieve_and_zip
79from ..datastores.file_datastore.transfer import retrieve_file_transfer_records
80from ..dimensions import DataCoordinate, Dimension, DimensionGroup
81from ..direct_query_driver import DirectQueryDriver
82from ..progress import Progress
83from ..queries import Query
84from ..registry import (
85 ConflictingDefinitionError,
86 DataIdError,
87 MissingDatasetTypeError,
88 RegistryDefaults,
89 _RegistryFactory,
90)
91from ..registry.sql_registry import SqlRegistry
92from ..transfers import RepoExportContext
93from ..utils import transactional
94from ._direct_butler_collections import DirectButlerCollections
96if TYPE_CHECKING:
97 from lsst.resources import ResourceHandleProtocol
99 from .._dataset_provenance import DatasetProvenance
100 from .._dataset_ref import DatasetId
101 from ..datastore import DatasetRefURIs
102 from ..dimensions import DataId, DataIdValue, DimensionElement, DimensionRecord, DimensionUniverse
103 from ..registry import CollectionArgType, Registry
104 from ..transfers import RepoImportBackend
106_LOG = getLogger(__name__)
109class ButlerValidationError(ValidationError):
110 """There is a problem with the Butler configuration."""
112 pass
115class DirectButler(Butler): # numpydoc ignore=PR02
116 """Main entry point for the data access system.
118 Parameters
119 ----------
120 config : `ButlerConfig`
121 The configuration for this Butler instance.
122 registry : `SqlRegistry`
123 The object that manages dataset metadata and relationships.
124 datastore : Datastore
125 The object that manages actual dataset storage.
126 storageClasses : StorageClassFactory
127 An object that maps known storage class names to objects that fully
128 describe them.
130 Notes
131 -----
132 Most users should call the top-level `Butler`.``from_config`` instead of
133 using this constructor directly.
134 """
136 # This is __new__ instead of __init__ because we have to support
137 # instantiation via the legacy constructor Butler.__new__(), which
138 # reads the configuration and selects which subclass to instantiate. The
139 # interaction between __new__ and __init__ is kind of wacky in Python. If
140 # we were using __init__ here, __init__ would be called twice (once when
141 # the DirectButler instance is constructed inside Butler.from_config(), and
142 # a second time with the original arguments to Butler() when the instance
143 # is returned from Butler.__new__()
144 def __new__(
145 cls,
146 *,
147 config: ButlerConfig,
148 registry: SqlRegistry,
149 datastore: Datastore,
150 storageClasses: StorageClassFactory,
151 metrics: ButlerMetrics | None = None,
152 ) -> DirectButler:
153 self = cast(DirectButler, super().__new__(cls))
154 self._config = config
155 self._registry = registry
156 self._datastore = datastore
157 self.storageClasses = storageClasses
158 self._metrics = metrics if metrics is not None else ButlerMetrics()
160 # For execution butler the datastore needs a special
161 # dependency-inversion trick. This is not used by regular butler,
162 # but we do not have a way to distinguish regular butler from execution
163 # butler.
164 self._datastore.set_retrieve_dataset_type_method(partial(_retrieve_dataset_type, registry))
166 self._closed = False
168 return self
170 @classmethod
171 def create_from_config(
172 cls,
173 config: ButlerConfig,
174 *,
175 options: ButlerInstanceOptions,
176 without_datastore: bool = False,
177 ) -> DirectButler:
178 """Construct a Butler instance from a configuration file.
180 Parameters
181 ----------
182 config : `ButlerConfig`
183 The configuration for this Butler instance.
184 options : `ButlerInstanceOptions`
185 Default values and other settings for the Butler instance.
186 without_datastore : `bool`, optional
187 If `True` do not attach a datastore to this butler. Any attempts
188 to use a datastore will fail.
190 Notes
191 -----
192 Most users should call the top-level `Butler`.``from_config``
193 instead of using this function directly.
194 """
195 if "run" in config or "collection" in config:
196 raise ValueError("Passing a run or collection via configuration is no longer supported.")
198 defaults = RegistryDefaults.from_butler_instance_options(options)
199 try:
200 butlerRoot = config.get("root", config.configDir)
201 writeable = options.writeable
202 if writeable is None:
203 writeable = options.run is not None
204 registry = _RegistryFactory(config).from_config(
205 butlerRoot=butlerRoot, writeable=writeable, defaults=defaults
206 )
207 if without_datastore:
208 datastore: Datastore = NullDatastore(None, None)
209 else:
210 datastore = Datastore.fromConfig(
211 config, registry.getDatastoreBridgeManager(), butlerRoot=butlerRoot
212 )
213 # TODO: Once datastore drops dependency on registry we can
214 # construct datastore first and pass opaque tables to registry
215 # constructor.
216 registry.make_datastore_tables(datastore.get_opaque_table_definitions())
217 storageClasses = StorageClassFactory()
218 storageClasses.addFromConfig(config)
220 return DirectButler(
221 config=config,
222 registry=registry,
223 datastore=datastore,
224 storageClasses=storageClasses,
225 metrics=options.metrics,
226 )
227 except Exception:
228 # Failures here usually mean that configuration is incomplete,
229 # just issue an error message which includes config file URI.
230 _LOG.error(f"Failed to instantiate Butler from config {config.configFile}.")
231 raise
233 def clone(
234 self,
235 *,
236 collections: CollectionArgType | None | EllipsisType = ...,
237 run: str | None | EllipsisType = ...,
238 inferDefaults: bool | EllipsisType = ...,
239 dataId: dict[str, str] | EllipsisType = ...,
240 metrics: ButlerMetrics | None = None,
241 ) -> DirectButler:
242 # Docstring inherited
243 defaults = self._registry.defaults.clone(collections, run, inferDefaults, dataId)
244 registry = self._registry.copy(defaults)
246 return DirectButler(
247 registry=registry,
248 config=self._config,
249 datastore=self._datastore.clone(registry.getDatastoreBridgeManager()),
250 storageClasses=self.storageClasses,
251 metrics=metrics,
252 )
254 def close(self) -> None:
255 if not self._closed:
256 self._closed = True
257 self._registry.close()
258 # Cause exceptions to be raised if a user attempts to use the
259 # instance after closing it. Without this, Butler would still
260 # work after being closed because of implementation details
261 # of SqlAlchemy, but this may not continue to be the case in the
262 # future and we don't want users to get in the habit of doing this.
263 self._registry = _BUTLER_CLOSED_INSTANCE
264 self._datastore = _BUTLER_CLOSED_INSTANCE
266 GENERATION: ClassVar[int] = 3
267 """This is a Generation 3 Butler.
269 This attribute may be removed in the future, once the Generation 2 Butler
270 interface has been fully retired; it should only be used in transitional
271 code.
272 """
274 @classmethod
275 def _unpickle(
276 cls,
277 config: ButlerConfig,
278 collections: tuple[str, ...] | None,
279 run: str | None,
280 defaultDataId: dict[str, str],
281 writeable: bool,
282 ) -> DirectButler:
283 """Callable used to unpickle a Butler.
285 We prefer not to use ``Butler.__init__`` directly so we can force some
286 of its many arguments to be keyword-only (note that ``__reduce__``
287 can only invoke callables with positional arguments).
289 Parameters
290 ----------
291 config : `ButlerConfig`
292 Butler configuration, already coerced into a true `ButlerConfig`
293 instance (and hence after any search paths for overrides have been
294 utilized).
295 collections : `tuple` [ `str` ]
296 Names of the default collections to read from.
297 run : `str`, optional
298 Name of the default `~CollectionType.RUN` collection to write to.
299 defaultDataId : `dict` [ `str`, `str` ]
300 Default data ID values.
301 writeable : `bool`
302 Whether the Butler should support write operations.
304 Returns
305 -------
306 butler : `Butler`
307 A new `Butler` instance.
308 """
309 return cls.create_from_config(
310 config=config,
311 options=ButlerInstanceOptions(
312 collections=collections, run=run, writeable=writeable, kwargs=defaultDataId
313 ),
314 )
316 def __reduce__(self) -> tuple:
317 """Support pickling."""
318 return (
319 DirectButler._unpickle,
320 (
321 self._config,
322 self.collections.defaults,
323 self.run,
324 dict(self._registry.defaults.dataId.required),
325 self._registry.isWriteable(),
326 ),
327 )
329 def __str__(self) -> str:
330 return (
331 f"Butler(collections={self.collections}, run={self.run}, "
332 f"datastore='{self._datastore}', registry='{self._registry}')"
333 )
335 def isWriteable(self) -> bool:
336 # Docstring inherited.
337 return self._registry.isWriteable()
339 def _caching_context(self) -> contextlib.AbstractContextManager[None]:
340 """Context manager that enables caching."""
341 return self._registry.caching_context()
343 @contextlib.contextmanager
344 def transaction(self) -> Iterator[None]:
345 """Context manager supporting `Butler` transactions.
347 Transactions can be nested.
348 """
349 with self._registry.transaction(), self._datastore.transaction():
350 yield
352 def _standardizeArgs(
353 self,
354 datasetRefOrType: DatasetRef | DatasetType | str,
355 dataId: DataId | None = None,
356 for_put: bool = True,
357 **kwargs: Any,
358 ) -> tuple[DatasetType, DataId | None]:
359 """Standardize the arguments passed to several Butler APIs.
361 Parameters
362 ----------
363 datasetRefOrType : `DatasetRef`, `DatasetType`, or `str`
364 When `DatasetRef` the `dataId` should be `None`.
365 Otherwise the `DatasetType` or name thereof.
366 dataId : `dict` or `DataCoordinate`
367 A `dict` of `Dimension` link name, value pairs that label the
368 `DatasetRef` within a Collection. When `None`, a `DatasetRef`
369 should be provided as the second argument.
370 for_put : `bool`, optional
371 If `True` this call is invoked as part of a `Butler.put`.
372 Otherwise it is assumed to be part of a `Butler.get()`. This
373 parameter is only relevant if there is dataset type
374 inconsistency.
375 **kwargs
376 Additional keyword arguments used to augment or construct a
377 `DataCoordinate`. See `DataCoordinate.standardize`
378 parameters.
380 Returns
381 -------
382 datasetType : `DatasetType`
383 A `DatasetType` instance extracted from ``datasetRefOrType``.
384 dataId : `dict` or `DataId`, optional
385 Argument that can be used (along with ``kwargs``) to construct a
386 `DataId`.
388 Notes
389 -----
390 Butler APIs that conceptually need a DatasetRef also allow passing a
391 `DatasetType` (or the name of one) and a `DataId` (or a dict and
392 keyword arguments that can be used to construct one) separately. This
393 method accepts those arguments and always returns a true `DatasetType`
394 and a `DataId` or `dict`.
396 Standardization of `dict` vs `DataId` is best handled by passing the
397 returned ``dataId`` (and ``kwargs``) to `Registry` APIs, which are
398 generally similarly flexible.
399 """
400 externalDatasetType: DatasetType | None = None
401 internalDatasetType: DatasetType | None = None
402 if isinstance(datasetRefOrType, DatasetRef):
403 if dataId is not None or kwargs:
404 raise ValueError("DatasetRef given, cannot use dataId as well")
405 externalDatasetType = datasetRefOrType.datasetType
406 dataId = datasetRefOrType.dataId
407 else:
408 # Don't check whether DataId is provided, because Registry APIs
409 # can usually construct a better error message when it wasn't.
410 if isinstance(datasetRefOrType, DatasetType):
411 externalDatasetType = datasetRefOrType
412 else:
413 internalDatasetType = self.get_dataset_type(datasetRefOrType)
415 # Check that they are self-consistent
416 if externalDatasetType is not None:
417 internalDatasetType = self.get_dataset_type(externalDatasetType.name)
418 if externalDatasetType != internalDatasetType:
419 # We can allow differences if they are compatible, depending
420 # on whether this is a get or a put. A get requires that
421 # the python type associated with the datastore can be
422 # converted to the user type. A put requires that the user
423 # supplied python type can be converted to the internal
424 # type expected by registry.
425 relevantDatasetType = internalDatasetType
426 if for_put:
427 is_compatible = internalDatasetType.is_compatible_with(externalDatasetType)
428 else:
429 is_compatible = externalDatasetType.is_compatible_with(internalDatasetType)
430 relevantDatasetType = externalDatasetType
431 if not is_compatible:
432 raise ValueError(
433 f"Supplied dataset type ({externalDatasetType}) inconsistent with "
434 f"registry definition ({internalDatasetType})"
435 )
436 # Override the internal definition.
437 internalDatasetType = relevantDatasetType
439 assert internalDatasetType is not None
440 return internalDatasetType, dataId
442 def _rewrite_data_id(
443 self, dataId: DataId | None, datasetType: DatasetType, **kwargs: Any
444 ) -> tuple[DataId | None, dict[str, Any]]:
445 """Rewrite a data ID taking into account dimension records.
447 Take a Data ID and keyword args and rewrite it if necessary to
448 allow the user to specify dimension records rather than dimension
449 primary values.
451 This allows a user to include a dataId dict with keys of
452 ``exposure.day_obs`` and ``exposure.seq_num`` instead of giving
453 the integer exposure ID. It also allows a string to be given
454 for a dimension value rather than the integer ID if that is more
455 convenient. For example, rather than having to specifying the
456 detector with ``detector.full_name``, a string given for ``detector``
457 will be interpreted as the full name and converted to the integer
458 value.
460 Keyword arguments can also use strings for dimensions like detector
461 and exposure but python does not allow them to include ``.`` and
462 so the ``exposure.day_obs`` syntax can not be used in a keyword
463 argument.
465 Parameters
466 ----------
467 dataId : `dict` or `DataCoordinate`
468 A `dict` of `Dimension` link name, value pairs that will label the
469 `DatasetRef` within a Collection.
470 datasetType : `DatasetType`
471 The dataset type associated with this dataId. Required to
472 determine the relevant dimensions.
473 **kwargs
474 Additional keyword arguments used to augment or construct a
475 `DataId`. See `DataId` parameters.
477 Returns
478 -------
479 dataId : `dict` or `DataCoordinate`
480 The, possibly rewritten, dataId. If given a `DataCoordinate` and
481 no keyword arguments, the original dataId will be returned
482 unchanged.
483 **kwargs : `dict`
484 Any unused keyword arguments (would normally be empty dict).
485 """
486 # Process dimension records that are using record information
487 # rather than ids
488 newDataId: dict[str, DataIdValue] = {}
489 byRecord: dict[str, dict[str, Any]] = defaultdict(dict)
491 if isinstance(dataId, DataCoordinate):
492 # Do nothing if we have a DataCoordinate and no kwargs.
493 if not kwargs:
494 return dataId, kwargs
495 # If we have a DataCoordinate with kwargs, we know the
496 # DataCoordinate only has values for real dimensions.
497 newDataId.update(dataId.mapping)
498 elif dataId:
499 # The data is mapping, which means it might have keys like
500 # "exposure.obs_id" (unlike kwargs, because a "." is not allowed in
501 # a keyword parameter).
502 for k, v in dataId.items():
503 if isinstance(k, str) and "." in k:
504 # Someone is using a more human-readable dataId
505 dimensionName, record = k.split(".", 1)
506 byRecord[dimensionName][record] = v
507 else:
508 newDataId[k] = v
510 # Go through the updated dataId and check the type in case someone is
511 # using an alternate key. We have already filtered out the compound
512 # keys dimensions.record format.
513 not_dimensions = {}
515 # Will need to look in the dataId and the keyword arguments
516 # and will remove them if they need to be fixed or are unrecognized.
517 for dataIdDict in (newDataId, kwargs):
518 # Use a list so we can adjust the dict safely in the loop
519 for dimensionName in list(dataIdDict):
520 value = dataIdDict[dimensionName]
521 try:
522 dimension = self.dimensions.dimensions[dimensionName]
523 except KeyError:
524 # This is not a real dimension
525 not_dimensions[dimensionName] = value
526 del dataIdDict[dimensionName]
527 continue
529 # Convert an integral type to an explicit int to simplify
530 # comparisons here
531 if isinstance(value, numbers.Integral):
532 value = int(value)
534 if not isinstance(value, dimension.primaryKey.getPythonType()):
535 for alternate in dimension.alternateKeys:
536 if isinstance(value, alternate.getPythonType()):
537 byRecord[dimensionName][alternate.name] = value
538 del dataIdDict[dimensionName]
539 _LOG.debug(
540 "Converting dimension %s to %s.%s=%s",
541 dimensionName,
542 dimensionName,
543 alternate.name,
544 value,
545 )
546 break
547 else:
548 _LOG.warning(
549 "Type mismatch found for value '%r' provided for dimension %s. "
550 "Could not find matching alternative (primary key has type %s) "
551 "so attempting to use as-is.",
552 value,
553 dimensionName,
554 dimension.primaryKey.getPythonType(),
555 )
557 # By this point kwargs and newDataId should only include valid
558 # dimensions. Merge kwargs in to the new dataId and log if there
559 # are dimensions in both (rather than calling update).
560 for k, v in kwargs.items():
561 if k in newDataId and newDataId[k] != v:
562 _LOG.debug(
563 "Keyword arg %s overriding explicit value in dataId of %s with %s", k, newDataId[k], v
564 )
565 newDataId[k] = v
566 # No need to retain any values in kwargs now.
567 kwargs = {}
569 # If we have some unrecognized dimensions we have to try to connect
570 # them to records in other dimensions. This is made more complicated
571 # by some dimensions having records with clashing names. A mitigation
572 # is that we can tell by this point which dimensions are missing
573 # for the DatasetType but this does not work for calibrations
574 # where additional dimensions can be used to constrain the temporal
575 # axis.
576 if not_dimensions:
577 # Search for all dimensions even if we have been given a value
578 # explicitly. In some cases records are given as well as the
579 # actually dimension and this should not be an error if they
580 # match.
581 mandatoryDimensions = datasetType.dimensions.names # - provided
583 candidateDimensions: set[str] = set()
584 candidateDimensions.update(mandatoryDimensions)
586 # For calibrations we may well be needing temporal dimensions
587 # so rather than always including all dimensions in the scan
588 # restrict things a little. It is still possible for there
589 # to be confusion over day_obs in visit vs exposure for example.
590 # If we are not searching calibration collections things may
591 # fail but they are going to fail anyway because of the
592 # ambiguousness of the dataId...
593 if datasetType.isCalibration():
594 for dim in self.dimensions.dimensions:
595 if dim.temporal:
596 candidateDimensions.add(str(dim))
598 # Look up table for the first association with a dimension
599 guessedAssociation: dict[str, dict[str, Any]] = defaultdict(dict)
601 # Keep track of whether an item is associated with multiple
602 # dimensions.
603 counter: Counter[str] = Counter()
604 assigned: dict[str, set[str]] = defaultdict(set)
606 # Go through the missing dimensions and associate the
607 # given names with records within those dimensions
608 matched_dims = set()
609 for dimensionName in candidateDimensions:
610 dimension = self.dimensions.dimensions[dimensionName]
611 fields = dimension.metadata.names | dimension.uniqueKeys.names
612 for field in not_dimensions:
613 if field in fields:
614 guessedAssociation[dimensionName][field] = not_dimensions[field]
615 counter[dimensionName] += 1
616 assigned[field].add(dimensionName)
617 matched_dims.add(field)
619 # Calculate the fields that matched nothing.
620 never_found = set(not_dimensions) - matched_dims
622 if never_found:
623 raise DimensionValueError(f"Unrecognized keyword args given: {never_found}")
625 # There is a chance we have allocated a single dataId item
626 # to multiple dimensions. Need to decide which should be retained.
627 # For now assume that the most popular alternative wins.
628 # This means that day_obs with seq_num will result in
629 # exposure.day_obs and not visit.day_obs
630 # Also prefer an explicitly missing dimension over an inferred
631 # temporal dimension.
632 for fieldName, assignedDimensions in assigned.items():
633 if len(assignedDimensions) > 1:
634 # Pick the most popular (preferring mandatory dimensions)
635 requiredButMissing = assignedDimensions.intersection(mandatoryDimensions)
636 if requiredButMissing:
637 candidateDimensions = requiredButMissing
638 else:
639 candidateDimensions = assignedDimensions
641 # If this is a choice between visit and exposure and
642 # neither was a required part of the dataset type,
643 # (hence in this branch) always prefer exposure over
644 # visit since exposures are always defined and visits
645 # are defined from exposures.
646 if candidateDimensions == {"exposure", "visit"}:
647 candidateDimensions = {"exposure"}
649 # Select the relevant items and get a new restricted
650 # counter.
651 theseCounts = {k: v for k, v in counter.items() if k in candidateDimensions}
652 duplicatesCounter: Counter[str] = Counter()
653 duplicatesCounter.update(theseCounts)
655 # Choose the most common. If they are equally common
656 # we will pick the one that was found first.
657 # Returns a list of tuples
658 selected = duplicatesCounter.most_common(1)[0][0]
660 _LOG.debug(
661 "Ambiguous dataId entry '%s' associated with multiple dimensions: %s."
662 " Removed ambiguity by choosing dimension %s.",
663 fieldName,
664 ", ".join(assignedDimensions),
665 selected,
666 )
668 for candidateDimension in assignedDimensions:
669 if candidateDimension != selected:
670 del guessedAssociation[candidateDimension][fieldName]
672 # Update the record look up dict with the new associations
673 for dimensionName, values in guessedAssociation.items():
674 if values: # A dict might now be empty
675 _LOG.debug(
676 "Assigned non-dimension dataId keys to dimension %s: %s", dimensionName, values
677 )
678 byRecord[dimensionName].update(values)
680 if byRecord:
681 # Some record specifiers were found so we need to convert
682 # them to the Id form
683 for dimensionName, values in byRecord.items():
684 if dimensionName in newDataId:
685 _LOG.debug(
686 "DataId specified explicit %s dimension value of %s in addition to"
687 " general record specifiers for it of %s. Checking for self-consistency.",
688 dimensionName,
689 newDataId[dimensionName],
690 str(values),
691 )
692 # Get the actual record and compare with these values.
693 # Only query with relevant data ID values.
694 filtered_data_id = {
695 k: v for k, v in newDataId.items() if k in self.dimensions[dimensionName].required
696 }
697 try:
698 recs = self.query_dimension_records(
699 dimensionName,
700 data_id=filtered_data_id,
701 )
702 except (DataIdError, EmptyQueryResultError):
703 raise DimensionValueError(
704 f"Could not find dimension '{dimensionName}'"
705 f" with dataId {filtered_data_id} as part of comparing with"
706 f" record values {byRecord[dimensionName]}"
707 ) from None
708 if len(recs) == 1:
709 errmsg: list[str] = []
710 for k, v in values.items():
711 if (recval := getattr(recs[0], k)) != v:
712 errmsg.append(f"{k} ({recval} != {v})")
713 if errmsg:
714 raise DimensionValueError(
715 f"Dimension {dimensionName} in dataId has explicit value"
716 f" {newDataId[dimensionName]} inconsistent with"
717 f" {dimensionName} dimension record: " + ", ".join(errmsg)
718 )
719 else:
720 # Multiple matches for an explicit dimension
721 # should never happen but let downstream complain.
722 pass
723 continue
725 # Do not use data ID keys in query that aren't relevant.
726 # Otherwise we can have detector queries being constrained
727 # by an exposure ID that doesn't exist and return no matches
728 # for a detector even though it's a good detector name.
729 filtered_data_id = {
730 k: v
731 for k, v in newDataId.items()
732 if k in self.dimensions[dimensionName].minimal_group.names
733 }
735 def _get_attr(obj: Any, attr: str) -> Any:
736 # Used to implement x.exposure.seq_num when given
737 # x and "exposure.seq_num".
738 for component in attr.split("."):
739 obj = getattr(obj, component)
740 return obj
742 with self.query() as q:
743 x = q.expression_factory
744 # Build up a WHERE expression.
745 predicates = tuple(_get_attr(x, f"{dimensionName}.{k}") == v for k, v in values.items())
746 extra_args: dict[str, Any] = {} # For mypy.
747 extra_args.update(filtered_data_id)
748 extra_args.update(kwargs)
749 q = q.where(x.all(*predicates), **extra_args)
750 records = set(q.dimension_records(dimensionName))
752 if len(records) != 1:
753 if len(records) > 1:
754 # visit can have an ambiguous answer without involving
755 # visit_system. The default visit_system is defined
756 # by the instrument.
757 if (
758 dimensionName == "visit"
759 and "visit_system_membership" in self.dimensions
760 and "visit_system" in self.dimensions["instrument"].metadata
761 ):
762 instrument_records = self.query_dimension_records(
763 "instrument",
764 data_id=newDataId,
765 explain=False,
766 **kwargs,
767 )
768 if len(instrument_records) == 1:
769 visit_system = instrument_records[0].visit_system
770 if visit_system is None:
771 # Set to a value that will never match.
772 visit_system = -1
774 # Look up each visit in the
775 # visit_system_membership records.
776 for rec in records:
777 membership = self.query_dimension_records(
778 # Use bind to allow zero results.
779 # This is a fully-specified query.
780 "visit_system_membership",
781 instrument=instrument_records[0].name,
782 visit_system=visit_system,
783 visit=rec.id,
784 explain=False,
785 )
786 if membership:
787 # This record is the right answer.
788 records = {rec}
789 break
791 # The ambiguity may have been resolved so check again.
792 if len(records) > 1:
793 _LOG.debug(
794 "Received %d records from constraints of %s", len(records), str(values)
795 )
796 for r in records:
797 _LOG.debug("- %s", str(r))
798 raise DimensionValueError(
799 f"DataId specification for dimension {dimensionName} is not"
800 f" uniquely constrained to a single dataset by {values}."
801 f" Got {len(records)} results."
802 )
803 else:
804 raise DimensionValueError(
805 f"DataId specification for dimension {dimensionName} matched no"
806 f" records when constrained by {values}"
807 )
809 # Get the primary key from the real dimension object
810 dimension = self.dimensions.dimensions[dimensionName]
811 if not isinstance(dimension, Dimension):
812 raise RuntimeError(
813 f"{dimension.name} is not a true dimension, and cannot be used in data IDs."
814 )
815 newDataId[dimensionName] = getattr(records.pop(), dimension.primaryKey.name)
817 return newDataId, kwargs
819 def _findDatasetRef(
820 self,
821 datasetRefOrType: DatasetRef | DatasetType | str,
822 dataId: DataId | None = None,
823 *,
824 collections: Any = None,
825 predict: bool = False,
826 run: str | None = None,
827 datastore_records: bool = False,
828 timespan: Timespan | None = None,
829 **kwargs: Any,
830 ) -> DatasetRef:
831 """Shared logic for methods that start with a search for a dataset in
832 the registry.
834 Parameters
835 ----------
836 datasetRefOrType : `DatasetRef`, `DatasetType`, or `str`
837 When `DatasetRef` the `dataId` should be `None`.
838 Otherwise the `DatasetType` or name thereof.
839 dataId : `dict` or `DataCoordinate`, optional
840 A `dict` of `Dimension` link name, value pairs that label the
841 `DatasetRef` within a Collection. When `None`, a `DatasetRef`
842 should be provided as the first argument.
843 collections : Any, optional
844 Collections to be searched, overriding ``self.collections``.
845 Can be any of the types supported by the ``collections`` argument
846 to butler construction.
847 predict : `bool`, optional
848 If `True`, return a newly created `DatasetRef` with a unique
849 dataset ID if finding a reference in the `Registry` fails.
850 Defaults to `False`.
851 run : `str`, optional
852 Run collection name to use for creating `DatasetRef` for predicted
853 datasets. Only used if ``predict`` is `True`.
854 datastore_records : `bool`, optional
855 If `True` add datastore records to returned `DatasetRef`.
856 timespan : `Timespan` or `None`, optional
857 A timespan that the validity range of the dataset must overlap.
858 If not provided and this is a calibration dataset type, an attempt
859 will be made to find the timespan from any temporal coordinate
860 in the data ID.
861 **kwargs
862 Additional keyword arguments used to augment or construct a
863 `DataId`. See `DataId` parameters.
865 Returns
866 -------
867 ref : `DatasetRef`
868 A reference to the dataset identified by the given arguments.
869 This can be the same dataset reference as given if it was
870 resolved.
872 Raises
873 ------
874 LookupError
875 Raised if no matching dataset exists in the `Registry` (and
876 ``predict`` is `False`).
877 ValueError
878 Raised if a resolved `DatasetRef` was passed as an input, but it
879 differs from the one found in the registry.
880 TypeError
881 Raised if no collections were provided.
882 """
883 datasetType, dataId = self._standardizeArgs(datasetRefOrType, dataId, for_put=False, **kwargs)
884 if isinstance(datasetRefOrType, DatasetRef):
885 if collections is not None:
886 warnings.warn("Collections should not be specified with DatasetRef", stacklevel=3)
887 if predict and not datasetRefOrType.dataId.hasRecords():
888 return datasetRefOrType.expanded(self.registry.expandDataId(datasetRefOrType.dataId))
889 # May need to retrieve datastore records if requested.
890 if datastore_records and datasetRefOrType._datastore_records is None:
891 datasetRefOrType = self._registry.get_datastore_records(datasetRefOrType)
892 return datasetRefOrType
894 dataId, kwargs = self._rewrite_data_id(dataId, datasetType, **kwargs)
896 if datasetType.isCalibration():
897 # Because this is a calibration dataset, first try to make a
898 # standardize the data ID without restricting the dimensions to
899 # those of the dataset type requested, because there may be extra
900 # dimensions that provide temporal information for a validity-range
901 # lookup.
902 dataId = DataCoordinate.standardize(
903 dataId, universe=self.dimensions, defaults=self._registry.defaults.dataId, **kwargs
904 )
905 if timespan is None:
906 if dataId.dimensions.temporal:
907 dataId = self._registry.expandDataId(dataId)
908 # Use the timespan from the data ID to constrain the
909 # calibration lookup, but only if the caller has not
910 # specified an explicit timespan.
911 timespan = dataId.timespan
912 else:
913 # Try an arbitrary timespan. Downstream will fail if this
914 # results in more than one matching dataset.
915 timespan = Timespan(None, None)
916 else:
917 # Standardize the data ID to just the dimensions of the dataset
918 # type instead of letting registry.findDataset do it, so we get the
919 # result even if no dataset is found.
920 dataId = DataCoordinate.standardize(
921 dataId,
922 dimensions=datasetType.dimensions,
923 defaults=self._registry.defaults.dataId,
924 **kwargs,
925 )
926 # Always lookup the DatasetRef, even if one is given, to ensure it is
927 # present in the current collection.
928 ref = self.find_dataset(
929 datasetType,
930 dataId,
931 collections=collections,
932 timespan=timespan,
933 datastore_records=datastore_records,
934 )
935 if ref is None:
936 if predict:
937 if run is None:
938 run = self.run
939 if run is None:
940 raise TypeError("Cannot predict dataset ID/location with run=None.")
941 dataId = self.registry.expandDataId(dataId)
942 return DatasetRef(datasetType, dataId, run=run)
943 else:
944 if collections is None:
945 collections = self._registry.defaults.collections
946 raise DatasetNotFoundError(
947 f"Dataset {datasetType.name} with data ID {dataId} "
948 f"could not be found in collections {collections}."
949 )
950 if datasetType != ref.datasetType:
951 # If they differ it is because the user explicitly specified
952 # a compatible dataset type to this call rather than using the
953 # registry definition. The DatasetRef must therefore be recreated
954 # using the user definition such that the expected type is
955 # returned.
956 ref = DatasetRef(
957 datasetType, ref.dataId, run=ref.run, id=ref.id, datastore_records=ref._datastore_records
958 )
960 return ref
962 @transactional
963 def put(
964 self,
965 obj: Any,
966 datasetRefOrType: DatasetRef | DatasetType | str,
967 /,
968 dataId: DataId | None = None,
969 *,
970 run: str | None = None,
971 provenance: DatasetProvenance | None = None,
972 **kwargs: Any,
973 ) -> DatasetRef:
974 """Store and register a dataset.
976 Parameters
977 ----------
978 obj : `object`
979 The dataset.
980 datasetRefOrType : `DatasetRef`, `DatasetType`, or `str`
981 When `DatasetRef` is provided, ``dataId`` should be `None`.
982 Otherwise the `DatasetType` or name thereof. If a fully resolved
983 `DatasetRef` is given the run and ID are used directly.
984 dataId : `dict` or `DataCoordinate`
985 A `dict` of `Dimension` link name, value pairs that label the
986 `DatasetRef` within a Collection. When `None`, a `DatasetRef`
987 should be provided as the second argument.
988 run : `str`, optional
989 The name of the run the dataset should be added to, overriding
990 ``self.run``. Not used if a resolved `DatasetRef` is provided.
991 provenance : `DatasetProvenance` or `None`, optional
992 Any provenance that should be attached to the serialized dataset.
993 Not supported by all serialization mechanisms.
994 **kwargs
995 Additional keyword arguments used to augment or construct a
996 `DataCoordinate`. See `DataCoordinate.standardize`
997 parameters. Not used if a resolve `DatasetRef` is provided.
999 Returns
1000 -------
1001 ref : `DatasetRef`
1002 A reference to the stored dataset, updated with the correct id if
1003 given.
1005 Raises
1006 ------
1007 TypeError
1008 Raised if the butler is read-only or if no run has been provided.
1009 """
1010 if isinstance(datasetRefOrType, DatasetRef):
1011 # This is a direct put of predefined DatasetRef.
1012 _LOG.debug("Butler put direct: %s", datasetRefOrType)
1013 if run is not None:
1014 warnings.warn("Run collection is not used for DatasetRef", stacklevel=3)
1016 with self._metrics.instrument_put(_LOG, msg="Dataset put direct"):
1017 # If registry already has a dataset with the same dataset ID,
1018 # dataset type and DataId, then _importDatasets will do
1019 # nothing and just return an original ref. We have to raise in
1020 # this case, there is a datastore check below for that.
1021 self._registry._importDatasets([datasetRefOrType], expand=True)
1022 # Before trying to write to the datastore check that it does
1023 # not know this dataset. This is prone to races, of course.
1024 if self._datastore.knows(datasetRefOrType):
1025 raise ConflictingDefinitionError(
1026 f"Datastore already contains dataset: {datasetRefOrType}"
1027 )
1028 # Try to write dataset to the datastore, if it fails due to a
1029 # race with another write, the content of stored data may be
1030 # unpredictable.
1031 try:
1032 self._datastore.put(obj, datasetRefOrType, provenance=provenance)
1033 except IntegrityError as e:
1034 raise ConflictingDefinitionError(f"Datastore already contains dataset: {e}") from e
1036 return datasetRefOrType
1038 _LOG.debug("Butler put: %s, dataId=%s, run=%s", datasetRefOrType, dataId, run)
1039 if not self.isWriteable():
1040 raise TypeError("Butler is read-only.")
1042 with self._metrics.instrument_put(_LOG, msg="Dataset put with dataID"):
1043 datasetType, dataId = self._standardizeArgs(datasetRefOrType, dataId, **kwargs)
1045 # Handle dimension records in dataId
1046 dataId, kwargs = self._rewrite_data_id(dataId, datasetType, **kwargs)
1048 # Add Registry Dataset entry.
1049 dataId = self._registry.expandDataId(dataId, dimensions=datasetType.dimensions, **kwargs)
1050 (ref,) = self._registry.insertDatasets(datasetType, run=run, dataIds=[dataId])
1051 self._datastore.put(obj, ref, provenance=provenance)
1053 return ref
1055 def getDeferred(
1056 self,
1057 datasetRefOrType: DatasetRef | DatasetType | str,
1058 /,
1059 dataId: DataId | None = None,
1060 *,
1061 parameters: dict | None = None,
1062 collections: Any = None,
1063 storageClass: str | StorageClass | None = None,
1064 timespan: Timespan | None = None,
1065 **kwargs: Any,
1066 ) -> DeferredDatasetHandle:
1067 """Create a `DeferredDatasetHandle` which can later retrieve a dataset,
1068 after an immediate registry lookup.
1070 Parameters
1071 ----------
1072 datasetRefOrType : `DatasetRef`, `DatasetType`, or `str`
1073 When `DatasetRef` the `dataId` should be `None`.
1074 Otherwise the `DatasetType` or name thereof.
1075 dataId : `dict` or `DataCoordinate`, optional
1076 A `dict` of `Dimension` link name, value pairs that label the
1077 `DatasetRef` within a Collection. When `None`, a `DatasetRef`
1078 should be provided as the first argument.
1079 parameters : `dict`
1080 Additional StorageClass-defined options to control reading,
1081 typically used to efficiently read only a subset of the dataset.
1082 collections : Any, optional
1083 Collections to be searched, overriding ``self.collections``.
1084 Can be any of the types supported by the ``collections`` argument
1085 to butler construction.
1086 storageClass : `StorageClass` or `str`, optional
1087 The storage class to be used to override the Python type
1088 returned by this method. By default the returned type matches
1089 the dataset type definition for this dataset. Specifying a
1090 read `StorageClass` can force a different type to be returned.
1091 This type must be compatible with the original type.
1092 timespan : `Timespan` or `None`, optional
1093 A timespan that the validity range of the dataset must overlap.
1094 If not provided and this is a calibration dataset type, an attempt
1095 will be made to find the timespan from any temporal coordinate
1096 in the data ID.
1097 **kwargs
1098 Additional keyword arguments used to augment or construct a
1099 `DataId`. See `DataId` parameters.
1101 Returns
1102 -------
1103 obj : `DeferredDatasetHandle`
1104 A handle which can be used to retrieve a dataset at a later time.
1106 Raises
1107 ------
1108 LookupError
1109 Raised if no matching dataset exists in the `Registry` or
1110 datastore.
1111 ValueError
1112 Raised if a resolved `DatasetRef` was passed as an input, but it
1113 differs from the one found in the registry.
1114 TypeError
1115 Raised if no collections were provided.
1116 """
1117 if isinstance(datasetRefOrType, DatasetRef):
1118 # Do the quick check first and if that fails, check for artifact
1119 # existence. This is necessary for datastores that are configured
1120 # in trust mode where there won't be a record but there will be
1121 # a file.
1122 if self._datastore.knows(datasetRefOrType) or self._datastore.exists(datasetRefOrType):
1123 ref = datasetRefOrType
1124 else:
1125 raise LookupError(f"Dataset reference {datasetRefOrType} does not exist.")
1126 else:
1127 ref = self._findDatasetRef(
1128 datasetRefOrType, dataId, collections=collections, timespan=timespan, **kwargs
1129 )
1130 return DeferredDatasetHandle(butler=self, ref=ref, parameters=parameters, storageClass=storageClass)
1132 def get(
1133 self,
1134 datasetRefOrType: DatasetRef | DatasetType | str,
1135 /,
1136 dataId: DataId | None = None,
1137 *,
1138 parameters: dict[str, Any] | None = None,
1139 collections: Any = None,
1140 storageClass: StorageClass | str | None = None,
1141 timespan: Timespan | None = None,
1142 **kwargs: Any,
1143 ) -> Any:
1144 """Retrieve a stored dataset.
1146 Parameters
1147 ----------
1148 datasetRefOrType : `DatasetRef`, `DatasetType`, or `str`
1149 When `DatasetRef` the `dataId` should be `None`.
1150 Otherwise the `DatasetType` or name thereof.
1151 If a resolved `DatasetRef`, the associated dataset
1152 is returned directly without additional querying.
1153 dataId : `dict` or `DataCoordinate`
1154 A `dict` of `Dimension` link name, value pairs that label the
1155 `DatasetRef` within a Collection. When `None`, a `DatasetRef`
1156 should be provided as the first argument.
1157 parameters : `dict`
1158 Additional StorageClass-defined options to control reading,
1159 typically used to efficiently read only a subset of the dataset.
1160 collections : Any, optional
1161 Collections to be searched, overriding ``self.collections``.
1162 Can be any of the types supported by the ``collections`` argument
1163 to butler construction.
1164 storageClass : `StorageClass` or `str`, optional
1165 The storage class to be used to override the Python type
1166 returned by this method. By default the returned type matches
1167 the dataset type definition for this dataset. Specifying a
1168 read `StorageClass` can force a different type to be returned.
1169 This type must be compatible with the original type.
1170 timespan : `Timespan` or `None`, optional
1171 A timespan that the validity range of the dataset must overlap.
1172 If not provided and this is a calibration dataset type, an attempt
1173 will be made to find the timespan from any temporal coordinate
1174 in the data ID.
1175 **kwargs
1176 Additional keyword arguments used to augment or construct a
1177 `DataCoordinate`. See `DataCoordinate.standardize`
1178 parameters.
1180 Returns
1181 -------
1182 obj : `object`
1183 The dataset.
1185 Raises
1186 ------
1187 LookupError
1188 Raised if no matching dataset exists in the `Registry`.
1189 TypeError
1190 Raised if no collections were provided.
1192 Notes
1193 -----
1194 When looking up datasets in a `~CollectionType.CALIBRATION` collection,
1195 this method requires that the given data ID include temporal dimensions
1196 beyond the dimensions of the dataset type itself, in order to find the
1197 dataset with the appropriate validity range. For example, a "bias"
1198 dataset with native dimensions ``{instrument, detector}`` could be
1199 fetched with a ``{instrument, detector, exposure}`` data ID, because
1200 ``exposure`` is a temporal dimension.
1201 """
1202 _LOG.debug("Butler get: %s, dataId=%s, parameters=%s", datasetRefOrType, dataId, parameters)
1203 with self._metrics.instrument_get(_LOG, msg="Retrieved dataset"):
1204 ref = self._findDatasetRef(
1205 datasetRefOrType,
1206 dataId,
1207 collections=collections,
1208 datastore_records=True,
1209 timespan=timespan,
1210 **kwargs,
1211 )
1212 return self._datastore.get(ref, parameters=parameters, storageClass=storageClass)
1214 def getURIs(
1215 self,
1216 datasetRefOrType: DatasetRef | DatasetType | str,
1217 /,
1218 dataId: DataId | None = None,
1219 *,
1220 predict: bool = False,
1221 collections: Any = None,
1222 run: str | None = None,
1223 **kwargs: Any,
1224 ) -> DatasetRefURIs:
1225 """Return the URIs associated with the dataset.
1227 Parameters
1228 ----------
1229 datasetRefOrType : `DatasetRef`, `DatasetType`, or `str`
1230 When `DatasetRef` the `dataId` should be `None`.
1231 Otherwise the `DatasetType` or name thereof.
1232 dataId : `dict` or `DataCoordinate`
1233 A `dict` of `Dimension` link name, value pairs that label the
1234 `DatasetRef` within a Collection. When `None`, a `DatasetRef`
1235 should be provided as the first argument.
1236 predict : `bool`
1237 If `True`, allow URIs to be returned of datasets that have not
1238 been written.
1239 collections : Any, optional
1240 Collections to be searched, overriding ``self.collections``.
1241 Can be any of the types supported by the ``collections`` argument
1242 to butler construction.
1243 run : `str`, optional
1244 Run to use for predictions, overriding ``self.run``.
1245 **kwargs
1246 Additional keyword arguments used to augment or construct a
1247 `DataCoordinate`. See `DataCoordinate.standardize`
1248 parameters.
1250 Returns
1251 -------
1252 uris : `DatasetRefURIs`
1253 The URI to the primary artifact associated with this dataset (if
1254 the dataset was disassembled within the datastore this may be
1255 `None`), and the URIs to any components associated with the dataset
1256 artifact. (can be empty if there are no components).
1257 """
1258 ref = self._findDatasetRef(
1259 datasetRefOrType, dataId, predict=predict, run=run, collections=collections, **kwargs
1260 )
1261 return self._datastore.getURIs(ref, predict)
1263 def get_dataset_type(self, name: str) -> DatasetType:
1264 return self._registry.getDatasetType(name)
1266 def get_dataset(
1267 self,
1268 id: DatasetId | str,
1269 *,
1270 storage_class: str | StorageClass | None = None,
1271 dimension_records: bool = False,
1272 datastore_records: bool = False,
1273 ) -> DatasetRef | None:
1274 id = _to_uuid(id)
1275 ref = self._registry.getDataset(id)
1276 if ref is not None:
1277 if dimension_records:
1278 ref = ref.expanded(
1279 self._registry.expandDataId(ref.dataId, dimensions=ref.datasetType.dimensions)
1280 )
1281 if storage_class:
1282 ref = ref.overrideStorageClass(storage_class)
1283 if datastore_records:
1284 ref = self._registry.get_datastore_records(ref)
1285 return ref
1287 def get_many_datasets(self, ids: Iterable[DatasetId | str]) -> list[DatasetRef]:
1288 uuids = [_to_uuid(id) for id in ids]
1289 return self._registry._managers.datasets.get_dataset_refs(uuids)
1291 def find_dataset(
1292 self,
1293 dataset_type: DatasetType | str,
1294 data_id: DataId | None = None,
1295 *,
1296 collections: str | Sequence[str] | None = None,
1297 timespan: Timespan | None = None,
1298 storage_class: str | StorageClass | None = None,
1299 dimension_records: bool = False,
1300 datastore_records: bool = False,
1301 **kwargs: Any,
1302 ) -> DatasetRef | None:
1303 # Handle any parts of the dataID that are not using primary dimension
1304 # keys.
1305 if isinstance(dataset_type, str):
1306 actual_type = self.get_dataset_type(dataset_type)
1307 else:
1308 actual_type = dataset_type
1310 # Store the component for later.
1311 component_name = actual_type.component()
1312 if actual_type.isComponent():
1313 parent_type = actual_type.makeCompositeDatasetType()
1314 else:
1315 parent_type = actual_type
1317 data_id, kwargs = self._rewrite_data_id(data_id, parent_type, **kwargs)
1319 ref = self.registry.findDataset(
1320 parent_type,
1321 data_id,
1322 collections=collections,
1323 timespan=timespan,
1324 datastore_records=datastore_records,
1325 **kwargs,
1326 )
1327 if ref is not None and dimension_records:
1328 ref = ref.expanded(self._registry.expandDataId(ref.dataId, dimensions=ref.datasetType.dimensions))
1329 if ref is not None and component_name:
1330 ref = ref.makeComponentRef(component_name)
1331 if ref is not None and storage_class is not None:
1332 ref = ref.overrideStorageClass(storage_class)
1334 return ref
1336 def retrieve_artifacts_zip(
1337 self,
1338 refs: Iterable[DatasetRef],
1339 destination: ResourcePathExpression,
1340 overwrite: bool = True,
1341 ) -> ResourcePath:
1342 return retrieve_and_zip(refs, destination, self._datastore.retrieveArtifacts, overwrite)
1344 def retrieveArtifacts(
1345 self,
1346 refs: Iterable[DatasetRef],
1347 destination: ResourcePathExpression,
1348 transfer: str = "auto",
1349 preserve_path: bool = True,
1350 overwrite: bool = False,
1351 ) -> list[ResourcePath]:
1352 # Docstring inherited.
1353 outdir = ResourcePath(destination)
1354 artifact_map = self._datastore.retrieveArtifacts(
1355 refs,
1356 outdir,
1357 transfer=transfer,
1358 preserve_path=preserve_path,
1359 overwrite=overwrite,
1360 write_index=True,
1361 )
1362 return list(artifact_map)
1364 def exists(
1365 self,
1366 dataset_ref_or_type: DatasetRef | DatasetType | str,
1367 /,
1368 data_id: DataId | None = None,
1369 *,
1370 full_check: bool = True,
1371 collections: Any = None,
1372 **kwargs: Any,
1373 ) -> DatasetExistence:
1374 # Docstring inherited.
1375 existence = DatasetExistence.UNRECOGNIZED
1377 if isinstance(dataset_ref_or_type, DatasetRef):
1378 if collections is not None:
1379 warnings.warn("Collections should not be specified with DatasetRef", stacklevel=2)
1380 if data_id is not None:
1381 warnings.warn("A DataID should not be specified with DatasetRef", stacklevel=2)
1382 ref = dataset_ref_or_type
1383 registry_ref = self._registry.getDataset(dataset_ref_or_type.id)
1384 if registry_ref is not None:
1385 existence |= DatasetExistence.RECORDED
1387 if dataset_ref_or_type != registry_ref:
1388 # This could mean that storage classes differ, so we should
1389 # check for that but use the registry ref for the rest of
1390 # the method.
1391 if registry_ref.is_compatible_with(dataset_ref_or_type):
1392 # Use the registry version from now on.
1393 ref = registry_ref
1394 else:
1395 raise ValueError(
1396 f"The ref given to exists() ({ref}) has the same dataset ID as one "
1397 f"in registry but has different incompatible values ({registry_ref})."
1398 )
1399 else:
1400 try:
1401 ref = self._findDatasetRef(dataset_ref_or_type, data_id, collections=collections, **kwargs)
1402 except (LookupError, TypeError):
1403 return existence
1404 existence |= DatasetExistence.RECORDED
1406 if self._datastore.knows(ref):
1407 existence |= DatasetExistence.DATASTORE
1409 if full_check:
1410 if self._datastore.exists(ref):
1411 existence |= DatasetExistence._ARTIFACT
1412 elif existence.value != DatasetExistence.UNRECOGNIZED.value:
1413 # Do not add this flag if we have no other idea about a dataset.
1414 existence |= DatasetExistence(DatasetExistence._ASSUMED)
1416 return existence
1418 def _exists_many(
1419 self,
1420 refs: Iterable[DatasetRef],
1421 /,
1422 *,
1423 full_check: bool = True,
1424 ) -> dict[DatasetRef, DatasetExistence]:
1425 # Docstring inherited.
1426 existence = {ref: DatasetExistence.UNRECOGNIZED for ref in refs}
1428 # Check which refs exist in the registry.
1429 id_map = {ref.id: ref for ref in existence.keys()}
1430 for registry_ref in self.get_many_datasets(id_map.keys()):
1431 # Consistency between the given DatasetRef and the information
1432 # recorded in the registry is not verified.
1433 existence[id_map[registry_ref.id]] |= DatasetExistence.RECORDED
1435 # Ask datastore if it knows about these refs.
1436 knows = self._datastore.knows_these(refs)
1437 for ref, known in knows.items():
1438 if known:
1439 existence[ref] |= DatasetExistence.DATASTORE
1441 if full_check:
1442 mexists = self._datastore.mexists(refs)
1443 for ref, exists in mexists.items():
1444 if exists:
1445 existence[ref] |= DatasetExistence._ARTIFACT
1446 else:
1447 # Do not set this flag if nothing is known about the dataset.
1448 for ref in existence:
1449 if existence[ref] != DatasetExistence.UNRECOGNIZED:
1450 existence[ref] |= DatasetExistence._ASSUMED
1452 return existence
1454 def removeRuns(
1455 self,
1456 names: Iterable[str],
1457 unstore: bool | type[_DeprecatedDefault] = _DeprecatedDefault,
1458 *,
1459 unlink_from_chains: bool = False,
1460 ) -> None:
1461 # Docstring inherited.
1462 if not self.isWriteable():
1463 raise TypeError("Butler is read-only.")
1465 if unstore is not _DeprecatedDefault:
1466 # The value was passed in by a user. Must report it is now
1467 # ignored.
1468 if unstore is True:
1469 msg = "The unstore parameter is deprecated and is now always treated as True. "
1470 else:
1471 msg = "The unstore parameter for removeRuns can no longer be False and is now ignored. "
1472 warnings.warn(
1473 msg + " The parameter will be removed after v30.",
1474 category=FutureWarning,
1475 stacklevel=find_outside_stacklevel("lsst.daf.butler"),
1476 )
1478 names = list(names)
1479 refs: list[DatasetRef] = []
1480 # Map of the chained collections to the RUN children.
1481 parents_to_children: dict[str, set[str]] = defaultdict(set)
1483 with self._caching_context():
1484 # Get information about these RUNs.
1485 collections_info = self.collections.query_info(names, include_parents=unlink_from_chains)
1486 for info in collections_info:
1487 if info.type is not CollectionType.RUN:
1488 raise TypeError(f"The collection type of '{info.name}' is {info.type.name}, not RUN.")
1489 if unlink_from_chains:
1490 if info.parents is None: # For mypy.
1491 raise AssertionError("Internal error: Collection parents required but not received")
1492 for parent in info.parents:
1493 parents_to_children[parent].add(info.name)
1495 # Update the names in case the query unexpectedly had a wildcard.
1496 names = [info.name for info in collections_info]
1498 # Get all the datasets from these runs.
1499 refs = self.query_all_datasets(names, find_first=False, limit=None)
1501 # Call pruneDatasets since we are deliberately removing
1502 # datasets in chunks from the RUN collections rather than
1503 # attempting to remove everything at once.
1504 with time_this(
1505 _LOG,
1506 msg="Removing %d dataset%s from %s",
1507 args=(len(refs), "s" if len(refs) != 1 else "", ", ".join(names)),
1508 ):
1509 self.pruneDatasets(refs, unstore=True, purge=True, disassociate=True)
1511 # Now can remove the actual RUN collection and unlink from chains.
1512 with self._registry.transaction():
1513 # This will fail if caller is not unlinking from chains but the
1514 # RUN is in a chain -- but we have already deleted all the datasets
1515 # by this point.
1516 if unlink_from_chains:
1517 # Use deterministic order for deletions to attempt to minimize
1518 # risk of deadlocks for parallel deletes.
1519 for parent in sorted(parents_to_children):
1520 self.collections.remove_from_chain(parent, sorted(parents_to_children[parent]))
1521 # Sort to avoid potential deadlocks.
1522 for name in sorted(names):
1523 # This should be fast since the collection should be empty.
1524 with time_this(_LOG, msg="Removing RUN collection %s", args=(name,)):
1525 self._registry.removeCollection(name)
1526 _LOG.info("Completely removed the following RUN collections: %s", ", ".join(names))
1528 def pruneDatasets(
1529 self,
1530 refs: Iterable[DatasetRef],
1531 *,
1532 disassociate: bool = True,
1533 unstore: bool = False,
1534 tags: Iterable[str] = (),
1535 purge: bool = False,
1536 ) -> None:
1537 # docstring inherited from LimitedButler
1539 if not self.isWriteable():
1540 raise TypeError("Butler is read-only.")
1541 if purge:
1542 if not disassociate:
1543 raise TypeError("Cannot pass purge=True without disassociate=True.")
1544 if not unstore:
1545 raise TypeError("Cannot pass purge=True without unstore=True.")
1546 elif disassociate:
1547 tags = tuple(tags)
1548 if not tags:
1549 raise TypeError("No tags provided but disassociate=True.")
1550 for tag in tags:
1551 collectionType = self._registry.getCollectionType(tag)
1552 if collectionType is not CollectionType.TAGGED:
1553 raise TypeError(
1554 f"Cannot disassociate from collection '{tag}' "
1555 f"of non-TAGGED type {collectionType.name}."
1556 )
1557 # Transform possibly-single-pass iterable into something we can iterate
1558 # over multiple times.
1559 refs = list(refs)
1560 # Pruning a component of a DatasetRef makes no sense since registry
1561 # doesn't know about components and datastore might not store
1562 # components in a separate file
1563 for ref in refs:
1564 if ref.datasetType.component():
1565 raise ValueError(f"Can not prune a component of a dataset (ref={ref})")
1567 # Chunk the deletions using a size that is reasonably efficient whilst
1568 # also giving reasonable feedback to the user. Chunking also minimizes
1569 # what needs to rollback if there is a failure and should allow
1570 # incremental re-running of the pruning (assuming the query is
1571 # repeated). The only issue will be if the Ctrl-C comes during
1572 # emptyTrash since an admin command would need to run to finish the
1573 # emptying of that chunk.
1574 progress = Progress("lsst.daf.butler.Butler.pruneDatasets", level=_LOG.INFO)
1575 chunk_size = 50_000
1576 n_chunks = math.ceil(len(refs) / chunk_size)
1577 if n_chunks > 1:
1578 _LOG.verbose("Pruning a total of %d datasets", len(refs))
1579 chunk_num = 0
1580 for chunked_refs in progress.wrap(
1581 chunk_iterable(refs, chunk_size=chunk_size), desc="Deleting datasets", total=n_chunks
1582 ):
1583 chunk_num += 1
1584 _LOG.verbose(
1585 "Pruning %d dataset%s in chunk %d/%d",
1586 len(chunked_refs),
1587 "s" if len(chunked_refs) != 1 else "",
1588 chunk_num,
1589 n_chunks,
1590 )
1591 with time_this(
1592 _LOG,
1593 msg="Removing %d datasets for chunk %d/%d",
1594 args=(len(chunked_refs), chunk_num, n_chunks),
1595 ):
1596 self._prune_datasets(
1597 chunked_refs, tags=tags, unstore=unstore, purge=purge, disassociate=disassociate
1598 )
1600 def _prune_datasets(
1601 self,
1602 refs: Collection[DatasetRef],
1603 *,
1604 disassociate: bool = True,
1605 unstore: bool = False,
1606 tags: Iterable[str] = (),
1607 purge: bool = False,
1608 ) -> None:
1609 # We don't need an unreliable Datastore transaction for this, because
1610 # we've been extra careful to ensure that Datastore.trash only involves
1611 # mutating the Registry (it can _look_ at Datastore-specific things,
1612 # but shouldn't change them), and hence all operations here are
1613 # Registry operations.
1614 with self.transaction():
1615 plural = "s" if len(refs) != 1 else ""
1616 if unstore:
1617 with time_this(
1618 _LOG,
1619 msg="Marking %d dataset%s for removal during pruneDatasets",
1620 args=(len(refs), plural),
1621 ):
1622 self._datastore.trash(refs)
1623 if purge:
1624 with time_this(
1625 _LOG, msg="Removing %d pruned dataset%s from registry", args=(len(refs), plural)
1626 ):
1627 self._registry.removeDatasets(refs)
1628 elif disassociate:
1629 assert tags, "Guaranteed by earlier logic in this function."
1630 with time_this(
1631 _LOG, msg="Disassociating %d dataset%ss from tagged collections", args=(len(refs), plural)
1632 ):
1633 for tag in tags:
1634 self._registry.disassociate(tag, refs)
1635 # We've exited the Registry transaction, and apparently committed.
1636 # (if there was an exception, everything rolled back, and it's as if
1637 # nothing happened - and we never get here).
1638 # Datastore artifacts are not yet gone, but they're clearly marked
1639 # as trash, so if we fail to delete now because of (e.g.) filesystem
1640 # problems we can try again later, and if manual administrative
1641 # intervention is required, it's pretty clear what that should entail:
1642 # deleting everything on disk and in private Datastore tables that is
1643 # in the dataset_location_trash table.
1644 if unstore:
1645 # Point of no return for removing artifacts. Restrict the trash
1646 # emptying to the refs that this call trashed.
1647 with time_this(
1648 _LOG,
1649 msg="Attempting to remove artifacts for %d dataset%s associated with pruning",
1650 args=(len(refs), plural),
1651 ):
1652 self._datastore.emptyTrash(refs=refs)
1654 def ingest_zip(
1655 self,
1656 zip_file: ResourcePathExpression,
1657 transfer: str = "auto",
1658 *,
1659 transfer_dimensions: bool = False,
1660 dry_run: bool = False,
1661 skip_existing: bool = False,
1662 ) -> None:
1663 # Docstring inherited.
1664 if not self.isWriteable():
1665 raise TypeError("Butler is read-only.")
1667 zip_path = ResourcePath(zip_file)
1668 index = ZipIndex.from_zip_file(zip_path)
1669 _LOG.verbose(
1670 "Ingesting %s containing %d datasets and %d files.", zip_path, len(index.refs), len(index)
1671 )
1673 # Need to ingest the refs into registry. Re-use the standard ingest
1674 # code by reconstructing FileDataset from the index.
1675 refs = index.refs.to_refs(universe=self.dimensions)
1676 id_to_ref = {ref.id: ref for ref in refs}
1677 datasets: list[FileDataset] = []
1678 processed_ids: set[uuid.UUID] = set()
1679 for path_in_zip, index_info in index.artifact_map.items():
1680 # Disassembled composites need to check this ref isn't already
1681 # included.
1682 unprocessed = {id_ for id_ in index_info.ids if id_ not in processed_ids}
1683 if not unprocessed:
1684 continue
1685 dataset = FileDataset(refs=[id_to_ref[id_] for id_ in unprocessed], path=path_in_zip)
1686 datasets.append(dataset)
1687 processed_ids.update(unprocessed)
1689 new_datasets, existing_datasets = self._partition_datasets_by_known(datasets)
1690 if existing_datasets:
1691 if skip_existing:
1692 _LOG.info(
1693 "Skipping %d datasets from zip file %s which already exist in the repository.",
1694 len(existing_datasets),
1695 zip_file,
1696 )
1697 else:
1698 raise ConflictingDefinitionError(
1699 f"Datastore already contains {len(existing_datasets)} of the given datasets."
1700 f" Example: {existing_datasets[0]}"
1701 )
1702 if new_datasets:
1703 # Can not yet support partial zip ingests where a zip contains
1704 # some datasets that are already in another zip.
1705 raise ValueError(
1706 f"The given zip file from {zip_file} contains {len(new_datasets)} datasets not known "
1707 f"to this butler but also contains {len(existing_datasets)} datasets already known to "
1708 "this butler. Currently butler can not ingest zip files with overlapping content."
1709 )
1710 return
1712 # Ingest doesn't create the RUN collections so we have to do that
1713 # here.
1714 #
1715 # Sort by run collection name to ensure Postgres takes locks in the
1716 # same order between different processes, to mitigate an issue
1717 # where Postgres can deadlock due to the unique index on collection
1718 # name. (See DM-47543).
1719 runs = {ref.run for ref in refs}
1720 for run in sorted(runs):
1721 registered = self.collections.register(run)
1722 if registered:
1723 _LOG.verbose("Created RUN collection %s as part of zip ingest", run)
1725 progress = Progress("lsst.daf.butler.Butler.ingest", level=VERBOSE)
1726 import_info = self._prepare_ingest_file_datasets(
1727 datasets, progress, dry_run=dry_run, transfer_dimensions=transfer_dimensions
1728 )
1730 # Calculate some statistics based on the given list of datasets.
1731 n_datasets = 0
1732 for d in datasets:
1733 n_datasets += len(d.refs)
1734 srefs = "s" if n_datasets != 1 else ""
1736 with (
1737 self._metrics.instrument_ingest(
1738 n_datasets, _LOG, msg=f"Ingesting zip file {zip_file} with {n_datasets} dataset{srefs}"
1739 ),
1740 self.transaction(),
1741 ):
1742 # Do not need expanded dataset refs so can ignore the return value.
1743 self._ingest_file_datasets(datasets, import_info, progress, dry_run=dry_run)
1745 try:
1746 self._datastore.ingest_zip(zip_path, transfer=transfer, dry_run=dry_run)
1747 except IntegrityError as e:
1748 raise ConflictingDefinitionError(
1749 f"Datastore already contains one or more datasets: {e}"
1750 ) from e
1752 def _prepare_ingest_file_datasets(
1753 self,
1754 datasets: Sequence[FileDataset],
1755 progress: Progress,
1756 *,
1757 transfer_dimensions: bool = False,
1758 dry_run: bool = False,
1759 ) -> _ImportDatasetsInfo:
1760 # Track DataIDs that are being ingested so we can spot issues early
1761 # with duplication. Retain previous FileDataset so we can report it.
1762 groupedDataIds: MutableMapping[tuple[DatasetType, str], dict[DataCoordinate, FileDataset]] = (
1763 defaultdict(dict)
1764 )
1766 # All the refs we need to import.
1767 refs: list[DatasetRef] = []
1769 for dataset in progress.wrap(datasets, desc="Validating dataIDs"):
1770 for ref in dataset.refs:
1771 group_key = (ref.datasetType, ref.run)
1773 if ref.dataId in groupedDataIds[group_key]:
1774 raise ConflictingDefinitionError(
1775 f"Ingest conflict. Dataset {dataset.path} has same"
1776 " DataId as other ingest dataset"
1777 f" {groupedDataIds[group_key][ref.dataId].path} "
1778 f" ({ref.dataId})"
1779 )
1781 groupedDataIds[group_key][ref.dataId] = dataset
1782 refs.extend(dataset.refs)
1784 # Ensure that dataset types are created and all ref information
1785 # extracted.
1786 import_info = self._prepare_for_import_refs(
1787 self,
1788 refs,
1789 register_dataset_types=True,
1790 dry_run=dry_run,
1791 transfer_dimensions=transfer_dimensions,
1792 )
1793 return import_info
1795 def _ingest_file_datasets(
1796 self,
1797 datasets: Sequence[FileDataset],
1798 import_info: _ImportDatasetsInfo,
1799 progress: Progress,
1800 *,
1801 dry_run: bool = False,
1802 ) -> None:
1803 self._import_dimension_records(import_info.dimension_records, dry_run=dry_run)
1804 imported_refs = self._import_grouped_refs(
1805 import_info.grouped_refs, None, progress, dry_run=dry_run, expand_refs=True
1806 )
1808 # The expanded refs need to be attached back to the original
1809 # FileDatasets for datastore to use.
1810 id_to_ref = {ref.id: ref for ref in imported_refs}
1812 for dataset in progress.wrap(datasets, desc="Re-attaching expanded refs"):
1813 dataset.refs = [id_to_ref[ref.id] for ref in dataset.refs]
1815 def ingest(
1816 self,
1817 *datasets: FileDataset,
1818 transfer: str | None = "auto",
1819 record_validation_info: bool = True,
1820 skip_existing: bool = False,
1821 ) -> None:
1822 # Docstring inherited.
1823 if not datasets:
1824 return
1825 if not self.isWriteable():
1826 raise TypeError("Butler is read-only.")
1827 _LOG.verbose("Ingesting %d file dataset%s.", len(datasets), "" if len(datasets) == 1 else "s")
1828 progress = Progress("lsst.daf.butler.Butler.ingest", level=VERBOSE)
1830 new_datasets, existing_datasets = self._partition_datasets_by_known(datasets)
1831 if existing_datasets:
1832 if skip_existing:
1833 _LOG.info(
1834 "Skipping %d datasets which already exist in the repository.", len(existing_datasets)
1835 )
1836 else:
1837 raise ConflictingDefinitionError(
1838 f"Datastore already contains {len(existing_datasets)} of the given datasets."
1839 f" Example: {existing_datasets[0]}"
1840 )
1842 # Calculate some statistics based on the given list of datasets.
1843 n_files = len(datasets)
1844 n_datasets = 0
1845 for d in datasets:
1846 n_datasets += len(d.refs)
1847 sfiles = "s" if n_files != 1 else ""
1848 srefs = "s" if n_datasets != 1 else ""
1850 # We use `datasets` rather `new_datasets` for the Registry
1851 # portion of this, to let it confirm that everything matches the
1852 # existing datasets.
1853 import_info = self._prepare_ingest_file_datasets(datasets, progress)
1855 with (
1856 self._metrics.instrument_ingest(
1857 n_datasets, _LOG, msg=f"Ingesting {n_files} file{sfiles} with {n_datasets} dataset{srefs}"
1858 ),
1859 self.transaction(),
1860 ):
1861 self._ingest_file_datasets(datasets, import_info, progress)
1863 # Bulk-insert everything into Datastore.
1864 # We do not know if any of the registry entries already existed
1865 # (_importDatasets only complains if they exist but differ).
1866 # The _partition_datasets_by_known logic above should catch most
1867 # instances where we attempt to re-ingest files that were already
1868 # ingested, but a concurrent writer could cause a unique constraint
1869 # violation here.
1870 try:
1871 self._datastore.ingest(
1872 *new_datasets, transfer=transfer, record_validation_info=record_validation_info
1873 )
1874 except IntegrityError as e:
1875 raise ConflictingDefinitionError(
1876 f"Datastore already contains one or more datasets: {e}"
1877 ) from e
1879 def _partition_datasets_by_known(
1880 self, datasets: Iterable[FileDataset]
1881 ) -> tuple[list[FileDataset], list[FileDataset]]:
1882 """Divides the given `FileDataset` objects into two groups: those for
1883 which the Datastore already has an entry, and those for which it does
1884 not.
1885 """
1886 new_datasets = []
1887 existing_datasets = []
1889 refs = itertools.chain.from_iterable(dataset.refs for dataset in datasets)
1890 known_refs = self._datastore.knows_these(refs)
1892 for dataset in datasets:
1893 if any(known_refs[ref] for ref in dataset.refs):
1894 existing_datasets.append(dataset)
1895 else:
1896 new_datasets.append(dataset)
1898 return new_datasets, existing_datasets
1900 @contextlib.contextmanager
1901 def export(
1902 self,
1903 *,
1904 directory: str | None = None,
1905 filename: str | None = None,
1906 format: str | None = None,
1907 transfer: str | None = None,
1908 ) -> Iterator[RepoExportContext]:
1909 # Docstring inherited.
1910 if directory is None and transfer is not None:
1911 raise TypeError("Cannot transfer without providing a directory.")
1912 if transfer == "move":
1913 raise TypeError("Transfer may not be 'move': export is read-only")
1914 if format is None:
1915 if filename is None:
1916 raise TypeError("At least one of 'filename' or 'format' must be provided.")
1917 else:
1918 _, format = os.path.splitext(filename)
1919 if not format:
1920 raise ValueError("Please specify a file extension to determine export format.")
1921 format = format[1:] # Strip leading ".""
1922 elif filename is None:
1923 filename = f"export.{format}"
1924 if directory is not None:
1925 filename = os.path.join(directory, filename)
1926 formats = self._config["repo_transfer_formats"]
1927 if format not in formats:
1928 raise ValueError(f"Unknown export format {format!r}, allowed: {','.join(formats.keys())}")
1929 BackendClass = get_class_of(formats[format, "export"])
1930 with open(filename, "w") as stream:
1931 backend = BackendClass(stream, universe=self.dimensions)
1932 try:
1933 helper = RepoExportContext(self, backend=backend, directory=directory, transfer=transfer)
1934 with self._caching_context():
1935 yield helper
1936 except BaseException:
1937 raise
1938 else:
1939 helper._finish()
1941 def import_(
1942 self,
1943 *,
1944 directory: ResourcePathExpression | None = None,
1945 filename: ResourcePathExpression | TextIO | None = None,
1946 format: str | None = None,
1947 transfer: str | None = None,
1948 skip_dimensions: set | None = None,
1949 record_validation_info: bool = True,
1950 without_datastore: bool = False,
1951 ) -> None:
1952 # Docstring inherited.
1953 if not self.isWriteable():
1954 raise TypeError("Butler is read-only.")
1955 if filename is None and format is not None:
1956 filename = ResourcePath(f"export.{format}", forceAbsolute=False)
1957 if directory is not None:
1958 directory = ResourcePath(directory, forceDirectory=True)
1959 # mypy doesn't think this will work but it does in python >= 3.10.
1960 if isinstance(filename, ResourcePathExpression): # type: ignore
1961 filename = ResourcePath(filename, forceAbsolute=False) # type: ignore
1962 if format is None:
1963 format = filename.getExtension()
1964 if not filename.isabs() and directory is not None:
1965 potential = directory.join(filename)
1966 exists_in_cwd = filename.exists()
1967 exists_in_dir = potential.exists()
1968 if exists_in_cwd and exists_in_dir:
1969 _LOG.warning(
1970 "A relative path for filename was specified (%s) which exists relative to cwd. "
1971 "Additionally, the file exists relative to the given search directory (%s). "
1972 "Using the export file in the given directory.",
1973 filename,
1974 potential,
1975 )
1976 # Given they specified an explicit directory and that
1977 # directory has the export file in it, assume that that
1978 # is what was meant despite the file in cwd.
1979 filename = potential
1980 elif exists_in_dir:
1981 filename = potential
1982 elif not exists_in_cwd and not exists_in_dir:
1983 # Raise early.
1984 raise FileNotFoundError(
1985 f"Export file could not be found in {filename.abspath()} or {potential.abspath()}."
1986 )
1987 elif format is None:
1988 format = ".yaml"
1989 BackendClass: type[RepoImportBackend] = get_class_of(
1990 self._config["repo_transfer_formats"][format]["import"]
1991 )
1993 def doImport(importStream: TextIO | ResourceHandleProtocol) -> None:
1994 with self._caching_context():
1995 backend = BackendClass(importStream, self) # type: ignore[call-arg]
1996 backend.register()
1997 with self.transaction():
1998 backend.load(
1999 datastore=self._datastore if not without_datastore else None,
2000 directory=directory,
2001 transfer=transfer,
2002 skip_dimensions=skip_dimensions,
2003 record_validation_info=record_validation_info,
2004 )
2006 if isinstance(filename, ResourcePath):
2007 # We can not use open() here at the moment because of
2008 # DM-38589 since yaml does stream.read(8192) in a loop.
2009 stream = io.StringIO(filename.read().decode())
2010 doImport(stream)
2011 else:
2012 doImport(filename) # type: ignore
2014 def transfer_dimension_records_from(
2015 self, source_butler: LimitedButler | Butler, source_refs: Iterable[DatasetRef | DataCoordinate]
2016 ) -> None:
2017 # Allowed dimensions in the target butler.
2018 elements = frozenset(element for element in self.dimensions.elements if element.has_own_table)
2020 data_ids = {ref.dataId for ref in source_refs}
2022 dimension_records = self._extract_all_dimension_records_from_data_ids(
2023 source_butler, data_ids, elements
2024 )
2026 # Insert order is important.
2027 for element in self.dimensions.sorted(dimension_records.keys()):
2028 records = [r for r in dimension_records[element].values()]
2029 # Assume that if the record is already present that we can
2030 # use it without having to check that the record metadata
2031 # is consistent.
2032 self._registry.insertDimensionData(element, *records, skip_existing=True)
2033 _LOG.debug("Dimension '%s' -- number of records transferred: %d", element.name, len(records))
2035 def _extract_all_dimension_records_from_data_ids(
2036 self,
2037 source_butler: LimitedButler | Butler,
2038 data_ids: set[DataCoordinate],
2039 allowed_elements: frozenset[DimensionElement],
2040 ) -> dict[DimensionElement, dict[DataCoordinate, DimensionRecord]]:
2041 primary_records = self._extract_dimension_records_from_data_ids(
2042 source_butler, data_ids, allowed_elements
2043 )
2045 additional_records: dict[DimensionElement, dict[DataCoordinate, DimensionRecord]] = defaultdict(dict)
2046 for original_element, record_mapping in primary_records.items():
2047 # Get dimensions that depend on this dimension.
2048 populated_by = self.dimensions.get_elements_populated_by(
2049 self.dimensions[original_element.name] # type: ignore
2050 )
2051 if populated_by:
2052 for element in populated_by:
2053 if element not in allowed_elements:
2054 continue
2055 if element.name == original_element.name:
2056 continue
2058 if element.name in primary_records:
2059 # If this element has already been stored avoid
2060 # re-finding records since that may lead to additional
2061 # spurious records. e.g. visit is populated_by
2062 # visit_detector_region but querying
2063 # visit_detector_region by visit will return all the
2064 # detectors for this visit -- the visit dataId does not
2065 # constrain this.
2066 # To constrain the query the original dataIds would
2067 # have to be scanned.
2068 continue
2070 if record_mapping:
2071 if not isinstance(source_butler, Butler):
2072 raise RuntimeError(
2073 f"Transferring populated_by records like {element.name}"
2074 " requires a full Butler."
2075 )
2077 with source_butler.query() as query:
2078 records = query.join_data_coordinates(record_mapping.keys()).dimension_records(
2079 element.name
2080 )
2081 for record in records:
2082 additional_records[record.definition].setdefault(record.dataId, record)
2084 # The next step is to walk back through the additional records to
2085 # pick up any missing content (such as visit_definition needing to
2086 # know the exposure). Want to ensure we do not request records we
2087 # already have.
2088 missing_data_ids = set()
2089 for record_mapping in additional_records.values():
2090 for data_id in record_mapping.keys():
2091 for dimension in data_id.dimensions.required:
2092 element = source_butler.dimensions[dimension]
2093 dimension_key = data_id.subset(dimension)
2094 if dimension_key not in primary_records[element]:
2095 missing_data_ids.add(dimension_key)
2097 # Fill out the new records. Assume that these new records do not
2098 # also need to carry over additional populated_by records.
2099 secondary_records = self._extract_dimension_records_from_data_ids(
2100 source_butler, missing_data_ids, allowed_elements
2101 )
2103 # Merge the extra sets of records in with the original.
2104 for name, record_mapping in itertools.chain(additional_records.items(), secondary_records.items()):
2105 primary_records[name].update(record_mapping)
2107 return primary_records
2109 def _extract_dimension_records_from_data_ids(
2110 self,
2111 source_butler: LimitedButler | Butler,
2112 data_ids: Iterable[DataCoordinate],
2113 allowed_elements: frozenset[DimensionElement],
2114 ) -> dict[DimensionElement, dict[DataCoordinate, DimensionRecord]]:
2115 dimension_records: dict[DimensionElement, dict[DataCoordinate, DimensionRecord]] = defaultdict(dict)
2117 data_ids = set(data_ids)
2118 if not all(data_id.hasRecords() for data_id in data_ids):
2119 if isinstance(source_butler, Butler):
2120 data_ids = source_butler._expand_data_ids(data_ids)
2121 else:
2122 raise TypeError("Input butler needs to be a full butler to expand DataId.")
2124 for data_id in data_ids:
2125 # If this butler doesn't know about a dimension in the source
2126 # butler things will break later.
2127 for element_name in data_id.dimensions.elements:
2128 record = data_id.records[element_name]
2129 if record is not None and record.definition in allowed_elements:
2130 dimension_records[record.definition].setdefault(record.dataId, record)
2132 return dimension_records
2134 def _prepare_for_import_refs(
2135 self,
2136 source_butler: LimitedButler,
2137 source_refs: Iterable[DatasetRef],
2138 *,
2139 register_dataset_types: bool = False,
2140 transfer_dimensions: bool = False,
2141 dry_run: bool = False,
2142 ) -> _ImportDatasetsInfo:
2143 # Docstring inherited.
2144 if not self.isWriteable() and not dry_run:
2145 raise TypeError("Butler is read-only.")
2147 # Will iterate through the refs multiple times so need to convert
2148 # to a list if this isn't a collection.
2149 if not isinstance(source_refs, collections.abc.Collection):
2150 source_refs = list(source_refs)
2152 original_count = len(source_refs)
2153 log_level = _LOG.INFO if original_count > 1 else _LOG.VERBOSE
2154 _LOG.log(
2155 log_level,
2156 "Importing %d dataset%s into %s",
2157 original_count,
2158 "s" if original_count != 1 else "",
2159 str(self),
2160 )
2162 # Importing requires that we group the refs by dimension group and run
2163 # before doing the import.
2164 source_dataset_types = set()
2165 grouped_refs: defaultdict[_RefGroup, list[DatasetRef]] = defaultdict(list)
2166 for ref in source_refs:
2167 grouped_refs[_RefGroup(ref.datasetType.dimensions, ref.run)].append(ref)
2168 source_dataset_types.add(ref.datasetType)
2170 # Check to see if the dataset type in the source butler has
2171 # the same definition in the target butler and register missing
2172 # ones if requested. Registration must happen outside a transaction.
2173 newly_registered_dataset_types = set()
2174 for datasetType in source_dataset_types:
2175 if register_dataset_types:
2176 # Let this raise immediately if inconsistent. Continuing
2177 # on to find additional inconsistent dataset types
2178 # might result in additional unwanted dataset types being
2179 # registered.
2180 try:
2181 if not dry_run and self._registry.registerDatasetType(datasetType):
2182 newly_registered_dataset_types.add(datasetType)
2183 except ConflictingDefinitionError as e:
2184 # Be safe and require that conversions be bidirectional
2185 # when there are storage class mismatches. This is because
2186 # get() will have to support conversion from source to
2187 # target python type (the source formatter will be
2188 # returning source python type) but there also is an
2189 # expectation that people will want to be able to get() in
2190 # the target using the source python type, which will not
2191 # require conversion for transferred datasets but might
2192 # for target-native types. Additionally, butler.get does
2193 # not know that the formatter will return the wrong
2194 # python type and so will always check that the conversion
2195 # works even though it won't need it.
2196 target_dataset_type = self.get_dataset_type(datasetType.name)
2197 target_compatible_with_source = target_dataset_type.is_compatible_with(datasetType)
2198 source_compatible_with_target = datasetType.is_compatible_with(target_dataset_type)
2199 if not (target_compatible_with_source and source_compatible_with_target):
2200 if target_compatible_with_source:
2201 e.add_note(
2202 "Target dataset type storage class is compatible with source "
2203 "but the reverse is not true."
2204 )
2205 elif source_compatible_with_target:
2206 e.add_note(
2207 "Source dataset type storage class is compatible with target "
2208 "but the reverse is not true."
2209 )
2210 else:
2211 e.add_note("If storage classes differ, please register converters.")
2212 raise
2213 else:
2214 # If the dataset type is missing, let it fail immediately.
2215 target_dataset_type = self.get_dataset_type(datasetType.name)
2216 if target_dataset_type != datasetType:
2217 target_compatible_with_source = target_dataset_type.is_compatible_with(datasetType)
2218 source_compatible_with_target = datasetType.is_compatible_with(target_dataset_type)
2219 # Both conversion directions are currently required.
2220 if not (target_compatible_with_source and source_compatible_with_target):
2221 msg = ""
2222 if target_compatible_with_source:
2223 msg = (
2224 "Target storage class is compatible with the source storage class "
2225 "but the reverse is not true."
2226 )
2227 elif source_compatible_with_target:
2228 msg = (
2229 "Source storage class is compatible with the target storage class"
2230 " but the reverse is not true."
2231 )
2232 else:
2233 msg = "If storage classes differ register converters."
2234 if msg:
2235 msg = f"({msg})"
2236 raise ConflictingDefinitionError(
2237 "Source butler dataset type differs from definition"
2238 f" in target butler: {datasetType} !="
2239 f" {target_dataset_type} {msg}"
2240 )
2241 if newly_registered_dataset_types:
2242 # We may have registered some even if there were inconsistencies
2243 # but should let people know (or else remove them again).
2244 _LOG.verbose(
2245 "Registered the following dataset types in the target Butler: %s",
2246 ", ".join(d.name for d in newly_registered_dataset_types),
2247 )
2248 else:
2249 _LOG.verbose("All required dataset types are known to the target Butler")
2251 dimension_records: dict[DimensionElement, dict[DataCoordinate, DimensionRecord]] = defaultdict(dict)
2252 if transfer_dimensions:
2253 # Collect all the dimension records for these refs.
2254 # All dimensions are to be copied but the list of valid dimensions
2255 # come from this butler's universe.
2256 elements = frozenset(element for element in self.dimensions.elements if element.has_own_table)
2257 dataIds = {ref.dataId for ref in source_refs}
2258 dimension_records = self._extract_all_dimension_records_from_data_ids(
2259 source_butler, dataIds, elements
2260 )
2261 return _ImportDatasetsInfo(grouped_refs, dimension_records)
2263 def _import_dimension_records(
2264 self,
2265 dimension_records: dict[DimensionElement, dict[DataCoordinate, DimensionRecord]],
2266 *,
2267 dry_run: bool,
2268 ) -> None:
2269 """Import dimension records collected during import pre-process."""
2270 if dimension_records and not dry_run:
2271 _LOG.verbose("Ensuring that dimension records exist for transferred datasets.")
2272 # Order matters.
2273 for element in self.dimensions.sorted(dimension_records.keys()):
2274 records = list(dimension_records[element].values())
2275 # Assume that if the record is already present that we can
2276 # use it without having to check that the record metadata
2277 # is consistent.
2278 self._registry.insertDimensionData(element, *records, skip_existing=True)
2280 def _import_grouped_refs(
2281 self,
2282 grouped_refs: defaultdict[_RefGroup, list[DatasetRef]],
2283 source_butler: LimitedButler | None,
2284 progress: Progress,
2285 *,
2286 dry_run: bool = False,
2287 expand_refs: bool = False,
2288 ) -> list[DatasetRef]:
2289 handled_collections: set[str] = set()
2290 n_to_import = 0
2291 all_imported_refs: list[DatasetRef] = []
2292 # Sort by run collection name to ensure Postgres takes locks in the
2293 # same order between different processes, to mitigate an issue
2294 # where Postgres can deadlock due to the unique index on collection
2295 # name. (See DM-47543).
2296 groups = sorted(grouped_refs.items(), key=lambda item: item[0].run)
2297 for (dimension_group, run), refs_to_import in progress.iter_item_chunks(
2298 groups, desc="Importing to registry by run and dataset type"
2299 ):
2300 if run not in handled_collections:
2301 # May need to create output collection. If source butler
2302 # has a registry, ask for documentation string.
2303 run_doc = None
2304 if source_butler is not None and (registry := getattr(source_butler, "registry", None)):
2305 run_doc = registry.getCollectionDocumentation(run)
2306 if not dry_run:
2307 registered = self.collections.register(run, doc=run_doc)
2308 else:
2309 registered = True
2310 handled_collections.add(run)
2311 if registered:
2312 _LOG.verbose("Creating output run %s", run)
2314 n_refs = len(refs_to_import)
2315 n_to_import += n_refs
2316 _LOG.verbose(
2317 "Importing %d ref%s with dimensions %s into run %s",
2318 n_refs,
2319 "" if n_refs == 1 else "s",
2320 dimension_group.names,
2321 run,
2322 )
2324 # Assume we are using UUIDs and the source refs will match
2325 # those imported.
2326 if not dry_run:
2327 imported_refs = self._registry._importDatasets(refs_to_import, expand=expand_refs)
2328 else:
2329 imported_refs = refs_to_import
2331 all_imported_refs.extend(imported_refs)
2333 assert n_to_import == len(all_imported_refs)
2334 _LOG.verbose("Imported %d datasets into destination butler", n_to_import)
2335 return all_imported_refs
2337 def transfer_from(
2338 self,
2339 source_butler: LimitedButler,
2340 source_refs: Iterable[DatasetRef],
2341 transfer: str = "auto",
2342 skip_missing: bool = True,
2343 register_dataset_types: bool = False,
2344 transfer_dimensions: bool = False,
2345 dry_run: bool = False,
2346 ) -> collections.abc.Collection[DatasetRef]:
2347 # Docstring inherited.
2348 source_refs = list(source_refs)
2349 if not self.isWriteable() and not dry_run:
2350 raise TypeError("Butler is read-only.")
2352 progress = Progress("lsst.daf.butler.Butler.transfer_from", level=VERBOSE)
2354 artifact_existence: dict[ResourcePath, bool] = {}
2355 file_transfer_source = source_butler._file_transfer_source
2356 transfer_records = retrieve_file_transfer_records(
2357 file_transfer_source, source_refs, artifact_existence
2358 )
2359 # In some situations the datastore artifact may be missing and we do
2360 # not want that registry entry to be imported. For example, this can
2361 # happen if a file was removed but the dataset was left in the registry
2362 # for provenance, or if a pipeline task didn't create all of the
2363 # possible files in a QuantumBackedButler.
2364 if skip_missing:
2365 original_ids = {ref.id for ref in source_refs}
2366 missing_ids = original_ids - transfer_records.keys()
2367 if missing_ids:
2368 original_count = len(source_refs)
2369 source_refs = [ref for ref in source_refs if ref.id not in missing_ids]
2370 filtered_count = len(source_refs)
2371 n_missing = original_count - filtered_count
2372 _LOG.verbose(
2373 "%d dataset%s removed because the artifact does not exist. Now have %d.",
2374 n_missing,
2375 "" if n_missing == 1 else "s",
2376 filtered_count,
2377 )
2379 import_info = self._prepare_for_import_refs(
2380 source_butler,
2381 source_refs,
2382 register_dataset_types=register_dataset_types,
2383 dry_run=dry_run,
2384 transfer_dimensions=transfer_dimensions,
2385 )
2387 # Do all the importing in a single transaction.
2388 with self.transaction():
2389 self._import_dimension_records(import_info.dimension_records, dry_run=dry_run)
2390 imported_refs = self._import_grouped_refs(
2391 import_info.grouped_refs, source_butler, progress, dry_run=dry_run
2392 )
2394 # Ask the datastore to transfer. The datastore has to check that
2395 # the source datastore is compatible with the target datastore.
2396 _LOG.verbose("Transferring %d datasets from %s", len(transfer_records), file_transfer_source.name)
2397 accepted, rejected = self._datastore.transfer_from(
2398 transfer_records,
2399 imported_refs,
2400 transfer=transfer,
2401 artifact_existence=artifact_existence,
2402 dry_run=dry_run,
2403 )
2404 if rejected:
2405 # For now, accept the registry entries but not the files.
2406 _LOG.warning(
2407 "%d datasets were rejected and %d accepted for transfer.",
2408 len(rejected),
2409 len(accepted),
2410 )
2412 return imported_refs
2414 def validateConfiguration(
2415 self,
2416 logFailures: bool = False,
2417 datasetTypeNames: Iterable[str] | None = None,
2418 ignore: Iterable[str] | None = None,
2419 ) -> None:
2420 # Docstring inherited.
2421 if datasetTypeNames:
2422 datasetTypes = [self.get_dataset_type(name) for name in datasetTypeNames]
2423 else:
2424 datasetTypes = list(self._registry.queryDatasetTypes())
2426 # filter out anything from the ignore list
2427 if ignore:
2428 ignore = set(ignore)
2429 datasetTypes = [
2430 e for e in datasetTypes if e.name not in ignore and e.nameAndComponent()[0] not in ignore
2431 ]
2432 else:
2433 ignore = set()
2435 # For each datasetType that has an instrument dimension, create
2436 # a DatasetRef for each defined instrument
2437 datasetRefs = []
2439 # Find all the registered instruments (if "instrument" is in the
2440 # universe).
2441 instruments: set[str] = set()
2442 if "instrument" in self.dimensions:
2443 instruments = {rec.name for rec in self.query_dimension_records("instrument", explain=False)}
2445 for datasetType in datasetTypes:
2446 if "instrument" in datasetType.dimensions:
2447 # In order to create a conforming dataset ref, create
2448 # fake DataCoordinate values for the non-instrument
2449 # dimensions. The type of the value does not matter here.
2450 dataId = {dim: 1 for dim in datasetType.dimensions.names if dim != "instrument"}
2452 for instrument in instruments:
2453 datasetRef = DatasetRef(
2454 datasetType,
2455 DataCoordinate.standardize(
2456 dataId, instrument=instrument, dimensions=datasetType.dimensions
2457 ),
2458 run="validate",
2459 )
2460 datasetRefs.append(datasetRef)
2462 entities: list[DatasetType | DatasetRef] = []
2463 entities.extend(datasetTypes)
2464 entities.extend(datasetRefs)
2466 datastoreErrorStr = None
2467 try:
2468 self._datastore.validateConfiguration(entities, logFailures=logFailures)
2469 except ValidationError as e:
2470 datastoreErrorStr = str(e)
2472 # Also check that the LookupKeys used by the datastores match
2473 # registry and storage class definitions
2474 keys = self._datastore.getLookupKeys()
2476 failedNames = set()
2477 failedDataId = set()
2478 for key in keys:
2479 if key.name is not None:
2480 if key.name in ignore:
2481 continue
2483 # skip if specific datasetType names were requested and this
2484 # name does not match
2485 if datasetTypeNames and key.name not in datasetTypeNames:
2486 continue
2488 # See if it is a StorageClass or a DatasetType
2489 if key.name in self.storageClasses:
2490 pass
2491 else:
2492 try:
2493 self.get_dataset_type(key.name)
2494 except KeyError:
2495 if logFailures:
2496 _LOG.critical(
2497 "Key '%s' does not correspond to a DatasetType or StorageClass", key
2498 )
2499 failedNames.add(key)
2500 else:
2501 # Dimensions are checked for consistency when the Butler
2502 # is created and rendezvoused with a universe.
2503 pass
2505 # Check that the instrument is a valid instrument
2506 # Currently only support instrument so check for that
2507 if key.dataId:
2508 dataIdKeys = set(key.dataId)
2509 if {"instrument"} != dataIdKeys:
2510 if logFailures:
2511 _LOG.critical("Key '%s' has unsupported DataId override", key)
2512 failedDataId.add(key)
2513 elif key.dataId["instrument"] not in instruments:
2514 if logFailures:
2515 _LOG.critical("Key '%s' has unknown instrument", key)
2516 failedDataId.add(key)
2518 messages = []
2520 if datastoreErrorStr:
2521 messages.append(datastoreErrorStr)
2523 for failed, msg in (
2524 (failedNames, "Keys without corresponding DatasetType or StorageClass entry: "),
2525 (failedDataId, "Keys with bad DataId entries: "),
2526 ):
2527 if failed:
2528 msg += ", ".join(str(k) for k in failed)
2529 messages.append(msg)
2531 if messages:
2532 raise ValidationError(";\n".join(messages))
2534 @property
2535 @deprecated(
2536 "Please use 'collections' instead. collection_chains will be removed after v28.",
2537 version="v28",
2538 category=FutureWarning,
2539 )
2540 def collection_chains(self) -> DirectButlerCollections:
2541 """Object with methods for modifying collection chains."""
2542 return DirectButlerCollections(self._registry)
2544 @property
2545 def collections(self) -> DirectButlerCollections:
2546 """Object with methods for modifying and inspecting collections."""
2547 return DirectButlerCollections(self._registry)
2549 @property
2550 def run(self) -> str | None:
2551 """Name of the run this butler writes outputs to by default (`str` or
2552 `None`).
2554 This is an alias for ``self.registry.defaults.run``. It cannot be set
2555 directly in isolation, but all defaults may be changed together by
2556 assigning a new `RegistryDefaults` instance to
2557 ``self.registry.defaults``.
2558 """
2559 return self._registry.defaults.run
2561 @property
2562 def registry(self) -> Registry:
2563 """The object that manages dataset metadata and relationships
2564 (`Registry`).
2566 Many operations that don't involve reading or writing butler datasets
2567 are accessible only via `Registry` methods. Eventually these methods
2568 will be replaced by equivalent `Butler` methods.
2569 """
2570 return RegistryShim(self)
2572 @property
2573 def dimensions(self) -> DimensionUniverse:
2574 # Docstring inherited.
2575 return self._registry.dimensions
2577 def query(self) -> contextlib.AbstractContextManager[Query]:
2578 # Docstring inherited.
2579 return self._registry._query()
2581 def _query_driver(
2582 self,
2583 default_collections: Iterable[str],
2584 default_data_id: DataCoordinate,
2585 ) -> contextlib.AbstractContextManager[DirectQueryDriver]:
2586 """Set up a QueryDriver instance for use with this Butler. Although
2587 this is marked as a private method, it is also used by Butler server.
2588 """
2589 return self._registry._query_driver(default_collections, default_data_id)
2591 @contextlib.contextmanager
2592 def _query_all_datasets_by_page(
2593 self, args: QueryAllDatasetsParameters
2594 ) -> Iterator[Iterator[list[DatasetRef]]]:
2595 with self.query() as query:
2596 pages = query_all_datasets(self, query, args)
2597 yield iter(page.data for page in pages)
2599 def _preload_cache(self, *, load_dimension_record_cache: bool = True) -> None:
2600 """Immediately load caches that are used for common operations."""
2601 self._registry.preload_cache(load_dimension_record_cache=load_dimension_record_cache)
2603 def _expand_data_ids(self, data_ids: Iterable[DataCoordinate]) -> list[DataCoordinate]:
2604 return self._registry.expand_data_ids(data_ids)
2606 _config: ButlerConfig
2607 """Configuration for this Butler instance."""
2609 _registry: SqlRegistry
2610 """The object that manages dataset metadata and relationships
2611 (`SqlRegistry`).
2613 Most operations that don't involve reading or writing butler datasets are
2614 accessible only via `SqlRegistry` methods.
2615 """
2617 storageClasses: StorageClassFactory
2618 """An object that maps known storage class names to objects that fully
2619 describe them (`StorageClassFactory`).
2620 """
2622 _closed: bool
2623 """`True` if close() has already been called on this instance; `False`
2624 otherwise.
2625 """
2628class _RefGroup(NamedTuple):
2629 """Key identifying a batch of DatasetRefs to be inserted in
2630 `Butler.transfer_from`.
2631 """
2633 dimensions: DimensionGroup
2634 run: str
2637class _ImportDatasetsInfo(NamedTuple):
2638 """Information extracted from datasets to be imported."""
2640 grouped_refs: defaultdict[_RefGroup, list[DatasetRef]]
2641 dimension_records: dict[DimensionElement, dict[DataCoordinate, DimensionRecord]]
2644def _to_uuid(id: DatasetId | str) -> uuid.UUID:
2645 if isinstance(id, uuid.UUID):
2646 return id
2647 else:
2648 return uuid.UUID(id)
2651class _ButlerClosed:
2652 def __getattr__(self, name: str) -> Any:
2653 raise RuntimeError("Attempted to use a Butler instance which has been closed.")
2656_BUTLER_CLOSED_INSTANCE: Any = _ButlerClosed()
2659def _retrieve_dataset_type(registry: SqlRegistry, name: str) -> DatasetType | None:
2660 """Return DatasetType defined in registry given dataset type name."""
2661 try:
2662 return registry.getDatasetType(name)
2663 except MissingDatasetTypeError:
2664 return None