Coverage for python / lsst / daf / butler / datastores / chainedDatastore.py: 0%
541 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-24 08:17 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-24 08:17 +0000
1# This file is part of daf_butler.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28"""Chained datastore."""
30from __future__ import annotations
32__all__ = ("ChainedDatastore",)
34import itertools
35import time
36import warnings
37from collections.abc import Callable, Collection, Iterable, Mapping, Sequence
38from typing import TYPE_CHECKING, Any
40from lsst.daf.butler import DatasetRef, DatasetTypeNotSupportedError, DimensionUniverse, FileDataset
41from lsst.daf.butler.datastore import (
42 DatasetRefURIs,
43 Datastore,
44 DatastoreConfig,
45 DatastoreOpaqueTable,
46 DatastoreValidationError,
47)
48from lsst.daf.butler.datastore.constraints import Constraints
49from lsst.daf.butler.datastore.record_data import DatastoreRecordData
50from lsst.daf.butler.datastores.file_datastore.get import DatasetLocationInformation
51from lsst.daf.butler.datastores.file_datastore.retrieve_artifacts import ArtifactIndexInfo, ZipIndex
52from lsst.resources import ResourcePath
53from lsst.utils import doImportType
54from lsst.utils.introspection import get_full_type_name
55from lsst.utils.logging import getLogger
57from .._dataset_ref import DatasetId
58from ..datastore import FileTransferMap
60if TYPE_CHECKING:
61 from lsst.daf.butler import Config, DatasetProvenance, DatasetType, LookupKey, StorageClass
62 from lsst.daf.butler.registry.interfaces import DatasetIdRef, DatastoreRegistryBridgeManager
63 from lsst.resources import ResourcePathExpression
65log = getLogger(__name__)
68class _IngestPrepData(Datastore.IngestPrepData):
69 """Helper class for ChainedDatastore ingest implementation.
71 Parameters
72 ----------
73 children : `list` of `tuple`
74 Pairs of `Datastore`, `IngestPrepData` for all child datastores.
75 """
77 def __init__(self, children: list[tuple[Datastore, Datastore.IngestPrepData, set[ResourcePath]]]):
78 super().__init__(itertools.chain.from_iterable(data.refs.values() for _, data, _ in children))
79 self.children = children
82class ChainedDatastore(Datastore):
83 """Chained Datastores to allow read and writes from multiple datastores.
85 A ChainedDatastore is configured with multiple datastore configurations.
86 A ``put()`` is always sent to each datastore. A ``get()``
87 operation is sent to each datastore in turn and the first datastore
88 to return a valid dataset is used.
90 Parameters
91 ----------
92 config : `DatastoreConfig` or `str`
93 Configuration. This configuration must include a ``datastores`` field
94 as a sequence of datastore configurations. The order in this sequence
95 indicates the order to use for read operations.
96 bridgeManager : `DatastoreRegistryBridgeManager`
97 Object that manages the interface between `Registry` and datastores.
98 datastores : `list` [`Datastore`]
99 All the child datastores known to this datastore.
101 Notes
102 -----
103 ChainedDatastore never supports `None` or ``"move"`` as an ingest transfer
104 mode. It supports ``"copy"``, ``"symlink"``, ``"relsymlink"``
105 and ``"hardlink"`` if and only if all its child datastores do.
106 """
108 defaultConfigFile = "datastores/chainedDatastore.yaml"
109 """Path to configuration defaults. Accessed within the ``configs`` resource
110 or relative to a search path. Can be None if no defaults specified.
111 """
113 containerKey = "datastores"
114 """Key to specify where child datastores are configured."""
116 datastores: list[Datastore]
117 """All the child datastores known to this datastore."""
119 datastoreConstraints: Sequence[Constraints | None]
120 """Constraints to be applied to each of the child datastores."""
122 universe: DimensionUniverse
123 """Dimension universe associated with the butler."""
125 @classmethod
126 def setConfigRoot(cls, root: str, config: Config, full: Config, overwrite: bool = True) -> None:
127 """Set any filesystem-dependent config options for child Datastores to
128 be appropriate for a new empty repository with the given root.
130 Parameters
131 ----------
132 root : `str`
133 Filesystem path to the root of the data repository.
134 config : `Config`
135 A `Config` to update. Only the subset understood by
136 this component will be updated. Will not expand
137 defaults.
138 full : `Config`
139 A complete config with all defaults expanded that can be
140 converted to a `DatastoreConfig`. Read-only and will not be
141 modified by this method.
142 Repository-specific options that should not be obtained
143 from defaults when Butler instances are constructed
144 should be copied from ``full`` to ``config``.
145 overwrite : `bool`, optional
146 If `False`, do not modify a value in ``config`` if the value
147 already exists. Default is always to overwrite with the provided
148 ``root``.
150 Notes
151 -----
152 If a keyword is explicitly defined in the supplied ``config`` it
153 will not be overridden by this method if ``overwrite`` is `False`.
154 This allows explicit values set in external configs to be retained.
155 """
156 # Extract the part of the config we care about updating
157 datastoreConfig = DatastoreConfig(config, mergeDefaults=False)
159 # And the subset of the full config that we can use for reference.
160 # Do not bother with defaults because we are told this already has
161 # them.
162 fullDatastoreConfig = DatastoreConfig(full, mergeDefaults=False)
164 # Loop over each datastore config and pass the subsets to the
165 # child datastores to process.
167 containerKey = cls.containerKey
168 for idx, (child, fullChild) in enumerate(
169 zip(datastoreConfig[containerKey], fullDatastoreConfig[containerKey], strict=True)
170 ):
171 childConfig = DatastoreConfig(child, mergeDefaults=False)
172 fullChildConfig = DatastoreConfig(fullChild, mergeDefaults=False)
173 datastoreClass = doImportType(fullChildConfig["cls"])
174 if not issubclass(datastoreClass, Datastore):
175 raise TypeError(f"Imported child class {fullChildConfig['cls']} is not a Datastore")
176 newroot = f"{root}/{datastoreClass.__qualname__}_{idx}"
177 datastoreClass.setConfigRoot(newroot, childConfig, fullChildConfig, overwrite=overwrite)
179 # Reattach to parent
180 datastoreConfig[containerKey, idx] = childConfig
182 # Reattach modified datastore config to parent
183 # If this has a datastore key we attach there, otherwise we assume
184 # this information goes at the top of the config hierarchy.
185 if DatastoreConfig.component in config:
186 config[DatastoreConfig.component] = datastoreConfig
187 else:
188 config.update(datastoreConfig)
190 return
192 def __init__(
193 self,
194 config: DatastoreConfig,
195 bridgeManager: DatastoreRegistryBridgeManager,
196 datastores: list[Datastore],
197 ):
198 super().__init__(config, bridgeManager)
200 self.datastores = list(datastores)
202 # Name ourself based on our children
203 if self.datastores:
204 # We must set the names explicitly
205 self._names = [d.name for d in self.datastores]
206 childNames = ",".join(self.names)
207 else:
208 childNames = f"(empty@{time.time()})"
209 self._names = [childNames]
210 self.name = f"{type(self).__qualname__}[{childNames}]"
212 # We declare we are ephemeral if all our child datastores declare
213 # they are ephemeral
214 self.isEphemeral = all(d.isEphemeral for d in self.datastores)
216 # per-datastore override constraints
217 if "datastore_constraints" in self.config:
218 overrides = self.config["datastore_constraints"]
220 if len(overrides) != len(self.datastores):
221 raise DatastoreValidationError(
222 f"Number of registered datastores ({len(self.datastores)})"
223 " differs from number of constraints overrides"
224 f" {len(overrides)}"
225 )
227 self.datastoreConstraints = [
228 Constraints(c.get("constraints"), universe=bridgeManager.universe) for c in overrides
229 ]
231 else:
232 self.datastoreConstraints = (None,) * len(self.datastores)
234 self.universe = bridgeManager.universe
236 log.debug("Created %s (%s)", self.name, ("ephemeral" if self.isEphemeral else "permanent"))
238 @classmethod
239 def _create_from_config(
240 cls,
241 config: DatastoreConfig,
242 bridgeManager: DatastoreRegistryBridgeManager,
243 butlerRoot: ResourcePathExpression | None,
244 ) -> ChainedDatastore:
245 # Scan for child datastores and instantiate them with the same registry
246 datastores = []
247 for c in config["datastores"]:
248 c = DatastoreConfig(c)
249 datastoreType = doImportType(c["cls"])
250 if not issubclass(datastoreType, Datastore):
251 raise TypeError(f"Imported child class {c['cls']} is not a Datastore")
252 datastore = datastoreType._create_from_config(c, bridgeManager, butlerRoot=butlerRoot)
253 log.debug("Creating child datastore %s", datastore.name)
254 datastores.append(datastore)
256 return ChainedDatastore(config, bridgeManager, datastores)
258 def clone(self, bridgeManager: DatastoreRegistryBridgeManager) -> Datastore:
259 datastores = [ds.clone(bridgeManager) for ds in self.datastores]
260 return ChainedDatastore(self.config, bridgeManager, datastores)
262 @property
263 def names(self) -> tuple[str, ...]:
264 return tuple(self._names)
266 @property
267 def roots(self) -> dict[str, ResourcePath | None]:
268 # Docstring inherited.
269 roots = {}
270 for datastore in self.datastores:
271 roots.update(datastore.roots)
272 return roots
274 def __str__(self) -> str:
275 chainName = ", ".join(str(ds) for ds in self.datastores)
276 return chainName
278 def _set_trust_mode(self, mode: bool) -> None:
279 for datastore in self.datastores:
280 datastore._set_trust_mode(mode)
282 def knows(self, ref: DatasetRef) -> bool:
283 """Check if the dataset is known to any of the datastores.
285 Does not check for existence of any artifact.
287 Parameters
288 ----------
289 ref : `DatasetRef`
290 Reference to the required dataset.
292 Returns
293 -------
294 exists : `bool`
295 `True` if the dataset is known to the datastore.
296 """
297 for datastore in self.datastores:
298 if datastore.knows(ref):
299 log.debug("%s known to datastore %s", ref, datastore.name)
300 return True
301 return False
303 def knows_these(self, refs: Iterable[DatasetRef]) -> dict[DatasetRef, bool]:
304 # Docstring inherited from the base class.
305 refs_known: dict[DatasetRef, bool] = {}
306 for datastore in self.datastores:
307 refs_known.update(datastore.knows_these(refs))
309 # No need to check in next datastore for refs that are known.
310 # We only update entries that were initially False.
311 refs = [ref for ref, known in refs_known.items() if not known]
313 return refs_known
315 def mexists(
316 self, refs: Iterable[DatasetRef], artifact_existence: dict[ResourcePath, bool] | None = None
317 ) -> dict[DatasetRef, bool]:
318 """Check the existence of multiple datasets at once.
320 Parameters
321 ----------
322 refs : `~collections.abc.Iterable` of `DatasetRef`
323 The datasets to be checked.
324 artifact_existence : `dict` [`lsst.resources.ResourcePath`, `bool`]
325 Optional mapping of datastore artifact to existence. Updated by
326 this method with details of all artifacts tested. Can be `None`
327 if the caller is not interested.
329 Returns
330 -------
331 existence : `dict` of [`DatasetRef`, `bool`]
332 Mapping from dataset to boolean indicating existence in any
333 of the child datastores.
334 """
335 dataset_existence: dict[DatasetRef, bool] = {}
336 for datastore in self.datastores:
337 dataset_existence.update(datastore.mexists(refs, artifact_existence=artifact_existence))
339 # For next datastore no point asking about ones we know
340 # exist already. No special exemption for ephemeral datastores.
341 refs = [ref for ref, exists in dataset_existence.items() if not exists]
343 return dataset_existence
345 def exists(self, ref: DatasetRef) -> bool:
346 """Check if the dataset exists in one of the datastores.
348 Parameters
349 ----------
350 ref : `DatasetRef`
351 Reference to the required dataset.
353 Returns
354 -------
355 exists : `bool`
356 `True` if the entity exists in one of the child datastores.
357 """
358 for datastore in self.datastores:
359 if datastore.exists(ref):
360 log.debug("Found %s in datastore %s", ref, datastore.name)
361 return True
362 return False
364 def get(
365 self,
366 ref: DatasetRef,
367 parameters: Mapping[str, Any] | None = None,
368 storageClass: StorageClass | str | None = None,
369 ) -> Any:
370 """Load an InMemoryDataset from the store.
372 The dataset is returned from the first datastore that has
373 the dataset.
375 Parameters
376 ----------
377 ref : `DatasetRef`
378 Reference to the required Dataset.
379 parameters : `dict`
380 `StorageClass`-specific parameters that specify, for example,
381 a slice of the dataset to be loaded.
382 storageClass : `StorageClass` or `str`, optional
383 The storage class to be used to override the Python type
384 returned by this method. By default the returned type matches
385 the dataset type definition for this dataset. Specifying a
386 read `StorageClass` can force a different type to be returned.
387 This type must be compatible with the original type.
389 Returns
390 -------
391 inMemoryDataset : `object`
392 Requested dataset or slice thereof as an InMemoryDataset.
394 Raises
395 ------
396 FileNotFoundError
397 Requested dataset can not be retrieved.
398 TypeError
399 Return value from formatter has unexpected type.
400 ValueError
401 Formatter failed to process the dataset.
402 """
403 for datastore in self.datastores:
404 try:
405 inMemoryObject = datastore.get(ref, parameters, storageClass=storageClass)
406 log.debug("Found dataset %s in datastore %s", ref, datastore.name)
407 return inMemoryObject
408 except FileNotFoundError:
409 pass
411 raise FileNotFoundError(f"Dataset {ref} could not be found in any of the datastores")
413 def prepare_get_for_external_client(self, ref: DatasetRef) -> list[DatasetLocationInformation] | None:
414 datastore = self._get_matching_datastore(ref)
415 if datastore is None:
416 return None
418 return datastore.prepare_get_for_external_client(ref)
420 def _get_matching_datastore(self, ref: DatasetRef) -> Datastore | None:
421 """Return the first child datastore that owns the specified dataset."""
422 for datastore in self.datastores:
423 if datastore.knows(ref):
424 return datastore
426 return None
428 def put(self, inMemoryDataset: Any, ref: DatasetRef, provenance: DatasetProvenance | None = None) -> None:
429 """Write a InMemoryDataset with a given `DatasetRef` to each
430 datastore.
432 The put() to child datastores can fail with
433 `DatasetTypeNotSupportedError`. The put() for this datastore will be
434 deemed to have succeeded so long as at least one child datastore
435 accepted the inMemoryDataset.
437 Parameters
438 ----------
439 inMemoryDataset : `object`
440 The dataset to store.
441 ref : `DatasetRef`
442 Reference to the associated Dataset.
443 provenance : `DatasetProvenance` or `None`, optional
444 Any provenance that should be attached to the serialized dataset.
445 Not supported by all serialization mechanisms.
447 Raises
448 ------
449 TypeError
450 Supplied object and storage class are inconsistent.
451 DatasetTypeNotSupportedError
452 All datastores reported `DatasetTypeNotSupportedError`.
453 """
454 log.debug("Put %s", ref)
456 # Confirm that we can accept this dataset
457 if not self.constraints.isAcceptable(ref):
458 # Raise rather than use boolean return value.
459 raise DatasetTypeNotSupportedError(
460 f"Dataset {ref} has been rejected by this datastore via configuration."
461 )
463 isPermanent = False
464 nsuccess = 0
465 npermanent = 0
466 nephemeral = 0
467 for datastore, constraints in zip(self.datastores, self.datastoreConstraints, strict=True):
468 if (
469 constraints is not None and not constraints.isAcceptable(ref)
470 ) or not datastore.constraints.isAcceptable(ref):
471 log.debug("Datastore %s skipping put via configuration for ref %s", datastore.name, ref)
472 continue
474 if datastore.isEphemeral:
475 nephemeral += 1
476 else:
477 npermanent += 1
478 try:
479 datastore.put(inMemoryDataset, ref, provenance=provenance)
480 nsuccess += 1
481 if not datastore.isEphemeral:
482 isPermanent = True
483 except DatasetTypeNotSupportedError:
484 pass
486 if nsuccess == 0:
487 raise DatasetTypeNotSupportedError(f"None of the chained datastores supported ref {ref}")
489 if not isPermanent and npermanent > 0:
490 warnings.warn(f"Put of {ref} only succeeded in ephemeral databases", stacklevel=2)
492 if self._transaction is not None:
493 self._transaction.registerUndo("put", self.remove, ref)
495 def put_new(self, in_memory_dataset: Any, ref: DatasetRef) -> Mapping[str, DatasetRef]:
496 # Docstring inherited from base class.
497 log.debug("Put %s", ref)
499 # Confirm that we can accept this dataset
500 if not self.constraints.isAcceptable(ref):
501 # Raise rather than use boolean return value.
502 raise DatasetTypeNotSupportedError(
503 f"Dataset {ref} has been rejected by this datastore via configuration."
504 )
506 isPermanent = False
507 nsuccess = 0
508 npermanent = 0
509 nephemeral = 0
510 stored_refs: dict[str, DatasetRef] = {}
511 for datastore, constraints in zip(self.datastores, self.datastoreConstraints, strict=True):
512 if (
513 constraints is not None and not constraints.isAcceptable(ref)
514 ) or not datastore.constraints.isAcceptable(ref):
515 log.debug("Datastore %s skipping put via configuration for ref %s", datastore.name, ref)
516 continue
518 if datastore.isEphemeral:
519 nephemeral += 1
520 else:
521 npermanent += 1
522 try:
523 stored_ref_map = datastore.put_new(in_memory_dataset, ref)
524 stored_refs.update(stored_ref_map)
525 nsuccess += 1
526 if not datastore.isEphemeral:
527 isPermanent = True
528 except DatasetTypeNotSupportedError:
529 pass
531 if nsuccess == 0:
532 raise DatasetTypeNotSupportedError(f"None of the chained datastores supported ref {ref}")
534 if not isPermanent and npermanent > 0:
535 warnings.warn(f"Put of {ref} only succeeded in ephemeral databases", stacklevel=2)
537 if self._transaction is not None:
538 self._transaction.registerUndo("put", self.remove, ref)
540 return stored_refs
542 def _overrideTransferMode(self, *datasets: Any, transfer: str | None = None) -> str | None:
543 # Docstring inherited from base class.
544 if transfer != "auto":
545 return transfer
546 # Ask each datastore what they think auto means
547 transfers = {d._overrideTransferMode(*datasets, transfer=transfer) for d in self.datastores}
549 # Remove any untranslated "auto" values
550 transfers.discard(transfer)
552 if len(transfers) == 1:
553 return transfers.pop()
554 if not transfers:
555 # Everything reported "auto"
556 return transfer
558 raise RuntimeError(
559 "Chained datastore does not yet support different transfer modes"
560 f" from 'auto' in each child datastore (wanted {transfers})"
561 )
563 def _prepIngest(self, *datasets: FileDataset, transfer: str | None = None) -> _IngestPrepData:
564 # Docstring inherited from Datastore._prepIngest.
565 def isDatasetAcceptable(dataset: FileDataset, *, name: str, constraints: Constraints) -> bool:
566 acceptable = [ref for ref in dataset.refs if constraints.isAcceptable(ref)]
567 if not acceptable:
568 log.debug(
569 "Datastore %s skipping ingest via configuration for refs %s",
570 name,
571 ", ".join(str(ref) for ref in dataset.refs),
572 )
573 return False
574 else:
575 return True
577 # Filter down to just datasets the chained datastore's own
578 # configuration accepts.
579 okForParent: list[FileDataset] = [
580 dataset
581 for dataset in datasets
582 if isDatasetAcceptable(dataset, name=self.name, constraints=self.constraints)
583 ]
585 # Iterate over nested datastores and call _prepIngest on each.
586 # Save the results to a list:
587 children: list[tuple[Datastore, Datastore.IngestPrepData, set[ResourcePath]]] = []
588 # ...and remember whether all of the failures are due to
589 # NotImplementedError being raised.
590 allFailuresAreNotImplementedError = True
591 for datastore, constraints in zip(self.datastores, self.datastoreConstraints, strict=True):
592 okForChild: list[FileDataset]
593 if constraints is not None:
594 okForChild = [
595 dataset
596 for dataset in okForParent
597 if isDatasetAcceptable(dataset, name=datastore.name, constraints=constraints)
598 ]
599 else:
600 okForChild = okForParent
601 try:
602 prepDataForChild = datastore._prepIngest(*okForChild, transfer=transfer)
603 except NotImplementedError:
604 log.debug(
605 "Skipping ingest for datastore %s because transfer mode %s is not supported.",
606 datastore.name,
607 transfer,
608 )
609 continue
610 allFailuresAreNotImplementedError = False
611 if okForChild:
612 # Do not store for later if a datastore has rejected
613 # everything.
614 # Include the source paths if this is a "move". It's clearer
615 # to find the paths now rather than try to infer how
616 # each datastore has stored them in the internal prep class.
617 paths = (
618 {ResourcePath(dataset.path, forceDirectory=False) for dataset in okForChild}
619 if transfer == "move"
620 else set()
621 )
622 children.append((datastore, prepDataForChild, paths))
623 if allFailuresAreNotImplementedError:
624 raise NotImplementedError(f"No child datastore supports transfer mode {transfer}.")
625 return _IngestPrepData(children=children)
627 def _finishIngest(
628 self,
629 prepData: _IngestPrepData,
630 *,
631 transfer: str | None = None,
632 record_validation_info: bool = True,
633 ) -> None:
634 # Docstring inherited from Datastore._finishIngest.
635 # For "move" we must use "copy" and then delete the input
636 # data at the end. This has no rollback option if the ingest
637 # subsequently fails. If there is only one active datastore
638 # accepting any files we can leave it as "move"
639 actual_transfer: str | None
640 if transfer == "move" and len(prepData.children) > 1:
641 actual_transfer = "copy"
642 else:
643 actual_transfer = transfer
644 to_be_deleted: set[ResourcePath] = set()
645 for datastore, prepDataForChild, paths in prepData.children:
646 datastore._finishIngest(
647 prepDataForChild, transfer=actual_transfer, record_validation_info=record_validation_info
648 )
649 to_be_deleted.update(paths)
650 if actual_transfer != transfer:
651 # These datasets were copied but now need to be deleted.
652 # This can not be rolled back.
653 for uri in to_be_deleted:
654 uri.remove()
656 def getManyURIs(
657 self,
658 refs: Iterable[DatasetRef],
659 predict: bool = False,
660 allow_missing: bool = False,
661 ) -> dict[DatasetRef, DatasetRefURIs]:
662 # Docstring inherited
664 uris: dict[DatasetRef, DatasetRefURIs] = {}
665 ephemeral_uris: dict[DatasetRef, DatasetRefURIs] = {}
667 missing_refs = set(refs)
669 # If predict is True we don't want to predict a dataset in the first
670 # datastore if it actually exists in a later datastore, so in that
671 # case check all datastores with predict=False first, and then try
672 # again with predict=True.
673 for p in (False, True) if predict else (False,):
674 if not missing_refs:
675 break
676 for datastore in self.datastores:
677 try:
678 got_uris = datastore.getManyURIs(missing_refs, p, allow_missing=True)
679 except NotImplementedError:
680 # some datastores may not implement generating URIs
681 continue
682 if datastore.isEphemeral:
683 # Only use these as last resort so do not constrain
684 # subsequent queries.
685 ephemeral_uris.update(got_uris)
686 continue
688 missing_refs -= got_uris.keys()
689 uris.update(got_uris)
690 if not missing_refs:
691 break
693 if missing_refs and ephemeral_uris:
694 ephemeral_refs = missing_refs.intersection(ephemeral_uris.keys())
695 for ref in ephemeral_refs:
696 uris[ref] = ephemeral_uris[ref]
697 missing_refs.remove(ref)
699 if missing_refs and not allow_missing:
700 raise FileNotFoundError(f"Dataset(s) {missing_refs} not in this datastore.")
702 return uris
704 def getURIs(self, ref: DatasetRef, predict: bool = False) -> DatasetRefURIs:
705 """Return URIs associated with dataset.
707 Parameters
708 ----------
709 ref : `DatasetRef`
710 Reference to the required dataset.
711 predict : `bool`, optional
712 If the datastore does not know about the dataset, controls whether
713 it should return a predicted URI or not.
715 Returns
716 -------
717 uris : `DatasetRefURIs`
718 The URI to the primary artifact associated with this dataset (if
719 the dataset was disassembled within the datastore this may be
720 `None`), and the URIs to any components associated with the dataset
721 artifact. (can be empty if there are no components).
723 Notes
724 -----
725 The returned URI is from the first datastore in the list that has
726 the dataset with preference given to the first dataset coming from
727 a permanent datastore. If no datastores have the dataset and prediction
728 is allowed, the predicted URI for the first datastore in the list will
729 be returned.
730 """
731 log.debug("Requesting URIs for %s", ref)
732 predictedUri: DatasetRefURIs | None = None
733 predictedEphemeralUri: DatasetRefURIs | None = None
734 firstEphemeralUri: DatasetRefURIs | None = None
735 for datastore in self.datastores:
736 if datastore.exists(ref):
737 if not datastore.isEphemeral:
738 uri = datastore.getURIs(ref)
739 log.debug("Retrieved non-ephemeral URI: %s", uri)
740 return uri
741 elif not firstEphemeralUri:
742 firstEphemeralUri = datastore.getURIs(ref)
743 elif predict:
744 if not predictedUri and not datastore.isEphemeral:
745 predictedUri = datastore.getURIs(ref, predict)
746 elif not predictedEphemeralUri and datastore.isEphemeral:
747 predictedEphemeralUri = datastore.getURIs(ref, predict)
749 if firstEphemeralUri:
750 log.debug("Retrieved ephemeral URI: %s", firstEphemeralUri)
751 return firstEphemeralUri
753 if predictedUri:
754 log.debug("Retrieved predicted URI: %s", predictedUri)
755 return predictedUri
757 if predictedEphemeralUri:
758 log.debug("Retrieved predicted ephemeral URI: %s", predictedEphemeralUri)
759 return predictedEphemeralUri
761 raise FileNotFoundError(f"Dataset {ref} not in any datastore")
763 def getURI(self, ref: DatasetRef, predict: bool = False) -> ResourcePath:
764 """URI to the Dataset.
766 The returned URI is from the first datastore in the list that has
767 the dataset with preference given to the first dataset coming from
768 a permanent datastore. If no datastores have the dataset and prediction
769 is allowed, the predicted URI for the first datastore in the list will
770 be returned.
772 Parameters
773 ----------
774 ref : `DatasetRef`
775 Reference to the required Dataset.
776 predict : `bool`
777 If `True`, allow URIs to be returned of datasets that have not
778 been written.
780 Returns
781 -------
782 uri : `lsst.resources.ResourcePath`
783 URI pointing to the dataset within the datastore. If the
784 dataset does not exist in the datastore, and if ``predict`` is
785 `True`, the URI will be a prediction and will include a URI
786 fragment "#predicted".
788 Notes
789 -----
790 If the datastore does not have entities that relate well
791 to the concept of a URI the returned URI string will be
792 descriptive. The returned URI is not guaranteed to be obtainable.
794 Raises
795 ------
796 FileNotFoundError
797 A URI has been requested for a dataset that does not exist and
798 guessing is not allowed.
799 RuntimeError
800 Raised if a request is made for a single URI but multiple URIs
801 are associated with this dataset.
802 """
803 log.debug("Requesting URI for %s", ref)
804 primary, components = self.getURIs(ref, predict)
805 if primary is None or components:
806 raise RuntimeError(
807 f"Dataset ({ref}) includes distinct URIs for components. Use Datastore.getURIs() instead."
808 )
809 return primary
811 def retrieveArtifacts(
812 self,
813 refs: Iterable[DatasetRef],
814 destination: ResourcePath,
815 transfer: str = "auto",
816 preserve_path: bool = True,
817 overwrite: bool = False,
818 write_index: bool = True,
819 add_prefix: bool = False,
820 ) -> dict[ResourcePath, ArtifactIndexInfo]:
821 """Retrieve the file artifacts associated with the supplied refs.
823 Parameters
824 ----------
825 refs : `~collections.abc.Iterable` of `DatasetRef`
826 The datasets for which file artifacts are to be retrieved.
827 A single ref can result in multiple files. The refs must
828 be resolved.
829 destination : `lsst.resources.ResourcePath`
830 Location to write the file artifacts.
831 transfer : `str`, optional
832 Method to use to transfer the artifacts. Must be one of the options
833 supported by `lsst.resources.ResourcePath.transfer_from`.
834 "move" is not allowed.
835 preserve_path : `bool`, optional
836 If `True` the full path of the file artifact within the datastore
837 is preserved. If `False` the final file component of the path
838 is used.
839 overwrite : `bool`, optional
840 If `True` allow transfers to overwrite existing files at the
841 destination.
842 write_index : `bool`, optional
843 If `True` write a file at the top level containing a serialization
844 of a `ZipIndex` for the downloaded datasets.
845 add_prefix : `bool`, optional
846 Add a prefix based on the DatasetId. Only used if ``preserve_path``
847 is `False`.
849 Returns
850 -------
851 artifact_map : `dict` [ `lsst.resources.ResourcePath`, \
852 `ArtifactIndexInfo` ]
853 Mapping of retrieved file to associated index information.
854 """
855 if not destination.isdir():
856 raise ValueError(f"Destination location must refer to a directory. Given {destination}")
858 # Using getURIs is not feasible since it becomes difficult to
859 # determine the path within the datastore later on. For now
860 # follow getURIs implementation approach.
862 pending = set(refs)
864 # There is a question as to whether an exception should be raised
865 # early if some of the refs are missing, or whether files should be
866 # transferred until a problem is hit. Prefer to complain up front.
867 # Use the datastore integer as primary key.
868 grouped_by_datastore: dict[int, set[DatasetRef]] = {}
870 for number, datastore in enumerate(self.datastores):
871 if datastore.isEphemeral:
872 # In the future we will want to distinguish in-memory from
873 # caching datastore since using an on-disk local
874 # cache is exactly what we should be doing.
875 continue
876 try:
877 # Checking file existence is expensive. Have the option
878 # of checking whether the datastore knows of these datasets
879 # instead, which is fast but can potentially lead to
880 # retrieveArtifacts failing.
881 knows = datastore.knows_these(pending)
882 datastore_refs = {ref for ref, exists in knows.items() if exists}
883 except NotImplementedError:
884 # Some datastores may not support retrieving artifacts
885 continue
887 if datastore_refs:
888 grouped_by_datastore[number] = datastore_refs
890 # Remove these from the pending list so that we do not bother
891 # looking for them any more.
892 pending = pending - datastore_refs
894 if pending:
895 raise RuntimeError(f"Some datasets were not found in any datastores: {pending}")
897 # Now do the transfer.
898 merged_artifact_map: dict[ResourcePath, ArtifactIndexInfo] = {}
899 for number, datastore_refs in grouped_by_datastore.items():
900 artifact_map = self.datastores[number].retrieveArtifacts(
901 datastore_refs,
902 destination,
903 transfer=transfer,
904 preserve_path=preserve_path,
905 overwrite=overwrite,
906 write_index=False, # Disable index writing regardless.
907 add_prefix=add_prefix,
908 )
909 merged_artifact_map.update(artifact_map)
911 if write_index:
912 index = ZipIndex.from_artifact_map(refs, merged_artifact_map, destination)
913 index.write_index(destination)
915 return merged_artifact_map
917 def ingest_zip(self, zip_path: ResourcePath, transfer: str | None, *, dry_run: bool = False) -> None:
918 """Ingest an indexed Zip file and contents.
920 The Zip file must have an index file as created by `retrieveArtifacts`.
922 Parameters
923 ----------
924 zip_path : `lsst.resources.ResourcePath`
925 Path to the Zip file.
926 transfer : `str`
927 Method to use for transferring the Zip file into the datastore.
928 dry_run : `bool`, optional
929 If `True` the ingest will be processed without any modifications
930 made to the target datastore and as if the target datastore did not
931 have any of the datasets.
933 Notes
934 -----
935 Datastore constraints are bypassed with Zip ingest. A zip file can
936 contain multiple dataset types. Should the entire Zip be rejected
937 if one dataset type is in the constraints list? If configured to
938 reject everything, ingest should not be attempted.
940 The Zip file is given to each datastore in turn, ignoring datastores
941 where it is not supported. Is deemed successful if any of the
942 datastores accept the file.
943 """
944 index = ZipIndex.from_zip_file(zip_path)
945 refs = index.refs.to_refs(self.universe)
947 # For now raise if any refs are not supported.
948 # Being selective will require that we return the ingested refs
949 # to the caller so that registry can be modified to remove the
950 # entries.
951 if any(not self.constraints.isAcceptable(ref) for ref in refs):
952 raise DatasetTypeNotSupportedError(
953 "Some of the refs in the given Zip file are not acceptable to this datastore."
954 )
956 n_success = 0
957 final_exception: Exception | None = None
958 for number, (datastore, constraints) in enumerate(
959 zip(self.datastores, self.datastoreConstraints, strict=True)
960 ):
961 if datastore.isEphemeral:
962 continue
964 # There can be constraints for the datastore in the configuration
965 # of the chaining, or constraints in the configuration of the
966 # datastore itself.
967 if any(
968 (constraints is not None and not constraints.isAcceptable(ref))
969 or not datastore.constraints.isAcceptable(ref)
970 for ref in refs
971 ):
972 log.debug("Datastore %s skipping zip ingest due to constraints", datastore.name)
973 continue
974 try:
975 datastore.ingest_zip(zip_path, transfer=transfer, dry_run=dry_run)
976 except NotImplementedError:
977 continue
978 except Exception as e:
979 final_exception = e
980 else:
981 n_success += 1
983 if n_success:
984 return
985 if final_exception:
986 raise final_exception
987 raise RuntimeError("Ingest was not successful in any datastores.")
989 def remove(self, ref: DatasetRef) -> None:
990 """Indicate to the datastore that a dataset can be removed.
992 The dataset will be removed from each datastore. The dataset is
993 not required to exist in every child datastore.
995 Parameters
996 ----------
997 ref : `DatasetRef`
998 Reference to the required dataset.
1000 Raises
1001 ------
1002 FileNotFoundError
1003 Attempt to remove a dataset that does not exist. Raised if none
1004 of the child datastores removed the dataset.
1005 """
1006 log.debug("Removing %s", ref)
1007 self.trash(ref, ignore_errors=False)
1008 self.emptyTrash(ignore_errors=False, refs=[ref])
1010 def forget(self, refs: Iterable[DatasetRef]) -> None:
1011 for datastore in tuple(self.datastores):
1012 datastore.forget(refs)
1014 def trash(self, ref: DatasetRef | Iterable[DatasetRef], ignore_errors: bool = True) -> None:
1015 if isinstance(ref, DatasetRef):
1016 ref_label = str(ref)
1017 else:
1018 ref_label = "bulk datasets"
1020 log.debug("Trashing %s", ref_label)
1022 counter = 0
1023 for datastore in self.datastores:
1024 try:
1025 datastore.trash(ref, ignore_errors=ignore_errors)
1026 counter += 1
1027 except FileNotFoundError:
1028 pass
1030 if counter == 0:
1031 err_msg = f"Could not mark for removal from any child datastore: {ref_label}"
1032 if ignore_errors:
1033 log.warning(err_msg)
1034 else:
1035 raise FileNotFoundError(err_msg)
1037 def emptyTrash(
1038 self, ignore_errors: bool = True, refs: Collection[DatasetRef] | None = None, dry_run: bool = False
1039 ) -> set[ResourcePath]:
1040 removed = set()
1041 for datastore in self.datastores:
1042 removed.update(datastore.emptyTrash(ignore_errors=ignore_errors, refs=refs, dry_run=dry_run))
1043 return removed
1045 def transfer(self, inputDatastore: Datastore, ref: DatasetRef) -> None:
1046 """Retrieve a dataset from an input `Datastore`,
1047 and store the result in this `Datastore`.
1049 Parameters
1050 ----------
1051 inputDatastore : `Datastore`
1052 The external `Datastore` from which to retrieve the Dataset.
1053 ref : `DatasetRef`
1054 Reference to the required dataset in the input data store.
1056 Returns
1057 -------
1058 results : `list`
1059 List containing the return value from the ``put()`` to each
1060 child datastore.
1061 """
1062 assert inputDatastore is not self # unless we want it for renames?
1063 inMemoryDataset = inputDatastore.get(ref)
1064 self.put(inMemoryDataset, ref)
1066 def validateConfiguration(
1067 self, entities: Iterable[DatasetRef | DatasetType | StorageClass], logFailures: bool = False
1068 ) -> None:
1069 """Validate some of the configuration for this datastore.
1071 Parameters
1072 ----------
1073 entities : `~collections.abc.Iterable` [`DatasetRef` | `DatasetType` \
1074 | `StorageClass`]
1075 Entities to test against this configuration. Can be differing
1076 types.
1077 logFailures : `bool`, optional
1078 If `True`, output a log message for every validation error
1079 detected.
1081 Returns
1082 -------
1083 None
1085 Raises
1086 ------
1087 DatastoreValidationError
1088 Raised if there is a validation problem with a configuration.
1089 All the problems are reported in a single exception.
1091 Notes
1092 -----
1093 This method checks each datastore in turn.
1094 """
1095 # Need to catch each of the datastore outputs and ensure that
1096 # all are tested.
1097 failures = []
1098 for datastore in self.datastores:
1099 try:
1100 datastore.validateConfiguration(entities, logFailures=logFailures)
1101 except DatastoreValidationError as e:
1102 if logFailures:
1103 log.critical("Datastore %s failed validation", datastore.name)
1104 failures.append(f"Datastore {self.name}: {e}")
1106 if failures:
1107 msg = ";\n".join(failures)
1108 raise DatastoreValidationError(msg)
1110 def validateKey(self, lookupKey: LookupKey, entity: DatasetRef | DatasetType | StorageClass) -> None:
1111 # Docstring is inherited from base class
1112 failures = []
1113 for datastore in self.datastores:
1114 try:
1115 datastore.validateKey(lookupKey, entity)
1116 except DatastoreValidationError as e:
1117 failures.append(f"Datastore {self.name}: {e}")
1119 if failures:
1120 msg = ";\n".join(failures)
1121 raise DatastoreValidationError(msg)
1123 def getLookupKeys(self) -> set[LookupKey]:
1124 # Docstring is inherited from base class
1125 keys = set()
1126 for datastore in self.datastores:
1127 keys.update(datastore.getLookupKeys())
1129 keys.update(self.constraints.getLookupKeys())
1130 for p in self.datastoreConstraints:
1131 if p is not None:
1132 keys.update(p.getLookupKeys())
1134 return keys
1136 def needs_expanded_data_ids(
1137 self,
1138 transfer: str | None,
1139 entity: DatasetRef | DatasetType | StorageClass | None = None,
1140 ) -> bool:
1141 # Docstring inherited.
1142 # We can't safely use `self.datastoreConstraints` with `entity` to
1143 # check whether a child datastore would even want to ingest this
1144 # dataset, because we don't want to filter out datastores that might
1145 # need an expanded data ID based in incomplete information (e.g. we
1146 # pass a StorageClass, but the constraint dispatches on DatasetType).
1147 # So we pessimistically check if any datastore would need an expanded
1148 # data ID for this transfer mode.
1149 return any(datastore.needs_expanded_data_ids(transfer, entity) for datastore in self.datastores)
1151 def import_records(self, data: Mapping[str, DatastoreRecordData]) -> None:
1152 # Docstring inherited from the base class.
1154 for datastore in self.datastores:
1155 datastore.import_records(data)
1157 def export_records(self, refs: Iterable[DatasetIdRef]) -> Mapping[str, DatastoreRecordData]:
1158 # Docstring inherited from the base class.
1160 all_records: dict[str, DatastoreRecordData] = {}
1162 # Merge all sub-datastore records into one structure
1163 for datastore in self.datastores:
1164 sub_records = datastore.export_records(refs)
1165 for name, record_data in sub_records.items():
1166 # All datastore names must be unique in a chain.
1167 if name in all_records:
1168 raise ValueError("Non-unique datastore name found in datastore {datastore}")
1169 all_records[name] = record_data
1171 return all_records
1173 def export_predicted_records(self, refs: Iterable[DatasetRef]) -> dict[str, DatastoreRecordData]:
1174 # Docstring inherited from the base class.
1176 all_records: dict[str, DatastoreRecordData] = {}
1178 # Filter out datasets that this datastore is not allowed to contain.
1179 refs = [ref for ref in refs if self.constraints.isAcceptable(ref)]
1181 # Merge all sub-datastore records into one structure
1182 for datastore in self.datastores:
1183 sub_records = datastore.export_predicted_records(refs)
1184 for name, record_data in sub_records.items():
1185 # All datastore names must be unique in a chain.
1186 if name in all_records:
1187 raise ValueError("Non-unique datastore name found in datastore {datastore}")
1188 all_records[name] = record_data
1190 return all_records
1192 def export(
1193 self,
1194 refs: Iterable[DatasetRef],
1195 *,
1196 directory: ResourcePathExpression | None = None,
1197 transfer: str | None = "auto",
1198 ) -> Iterable[FileDataset]:
1199 # Docstring inherited from Datastore.export.
1200 if transfer == "auto" and directory is None:
1201 transfer = None
1203 if transfer is not None and directory is None:
1204 raise TypeError(f"Cannot export using transfer mode {transfer} with no export directory given")
1206 if transfer == "move":
1207 raise TypeError("Can not export by moving files out of datastore.")
1209 # Exporting from a chain has the potential for a dataset to be
1210 # in one or more of the datastores in the chain. We only need one
1211 # of them since we assume the datasets are the same in all (but
1212 # the file format could be different of course since that is a
1213 # per-datastore configuration).
1214 # We also do not know whether any of the datastores in the chain
1215 # support file export.
1217 # Ensure we have an ordered sequence that is not an iterator or set.
1218 if not isinstance(refs, Sequence):
1219 refs = list(refs)
1221 # If any of the datasets are missing entirely we need to raise early
1222 # before we try to run the export. This can be a little messy but is
1223 # better than exporting files from the first datastore and then finding
1224 # that one is missing but is not in the second datastore either.
1225 known = [datastore.knows_these(refs) for datastore in self.datastores]
1226 refs_known: set[DatasetRef] = set()
1227 for known_to_this in known:
1228 refs_known.update({ref for ref, knows_this in known_to_this.items() if knows_this})
1229 missing_count = len(refs) - len(refs_known)
1230 if missing_count:
1231 raise FileNotFoundError(f"Not all datasets known to this datastore. Missing {missing_count}")
1233 # To allow us to slot each result into the right place after
1234 # asking each datastore, create a dict with the index.
1235 ref_positions = {ref: i for i, ref in enumerate(refs)}
1237 # Presize the final export list.
1238 exported: list[FileDataset | None] = [None] * len(refs)
1240 # The order of the returned dataset has to match the order of the
1241 # given refs, even if they are all from different datastores.
1242 for i, datastore in enumerate(self.datastores):
1243 known_to_this = known[i]
1244 filtered = [ref for ref, knows in known_to_this.items() if knows and ref in ref_positions]
1246 try:
1247 this_export = datastore.export(filtered, directory=directory, transfer=transfer)
1248 except NotImplementedError:
1249 # Try the next datastore.
1250 continue
1252 for ref, export in zip(filtered, this_export, strict=True):
1253 # Get the position and also delete it from the list.
1254 exported[ref_positions.pop(ref)] = export
1256 # Every dataset should be accounted for because of the earlier checks
1257 # but make sure that we did fill all the slots to appease mypy.
1258 for i, dataset in enumerate(exported):
1259 if dataset is None:
1260 raise FileNotFoundError(f"Failed to export dataset {refs[i]}.")
1261 yield dataset
1263 def get_file_info_for_transfer(self, dataset_ids: Iterable[DatasetId]) -> FileTransferMap:
1264 unassigned_ids = set(dataset_ids)
1265 output: FileTransferMap = {}
1266 found_acceptable_datastore = False
1267 for datastore in self.datastores:
1268 try:
1269 found = datastore.get_file_info_for_transfer(unassigned_ids)
1270 found_acceptable_datastore = True
1271 output.update(found)
1272 unassigned_ids -= found.keys()
1273 except NotImplementedError:
1274 pass
1276 if not found_acceptable_datastore:
1277 types = {get_full_type_name(d) for d in self.datastores}
1278 raise TypeError(
1279 "ChainedDatastore had no datastores able to provide file transfer information."
1280 f" Had {','.join(types)}"
1281 )
1283 return output
1285 def locate_missing_files_for_transfer(
1286 self, refs: Iterable[DatasetRef], artifact_existence: dict[ResourcePath, bool]
1287 ) -> FileTransferMap:
1288 missing_refs = {ref.id: ref for ref in refs}
1289 output: FileTransferMap = {}
1290 for datastore in self.datastores:
1291 # Have to check each datastore in turn. If we do not do
1292 # this warnings will be issued further down for datasets
1293 # that are in one and not the other. The existence cache
1294 # will prevent repeat checks.
1296 found = datastore.locate_missing_files_for_transfer(missing_refs.values(), artifact_existence)
1297 output.update(found)
1298 for id in found.keys():
1299 missing_refs.pop(id)
1300 log.debug("Adding %d missing refs to list for transfer from %s", len(found), datastore.name)
1302 return output
1304 def transfer_from(
1305 self,
1306 source_records: FileTransferMap,
1307 refs: Collection[DatasetRef],
1308 transfer: str = "auto",
1309 artifact_existence: dict[ResourcePath, bool] | None = None,
1310 dry_run: bool = False,
1311 ) -> tuple[set[DatasetRef], set[DatasetRef]]:
1312 # Docstring inherited
1313 if not refs:
1314 return set(), set()
1316 # Assume that each child datastore knows how to look inside a chained
1317 # datastore for compatible datastores (and so there is no need to
1318 # unpack the source datastores here).
1319 # Need to decide if a ref accepted by one datastore should be sent to
1320 # later datastores (as is done in put()). More efficient to filter out
1321 # accepted datasets.
1322 if artifact_existence is None:
1323 artifact_existence = {}
1324 available_refs = set(refs)
1325 accepted: set[DatasetRef] = set()
1326 rejected: set[DatasetRef] = set()
1327 nsuccess = 0
1329 log.debug("Initiating transfer to chained datastore %s", self.name)
1330 for datastore in self.datastores:
1331 # Rejections from this datastore might be acceptances in the next.
1332 # We add them all up but then recalculate at the end.
1333 if not available_refs:
1334 break
1335 log.verbose("Transferring %d datasets to %s from chain", len(available_refs), datastore.name)
1337 try:
1338 current_accepted, current_rejected = datastore.transfer_from(
1339 source_records,
1340 available_refs,
1341 transfer=transfer,
1342 artifact_existence=artifact_existence,
1343 dry_run=dry_run,
1344 )
1345 except (TypeError, NotImplementedError):
1346 # The datastores were incompatible.
1347 continue
1348 else:
1349 nsuccess += 1
1351 # Do not send accepted refs to later datastores.
1352 available_refs -= current_accepted
1354 accepted.update(current_accepted)
1355 rejected.update(current_rejected)
1357 if nsuccess == 0:
1358 raise TypeError("None of the child datastores could accept file transfers")
1360 # It's not rejected if some other datastore accepted it.
1361 rejected -= accepted
1362 log.verbose(
1363 "Finished transfer_from to %s with %d accepted, %d rejected from %d requested.",
1364 self.name,
1365 len(accepted),
1366 len(rejected),
1367 len(refs),
1368 )
1370 return accepted, rejected
1372 def get_opaque_table_definitions(self) -> Mapping[str, DatastoreOpaqueTable]:
1373 # Docstring inherited from the base class.
1374 tables: dict[str, DatastoreOpaqueTable] = {}
1375 for datastore in self.datastores:
1376 tables.update(datastore.get_opaque_table_definitions())
1377 return tables
1379 def set_retrieve_dataset_type_method(self, method: Callable[[str], DatasetType | None] | None) -> None:
1380 # Docstring inherited from the base class.
1381 for datastore in self.datastores:
1382 datastore.set_retrieve_dataset_type_method(method)