Coverage for python / lsst / daf / butler / datastores / fileDatastore.py: 8%
1063 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-26 08:49 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-26 08:49 +0000
1# This file is part of daf_butler.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28"""Generic file-based datastore code."""
30from __future__ import annotations
32__all__ = ("FileDatastore",)
34import contextlib
35import hashlib
36import logging
37import math
38from collections import defaultdict
39from collections.abc import Callable, Collection, Iterable, Mapping, Sequence
40from typing import TYPE_CHECKING, Any, ClassVar, cast
42from sqlalchemy import BigInteger, String
44from lsst.daf.butler import (
45 Config,
46 DatasetDatastoreRecords,
47 DatasetId,
48 DatasetRef,
49 DatasetType,
50 DatasetTypeNotSupportedError,
51 FileDataset,
52 FileDescriptor,
53 Formatter,
54 FormatterFactory,
55 FormatterV1inV2,
56 FormatterV2,
57 Location,
58 LocationFactory,
59 Progress,
60 StorageClass,
61 ddl,
62)
63from lsst.daf.butler.datastore import (
64 DatasetRefURIs,
65 Datastore,
66 DatastoreConfig,
67 DatastoreOpaqueTable,
68 DatastoreValidationError,
69)
70from lsst.daf.butler.datastore.cache_manager import (
71 AbstractDatastoreCacheManager,
72 DatastoreCacheManager,
73 DatastoreDisabledCacheManager,
74)
75from lsst.daf.butler.datastore.composites import CompositesMap
76from lsst.daf.butler.datastore.file_templates import FileTemplates, FileTemplateValidationError
77from lsst.daf.butler.datastore.generic_base import GenericBaseDatastore
78from lsst.daf.butler.datastore.record_data import DatastoreRecordData
79from lsst.daf.butler.datastore.stored_file_info import StoredDatastoreItemInfo, StoredFileInfo
80from lsst.daf.butler.datastores.file_datastore.get import (
81 DatasetLocationInformation,
82 DatastoreFileGetInformation,
83 generate_datastore_get_information,
84 get_dataset_as_python_object_from_get_info,
85)
86from lsst.daf.butler.datastores.file_datastore.retrieve_artifacts import (
87 ArtifactIndexInfo,
88 ZipIndex,
89 determine_destination_for_retrieved_artifact,
90 unpack_zips,
91)
92from lsst.daf.butler.registry.interfaces import (
93 DatabaseInsertMode,
94 DatastoreRegistryBridge,
95 FakeDatasetRef,
96 ReadOnlyDatabaseError,
97)
98from lsst.daf.butler.repo_relocation import replaceRoot
99from lsst.daf.butler.utils import transactional
100from lsst.resources import ResourcePath, ResourcePathExpression
101from lsst.utils.introspection import get_class_of, get_full_type_name
102from lsst.utils.iteration import chunk_iterable
104# For VERBOSE logging usage.
105from lsst.utils.logging import VERBOSE, getLogger
106from lsst.utils.timer import time_this
108from ..datastore import FileTransferMap, FileTransferRecord
110if TYPE_CHECKING:
111 from lsst.daf.butler import DatasetProvenance, LookupKey
112 from lsst.daf.butler.registry.interfaces import DatasetIdRef, DatastoreRegistryBridgeManager
114log = getLogger(__name__)
117class _IngestPrepData(Datastore.IngestPrepData):
118 """Helper class for FileDatastore ingest implementation.
120 Parameters
121 ----------
122 datasets : `~collections.abc.Iterable` of `FileDataset`
123 Files to be ingested by this datastore.
124 """
126 def __init__(self, datasets: Iterable[FileDataset]):
127 super().__init__(ref for dataset in datasets for ref in dataset.refs)
128 self.datasets = datasets
131class FileDatastore(GenericBaseDatastore[StoredFileInfo]):
132 """Generic Datastore for file-based implementations.
134 Should always be sub-classed since key abstract methods are missing.
136 Parameters
137 ----------
138 config : `DatastoreConfig` or `str`
139 Configuration as either a `Config` object or URI to file.
140 bridgeManager : `DatastoreRegistryBridgeManager`
141 Object that manages the interface between `Registry` and datastores.
142 root : `lsst.resources.ResourcePath`
143 Root directory URI of this `Datastore`.
144 formatterFactory : `FormatterFactory`
145 Factory for creating instances of formatters.
146 templates : `FileTemplates`
147 File templates that can be used by this `Datastore`.
148 composites : `CompositesMap`
149 Determines whether a dataset should be disassembled on put.
150 trustGetRequest : `bool`
151 Determine whether we can fall back to configuration if a requested
152 dataset is not known to registry.
154 Raises
155 ------
156 ValueError
157 If root location does not exist and ``create`` is `False` in the
158 configuration.
159 """
161 defaultConfigFile: ClassVar[str | None] = None
162 """Path to configuration defaults. Accessed within the ``config`` resource
163 or relative to a search path. Can be None if no defaults specified.
164 """
166 root: ResourcePath
167 """Root directory URI of this `Datastore`."""
169 locationFactory: LocationFactory
170 """Factory for creating locations relative to the datastore root."""
172 formatterFactory: FormatterFactory
173 """Factory for creating instances of formatters."""
175 templates: FileTemplates
176 """File templates that can be used by this `Datastore`."""
178 composites: CompositesMap
179 """Determines whether a dataset should be disassembled on put."""
181 defaultConfigFile = "datastores/fileDatastore.yaml"
182 """Path to configuration defaults. Accessed within the ``config`` resource
183 or relative to a search path. Can be None if no defaults specified.
184 """
186 _retrieve_dataset_method: Callable[[str], DatasetType | None] | None = None
187 """Callable that is used in trusted mode to retrieve registry definition
188 of a named dataset type.
189 """
191 @classmethod
192 def setConfigRoot(cls, root: str, config: Config, full: Config, overwrite: bool = True) -> None:
193 """Set any filesystem-dependent config options for this Datastore to
194 be appropriate for a new empty repository with the given root.
196 Parameters
197 ----------
198 root : `str`
199 URI to the root of the data repository.
200 config : `Config`
201 A `Config` to update. Only the subset understood by
202 this component will be updated. Will not expand
203 defaults.
204 full : `Config`
205 A complete config with all defaults expanded that can be
206 converted to a `DatastoreConfig`. Read-only and will not be
207 modified by this method.
208 Repository-specific options that should not be obtained
209 from defaults when Butler instances are constructed
210 should be copied from ``full`` to ``config``.
211 overwrite : `bool`, optional
212 If `False`, do not modify a value in ``config`` if the value
213 already exists. Default is always to overwrite with the provided
214 ``root``.
216 Notes
217 -----
218 If a keyword is explicitly defined in the supplied ``config`` it
219 will not be overridden by this method if ``overwrite`` is `False`.
220 This allows explicit values set in external configs to be retained.
221 """
222 Config.updateParameters(
223 DatastoreConfig,
224 config,
225 full,
226 toUpdate={"root": root},
227 toCopy=("cls", ("records", "table")),
228 overwrite=overwrite,
229 )
231 @classmethod
232 def makeTableSpec(cls) -> ddl.TableSpec:
233 return ddl.TableSpec(
234 fields=[
235 ddl.FieldSpec(name="dataset_id", dtype=ddl.GUID, primaryKey=True),
236 ddl.FieldSpec(name="path", dtype=String, length=256, nullable=False),
237 ddl.FieldSpec(name="formatter", dtype=String, length=128, nullable=False),
238 ddl.FieldSpec(name="storage_class", dtype=String, length=64, nullable=False),
239 # Use empty string to indicate no component
240 ddl.FieldSpec(name="component", dtype=String, length=32, primaryKey=True),
241 # TODO: should checksum be Base64Bytes instead?
242 ddl.FieldSpec(name="checksum", dtype=String, length=128, nullable=True),
243 ddl.FieldSpec(name="file_size", dtype=BigInteger, nullable=True),
244 ],
245 unique=frozenset(),
246 indexes=[ddl.IndexSpec("path")],
247 )
249 def __init__(
250 self,
251 config: DatastoreConfig,
252 bridgeManager: DatastoreRegistryBridgeManager,
253 root: ResourcePath,
254 formatterFactory: FormatterFactory,
255 templates: FileTemplates,
256 composites: CompositesMap,
257 trustGetRequest: bool,
258 ):
259 super().__init__(config, bridgeManager)
260 self.root = ResourcePath(root)
261 self.formatterFactory = formatterFactory
262 self.templates = templates
263 self.composites = composites
264 self.trustGetRequest = trustGetRequest
266 # Name ourselves either using an explicit name or a name
267 # derived from the (unexpanded) root
268 if "name" in self.config:
269 self.name = self.config["name"]
270 else:
271 # We use the unexpanded root in the name to indicate that this
272 # datastore can be moved without having to update registry.
273 self.name = "{}@{}".format(type(self).__name__, self.config["root"])
275 self.locationFactory = LocationFactory(self.root)
277 self._opaque_table_name = self.config["records", "table"]
278 try:
279 # Storage of paths and formatters, keyed by dataset_id
280 self._table = bridgeManager.opaque.register(self._opaque_table_name, self.makeTableSpec())
281 # Interface to Registry.
282 self._bridge = bridgeManager.register(self.name)
283 except ReadOnlyDatabaseError:
284 # If the database is read only and we just tried and failed to
285 # create a table, it means someone is trying to create a read-only
286 # butler client for an empty repo. That should be okay, as long
287 # as they then try to get any datasets before some other client
288 # creates the table. Chances are they're just validating
289 # configuration.
290 pass
292 # Determine whether checksums should be used - default to False
293 self.useChecksum = self.config.get("checksum", False)
295 # Create a cache manager
296 self.cacheManager: AbstractDatastoreCacheManager
297 if "cached" in self.config:
298 self.cacheManager = DatastoreCacheManager(self.config["cached"], universe=bridgeManager.universe)
299 else:
300 self.cacheManager = DatastoreDisabledCacheManager("", universe=bridgeManager.universe)
302 self.universe = bridgeManager.universe
304 @classmethod
305 def _create_from_config(
306 cls,
307 config: DatastoreConfig,
308 bridgeManager: DatastoreRegistryBridgeManager,
309 butlerRoot: ResourcePathExpression | None,
310 ) -> FileDatastore:
311 if "root" not in config:
312 raise ValueError("No root directory specified in configuration")
314 # Support repository relocation in config
315 # Existence of self.root is checked in subclass
316 root = ResourcePath(replaceRoot(config["root"], butlerRoot), forceDirectory=True, forceAbsolute=True)
318 # Now associate formatters with storage classes
319 formatterFactory = FormatterFactory()
320 formatterFactory.registerFormatters(config["formatters"], universe=bridgeManager.universe)
322 # Read the file naming templates
323 templates = FileTemplates(config["templates"], universe=bridgeManager.universe)
325 # See if composites should be disassembled
326 composites = CompositesMap(config["composites"], universe=bridgeManager.universe)
328 # Determine whether we can fall back to configuration if a
329 # requested dataset is not known to registry
330 trustGetRequest = config.get("trust_get_request", False)
332 self = FileDatastore(
333 config, bridgeManager, root, formatterFactory, templates, composites, trustGetRequest
334 )
336 # Check existence and create directory structure if necessary.
337 #
338 # The concept of a 'root directory' is problematic for some resource
339 # path types that don't necessarily support the concept of a directory
340 # (http, s3, gs... basically anything that isn't a local filesystem or
341 # WebDAV.)
342 # On these resource paths an object representing the
343 # "root" directory may not exist even though files under the root do,
344 # and in a read-only repository we will be unable to create it.
345 # So we only immediately verify the root for local filesystems,
346 # the only case where this check will definitely not give a false
347 # negative.
348 if self.root.isLocal and not self.root.exists():
349 if "create" not in self.config or not self.config["create"]:
350 raise ValueError(f"No valid root and not allowed to create one at: {self.root}")
351 try:
352 self.root.mkdir()
353 except Exception as e:
354 raise ValueError(
355 f"Can not create datastore root '{self.root}', check permissions. Got error: {e}"
356 ) from e
358 return self
360 def clone(self, bridgeManager: DatastoreRegistryBridgeManager) -> Datastore:
361 return FileDatastore(
362 self.config,
363 bridgeManager,
364 self.root,
365 self.formatterFactory,
366 self.templates,
367 self.composites,
368 self.trustGetRequest,
369 )
371 def __str__(self) -> str:
372 return str(self.root)
374 @property
375 def bridge(self) -> DatastoreRegistryBridge:
376 return self._bridge
378 @property
379 def roots(self) -> dict[str, ResourcePath | None]:
380 # Docstring inherited.
381 return {self.name: self.root}
383 def _set_trust_mode(self, mode: bool) -> None:
384 self.trustGetRequest = mode
386 def _artifact_exists(self, location: Location) -> bool:
387 """Check that an artifact exists in this datastore at the specified
388 location.
390 Parameters
391 ----------
392 location : `Location`
393 Expected location of the artifact associated with this datastore.
395 Returns
396 -------
397 exists : `bool`
398 True if the location can be found, false otherwise.
399 """
400 log.debug("Checking if resource exists: %s", location.uri)
401 return location.uri.exists()
403 def addStoredItemInfo(
404 self,
405 refs: Iterable[DatasetRef],
406 infos: Iterable[StoredFileInfo],
407 insert_mode: DatabaseInsertMode = DatabaseInsertMode.INSERT,
408 ) -> None:
409 """Record internal storage information associated with one or more
410 datasets.
412 Parameters
413 ----------
414 refs : sequence of `DatasetRef`
415 The datasets that have been stored.
416 infos : sequence of `StoredDatastoreItemInfo`
417 Metadata associated with the stored datasets.
418 insert_mode : `~lsst.daf.butler.registry.interfaces.DatabaseInsertMode`
419 Mode to use to insert the new records into the table. The
420 options are ``INSERT`` (error if pre-existing), ``REPLACE``
421 (replace content with new values), and ``ENSURE`` (skip if the row
422 already exists).
423 """
424 records = [
425 info.rebase(ref).to_record(dataset_id=ref.id) for ref, info in zip(refs, infos, strict=True)
426 ]
427 match insert_mode:
428 case DatabaseInsertMode.INSERT:
429 self._table.insert(*records, transaction=self._transaction)
430 case DatabaseInsertMode.ENSURE:
431 self._table.ensure(*records, transaction=self._transaction)
432 case DatabaseInsertMode.REPLACE:
433 self._table.replace(*records, transaction=self._transaction)
434 case _:
435 raise ValueError(f"Unknown insert mode of '{insert_mode}'")
437 def getStoredItemsInfo(
438 self, ref: DatasetIdRef, ignore_datastore_records: bool = False
439 ) -> list[StoredFileInfo]:
440 """Retrieve information associated with files stored in this
441 `Datastore` associated with this dataset ref.
443 Parameters
444 ----------
445 ref : `DatasetRef`
446 The dataset that is to be queried.
447 ignore_datastore_records : `bool`
448 If `True` then do not use datastore records stored in refs.
450 Returns
451 -------
452 items : `~collections.abc.Iterable` [`StoredDatastoreItemInfo`]
453 Stored information about the files and associated formatters
454 associated with this dataset. Only one file will be returned
455 if the dataset has not been disassembled. Can return an empty
456 list if no matching datasets can be found.
457 """
458 # Try to get them from the ref first.
459 if ref._datastore_records is not None and not ignore_datastore_records:
460 ref_records = ref._datastore_records.get(self._table.name, [])
461 # Need to make sure they have correct type.
462 for record in ref_records:
463 if not isinstance(record, StoredFileInfo):
464 raise TypeError(f"Datastore record has unexpected type {record.__class__.__name__}")
465 return cast(list[StoredFileInfo], ref_records)
467 # Look for the dataset_id -- there might be multiple matches
468 # if we have disassembled the dataset.
469 records = self._table.fetch(dataset_id=ref.id)
470 return [StoredFileInfo.from_record(record) for record in records]
472 def _register_datasets(
473 self,
474 refsAndInfos: Iterable[tuple[DatasetRef, StoredFileInfo]],
475 insert_mode: DatabaseInsertMode = DatabaseInsertMode.INSERT,
476 ) -> None:
477 """Update registry to indicate that one or more datasets have been
478 stored.
480 Parameters
481 ----------
482 refsAndInfos : sequence `tuple` [`DatasetRef`,
483 `StoredDatastoreItemInfo`]
484 Datasets to register and the internal datastore metadata associated
485 with them.
486 insert_mode : `str`, optional
487 Indicate whether the new records should be new ("insert", default),
488 or allowed to exists ("ensure") or be replaced if already present
489 ("replace").
490 """
491 expandedRefs: list[DatasetRef] = []
492 expandedItemInfos: list[StoredFileInfo] = []
494 for ref, itemInfo in refsAndInfos:
495 expandedRefs.append(ref)
496 expandedItemInfos.append(itemInfo)
498 # Dataset location only cares about registry ID so if we have
499 # disassembled in datastore we have to deduplicate. Since they
500 # will have different datasetTypes we can't use a set
501 registryRefs = {r.id: r for r in expandedRefs}
502 if insert_mode == DatabaseInsertMode.INSERT:
503 self.bridge.insert(registryRefs.values())
504 else:
505 # There are only two columns and all that matters is the
506 # dataset ID.
507 self.bridge.ensure(registryRefs.values())
508 self.addStoredItemInfo(expandedRefs, expandedItemInfos, insert_mode=insert_mode)
510 def _get_stored_records_associated_with_refs(
511 self, refs: Iterable[DatasetIdRef], ignore_datastore_records: bool = False
512 ) -> dict[DatasetId, list[StoredFileInfo]]:
513 """Retrieve all records associated with the provided refs.
515 Parameters
516 ----------
517 refs : `~collections.abc.Iterable` of `DatasetIdRef`
518 The refs for which records are to be retrieved.
519 ignore_datastore_records : `bool`
520 If `True` then do not use datastore records stored in refs.
522 Returns
523 -------
524 records : `dict` of [`DatasetId`, `list` of `StoredFileInfo`]
525 The matching records indexed by the ref ID. The number of entries
526 in the dict can be smaller than the number of requested refs.
527 """
528 # Check datastore records in refs first.
529 records_by_ref: defaultdict[DatasetId, list[StoredFileInfo]] = defaultdict(list)
530 refs_with_no_records = []
531 for ref in refs:
532 if ignore_datastore_records or ref._datastore_records is None:
533 refs_with_no_records.append(ref)
534 else:
535 if (ref_records := ref._datastore_records.get(self._table.name)) is not None:
536 # Need to make sure they have correct type.
537 for ref_record in ref_records:
538 if not isinstance(ref_record, StoredFileInfo):
539 raise TypeError(
540 f"Datastore record has unexpected type {ref_record.__class__.__name__}"
541 )
542 records_by_ref[ref.id].append(ref_record)
544 # If there were any refs without datastore records, check opaque table.
545 records = self._table.fetch(dataset_id=[ref.id for ref in refs_with_no_records])
547 # Uniqueness is dataset_id + component so can have multiple records
548 # per ref.
549 for record in records:
550 records_by_ref[record["dataset_id"]].append(StoredFileInfo.from_record(record))
551 return records_by_ref
553 def _refs_associated_with_artifacts(
554 self, paths: Iterable[str | ResourcePath]
555 ) -> dict[str, set[DatasetId]]:
556 """Return paths and associated dataset refs.
558 Parameters
559 ----------
560 paths : `list` of `str` or `lsst.resources.ResourcePath`
561 All the paths to include in search. These are exact matches
562 to the entries in the records table and can include fragments.
564 Returns
565 -------
566 mapping : `dict` of [`str`, `set` [`DatasetId`]]
567 Mapping of each path to a set of associated database IDs.
568 These are artifacts and so any fragments are stripped from the
569 keys.
570 """
571 # Group paths by those that have fragments and those that do not.
572 with_fragment = set()
573 without_fragment = set()
574 for rpath in paths:
575 spath = str(rpath) # Typing says can be ResourcePath so must force to string.
576 if "#" in spath:
577 spath, fragment = spath.rsplit("#", 1)
578 with_fragment.add(spath)
579 else:
580 without_fragment.add(spath)
582 result: dict[str, set[DatasetId]] = defaultdict(set)
583 if without_fragment:
584 records = self._table.fetch(path=without_fragment)
585 for row in records:
586 path = row["path"]
587 result[path].add(row["dataset_id"])
588 if with_fragment:
589 # Do a query per prefix.
590 for path in with_fragment:
591 records = self._table.fetch(path=f"{path}#%")
592 for row in records:
593 # Need to strip fragments before adding to dict.
594 row_path = row["path"]
595 artifact_path = row_path[: row_path.rfind("#")]
596 result[artifact_path].add(row["dataset_id"])
597 return result
599 def _registered_refs_per_artifact(self, pathInStore: ResourcePath) -> set[DatasetId]:
600 """Return all dataset refs associated with the supplied path.
602 Parameters
603 ----------
604 pathInStore : `lsst.resources.ResourcePath`
605 Path of interest in the data store.
607 Returns
608 -------
609 ids : `set` of `int`
610 All `DatasetRef` IDs associated with this path.
611 """
612 records = list(self._table.fetch(path=str(pathInStore)))
613 ids = {r["dataset_id"] for r in records}
614 return ids
616 def removeStoredItemInfo(self, ref: DatasetIdRef) -> None:
617 """Remove information about the file associated with this dataset.
619 Parameters
620 ----------
621 ref : `DatasetRef`
622 The dataset that has been removed.
623 """
624 # Note that this method is actually not used by this implementation,
625 # we depend on bridge to delete opaque records. But there are some
626 # tests that check that this method works, so we keep it for now.
627 self._table.delete(["dataset_id"], {"dataset_id": ref.id})
629 def _get_dataset_locations_info(
630 self, ref: DatasetIdRef, ignore_datastore_records: bool = False
631 ) -> list[DatasetLocationInformation]:
632 r"""Find all the `Location`\ s of the requested dataset in the
633 `Datastore` and the associated stored file information.
635 Parameters
636 ----------
637 ref : `DatasetRef`
638 Reference to the required `Dataset`.
639 ignore_datastore_records : `bool`
640 If `True` then do not use datastore records stored in refs.
642 Returns
643 -------
644 results : `list` [`tuple` [`Location`, `StoredFileInfo` ]]
645 Location of the dataset within the datastore and
646 stored information about each file and its formatter.
647 """
648 # Get the file information (this will fail if no file)
649 records = self.getStoredItemsInfo(ref, ignore_datastore_records)
651 # Use the path to determine the location -- we need to take
652 # into account absolute URIs in the datastore record
653 return [(r.file_location(self.locationFactory), r) for r in records]
655 def _can_remove_dataset_artifact(self, ref: DatasetIdRef, location: Location) -> bool:
656 """Check that there is only one dataset associated with the
657 specified artifact.
659 Parameters
660 ----------
661 ref : `DatasetRef` or `FakeDatasetRef`
662 Dataset to be removed.
663 location : `Location`
664 The location of the artifact to be removed.
666 Returns
667 -------
668 can_remove : `Bool`
669 True if the artifact can be safely removed.
670 """
671 # Can't ever delete absolute URIs.
672 if location.pathInStore.isabs():
673 return False
675 # Get all entries associated with this path
676 allRefs = self._registered_refs_per_artifact(location.pathInStore)
677 if not allRefs:
678 raise RuntimeError(f"Datastore inconsistency error. {location.pathInStore} not in registry")
680 # Remove these refs from all the refs and if there is nothing left
681 # then we can delete
682 remainingRefs = allRefs - {ref.id}
684 if remainingRefs:
685 return False
686 return True
688 def _get_expected_dataset_locations_info(self, ref: DatasetRef) -> list[tuple[Location, StoredFileInfo]]:
689 """Predict the location and related file information of the requested
690 dataset in this datastore.
692 Parameters
693 ----------
694 ref : `DatasetRef`
695 Reference to the required `Dataset`.
697 Returns
698 -------
699 results : `list` [`tuple` [`Location`, `StoredFileInfo` ]]
700 Expected Location of the dataset within the datastore and
701 placeholder information about each file and its formatter.
703 Notes
704 -----
705 Uses the current configuration to determine how we would expect the
706 datastore files to have been written if we couldn't ask registry.
707 This is safe so long as there has been no change to datastore
708 configuration between writing the dataset and wanting to read it.
709 Will not work for files that have been ingested without using the
710 standard file template or default formatter.
711 """
712 # If we have a component ref we always need to ask the questions
713 # of the composite. If the composite is disassembled this routine
714 # should return all components. If the composite was not
715 # disassembled the composite is what is stored regardless of
716 # component request. Note that if the caller has disassembled
717 # a composite there is no way for this guess to know that
718 # without trying both the composite and component ref and seeing
719 # if there is something at the component Location even without
720 # disassembly being enabled.
721 if ref.datasetType.isComponent():
722 ref = ref.makeCompositeRef()
724 # See if the ref is a composite that should be disassembled
725 doDisassembly = self.composites.shouldBeDisassembled(ref)
727 all_info: list[tuple[Location, Formatter | FormatterV2, StorageClass, str | None]] = []
729 if doDisassembly:
730 for component, componentStorage in ref.datasetType.storageClass.components.items():
731 compRef = ref.makeComponentRef(component)
732 location, formatter = self._determine_put_formatter_location(compRef)
733 all_info.append((location, formatter, componentStorage, component))
735 else:
736 # Always use the composite ref if no disassembly
737 location, formatter = self._determine_put_formatter_location(ref)
738 all_info.append((location, formatter, ref.datasetType.storageClass, None))
740 # Convert the list of tuples to have StoredFileInfo as second element
741 return [
742 (
743 location,
744 StoredFileInfo(
745 formatter=formatter,
746 path=location.pathInStore.path,
747 storageClass=storageClass,
748 component=component,
749 checksum=None,
750 file_size=-1,
751 ),
752 )
753 for location, formatter, storageClass, component in all_info
754 ]
756 def _prepare_for_direct_get(
757 self, ref: DatasetRef, parameters: Mapping[str, Any] | None = None
758 ) -> list[DatastoreFileGetInformation]:
759 """Check parameters for ``get`` and obtain formatter and
760 location.
762 Parameters
763 ----------
764 ref : `DatasetRef`
765 Reference to the required Dataset.
766 parameters : `dict`
767 `StorageClass`-specific parameters that specify, for example,
768 a slice of the dataset to be loaded.
770 Returns
771 -------
772 getInfo : `list` [`DatastoreFileGetInformation`]
773 Parameters needed to retrieve each file.
774 """
775 log.debug("Retrieve %s from %s with parameters %s", ref, self.name, parameters)
777 # The storage class we want to use eventually
778 refStorageClass = ref.datasetType.storageClass
780 # For trusted mode need to reset storage class.
781 ref = self._cast_storage_class(ref)
783 # Get file metadata and internal metadata
784 fileLocations = self._get_dataset_locations_info(ref)
785 if not fileLocations:
786 if not self.trustGetRequest:
787 raise FileNotFoundError(f"Could not retrieve dataset {ref}.")
788 # Assume the dataset is where we think it should be
789 fileLocations = self._get_expected_dataset_locations_info(ref)
791 if len(fileLocations) > 1:
792 # If trust is involved it is possible that there will be
793 # components listed here that do not exist in the datastore.
794 # Explicitly check for file artifact existence and filter out any
795 # that are missing.
796 if self.trustGetRequest:
797 fileLocations = [loc for loc in fileLocations if loc[0].uri.exists()]
799 # For now complain only if we have no components at all. One
800 # component is probably a problem but we can punt that to the
801 # assembler.
802 if not fileLocations:
803 raise FileNotFoundError(f"None of the component files for dataset {ref} exist.")
805 return generate_datastore_get_information(
806 fileLocations,
807 readStorageClass=refStorageClass,
808 ref=ref,
809 parameters=parameters,
810 )
812 def _determine_put_formatter_location(
813 self, ref: DatasetRef, provenance: DatasetProvenance | None = None
814 ) -> tuple[Location, Formatter | FormatterV2]:
815 """Calculate the formatter and output location to use for put.
817 Parameters
818 ----------
819 ref : `DatasetRef`
820 Reference to the associated Dataset.
821 provenance : `DatasetProvenance`
822 Any provenance that should be attached to the serialized dataset.
824 Returns
825 -------
826 location : `Location`
827 The location to write the dataset.
828 formatter : `Formatter`
829 The `Formatter` to use to write the dataset.
830 """
831 # Work out output file name
832 try:
833 template = self.templates.getTemplate(ref)
834 except KeyError as e:
835 raise DatasetTypeNotSupportedError(f"Unable to find template for {ref}") from e
837 # Validate the template to protect against filenames from different
838 # dataIds returning the same and causing overwrite confusion.
839 template.validateTemplate(ref)
841 location = self.locationFactory.fromPath(template.format(ref), trusted_path=True)
843 # Get the formatter based on the storage class
844 storageClass = ref.datasetType.storageClass
845 try:
846 formatter = self.formatterFactory.getFormatter(
847 ref,
848 FileDescriptor(location, storageClass=storageClass, component=ref.datasetType.component()),
849 dataId=ref.dataId,
850 ref=ref,
851 provenance=provenance,
852 )
853 except KeyError as e:
854 raise DatasetTypeNotSupportedError(
855 f"Unable to find formatter for {ref} in datastore {self.name}"
856 ) from e
858 # Now that we know the formatter, update the location
859 location = formatter.make_updated_location(location)
861 return location, formatter
863 def _overrideTransferMode(self, *datasets: FileDataset, transfer: str | None = None) -> str | None:
864 # Docstring inherited from base class
865 if transfer != "auto":
866 return transfer
868 # See if the paths are within the datastore or not
869 inside = [self._pathInStore(d.path) is not None for d in datasets]
871 if all(inside):
872 transfer = None
873 elif not any(inside):
874 # Allow ResourcePath to use its own knowledge
875 transfer = "auto"
876 else:
877 # This can happen when importing from a datastore that
878 # has had some datasets ingested using "direct" mode.
879 # Also allow ResourcePath to sort it out but warn about it.
880 # This can happen if you are importing from a datastore
881 # that had some direct transfer datasets.
882 log.warning(
883 "Some datasets are inside the datastore and some are outside. Using 'split' "
884 "transfer mode. This assumes that the files outside the datastore are "
885 "still accessible to the new butler since they will not be copied into "
886 "the target datastore."
887 )
888 transfer = "split"
890 return transfer
892 def _pathInStore(self, path: ResourcePathExpression) -> str | None:
893 """Return path relative to datastore root.
895 Parameters
896 ----------
897 path : `lsst.resources.ResourcePathExpression`
898 Path to dataset. Can be absolute URI. If relative assumed to
899 be relative to the datastore. Returns path in datastore
900 or raises an exception if the path it outside.
902 Returns
903 -------
904 inStore : `str`
905 Path relative to datastore root. Returns `None` if the file is
906 outside the root.
907 """
908 # Relative path will always be relative to datastore
909 pathUri = ResourcePath(path, forceAbsolute=False, forceDirectory=False)
910 return pathUri.relative_to(self.root)
912 def _standardizeIngestPath(
913 self,
914 path: str | ResourcePath,
915 *,
916 transfer: str | None = None,
917 check_existence: bool = False,
918 ) -> str | ResourcePath:
919 """Standardize the path of a to-be-ingested file.
921 Parameters
922 ----------
923 path : `str` or `lsst.resources.ResourcePath`
924 Path of a file to be ingested. This parameter is not expected
925 to be all the types that can be used to construct a
926 `~lsst.resources.ResourcePath`.
927 transfer : `str`, optional
928 How (and whether) the dataset should be added to the datastore.
929 See `ingest` for details of transfer modes.
930 This implementation is provided only so
931 `NotImplementedError` can be raised if the mode is not supported;
932 actual transfers are deferred to `_extractIngestInfo`.
933 check_existence : `bool`, optional
934 If `True` the existence of the file will be checked, otherwise
935 no check will be made.
937 Returns
938 -------
939 path : `str` or `lsst.resources.ResourcePath`
940 New path in what the datastore considers standard form. If an
941 absolute URI was given that will be returned unchanged.
943 Notes
944 -----
945 Subclasses of `FileDatastore` can implement this method instead
946 of `_prepIngest`. It should not modify the data repository or given
947 file in any way.
949 Raises
950 ------
951 NotImplementedError
952 Raised if the datastore does not support the given transfer mode
953 (including the case where ingest is not supported at all).
954 """
955 if transfer not in (None, "direct", "split") + self.root.transferModes:
956 raise NotImplementedError(f"Transfer mode {transfer} not supported.")
958 # A relative URI indicates relative to datastore root
959 srcUri = ResourcePath(path, forceAbsolute=False, forceDirectory=False)
960 if not srcUri.isabs():
961 srcUri = self.root.join(path)
963 if check_existence and not srcUri.exists():
964 raise FileNotFoundError(
965 f"Resource at {srcUri} does not exist; note that paths to ingest "
966 f"are assumed to be relative to {self.root} unless they are absolute."
967 )
969 if transfer is None:
970 relpath = srcUri.relative_to(self.root)
971 if not relpath:
972 raise RuntimeError(
973 f"Transfer is none but source file ({srcUri}) is not within datastore ({self.root})"
974 )
976 # Return the relative path within the datastore for internal
977 # transfer
978 path = relpath
980 return path
982 def _extractIngestInfo(
983 self,
984 path: ResourcePathExpression,
985 ref: DatasetRef,
986 *,
987 formatter: Formatter | FormatterV2 | type[Formatter | FormatterV2],
988 transfer: str | None = None,
989 record_validation_info: bool = True,
990 ) -> StoredFileInfo:
991 """Relocate (if necessary) and extract `StoredFileInfo` from a
992 to-be-ingested file.
994 Parameters
995 ----------
996 path : `lsst.resources.ResourcePathExpression`
997 URI or path of a file to be ingested.
998 ref : `DatasetRef`
999 Reference for the dataset being ingested. Guaranteed to have
1000 ``dataset_id not None`.
1001 formatter : `type` or `Formatter`
1002 `Formatter` subclass to use for this dataset or an instance.
1003 transfer : `str`, optional
1004 How (and whether) the dataset should be added to the datastore.
1005 See `ingest` for details of transfer modes.
1006 record_validation_info : `bool`, optional
1007 If `True`, the default, the datastore can record validation
1008 information associated with the file. If `False` the datastore
1009 will not attempt to track any information such as checksums
1010 or file sizes. This can be useful if such information is tracked
1011 in an external system or if the file is to be compressed in place.
1012 It is up to the datastore whether this parameter is relevant.
1014 Returns
1015 -------
1016 info : `StoredFileInfo`
1017 Internal datastore record for this file. This will be inserted by
1018 the caller; the `_extractIngestInfo` is only responsible for
1019 creating and populating the struct.
1021 Raises
1022 ------
1023 FileNotFoundError
1024 Raised if one of the given files does not exist.
1025 FileExistsError
1026 Raised if transfer is not `None` but the (internal) location the
1027 file would be moved to is already occupied.
1028 """
1029 if self._transaction is None:
1030 raise RuntimeError("Ingest called without transaction enabled")
1032 # Create URI of the source path, do not need to force a relative
1033 # path to absolute.
1034 srcUri = ResourcePath(path, forceAbsolute=False, forceDirectory=False)
1036 # Track whether we have read the size of the source yet
1037 have_sized = False
1039 tgtLocation: Location | None
1040 if transfer is None or transfer == "split":
1041 # A relative path is assumed to be relative to the datastore
1042 # in this context
1043 if not srcUri.isabs():
1044 tgtLocation = self.locationFactory.fromPath(srcUri.ospath, trusted_path=False)
1045 else:
1046 # Work out the path in the datastore from an absolute URI
1047 # This is required to be within the datastore.
1048 pathInStore = srcUri.relative_to(self.root)
1049 if pathInStore is None and transfer is None:
1050 raise RuntimeError(
1051 f"Unexpectedly learned that {srcUri} is not within datastore {self.root}"
1052 )
1053 if pathInStore:
1054 tgtLocation = self.locationFactory.fromPath(pathInStore, trusted_path=True)
1055 elif transfer == "split":
1056 # Outside the datastore but treat that as a direct ingest
1057 # instead.
1058 tgtLocation = None
1059 else:
1060 raise RuntimeError(f"Unexpected transfer mode encountered: {transfer} for URI {srcUri}")
1061 elif transfer == "direct":
1062 # Want to store the full URI to the resource directly in
1063 # datastore. This is useful for referring to permanent archive
1064 # storage for raw data.
1065 # Trust that people know what they are doing.
1066 tgtLocation = None
1067 else:
1068 # Work out the name we want this ingested file to have
1069 # inside the datastore
1070 tgtLocation = self._calculate_ingested_datastore_name(srcUri, ref, formatter)
1072 # if we are transferring from a local file to a remote location
1073 # it may be more efficient to get the size and checksum of the
1074 # local file rather than the transferred one
1075 if record_validation_info and srcUri.isLocal:
1076 size = srcUri.size()
1077 checksum = self.computeChecksum(srcUri) if self.useChecksum else None
1078 have_sized = True
1080 # Transfer the resource to the destination.
1081 # Allow overwrite of an existing file. This matches the behavior
1082 # of datastore.put() in that it trusts that registry would not
1083 # be asking to overwrite unless registry thought that the
1084 # overwrite was allowed.
1085 tgtLocation.uri.transfer_from(
1086 srcUri, transfer=transfer, transaction=self._transaction, overwrite=True
1087 )
1089 if tgtLocation is None:
1090 # This means we are using direct mode
1091 targetUri = srcUri
1092 targetPath = str(srcUri)
1093 else:
1094 targetUri = tgtLocation.uri
1095 targetPath = tgtLocation.pathInStore.path
1097 # the file should exist in the datastore now
1098 if record_validation_info:
1099 if not have_sized:
1100 size = targetUri.size()
1101 checksum = self.computeChecksum(targetUri) if self.useChecksum else None
1102 else:
1103 # Not recording any file information.
1104 size = -1
1105 checksum = None
1107 return StoredFileInfo(
1108 formatter=formatter,
1109 path=targetPath,
1110 storageClass=ref.datasetType.storageClass,
1111 component=ref.datasetType.component(),
1112 file_size=size,
1113 checksum=checksum,
1114 )
1116 def _prepIngest(self, *datasets: FileDataset, transfer: str | None = None) -> _IngestPrepData:
1117 # Docstring inherited from Datastore._prepIngest.
1118 filtered = []
1120 # Ingest could be given tens of thousands of files. It is not efficient
1121 # to check for the existence of every single file (especially if they
1122 # are remote URIs) but in some transfer modes the files will be checked
1123 # anyhow when they are relocated. For direct or None transfer modes
1124 # it is possible to not know if the file is accessible at all.
1125 # Therefore limit number of files that will be checked (but always
1126 # include the first one).
1127 max_checks = 200
1128 n_datasets = len(datasets)
1129 if n_datasets <= max_checks:
1130 check_every_n = 1
1131 elif transfer in ("direct", None):
1132 check_every_n = int(n_datasets / max_checks + 1) # +1 so that if n < max_checks the answer is 1.
1133 else:
1134 check_every_n = 0
1136 for count, dataset in enumerate(datasets):
1137 acceptable = [ref for ref in dataset.refs if self.constraints.isAcceptable(ref)]
1138 if not acceptable:
1139 continue
1140 else:
1141 dataset.refs = acceptable
1142 if dataset.formatter is None:
1143 dataset.formatter = self.formatterFactory.getFormatterClass(dataset.refs[0])
1144 else:
1145 assert isinstance(dataset.formatter, type | str)
1146 formatter_class = get_class_of(dataset.formatter)
1147 if not issubclass(formatter_class, Formatter | FormatterV2):
1148 raise TypeError(f"Requested formatter {dataset.formatter} is not a Formatter class.")
1149 dataset.formatter = formatter_class
1151 # Decide whether the file should be checked.
1152 check_existence = False
1153 if check_every_n != 0:
1154 # First time through count is 0 so we guarantee to check
1155 # the first file but not necessarily the final one.
1156 check_existence = count % check_every_n == 0
1158 if check_existence:
1159 log.debug(
1160 "Checking file existence: %s (%d/%d) [%s]",
1161 check_existence,
1162 count + 1,
1163 n_datasets,
1164 transfer,
1165 )
1167 dataset.path = self._standardizeIngestPath(
1168 dataset.path, transfer=transfer, check_existence=check_existence
1169 )
1170 filtered.append(dataset)
1171 return _IngestPrepData(filtered)
1173 @transactional
1174 def _finishIngest(
1175 self,
1176 prepData: Datastore.IngestPrepData,
1177 *,
1178 transfer: str | None = None,
1179 record_validation_info: bool = True,
1180 ) -> None:
1181 # Docstring inherited from Datastore._finishIngest.
1182 refsAndInfos = []
1183 progress = Progress("lsst.daf.butler.datastores.FileDatastore.ingest", level=logging.DEBUG)
1184 for dataset in progress.wrap(prepData.datasets, desc="Ingesting dataset files"):
1185 # Do ingest as if the first dataset ref is associated with the file
1186 info = self._extractIngestInfo(
1187 dataset.path,
1188 dataset.refs[0],
1189 formatter=dataset.formatter,
1190 transfer=transfer,
1191 record_validation_info=record_validation_info,
1192 )
1193 refsAndInfos.extend([(ref, info) for ref in dataset.refs])
1195 # In direct mode we can allow repeated ingests of the same thing
1196 # if we are sure that the external dataset is immutable. We use
1197 # UUIDv5 to indicate this. If there is a mix of v4 and v5 they are
1198 # separated.
1199 refs_and_infos_replace = []
1200 refs_and_infos_insert = []
1201 if transfer == "direct":
1202 for entry in refsAndInfos:
1203 if entry[0].id.version == 5:
1204 refs_and_infos_replace.append(entry)
1205 else:
1206 refs_and_infos_insert.append(entry)
1207 else:
1208 refs_and_infos_insert = refsAndInfos
1210 if refs_and_infos_insert:
1211 self._register_datasets(refs_and_infos_insert, insert_mode=DatabaseInsertMode.INSERT)
1212 if refs_and_infos_replace:
1213 self._register_datasets(refs_and_infos_replace, insert_mode=DatabaseInsertMode.REPLACE)
1215 def _calculate_ingested_datastore_name(
1216 self,
1217 srcUri: ResourcePath,
1218 ref: DatasetRef,
1219 formatter: Formatter | FormatterV2 | type[Formatter | FormatterV2] | None = None,
1220 ) -> Location:
1221 """Given a source URI and a DatasetRef, determine the name the
1222 dataset will have inside datastore.
1224 Parameters
1225 ----------
1226 srcUri : `lsst.resources.ResourcePath`
1227 URI to the source dataset file.
1228 ref : `DatasetRef`
1229 Ref associated with the newly-ingested dataset artifact. This
1230 is used to determine the name within the datastore.
1231 formatter : `Formatter` or Formatter class.
1232 Formatter to use for validation. Can be a class or an instance.
1233 No validation of the file extension is performed if the
1234 ``formatter`` is `None`. This can be used if the caller knows
1235 that the source URI and target URI will use the same formatter.
1237 Returns
1238 -------
1239 location : `Location`
1240 Target location for the newly-ingested dataset.
1241 """
1242 # Ingesting a file from outside the datastore.
1243 # This involves a new name.
1244 template = self.templates.getTemplate(ref)
1245 location = self.locationFactory.fromPath(template.format(ref), trusted_path=True)
1247 # Get the extension
1248 ext = srcUri.getExtension()
1250 # Update the destination to include that extension
1251 location.updateExtension(ext)
1253 # Ask the formatter to validate this extension
1254 if formatter is not None:
1255 formatter.validate_extension(location)
1257 return location
1259 def _write_in_memory_to_artifact(
1260 self, inMemoryDataset: Any, ref: DatasetRef, provenance: DatasetProvenance | None = None
1261 ) -> StoredFileInfo:
1262 """Write out in memory dataset to datastore.
1264 Parameters
1265 ----------
1266 inMemoryDataset : `object`
1267 Dataset to write to datastore.
1268 ref : `DatasetRef`
1269 Registry information associated with this dataset.
1270 provenance : `DatasetProvenance` or `None`, optional
1271 Any provenance that should be attached to the serialized dataset.
1272 Not supported by all formatters.
1274 Returns
1275 -------
1276 info : `StoredFileInfo`
1277 Information describing the artifact written to the datastore.
1278 """
1279 # May need to coerce the in memory dataset to the correct
1280 # python type, but first we need to make sure the storage class
1281 # reflects the one defined in the data repository.
1282 ref = self._cast_storage_class(ref)
1284 # Confirm that we can accept this dataset
1285 if not self.constraints.isAcceptable(ref):
1286 # Raise rather than use boolean return value.
1287 raise DatasetTypeNotSupportedError(
1288 f"Dataset {ref} has been rejected by this datastore via configuration."
1289 )
1291 location, formatter = self._determine_put_formatter_location(ref)
1293 # The external storage class can differ from the registry storage
1294 # class AND the given in-memory dataset might not match any of the
1295 # storage class definitions.
1296 if formatter.can_accept(inMemoryDataset):
1297 # Do not need to coerce. Must assume that the formatter can handle
1298 # it without further checking of types.
1299 pass
1300 else:
1301 # Coerce to a type that it can accept.
1302 inMemoryDataset = ref.datasetType.storageClass.coerce_type(inMemoryDataset)
1303 required_pytype = ref.datasetType.storageClass.pytype
1305 if not isinstance(inMemoryDataset, required_pytype):
1306 raise TypeError(
1307 f"Inconsistency between supplied object ({type(inMemoryDataset)}) "
1308 f"and storage class type ({required_pytype})"
1309 )
1311 if self._transaction is None:
1312 raise RuntimeError("Attempting to write artifact without transaction enabled")
1314 def _removeFileExists(uri: ResourcePath) -> None:
1315 """Remove a file and do not complain if it is not there.
1317 This is important since a formatter might fail before the file
1318 is written and we should not confuse people by writing spurious
1319 error messages to the log.
1320 """
1321 with contextlib.suppress(FileNotFoundError):
1322 uri.remove()
1324 # Register a callback to try to delete the uploaded data if
1325 # something fails below
1326 uri = location.uri
1327 self._transaction.registerUndo("artifactWrite", _removeFileExists, uri)
1329 # Need to record the specified formatter but if this is a V1 formatter
1330 # we need to convert it to a V2 compatible shim to do the write.
1331 if not isinstance(formatter, Formatter):
1332 formatter_compat = formatter
1333 else:
1334 formatter_compat = FormatterV1inV2(
1335 formatter.file_descriptor,
1336 ref=ref,
1337 formatter=formatter,
1338 write_parameters=formatter.write_parameters,
1339 write_recipes=formatter.write_recipes,
1340 )
1342 assert isinstance(formatter_compat, FormatterV2)
1344 with time_this(log, msg="Writing dataset %s with formatter %s", args=(ref, formatter.name())):
1345 try:
1346 formatter_compat.write(
1347 inMemoryDataset, cache_manager=self.cacheManager, provenance=provenance
1348 )
1349 except Exception as e:
1350 raise RuntimeError(
1351 f"Failed to serialize dataset {ref} of type {get_full_type_name(inMemoryDataset)} "
1352 f"using formatter {formatter.name()}."
1353 ) from e
1355 # URI is needed to resolve what ingest case are we dealing with
1356 return self._extractIngestInfo(uri, ref, formatter=formatter)
1358 def knows(self, ref: DatasetRef) -> bool:
1359 """Check if the dataset is known to the datastore.
1361 Does not check for existence of any artifact.
1363 Parameters
1364 ----------
1365 ref : `DatasetRef`
1366 Reference to the required dataset.
1368 Returns
1369 -------
1370 exists : `bool`
1371 `True` if the dataset is known to the datastore.
1372 """
1373 fileLocations = self._get_dataset_locations_info(ref)
1374 if fileLocations:
1375 return True
1376 return False
1378 def knows_these(self, refs: Iterable[DatasetRef]) -> dict[DatasetRef, bool]:
1379 # Docstring inherited from the base class.
1380 refs = list(refs)
1382 # The records themselves. Could be missing some entries.
1383 records = self._get_stored_records_associated_with_refs(refs, ignore_datastore_records=True)
1385 return {ref: ref.id in records for ref in refs}
1387 def _process_mexists_records(
1388 self,
1389 id_to_ref: dict[DatasetId, DatasetRef],
1390 records: dict[DatasetId, list[StoredFileInfo]],
1391 all_required: bool,
1392 artifact_existence: dict[ResourcePath, bool] | None = None,
1393 ) -> dict[DatasetRef, bool]:
1394 """Check given records for existence.
1396 Helper function for `mexists()`.
1398 Parameters
1399 ----------
1400 id_to_ref : `dict` of [`DatasetId`, `DatasetRef`]
1401 Mapping of the dataset ID to the dataset ref itself.
1402 records : `dict` of [`DatasetId`, `list` of `StoredFileInfo`]
1403 Records as generally returned by
1404 ``_get_stored_records_associated_with_refs``.
1405 all_required : `bool`
1406 Flag to indicate whether existence requires all artifacts
1407 associated with a dataset ID to exist or not for existence.
1408 artifact_existence : `dict` [`lsst.resources.ResourcePath`, `bool`]
1409 Optional mapping of datastore artifact to existence. Updated by
1410 this method with details of all artifacts tested. Can be `None`
1411 if the caller is not interested.
1413 Returns
1414 -------
1415 existence : `dict` of [`DatasetRef`, `bool`]
1416 Mapping from dataset to boolean indicating existence.
1417 """
1418 # The URIs to be checked and a mapping of those URIs to
1419 # the dataset ID.
1420 uris_to_check: list[ResourcePath] = []
1421 location_map: dict[ResourcePath, DatasetId] = {}
1423 location_factory = self.locationFactory
1425 uri_existence: dict[ResourcePath, bool] = {}
1426 for ref_id, infos in records.items():
1427 # Key is the dataset Id, value is list of StoredItemInfo
1428 uris = [info.file_location(location_factory).uri for info in infos]
1429 location_map.update({uri: ref_id for uri in uris})
1431 # Check the local cache directly for a dataset corresponding
1432 # to the remote URI.
1433 if self.cacheManager.file_count > 0:
1434 ref = id_to_ref[ref_id]
1435 for uri, storedFileInfo in zip(uris, infos, strict=True):
1436 check_ref = ref
1437 if not ref.datasetType.isComponent() and (component := storedFileInfo.component):
1438 check_ref = ref.makeComponentRef(component)
1439 if self.cacheManager.known_to_cache(check_ref, uri.getExtension()):
1440 # Proxy for URI existence.
1441 uri_existence[uri] = True
1442 else:
1443 uris_to_check.append(uri)
1444 else:
1445 # Check all of them.
1446 uris_to_check.extend(uris)
1448 if artifact_existence is not None:
1449 # If a URI has already been checked remove it from the list
1450 # and immediately add the status to the output dict.
1451 filtered_uris_to_check = []
1452 for uri in uris_to_check:
1453 if uri in artifact_existence:
1454 uri_existence[uri] = artifact_existence[uri]
1455 else:
1456 filtered_uris_to_check.append(uri)
1457 uris_to_check = filtered_uris_to_check
1459 # Results.
1460 dataset_existence: dict[DatasetRef, bool] = {}
1462 uri_existence.update(ResourcePath.mexists(uris_to_check))
1463 for uri, exists in uri_existence.items():
1464 dataset_id = location_map[uri]
1465 ref = id_to_ref[dataset_id]
1467 # Disassembled composite needs to check all locations.
1468 # all_required indicates whether all need to exist or not.
1469 if ref in dataset_existence:
1470 if all_required:
1471 exists = dataset_existence[ref] and exists
1472 else:
1473 exists = dataset_existence[ref] or exists
1474 dataset_existence[ref] = exists
1476 if artifact_existence is not None:
1477 artifact_existence.update(uri_existence)
1479 return dataset_existence
1481 def mexists(
1482 self, refs: Iterable[DatasetRef], artifact_existence: dict[ResourcePath, bool] | None = None
1483 ) -> dict[DatasetRef, bool]:
1484 """Check the existence of multiple datasets at once.
1486 Parameters
1487 ----------
1488 refs : `~collections.abc.Iterable` of `DatasetRef`
1489 The datasets to be checked.
1490 artifact_existence : `dict` [`lsst.resources.ResourcePath`, `bool`]
1491 Optional mapping of datastore artifact to existence. Updated by
1492 this method with details of all artifacts tested. Can be `None`
1493 if the caller is not interested.
1495 Returns
1496 -------
1497 existence : `dict` of [`DatasetRef`, `bool`]
1498 Mapping from dataset to boolean indicating existence.
1500 Notes
1501 -----
1502 To minimize potentially costly remote existence checks, the local
1503 cache is checked as a proxy for existence. If a file for this
1504 `DatasetRef` does exist no check is done for the actual URI. This
1505 could result in possibly unexpected behavior if the dataset itself
1506 has been removed from the datastore by another process whilst it is
1507 still in the cache.
1508 """
1509 chunk_size = 50_000
1510 dataset_existence: dict[DatasetRef, bool] = {}
1511 log.debug("Checking for the existence of multiple artifacts in datastore in chunks of %d", chunk_size)
1512 n_found_total = 0
1513 n_checked = 0
1514 n_chunks = 0
1515 for chunk in chunk_iterable(refs, chunk_size=chunk_size):
1516 chunk_result = self._mexists(chunk, artifact_existence)
1518 # The log message level and content depend on how many
1519 # datasets we are processing.
1520 n_results = len(chunk_result)
1522 # Use verbose logging to ensure that messages can be seen
1523 # easily if many refs are being checked.
1524 log_threshold = VERBOSE
1525 n_checked += n_results
1527 # This sum can take some time so only do it if we know the
1528 # result is going to be used.
1529 n_found = 0
1530 if log.isEnabledFor(log_threshold):
1531 # Can treat the booleans as 0, 1 integers and sum them.
1532 n_found = sum(chunk_result.values())
1533 n_found_total += n_found
1535 # We are deliberately not trying to count the number of refs
1536 # provided in case it's in the millions. This means there is a
1537 # situation where the number of refs exactly matches the chunk
1538 # size and we will switch to the multi-chunk path even though
1539 # we only have a single chunk.
1540 if n_results < chunk_size and n_chunks == 0:
1541 # Single chunk will be processed so we can provide more detail.
1542 if n_results == 1:
1543 ref = list(chunk_result)[0]
1544 # Use debug logging to be consistent with `exists()`.
1545 log.debug(
1546 "Calling mexists() with single ref that does%s exist (%s).",
1547 "" if chunk_result[ref] else " not",
1548 ref,
1549 )
1550 else:
1551 # Single chunk but multiple files. Summarize.
1552 log.log(
1553 log_threshold,
1554 "Number of datasets found in datastore %s: %d out of %d datasets checked.",
1555 self.name,
1556 n_found,
1557 n_checked,
1558 )
1560 else:
1561 # Use incremental verbose logging when we have multiple chunks.
1562 log.log(
1563 log_threshold,
1564 "Number of datasets found in datastore for chunk %d: %d out of %d checked "
1565 "(running total from all chunks so far: %d found out of %d checked)",
1566 n_chunks,
1567 n_found,
1568 n_results,
1569 n_found_total,
1570 n_checked,
1571 )
1572 dataset_existence.update(chunk_result)
1573 n_chunks += 1
1575 return dataset_existence
1577 def _mexists(
1578 self, refs: Sequence[DatasetRef], artifact_existence: dict[ResourcePath, bool] | None = None
1579 ) -> dict[DatasetRef, bool]:
1580 """Check the existence of multiple datasets at once.
1582 Parameters
1583 ----------
1584 refs : `~collections.abc.Iterable` of `DatasetRef`
1585 The datasets to be checked.
1586 artifact_existence : `dict` [`lsst.resources.ResourcePath`, `bool`]
1587 Optional mapping of datastore artifact to existence. Updated by
1588 this method with details of all artifacts tested. Can be `None`
1589 if the caller is not interested.
1591 Returns
1592 -------
1593 existence : `dict` of [`DatasetRef`, `bool`]
1594 Mapping from dataset to boolean indicating existence.
1595 """
1596 # Make a mapping from refs with the internal storage class to the given
1597 # refs that may have a different one. We'll use the internal refs
1598 # throughout this method and convert back at the very end.
1599 internal_ref_to_input_ref = {self._cast_storage_class(ref): ref for ref in refs}
1601 # Need a mapping of dataset_id to (internal) dataset ref since some
1602 # internal APIs work with dataset_id.
1603 id_to_ref = {ref.id: ref for ref in internal_ref_to_input_ref}
1605 # Set of all IDs we are checking for.
1606 requested_ids = set(id_to_ref.keys())
1608 # The records themselves. Could be missing some entries.
1609 records = self._get_stored_records_associated_with_refs(
1610 id_to_ref.values(), ignore_datastore_records=True
1611 )
1613 dataset_existence = self._process_mexists_records(
1614 id_to_ref, records, True, artifact_existence=artifact_existence
1615 )
1617 # Set of IDs that have been handled.
1618 handled_ids = {ref.id for ref in dataset_existence}
1620 missing_ids = requested_ids - handled_ids
1621 if missing_ids:
1622 dataset_existence.update(
1623 self._mexists_check_expected(
1624 [id_to_ref[missing] for missing in missing_ids], artifact_existence
1625 )
1626 )
1628 return {
1629 internal_ref_to_input_ref[internal_ref]: existence
1630 for internal_ref, existence in dataset_existence.items()
1631 }
1633 def _mexists_check_expected(
1634 self, refs: Sequence[DatasetRef], artifact_existence: dict[ResourcePath, bool] | None = None
1635 ) -> dict[DatasetRef, bool]:
1636 """Check existence of refs that are not known to datastore.
1638 Parameters
1639 ----------
1640 refs : `~collections.abc.Iterable` of `DatasetRef`
1641 The datasets to be checked. These are assumed not to be known
1642 to datastore.
1643 artifact_existence : `dict` [`lsst.resources.ResourcePath`, `bool`]
1644 Optional mapping of datastore artifact to existence. Updated by
1645 this method with details of all artifacts tested. Can be `None`
1646 if the caller is not interested.
1648 Returns
1649 -------
1650 existence : `dict` of [`DatasetRef`, `bool`]
1651 Mapping from dataset to boolean indicating existence.
1652 """
1653 dataset_existence: dict[DatasetRef, bool] = {}
1654 if not self.trustGetRequest:
1655 # Must assume these do not exist
1656 for ref in refs:
1657 dataset_existence[ref] = False
1658 else:
1659 log.debug(
1660 "%d datasets were not known to datastore during initial existence check.",
1661 len(refs),
1662 )
1664 # Construct data structure identical to that returned
1665 # by _get_stored_records_associated_with_refs() but using
1666 # guessed names.
1667 records = {}
1668 id_to_ref = {}
1669 for missing_ref in refs:
1670 expected = self._get_expected_dataset_locations_info(missing_ref)
1671 dataset_id = missing_ref.id
1672 records[dataset_id] = [info for _, info in expected]
1673 id_to_ref[dataset_id] = missing_ref
1675 dataset_existence.update(
1676 self._process_mexists_records(
1677 id_to_ref,
1678 records,
1679 False,
1680 artifact_existence=artifact_existence,
1681 )
1682 )
1684 return dataset_existence
1686 def exists(self, ref: DatasetRef) -> bool:
1687 """Check if the dataset exists in the datastore.
1689 Parameters
1690 ----------
1691 ref : `DatasetRef`
1692 Reference to the required dataset.
1694 Returns
1695 -------
1696 exists : `bool`
1697 `True` if the entity exists in the `Datastore`.
1699 Notes
1700 -----
1701 The local cache is checked as a proxy for existence in the remote
1702 object store. It is possible that another process on a different
1703 compute node could remove the file from the object store even
1704 though it is present in the local cache.
1705 """
1706 ref = self._cast_storage_class(ref)
1707 # We cannot trust datastore records from ref, as many unit tests delete
1708 # datasets and check their existence.
1709 fileLocations = self._get_dataset_locations_info(ref, ignore_datastore_records=True)
1711 # if we are being asked to trust that registry might not be correct
1712 # we ask for the expected locations and check them explicitly
1713 if not fileLocations:
1714 if not self.trustGetRequest:
1715 return False
1717 # First check the cache. If it is not found we must check
1718 # the datastore itself. Assume that any component in the cache
1719 # means that the dataset does exist somewhere.
1720 if self.cacheManager.known_to_cache(ref):
1721 return True
1723 # When we are guessing a dataset location we can not check
1724 # for the existence of every component since we can not
1725 # know if every component was written. Instead we check
1726 # for the existence of any of the expected locations.
1727 for location, _ in self._get_expected_dataset_locations_info(ref):
1728 if self._artifact_exists(location):
1729 return True
1730 return False
1732 # All listed artifacts must exist.
1733 for location, storedFileInfo in fileLocations:
1734 # Checking in cache needs the component ref.
1735 check_ref = ref
1736 if not ref.datasetType.isComponent() and (component := storedFileInfo.component):
1737 check_ref = ref.makeComponentRef(component)
1738 if self.cacheManager.known_to_cache(check_ref, location.getExtension()):
1739 continue
1741 if not self._artifact_exists(location):
1742 return False
1744 return True
1746 def getURIs(self, ref: DatasetRef, predict: bool = False) -> DatasetRefURIs:
1747 """Return URIs associated with dataset.
1749 Parameters
1750 ----------
1751 ref : `DatasetRef`
1752 Reference to the required dataset.
1753 predict : `bool`, optional
1754 If the datastore does not know about the dataset, controls whether
1755 it should return a predicted URI or not.
1757 Returns
1758 -------
1759 uris : `DatasetRefURIs`
1760 The URI to the primary artifact associated with this dataset (if
1761 the dataset was disassembled within the datastore this may be
1762 `None`), and the URIs to any components associated with the dataset
1763 artifact. (can be empty if there are no components).
1764 """
1765 many = self.getManyURIs([ref], predict=predict, allow_missing=False)
1766 return many[ref]
1768 def getURI(self, ref: DatasetRef, predict: bool = False) -> ResourcePath:
1769 """URI to the Dataset.
1771 Parameters
1772 ----------
1773 ref : `DatasetRef`
1774 Reference to the required Dataset.
1775 predict : `bool`
1776 If `True`, allow URIs to be returned of datasets that have not
1777 been written.
1779 Returns
1780 -------
1781 uri : `str`
1782 URI pointing to the dataset within the datastore. If the
1783 dataset does not exist in the datastore, and if ``predict`` is
1784 `True`, the URI will be a prediction and will include a URI
1785 fragment "#predicted".
1786 If the datastore does not have entities that relate well
1787 to the concept of a URI the returned URI will be
1788 descriptive. The returned URI is not guaranteed to be obtainable.
1790 Raises
1791 ------
1792 FileNotFoundError
1793 Raised if a URI has been requested for a dataset that does not
1794 exist and guessing is not allowed.
1795 RuntimeError
1796 Raised if a request is made for a single URI but multiple URIs
1797 are associated with this dataset.
1799 Notes
1800 -----
1801 When a predicted URI is requested an attempt will be made to form
1802 a reasonable URI based on file templates and the expected formatter.
1803 """
1804 primary, components = self.getURIs(ref, predict)
1805 if primary is None or components:
1806 raise RuntimeError(
1807 f"Dataset ({ref}) includes distinct URIs for components. Use Datastore.getURIs() instead."
1808 )
1809 return primary
1811 def _predict_URIs(
1812 self,
1813 ref: DatasetRef,
1814 ) -> DatasetRefURIs:
1815 """Predict the URIs of a dataset ref.
1817 Parameters
1818 ----------
1819 ref : `DatasetRef`
1820 Reference to the required Dataset.
1822 Returns
1823 -------
1824 URI : DatasetRefUris
1825 Primary and component URIs. URIs will contain a URI fragment
1826 "#predicted".
1827 """
1828 uris = DatasetRefURIs()
1830 if self.composites.shouldBeDisassembled(ref):
1831 for component, _ in ref.datasetType.storageClass.components.items():
1832 comp_ref = ref.makeComponentRef(component)
1833 comp_location, _ = self._determine_put_formatter_location(comp_ref)
1835 # Add the "#predicted" URI fragment to indicate this is a
1836 # guess
1837 uris.componentURIs[component] = ResourcePath(
1838 comp_location.uri.geturl() + "#predicted", forceDirectory=comp_location.uri.dirLike
1839 )
1841 else:
1842 location, _ = self._determine_put_formatter_location(ref)
1844 # Add the "#predicted" URI fragment to indicate this is a guess
1845 uris.primaryURI = ResourcePath(
1846 location.uri.geturl() + "#predicted", forceDirectory=location.uri.dirLike
1847 )
1849 return uris
1851 def getManyURIs(
1852 self,
1853 refs: Iterable[DatasetRef],
1854 predict: bool = False,
1855 allow_missing: bool = False,
1856 ) -> dict[DatasetRef, DatasetRefURIs]:
1857 # Docstring inherited
1859 uris: dict[DatasetRef, DatasetRefURIs] = {}
1861 records = self._get_stored_records_associated_with_refs(refs)
1862 records_keys = records.keys()
1864 existing_refs = tuple(ref for ref in refs if ref.id in records_keys)
1865 missing_refs = tuple(ref for ref in refs if ref.id not in records_keys)
1867 # Have to handle trustGetRequest mode by checking for the existence
1868 # of the missing refs on disk.
1869 if missing_refs and not predict:
1870 dataset_existence = self._mexists_check_expected(missing_refs, None)
1871 really_missing = set()
1872 not_missing = set()
1873 for ref, exists in dataset_existence.items():
1874 if exists:
1875 not_missing.add(ref)
1876 else:
1877 really_missing.add(ref)
1879 if not_missing:
1880 # Need to recalculate the missing/existing split.
1881 existing_refs = existing_refs + tuple(not_missing)
1882 missing_refs = tuple(really_missing)
1884 for ref in missing_refs:
1885 # if this has never been written then we have to guess
1886 if not predict:
1887 if not allow_missing:
1888 raise FileNotFoundError(f"Dataset {ref} not in this datastore.")
1889 else:
1890 uris[ref] = self._predict_URIs(ref)
1892 for ref in existing_refs:
1893 file_infos = records[ref.id]
1894 file_locations = [(i.file_location(self.locationFactory), i) for i in file_infos]
1895 uris[ref] = self._locations_to_URI(ref, file_locations)
1897 return uris
1899 def _locations_to_URI(
1900 self,
1901 ref: DatasetRef,
1902 file_locations: Sequence[tuple[Location, StoredFileInfo]],
1903 ) -> DatasetRefURIs:
1904 """Convert one or more file locations associated with a DatasetRef
1905 to a DatasetRefURIs.
1907 Parameters
1908 ----------
1909 ref : `DatasetRef`
1910 Reference to the dataset.
1911 file_locations : Sequence[Tuple[Location, StoredFileInfo]]
1912 Each item in the sequence is the location of the dataset within the
1913 datastore and stored information about the file and its formatter.
1914 If there is only one item in the sequence then it is treated as the
1915 primary URI. If there is more than one item then they are treated
1916 as component URIs. If there are no items then an error is raised
1917 unless ``self.trustGetRequest`` is `True`.
1919 Returns
1920 -------
1921 uris: DatasetRefURIs
1922 Represents the primary URI or component URIs described by the
1923 inputs.
1925 Raises
1926 ------
1927 RuntimeError
1928 If no file locations are passed in and ``self.trustGetRequest`` is
1929 `False`.
1930 FileNotFoundError
1931 If the a passed-in URI does not exist, and ``self.trustGetRequest``
1932 is `False`.
1933 RuntimeError
1934 If a passed in `StoredFileInfo`'s ``component`` is `None` (this is
1935 unexpected).
1936 """
1937 guessing = False
1938 uris = DatasetRefURIs()
1940 if not file_locations:
1941 if not self.trustGetRequest:
1942 raise RuntimeError(f"Unexpectedly got no artifacts for dataset {ref}")
1943 file_locations = self._get_expected_dataset_locations_info(ref)
1944 guessing = True
1946 if len(file_locations) == 1:
1947 # No disassembly so this is the primary URI
1948 uris.primaryURI = file_locations[0][0].uri
1949 if guessing and not uris.primaryURI.exists():
1950 raise FileNotFoundError(f"Expected URI ({uris.primaryURI}) does not exist")
1951 else:
1952 for location, file_info in file_locations:
1953 if file_info.component is None:
1954 raise RuntimeError(f"Unexpectedly got no component name for a component at {location}")
1955 if guessing and not location.uri.exists():
1956 # If we are trusting then it is entirely possible for
1957 # some components to be missing. In that case we skip
1958 # to the next component.
1959 if self.trustGetRequest:
1960 continue
1961 raise FileNotFoundError(f"Expected URI ({location.uri}) does not exist")
1962 uris.componentURIs[file_info.component] = location.uri
1964 return uris
1966 def _find_missing_records(
1967 self,
1968 refs: Iterable[DatasetRef],
1969 missing_ids: set[DatasetId],
1970 artifact_existence: dict[ResourcePath, bool] | None = None,
1971 warn_for_missing: bool = True,
1972 ) -> dict[DatasetId, list[StoredFileInfo]]:
1973 if not missing_ids:
1974 return {}
1976 if artifact_existence is None:
1977 artifact_existence = {}
1979 found_records: dict[DatasetId, list[StoredFileInfo]] = defaultdict(list)
1980 id_to_ref = {ref.id: ref for ref in refs if ref.id in missing_ids}
1982 # This should be chunked in case we end up having to check
1983 # the file store since we need some log output to show
1984 # progress.
1985 chunk_size = 50_000
1986 for missing_ids_chunk in chunk_iterable(missing_ids, chunk_size=chunk_size):
1987 records = {}
1988 for missing in missing_ids_chunk:
1989 # Ask the source datastore where the missing artifacts
1990 # should be. An execution butler might not know about the
1991 # artifacts even if they are there.
1992 expected = self._get_expected_dataset_locations_info(id_to_ref[missing])
1993 records[missing] = [info for _, info in expected]
1995 # Call the mexist helper method in case we have not already
1996 # checked these artifacts such that artifact_existence is
1997 # empty. This allows us to benefit from parallelism.
1998 # datastore.mexists() itself does not give us access to the
1999 # derived datastore record.
2000 log.verbose("Checking existence of %d datasets unknown to datastore", len(records))
2001 ref_exists = self._process_mexists_records(
2002 id_to_ref, records, False, artifact_existence=artifact_existence
2003 )
2005 # Now go through the records and propagate the ones that exist.
2006 location_factory = self.locationFactory
2007 for missing, record_list in records.items():
2008 # Skip completely if the ref does not exist.
2009 ref = id_to_ref[missing]
2010 if not ref_exists[ref]:
2011 if warn_for_missing:
2012 log.warning("Asked to transfer dataset %s but no file artifacts exist for it.", ref)
2013 continue
2014 # Check for file artifact to decide which parts of a
2015 # disassembled composite do exist. If there is only a
2016 # single record we don't even need to look because it can't
2017 # be a composite and must exist.
2018 if len(record_list) == 1:
2019 dataset_records = record_list
2020 else:
2021 dataset_records = [
2022 record
2023 for record in record_list
2024 if artifact_existence[record.file_location(location_factory).uri]
2025 ]
2026 assert len(dataset_records) > 0, "Disassembled composite should have had some files."
2028 # Rely on source_records being a defaultdict.
2029 found_records[missing].extend(dataset_records)
2030 log.verbose("Completed scan for missing data files")
2031 return found_records
2033 def retrieveArtifacts(
2034 self,
2035 refs: Iterable[DatasetRef],
2036 destination: ResourcePath,
2037 transfer: str = "auto",
2038 preserve_path: bool = True,
2039 overwrite: bool = False,
2040 write_index: bool = True,
2041 add_prefix: bool = False,
2042 ) -> dict[ResourcePath, ArtifactIndexInfo]:
2043 """Retrieve the file artifacts associated with the supplied refs.
2045 Parameters
2046 ----------
2047 refs : `~collections.abc.Iterable` of `DatasetRef`
2048 The datasets for which file artifacts are to be retrieved.
2049 A single ref can result in multiple files. The refs must
2050 be resolved.
2051 destination : `lsst.resources.ResourcePath`
2052 Location to write the file artifacts.
2053 transfer : `str`, optional
2054 Method to use to transfer the artifacts. Must be one of the options
2055 supported by `lsst.resources.ResourcePath.transfer_from`.
2056 "move" is not allowed.
2057 preserve_path : `bool`, optional
2058 If `True` the full path of the file artifact within the datastore
2059 is preserved. If `False` the final file component of the path
2060 is used.
2061 overwrite : `bool`, optional
2062 If `True` allow transfers to overwrite existing files at the
2063 destination.
2064 write_index : `bool`, optional
2065 If `True` write a file at the top level containing a serialization
2066 of a `ZipIndex` for the downloaded datasets.
2067 add_prefix : `bool`, optional
2068 If `True` and if ``preserve_path`` is `False`, apply a prefix to
2069 the filenames corresponding to some part of the dataset ref ID.
2070 This can be used to guarantee uniqueness.
2072 Returns
2073 -------
2074 artifact_map : `dict` [ `lsst.resources.ResourcePath`, \
2075 `ArtifactIndexInfo` ]
2076 Mapping of retrieved file to associated index information.
2077 """
2078 if not destination.isdir():
2079 raise ValueError(f"Destination location must refer to a directory. Given {destination}")
2081 if transfer == "move":
2082 raise ValueError("Can not move artifacts out of datastore. Use copy instead.")
2084 # Source -> Destination
2085 # This also helps filter out duplicate DatasetRef in the request
2086 # that will map to the same underlying file transfer.
2087 to_transfer: dict[ResourcePath, ResourcePath] = {}
2088 zips_to_transfer: set[ResourcePath] = set()
2090 # Retrieve all the records in bulk indexed by ref.id.
2091 records = self._get_stored_records_associated_with_refs(refs, ignore_datastore_records=True)
2093 # Check for missing records.
2094 known_ids = set(records)
2095 log.debug("Number of datastore records found in database: %d", len(known_ids))
2096 requested_ids = {ref.id for ref in refs}
2097 missing_ids = requested_ids - known_ids
2099 if missing_ids and not self.trustGetRequest:
2100 raise ValueError(f"Number of datasets missing from this datastore: {len(missing_ids)}")
2102 missing_records = self._find_missing_records(refs, missing_ids)
2103 records.update(missing_records)
2105 # One artifact can be used by multiple DatasetRef.
2106 # e.g. DECam.
2107 artifact_map: dict[ResourcePath, ArtifactIndexInfo] = {}
2108 # Sort to ensure that in many refs to one file situation the same
2109 # ref is used for any prefix that might be added.
2110 for ref in sorted(refs):
2111 prefix = str(ref.id)[:8] + "-" if add_prefix else ""
2112 for info in records[ref.id]:
2113 location = info.file_location(self.locationFactory)
2114 source_uri = location.uri
2115 # For DECam/zip we only want to copy once.
2116 # For zip files we need to unpack so that they can be
2117 # zipped up again if needed.
2118 is_zip = source_uri.getExtension() == ".zip" and "zip-path" in source_uri.fragment
2119 # We need to remove fragments for consistency.
2120 cleaned_source_uri = source_uri.replace(fragment="", query="", params="")
2121 if is_zip:
2122 # Assume the DatasetRef definitions are within the Zip
2123 # file itself and so can be dropped from loop.
2124 zips_to_transfer.add(cleaned_source_uri)
2125 elif cleaned_source_uri not in to_transfer:
2126 target_uri = determine_destination_for_retrieved_artifact(
2127 destination, location.pathInStore, preserve_path, prefix
2128 )
2129 to_transfer[cleaned_source_uri] = target_uri
2130 artifact_map[target_uri] = ArtifactIndexInfo.from_single(info.to_simple(), ref.id)
2131 else:
2132 target_uri = to_transfer[cleaned_source_uri]
2133 artifact_map[target_uri].append(ref.id)
2135 # Parallelize the transfer. Re-raise as a single exception if
2136 # a FileExistsError is encountered anywhere.
2137 log.debug("Number of artifacts to transfer to %s: %d", str(destination), len(to_transfer))
2138 try:
2139 ResourcePath.mtransfer(transfer, tuple(to_transfer.items()), overwrite=overwrite)
2140 except* FileExistsError as egroup:
2141 raise FileExistsError(
2142 "Some files already exist in destination directory and overwrite is False"
2143 ) from egroup
2145 # Transfer the Zip files and unpack them.
2146 zipped_artifacts = unpack_zips(zips_to_transfer, requested_ids, destination, preserve_path, overwrite)
2147 artifact_map.update(zipped_artifacts)
2149 if write_index:
2150 index = ZipIndex.from_artifact_map(refs, artifact_map, destination)
2151 index.write_index(destination)
2153 return artifact_map
2155 def ingest_zip(
2156 self,
2157 zip_path: ResourcePath,
2158 transfer: str | None,
2159 *,
2160 dry_run: bool = False,
2161 ) -> None:
2162 """Ingest an indexed Zip file and contents.
2164 The Zip file must have an index file as created by `retrieveArtifacts`.
2166 Parameters
2167 ----------
2168 zip_path : `lsst.resources.ResourcePath`
2169 Path to the Zip file.
2170 transfer : `str`
2171 Method to use for transferring the Zip file into the datastore.
2172 dry_run : `bool`, optional
2173 If `True` the ingest will be processed without any modifications
2174 made to the target datastore and as if the target datastore did not
2175 have any of the datasets.
2177 Notes
2178 -----
2179 Datastore constraints are bypassed with Zip ingest. A zip file can
2180 contain multiple dataset types. Should the entire Zip be rejected
2181 if one dataset type is in the constraints list?
2183 If any dataset is already present in the datastore the entire ingest
2184 will fail.
2185 """
2186 index = ZipIndex.from_zip_file(zip_path)
2188 # Refs indexed by UUID.
2189 refs = index.refs.to_refs(universe=self.universe)
2190 id_to_ref = {ref.id: ref for ref in refs}
2192 # Any failing constraints trigger entire failure.
2193 if any(not self.constraints.isAcceptable(ref) for ref in refs):
2194 raise DatasetTypeNotSupportedError(
2195 "Some refs in the Zip file are not supported by this datastore"
2196 )
2198 # Transfer the Zip file into the datastore file system.
2199 # There is no RUN as such to use for naming.
2200 # Potentially could use the RUN from the first ref in the index
2201 # There is no requirement that the contents of the Zip files share
2202 # the same RUN.
2203 # Could use the Zip UUID from the index + special "zips/" prefix.
2204 if transfer is None:
2205 # Indicated that the zip file is already in the right place.
2206 if not zip_path.isabs():
2207 tgtLocation = self.locationFactory.fromPath(zip_path.ospath, trusted_path=False)
2208 else:
2209 pathInStore = zip_path.relative_to(self.root)
2210 if pathInStore is None:
2211 raise RuntimeError(
2212 f"Unexpectedly learned that {zip_path} is not within datastore {self.root}"
2213 )
2214 tgtLocation = self.locationFactory.fromPath(pathInStore, trusted_path=True)
2215 elif transfer == "direct":
2216 # Reference in original location.
2217 tgtLocation = None
2218 else:
2219 # Name the zip file based on index contents.
2220 tgtLocation = self.locationFactory.fromPath(index.calculate_zip_file_path_in_store())
2222 # Transfer the Zip file into the datastore.
2223 if not dry_run:
2224 tgtLocation.uri.transfer_from(
2225 zip_path, transfer=transfer, transaction=self._transaction, overwrite=True
2226 )
2227 else:
2228 log.info("Would be copying Zip from %s to %s", zip_path, tgtLocation)
2230 if tgtLocation is None:
2231 path_in_store = str(zip_path)
2232 else:
2233 path_in_store = tgtLocation.pathInStore.path
2235 # Associate each file with a (DatasetRef, StoredFileInfo) tuple.
2236 artifacts: list[tuple[DatasetRef, StoredFileInfo]] = []
2237 for path_in_zip, index_info in index.artifact_map.items():
2238 # Need to modify the info to include the path to the Zip file
2239 # that was previously written to the datastore.
2240 index_info.info.path = f"{path_in_store}#zip-path={path_in_zip}"
2242 info = StoredFileInfo.from_simple(index_info.info)
2243 for id_ in index_info.ids:
2244 artifacts.append((id_to_ref[id_], info))
2246 if not dry_run:
2247 self._register_datasets(artifacts, insert_mode=DatabaseInsertMode.INSERT)
2248 else:
2249 log.info("Would be registering %d artifacts from Zip into datastore", len(artifacts))
2251 def get(
2252 self,
2253 ref: DatasetRef,
2254 parameters: Mapping[str, Any] | None = None,
2255 storageClass: StorageClass | str | None = None,
2256 ) -> Any:
2257 """Load an InMemoryDataset from the store.
2259 Parameters
2260 ----------
2261 ref : `DatasetRef`
2262 Reference to the required Dataset.
2263 parameters : `dict`
2264 `StorageClass`-specific parameters that specify, for example,
2265 a slice of the dataset to be loaded.
2266 storageClass : `StorageClass` or `str`, optional
2267 The storage class to be used to override the Python type
2268 returned by this method. By default the returned type matches
2269 the dataset type definition for this dataset. Specifying a
2270 read `StorageClass` can force a different type to be returned.
2271 This type must be compatible with the original type.
2273 Returns
2274 -------
2275 inMemoryDataset : `object`
2276 Requested dataset or slice thereof as an InMemoryDataset.
2278 Raises
2279 ------
2280 FileNotFoundError
2281 Requested dataset can not be retrieved.
2282 TypeError
2283 Return value from formatter has unexpected type.
2284 ValueError
2285 Formatter failed to process the dataset.
2286 """
2287 # Supplied storage class for the component being read is either
2288 # from the ref itself or some an override if we want to force
2289 # type conversion.
2290 if storageClass is not None:
2291 ref = ref.overrideStorageClass(storageClass)
2293 allGetInfo = self._prepare_for_direct_get(ref, parameters)
2294 return get_dataset_as_python_object_from_get_info(
2295 allGetInfo, ref=ref, parameters=parameters, cache_manager=self.cacheManager
2296 )
2298 def prepare_get_for_external_client(self, ref: DatasetRef) -> list[DatasetLocationInformation] | None:
2299 # Docstring inherited
2301 locations = self._get_dataset_locations_info(ref)
2302 if len(locations) == 0:
2303 return None
2305 return locations
2307 @transactional
2308 def put(self, inMemoryDataset: Any, ref: DatasetRef, provenance: DatasetProvenance | None = None) -> None:
2309 """Write a InMemoryDataset with a given `DatasetRef` to the store.
2311 Parameters
2312 ----------
2313 inMemoryDataset : `object`
2314 The dataset to store.
2315 ref : `DatasetRef`
2316 Reference to the associated Dataset.
2317 provenance : `DatasetProvenance` or `None`, optional
2318 Any provenance that should be attached to the serialized dataset.
2319 Can be ignored by a formatter or delegate.
2321 Raises
2322 ------
2323 TypeError
2324 Supplied object and storage class are inconsistent.
2325 DatasetTypeNotSupportedError
2326 The associated `DatasetType` is not handled by this datastore.
2328 Notes
2329 -----
2330 If the datastore is configured to reject certain dataset types it
2331 is possible that the put will fail and raise a
2332 `DatasetTypeNotSupportedError`. The main use case for this is to
2333 allow `ChainedDatastore` to put to multiple datastores without
2334 requiring that every datastore accepts the dataset.
2335 """
2336 doDisassembly = self.composites.shouldBeDisassembled(ref)
2337 # doDisassembly = True
2339 artifacts = []
2340 if doDisassembly:
2341 inMemoryDataset = ref.datasetType.storageClass.delegate().add_provenance(
2342 inMemoryDataset, ref, provenance=provenance
2343 )
2344 components = ref.datasetType.storageClass.delegate().disassemble(inMemoryDataset)
2345 if components is None:
2346 raise RuntimeError(
2347 f"Inconsistent configuration: dataset type {ref.datasetType.name} "
2348 f"with storage class {ref.datasetType.storageClass.name} "
2349 "is configured to be disassembled, but cannot be."
2350 )
2351 for component, componentInfo in components.items():
2352 # Don't recurse because we want to take advantage of
2353 # bulk insert -- need a new DatasetRef that refers to the
2354 # same dataset_id but has the component DatasetType
2355 # DatasetType does not refer to the types of components
2356 # So we construct one ourselves.
2357 compRef = ref.makeComponentRef(component)
2358 # Provenance has already been attached above.
2359 storedInfo = self._write_in_memory_to_artifact(componentInfo.component, compRef)
2360 artifacts.append((compRef, storedInfo))
2361 else:
2362 # Write the entire thing out
2363 storedInfo = self._write_in_memory_to_artifact(inMemoryDataset, ref, provenance=provenance)
2364 artifacts.append((ref, storedInfo))
2366 self._register_datasets(artifacts, insert_mode=DatabaseInsertMode.INSERT)
2368 @transactional
2369 def put_new(self, in_memory_dataset: Any, ref: DatasetRef) -> Mapping[str, DatasetRef]:
2370 doDisassembly = self.composites.shouldBeDisassembled(ref)
2371 # doDisassembly = True
2373 artifacts = []
2374 if doDisassembly:
2375 components = ref.datasetType.storageClass.delegate().disassemble(in_memory_dataset)
2376 if components is None:
2377 raise RuntimeError(
2378 f"Inconsistent configuration: dataset type {ref.datasetType.name} "
2379 f"with storage class {ref.datasetType.storageClass.name} "
2380 "is configured to be disassembled, but cannot be."
2381 )
2382 for component, componentInfo in components.items():
2383 # Don't recurse because we want to take advantage of
2384 # bulk insert -- need a new DatasetRef that refers to the
2385 # same dataset_id but has the component DatasetType
2386 # DatasetType does not refer to the types of components
2387 # So we construct one ourselves.
2388 compRef = ref.makeComponentRef(component)
2389 storedInfo = self._write_in_memory_to_artifact(componentInfo.component, compRef)
2390 artifacts.append((compRef, storedInfo))
2391 else:
2392 # Write the entire thing out
2393 storedInfo = self._write_in_memory_to_artifact(in_memory_dataset, ref)
2394 artifacts.append((ref, storedInfo))
2396 ref_records: DatasetDatastoreRecords = {self._opaque_table_name: [info for _, info in artifacts]}
2397 ref = ref.replace(datastore_records=ref_records)
2398 return {self.name: ref}
2400 @transactional
2401 def trash(self, ref: DatasetRef | Iterable[DatasetRef], ignore_errors: bool = True) -> None:
2402 # At this point can safely remove these datasets from the cache
2403 # to avoid confusion later on. If they are not trashed later
2404 # the cache will simply be refilled.
2405 self.cacheManager.remove_from_cache(ref)
2407 # If we are in trust mode there will be nothing to move to
2408 # the trash table and we will have to try to delete the file
2409 # immediately.
2410 if self.trustGetRequest:
2411 # Try to keep the logic below for a single file trash.
2412 if isinstance(ref, DatasetRef):
2413 refs = {ref}
2414 else:
2415 # Will recreate ref at the end of this branch.
2416 refs = set(ref)
2418 # Determine which datasets are known to datastore directly.
2419 id_to_ref = {ref.id: ref for ref in refs}
2420 existing_ids = self._get_stored_records_associated_with_refs(refs, ignore_datastore_records=True)
2421 existing_refs = {id_to_ref[ref_id] for ref_id in existing_ids}
2423 missing = refs - existing_refs
2424 if missing:
2425 # Do an explicit existence check on these refs.
2426 # We only care about the artifacts at this point and not
2427 # the dataset existence.
2428 artifact_existence: dict[ResourcePath, bool] = {}
2429 _ = self.mexists(missing, artifact_existence)
2430 uris = [uri for uri, exists in artifact_existence.items() if exists]
2432 # FUTURE UPGRADE: Implement a parallelized bulk remove.
2433 log.debug("Removing %d artifacts from datastore that are unknown to datastore", len(uris))
2434 for uri in uris:
2435 try:
2436 uri.remove()
2437 except Exception as e:
2438 if ignore_errors:
2439 log.debug("Artifact %s could not be removed: %s", uri, e)
2440 continue
2441 raise
2443 # There is no point asking the code below to remove refs we
2444 # know are missing so update it with the list of existing
2445 # records. Try to retain one vs many logic.
2446 if not existing_refs:
2447 # Nothing more to do since none of the datasets were
2448 # known to the datastore record table.
2449 return
2450 ref = list(existing_refs)
2451 if len(ref) == 1:
2452 ref = ref[0]
2454 # Get file metadata and internal metadata
2455 if not isinstance(ref, DatasetRef):
2456 log.debug("Doing multi-dataset trash in datastore %s", self.name)
2457 # Assumed to be an iterable of refs so bulk mode enabled.
2458 try:
2459 self.bridge.moveToTrash(ref, transaction=self._transaction)
2460 except Exception as e:
2461 if ignore_errors:
2462 log.warning("Unexpected issue moving multiple datasets to trash: %s", e)
2463 else:
2464 raise
2465 return
2467 log.debug("Trashing dataset %s in datastore %s", ref, self.name)
2469 fileLocations = self._get_dataset_locations_info(ref)
2471 if not fileLocations:
2472 err_msg = f"Requested dataset to trash ({ref}) is not known to datastore {self.name}"
2473 if ignore_errors:
2474 log.warning(err_msg)
2475 return
2476 else:
2477 raise FileNotFoundError(err_msg)
2479 for location, _ in fileLocations:
2480 if not self._artifact_exists(location):
2481 err_msg = (
2482 f"Dataset is known to datastore {self.name} but "
2483 f"associated artifact ({location.uri}) is missing"
2484 )
2485 if ignore_errors:
2486 log.warning(err_msg)
2487 return
2488 else:
2489 raise FileNotFoundError(err_msg)
2491 # Mark dataset as trashed
2492 try:
2493 self.bridge.moveToTrash([ref], transaction=self._transaction)
2494 except Exception as e:
2495 if ignore_errors:
2496 log.warning(
2497 "Attempted to mark dataset (%s) to be trashed in datastore %s "
2498 "but encountered an error: %s",
2499 ref,
2500 self.name,
2501 e,
2502 )
2503 pass
2504 else:
2505 raise
2507 def emptyTrash(
2508 self, ignore_errors: bool = True, refs: Collection[DatasetRef] | None = None, dry_run: bool = False
2509 ) -> set[ResourcePath]:
2510 """Remove all datasets from the trash.
2512 Parameters
2513 ----------
2514 ignore_errors : `bool`
2515 If `True` return without error even if something went wrong.
2516 Problems could occur if another process is simultaneously trying
2517 to delete.
2518 refs : `collections.abc.Collection` [ `DatasetRef` ] or `None`
2519 Explicit list of datasets that can be removed from trash. If listed
2520 datasets are not already stored in the trash table they will be
2521 ignored. If `None` every entry in the trash table will be
2522 processed.
2523 dry_run : `bool`, optional
2524 If `True`, the trash table will be queried and results reported
2525 but no artifacts will be removed.
2527 Returns
2528 -------
2529 removed : `set` [ `lsst.resources.ResourcePath` ]
2530 List of artifacts that were removed.
2532 Notes
2533 -----
2534 Will empty the records from the trash tables only if this call finishes
2535 without raising.
2536 """
2537 removed = set()
2538 if refs:
2539 selected_ids = {ref.id for ref in refs}
2540 chunk_size = 50_000
2541 n_chunks = math.ceil(len(selected_ids) / chunk_size)
2542 chunk_num = 0
2543 for chunk in chunk_iterable(selected_ids, chunk_size=chunk_size):
2544 chunk_num += 1
2545 if n_chunks == 1:
2546 log.verbose(
2547 "Emptying datastore trash for %d dataset%s",
2548 len(chunk),
2549 "s" if len(chunk) != 1 else "",
2550 )
2551 else:
2552 log.verbose(
2553 "Emptying datastore trash for chunk %d out of %d of size %d",
2554 chunk_num,
2555 n_chunks,
2556 len(chunk),
2557 )
2558 removed.update(
2559 self._empty_trash_subset(ignore_errors=ignore_errors, selected_ids=chunk, dry_run=dry_run)
2560 )
2561 else:
2562 log.verbose("Emptying all trash in datastore %s", self.name)
2563 removed = self._empty_trash_subset(ignore_errors=ignore_errors, dry_run=dry_run)
2564 log.info(
2565 "%sRemoved %d file artifact%s from datastore %s",
2566 "Would have " if dry_run else "",
2567 len(removed),
2568 "s" if len(removed) != 1 else "",
2569 self.name,
2570 )
2571 return removed
2573 @transactional
2574 def _empty_trash_subset(
2575 self,
2576 *,
2577 ignore_errors: bool = True,
2578 selected_ids: Collection[DatasetId] | None = None,
2579 dry_run: bool = False,
2580 ) -> set[ResourcePath]:
2581 """Empty trash table in transaction.
2583 Parameters
2584 ----------
2585 ignore_errors : `bool`
2586 If `True` return without error even if something went wrong.
2587 Problems could occur if another process is simultaneously trying
2588 to delete.
2589 selected_ids : `collections.abc.collection` [`DatasetId`] or `None`
2590 Explicit list of dataset IDs that can be removed from the trash.
2591 If listed datasets are not already included in the trash table
2592 they will be ignored. If `None` every entry in the trash table
2593 will be processed.
2594 dry_run : `bool`, optional
2595 If `True`, the trash table will be queried and results reported
2596 but no artifacts will be removed.
2598 Returns
2599 -------
2600 removed : `set` [ `lsst.resources.ResourcePath` ]
2601 Artifacts successfully removed.
2603 Notes
2604 -----
2605 Will empty the records from the trash tables only if this call finishes
2606 without raising.
2607 """
2608 # Context manager will empty trash iff we finish it without raising.
2609 # It will also automatically delete the relevant rows from the
2610 # trash table and the records table.
2611 with self.bridge.emptyTrash(
2612 self._table,
2613 record_class=StoredFileInfo,
2614 record_column="path",
2615 selected_ids=selected_ids,
2616 dry_run=dry_run,
2617 ) as trash_data:
2618 # Removing the artifacts themselves requires that the files are
2619 # not also associated with refs that are not to be trashed.
2620 # Therefore need to do a query with the file paths themselves
2621 # and return all the refs associated with them. Can only delete
2622 # a file if the refs to be trashed are the only refs associated
2623 # with the file.
2624 # This requires multiple copies of the trashed items
2625 trashed, artifacts_to_keep = trash_data
2627 # Assume that # in path means there are fragments involved. The
2628 # fragments can not be handled by the emptyTrash bridge call
2629 # so need to be processed independently.
2630 # The generator has to be converted to a list for multiple
2631 # iterations. Clean up the typing so that multiple isinstance
2632 # tests aren't needed later.
2633 trashed_list = [(ref, ninfo) for ref, ninfo in trashed if isinstance(ninfo, StoredFileInfo)]
2635 if artifacts_to_keep is None or any("#" in info[1].path for info in trashed_list):
2636 # The bridge is not helping us so have to work it out
2637 # ourselves. This is not going to be as efficient.
2638 # This mapping does not include the fragments.
2639 if artifacts_to_keep is not None:
2640 # This means we have already checked for non-fragment
2641 # examples so can filter.
2642 paths_to_check = {info.path for _, info in trashed_list if "#" in info.path}
2643 else:
2644 paths_to_check = {info.path for _, info in trashed_list}
2646 path_map = self._refs_associated_with_artifacts(paths_to_check)
2648 for ref, info in trashed_list:
2649 path = info.artifact_path
2650 # For disassembled composites in a Zip it is possible
2651 # for the same path to correspond to the same dataset ref
2652 # multiple times so trap for that.
2653 if ref.id in path_map[path]:
2654 path_map[path].remove(ref.id)
2655 if not path_map[path]:
2656 del path_map[path]
2658 slow_artifacts_to_keep = set(path_map)
2659 if artifacts_to_keep is not None:
2660 artifacts_to_keep.update(slow_artifacts_to_keep)
2661 else:
2662 artifacts_to_keep = slow_artifacts_to_keep
2664 n_direct = 0
2665 artifacts_to_delete: set[ResourcePath] = set()
2666 for ref, info in trashed_list:
2667 # Should not happen for this implementation but need
2668 # to keep mypy happy.
2669 assert info is not None, f"Internal logic error in emptyTrash with ref {ref}."
2671 if info.artifact_path in artifacts_to_keep:
2672 # This is a multi-dataset artifact and we are not
2673 # removing all associated refs.
2674 continue
2676 # Only trashed refs still known to datastore will be returned.
2677 location = info.file_location(self.locationFactory)
2679 if location.pathInStore.isabs():
2680 n_direct += 1
2681 continue
2683 # Strip fragment before storing since it is the artifact
2684 # we are deleting and we do not want repeats for every member
2685 # in a zip.
2686 artifacts_to_delete.add(location.uri.replace(fragment=""))
2688 if n_direct > 0:
2689 s = "s" if n_direct != 1 else ""
2690 log.verbose("Not deleting %d artifact%s using absolute URI%s", n_direct, s, s)
2692 if artifacts_to_keep:
2693 log.verbose(
2694 "%d artifact%s %s not deleted because of association with other datasets",
2695 len(artifacts_to_keep),
2696 "s" if len(artifacts_to_keep) != 1 else "",
2697 "were" if len(artifacts_to_keep) != 1 else "was",
2698 )
2700 if not artifacts_to_delete:
2701 return set()
2703 # Now do the deleting. Special case the log message for a single
2704 # artifact.
2705 if len(artifacts_to_delete) == 1:
2706 log.verbose(
2707 "%s removing file artifact %s from datastore %s",
2708 "Would be" if dry_run else "Now",
2709 list(artifacts_to_delete)[0],
2710 self.name,
2711 )
2712 else:
2713 log.verbose(
2714 "%s removing %d file artifacts from datastore %s",
2715 "Would be" if dry_run else "Now",
2716 len(artifacts_to_delete),
2717 self.name,
2718 )
2720 # For dry-run mode do not attempt to search the file store for
2721 # the artifacts to determine whether they exist or not. Simply
2722 # report that an attempt would be made to delete them. Never
2723 # report direct imports.
2724 if dry_run:
2725 return artifacts_to_delete
2727 # Now remove the actual file artifacts.
2728 remove_result = ResourcePath.mremove(artifacts_to_delete, do_raise=False)
2730 removed: set[ResourcePath] = set()
2731 exceptions: list[Exception] = []
2732 for uri, result in remove_result.items():
2733 if result.exception is None or isinstance(result.exception, FileNotFoundError):
2734 # File not existing is not an error since some other
2735 # process might have been trying to clean it and we do not
2736 # want to raise an error for a situation where the file
2737 # is not there and we do not want it to be there.
2738 removed.add(uri)
2739 else:
2740 exceptions.append(result.exception)
2742 if exceptions:
2743 s_err = "s" if len(exceptions) != 1 else ""
2744 e = ExceptionGroup(f"Error{s_err} removing {len(exceptions)} artifact{s_err}", exceptions)
2745 if ignore_errors:
2746 # Use a debug message here even though it's not
2747 # a good situation. In some cases this can be
2748 # caused by a race between user A and user B
2749 # and neither of them has permissions for the
2750 # other's files. Butler does not know about users
2751 # and trash has no idea what collections these
2752 # files were in (without guessing from a path).
2753 log.debug(
2754 "Encountered %d error%s removing %d artifact%s from datastore %s: %s",
2755 len(exceptions),
2756 s_err,
2757 len(artifacts_to_delete),
2758 "s" if len(artifacts_to_delete) != 1 else "",
2759 self.name,
2760 e,
2761 )
2762 else:
2763 raise e
2764 return removed
2766 @transactional
2767 def transfer_from(
2768 self,
2769 source_records: FileTransferMap,
2770 refs: Collection[DatasetRef],
2771 transfer: str = "auto",
2772 artifact_existence: dict[ResourcePath, bool] | None = None,
2773 dry_run: bool = False,
2774 ) -> tuple[set[DatasetRef], set[DatasetRef]]:
2775 log.verbose("Transferring %d datasets to %s", len(refs), self.name)
2777 # Stop early if "direct" transfer mode is requested. That would
2778 # require that the URI inside the source datastore should be stored
2779 # directly in the target datastore, which seems unlikely to be useful
2780 # since at any moment the source datastore could delete the file.
2781 if transfer in ("direct", "split"):
2782 raise ValueError(
2783 f"Can not transfer from a source datastore using {transfer} mode since"
2784 " those files are controlled by the other datastore."
2785 )
2787 if not refs:
2788 return set(), set()
2790 # Empty existence lookup if none given.
2791 if artifact_existence is None:
2792 artifact_existence = {}
2794 # In order to handle disassembled composites the code works
2795 # at the records level since it can assume that internal APIs
2796 # can be used.
2797 # - If the record already exists in the destination this is assumed
2798 # to be okay.
2799 # - If there is no record but the source and destination URIs are
2800 # identical no transfer is done but the record is added.
2801 # - If the source record refers to an absolute URI currently assume
2802 # that that URI should remain absolute and will be visible to the
2803 # destination butler. May need to have a flag to indicate whether
2804 # the dataset should be transferred. This will only happen if
2805 # the detached Butler has had a local ingest.
2807 # See if we already have these records
2808 log.verbose("Looking up existing datastore records in target %s for %d refs", self.name, len(refs))
2809 target_records = self._get_stored_records_associated_with_refs(refs, ignore_datastore_records=True)
2811 # The artifacts to register
2812 artifacts = []
2814 # Refs that already exist
2815 already_present = []
2817 # Refs that were rejected by this datastore.
2818 rejected = set()
2820 # Refs that were transferred successfully.
2821 accepted = set()
2823 # Record each time we have done a "direct" transfer.
2824 direct_transfers = []
2826 # Keep track of all the file transfers that are required.
2827 from_to: list[tuple[ResourcePath, ResourcePath]] = []
2829 # Now can transfer the artifacts
2830 log.verbose("Transferring artifacts")
2831 for ref in refs:
2832 if not self.constraints.isAcceptable(ref):
2833 # This datastore should not be accepting this dataset.
2834 rejected.add(ref)
2835 continue
2837 accepted.add(ref)
2839 if ref.id in target_records:
2840 # Already have an artifact for this.
2841 already_present.append(ref)
2842 continue
2844 # mypy needs to know these are always resolved refs
2845 for transfer_info in source_records.get(ref.id, []):
2846 info = transfer_info.file_info
2847 source_location = transfer_info.location
2848 target_location = info.file_location(self.locationFactory)
2849 if transfer == "unsafe_direct":
2850 # Use the existing file from the source location in place,
2851 # by recording the absolute URI in the target DB. This is
2852 # "unsafe" because the file could be deleted from the
2853 # source Butler at any time, leaving a dangling reference.
2854 source_location = source_location.toAbsolute()
2855 direct_transfers.append(source_location)
2856 info = info.update(path=str(source_location.uri))
2857 elif source_location == target_location and not source_location.pathInStore.isabs():
2858 # Artifact is already in the target location.
2859 # (which is how execution butler currently runs)
2860 pass
2861 else:
2862 if target_location.pathInStore.isabs():
2863 # Just because we can see the artifact when running
2864 # the transfer doesn't mean it will be generally
2865 # accessible to a user of this butler. Need to decide
2866 # what to do about an absolute path.
2867 if transfer == "auto":
2868 # For "auto" transfers we allow the absolute URI
2869 # to be recorded in the target datastore.
2870 direct_transfers.append(source_location)
2871 else:
2872 # The user is explicitly requesting a transfer
2873 # even for an absolute URI. This requires us to
2874 # calculate the target path.
2875 template_ref = ref
2876 if info.component:
2877 template_ref = ref.makeComponentRef(info.component)
2878 target_location = self._calculate_ingested_datastore_name(
2879 source_location.uri,
2880 template_ref,
2881 )
2883 info = info.update(path=target_location.pathInStore.path)
2885 # Need to transfer it to the new location.
2886 from_to.append((source_location.uri, target_location.uri))
2888 artifacts.append((ref, info))
2890 # Do the file transfers in bulk.
2891 # Assume we should always overwrite. If the artifact
2892 # is there this might indicate that a previous transfer
2893 # was interrupted but was not able to be rolled back
2894 # completely (eg pre-emption) so follow Datastore default
2895 # and overwrite. Do not copy if we are in dry-run mode.
2896 if dry_run:
2897 log.info("Would be copying %d file artifacts", len(from_to))
2898 else:
2899 log.verbose("Copying %d file artifacts", len(from_to))
2900 with time_this(log, msg="Transferring datasets into datastore", level=VERBOSE):
2901 ResourcePath.mtransfer(
2902 transfer,
2903 from_to,
2904 overwrite=True,
2905 transaction=self._transaction,
2906 )
2908 if direct_transfers:
2909 log.info(
2910 "Transfer request for an outside-datastore artifact with absolute URI done %d time%s",
2911 len(direct_transfers),
2912 "" if len(direct_transfers) == 1 else "s",
2913 )
2915 # We are overwriting previous datasets that may have already
2916 # existed. We therefore should ensure that we force the
2917 # datastore records to agree. Note that this can potentially lead
2918 # to difficulties if the dataset has previously been ingested
2919 # disassembled and is somehow now assembled, or vice versa.
2920 if not dry_run:
2921 log.verbose("Registering datastore records in database")
2922 self._register_datasets(artifacts, insert_mode=DatabaseInsertMode.REPLACE)
2924 if already_present:
2925 n_skipped = len(already_present)
2926 log.info(
2927 "Skipped transfer of %d dataset%s already present in datastore",
2928 n_skipped,
2929 "" if n_skipped == 1 else "s",
2930 )
2932 log.verbose(
2933 "Finished transfer_from to %s with %d accepted, %d rejected",
2934 self.name,
2935 len(accepted),
2936 len(rejected),
2937 )
2938 return accepted, rejected
2940 def get_file_info_for_transfer(self, dataset_ids: Iterable[DatasetId]) -> FileTransferMap:
2941 source_records = self._get_stored_records_associated_with_refs(
2942 [FakeDatasetRef(id) for id in dataset_ids], ignore_datastore_records=True
2943 )
2944 return self._convert_stored_file_info_to_file_transfer_record(source_records)
2946 def locate_missing_files_for_transfer(
2947 self, refs: Iterable[DatasetRef], artifact_existence: dict[ResourcePath, bool]
2948 ) -> FileTransferMap:
2949 missing_ids = {ref.id for ref in refs}
2950 # Missing IDs can be okay if that datastore has allowed
2951 # gets based on file existence. Should we transfer what we can
2952 # or complain about it and warn?
2953 if not self.trustGetRequest:
2954 return {}
2956 found_records = self._find_missing_records(
2957 refs, missing_ids, artifact_existence, warn_for_missing=False
2958 )
2959 return self._convert_stored_file_info_to_file_transfer_record(found_records)
2961 def _convert_stored_file_info_to_file_transfer_record(
2962 self, info_map: dict[DatasetId, list[StoredFileInfo]]
2963 ) -> FileTransferMap:
2964 output: dict[DatasetId, list[FileTransferRecord]] = {}
2965 for k, file_info_list in info_map.items():
2966 output[k] = [
2967 FileTransferRecord(file_info=info, location=info.file_location(self.locationFactory))
2968 for info in file_info_list
2969 ]
2970 return output
2972 @transactional
2973 def forget(self, refs: Iterable[DatasetRef]) -> None:
2974 # Docstring inherited.
2975 refs = list(refs)
2976 self.bridge.forget(refs)
2977 self._table.delete(["dataset_id"], *[{"dataset_id": ref.id} for ref in refs])
2979 def validateConfiguration(
2980 self, entities: Iterable[DatasetRef | DatasetType | StorageClass], logFailures: bool = False
2981 ) -> None:
2982 """Validate some of the configuration for this datastore.
2984 Parameters
2985 ----------
2986 entities : `~collections.abc.Iterable` [`DatasetRef` | `DatasetType` \
2987 | `StorageClass`]
2988 Entities to test against this configuration. Can be differing
2989 types.
2990 logFailures : `bool`, optional
2991 If `True`, output a log message for every validation error
2992 detected.
2994 Returns
2995 -------
2996 None
2998 Raises
2999 ------
3000 DatastoreValidationError
3001 Raised if there is a validation problem with a configuration.
3002 All the problems are reported in a single exception.
3004 Notes
3005 -----
3006 This method checks that all the supplied entities have valid file
3007 templates and also have formatters defined.
3008 """
3009 templateFailed = None
3010 try:
3011 self.templates.validateTemplates(entities, logFailures=logFailures)
3012 except FileTemplateValidationError as e:
3013 templateFailed = str(e)
3015 formatterFailed = []
3016 for entity in entities:
3017 try:
3018 self.formatterFactory.getFormatterClass(entity)
3019 except KeyError as e:
3020 formatterFailed.append(str(e))
3021 if logFailures:
3022 log.critical("Formatter failure: %s", e)
3024 if templateFailed or formatterFailed:
3025 messages = []
3026 if templateFailed:
3027 messages.append(templateFailed)
3028 if formatterFailed:
3029 messages.append(",".join(formatterFailed))
3030 msg = ";\n".join(messages)
3031 raise DatastoreValidationError(msg)
3033 def getLookupKeys(self) -> set[LookupKey]:
3034 # Docstring is inherited from base class
3035 return (
3036 self.templates.getLookupKeys()
3037 | self.formatterFactory.getLookupKeys()
3038 | self.constraints.getLookupKeys()
3039 )
3041 def validateKey(self, lookupKey: LookupKey, entity: DatasetRef | DatasetType | StorageClass) -> None:
3042 # Docstring is inherited from base class
3043 # The key can be valid in either formatters or templates so we can
3044 # only check the template if it exists
3045 if lookupKey in self.templates:
3046 try:
3047 self.templates[lookupKey].validateTemplate(entity)
3048 except FileTemplateValidationError as e:
3049 raise DatastoreValidationError(e) from e
3051 def export(
3052 self,
3053 refs: Iterable[DatasetRef],
3054 *,
3055 directory: ResourcePathExpression | None = None,
3056 transfer: str | None = "auto",
3057 ) -> Iterable[FileDataset]:
3058 # Docstring inherited from Datastore.export.
3059 if transfer == "auto" and directory is None:
3060 transfer = None
3062 if transfer is not None and transfer != "direct" and directory is None:
3063 raise TypeError(f"Cannot export using transfer mode {transfer} with no export directory given")
3065 if transfer == "move":
3066 raise TypeError("Can not export by moving files out of datastore.")
3068 # Force the directory to be a URI object
3069 directoryUri: ResourcePath | None = None
3070 if directory is not None:
3071 directoryUri = ResourcePath(directory, forceDirectory=True)
3073 if transfer is not None and directoryUri is not None and not directoryUri.exists():
3074 # mypy needs the second test
3075 raise FileNotFoundError(f"Export location {directory} does not exist")
3077 progress = Progress("lsst.daf.butler.datastores.FileDatastore.export", level=logging.DEBUG)
3078 for ref in progress.wrap(refs, "Exporting dataset files"):
3079 fileLocations = self._get_dataset_locations_info(ref)
3080 if not fileLocations:
3081 raise FileNotFoundError(f"Could not retrieve dataset {ref}.")
3082 # For now we can not export disassembled datasets
3083 if len(fileLocations) > 1:
3084 raise NotImplementedError(f"Can not export disassembled datasets such as {ref}")
3085 location, storedFileInfo = fileLocations[0]
3087 pathInStore = location.pathInStore.path
3088 if transfer is None:
3089 # TODO: do we also need to return the readStorageClass somehow?
3090 # We will use the path in store directly. If this is an
3091 # absolute URI, preserve it.
3092 if location.pathInStore.isabs():
3093 pathInStore = str(location.uri)
3094 elif transfer == "direct":
3095 # Use full URIs to the remote store in the export
3096 pathInStore = str(location.uri)
3097 else:
3098 # mypy needs help
3099 assert directoryUri is not None, "directoryUri must be defined to get here"
3100 storeUri = ResourcePath(location.uri, forceDirectory=False)
3102 # if the datastore has an absolute URI to a resource, we
3103 # have two options:
3104 # 1. Keep the absolute URI in the exported YAML
3105 # 2. Allocate a new name in the local datastore and transfer
3106 # it.
3107 # For now go with option 2
3108 if location.pathInStore.isabs():
3109 template = self.templates.getTemplate(ref)
3110 newURI = ResourcePath(template.format(ref), forceAbsolute=False, forceDirectory=False)
3111 pathInStore = str(newURI.updatedExtension(location.pathInStore.getExtension()))
3113 exportUri = directoryUri.join(pathInStore)
3114 exportUri.transfer_from(storeUri, transfer=transfer)
3116 yield FileDataset(refs=[ref], path=pathInStore, formatter=storedFileInfo.formatter)
3118 @staticmethod
3119 def computeChecksum(uri: ResourcePath, algorithm: str = "blake2b", block_size: int = 8192) -> str | None:
3120 """Compute the checksum of the supplied file.
3122 Parameters
3123 ----------
3124 uri : `lsst.resources.ResourcePath`
3125 Name of resource to calculate checksum from.
3126 algorithm : `str`, optional
3127 Name of algorithm to use. Must be one of the algorithms supported
3128 by :py:class`hashlib`.
3129 block_size : `int`
3130 Number of bytes to read from file at one time.
3132 Returns
3133 -------
3134 hexdigest : `str`
3135 Hex digest of the file.
3137 Notes
3138 -----
3139 Currently returns None if the URI is for a remote resource.
3140 """
3141 if algorithm not in hashlib.algorithms_guaranteed:
3142 raise NameError(f"The specified algorithm '{algorithm}' is not supported by hashlib")
3144 if not uri.isLocal:
3145 return None
3147 hasher = hashlib.new(algorithm)
3149 with uri.as_local() as local_uri, open(local_uri.ospath, "rb") as f:
3150 for chunk in iter(lambda: f.read(block_size), b""):
3151 hasher.update(chunk)
3153 return hasher.hexdigest()
3155 def needs_expanded_data_ids(
3156 self,
3157 transfer: str | None,
3158 entity: DatasetRef | DatasetType | StorageClass | None = None,
3159 ) -> bool:
3160 # Docstring inherited.
3161 # This _could_ also use entity to inspect whether the filename template
3162 # involves placeholders other than the required dimensions for its
3163 # dataset type, but that's not necessary for correctness; it just
3164 # enables more optimizations (perhaps only in theory).
3165 return transfer not in ("direct", None)
3167 def import_records(self, data: Mapping[str, DatastoreRecordData]) -> None:
3168 # Docstring inherited from the base class.
3169 record_data = data.get(self.name)
3170 if not record_data:
3171 return
3173 self._bridge.insert(FakeDatasetRef(dataset_id) for dataset_id in record_data.records)
3175 # TODO: Verify that there are no unexpected table names in the dict?
3176 unpacked_records = []
3177 for dataset_id, dataset_data in record_data.records.items():
3178 records = dataset_data.get(self._table.name)
3179 if records:
3180 for info in records:
3181 assert isinstance(info, StoredFileInfo), "Expecting StoredFileInfo records"
3182 unpacked_records.append(info.to_record(dataset_id=dataset_id))
3183 if unpacked_records:
3184 self._table.insert(*unpacked_records, transaction=self._transaction)
3186 def export_records(self, refs: Iterable[DatasetIdRef]) -> Mapping[str, DatastoreRecordData]:
3187 # Docstring inherited from the base class.
3189 # This call to 'bridge.check' filters out "partially deleted" datasets.
3190 # Specifically, ones in the unusual edge state that:
3191 # 1. They have an entry in the registry dataset tables
3192 # 2. They were "trashed" from the datastore, so they are not
3193 # present in the "dataset_location" table.)
3194 # 3. But the trash has not been "emptied", so there are still entries
3195 # in the "opaque" datastore records table.
3196 #
3197 # As far as I can tell, this can only occur in the case of a concurrent
3198 # or aborted call to `Butler.pruneDatasets(unstore=True, purge=False)`.
3199 # Datasets (with or without files existing on disk) can persist in
3200 # this zombie state indefinitely, until someone manually empties
3201 # the trash.
3202 exported_refs = list(self._bridge.check(refs))
3203 ids = {ref.id for ref in exported_refs}
3204 records: dict[DatasetId, dict[str, list[StoredDatastoreItemInfo]]] = {id: {} for id in ids}
3205 for row in self._table.fetch(dataset_id=ids):
3206 info: StoredDatastoreItemInfo = StoredFileInfo.from_record(row)
3207 dataset_records = records.setdefault(row["dataset_id"], {})
3208 dataset_records.setdefault(self._table.name, []).append(info)
3210 record_data = DatastoreRecordData(records=records)
3211 return {self.name: record_data}
3213 def export_predicted_records(self, refs: Iterable[DatasetRef]) -> dict[str, DatastoreRecordData]:
3214 # Docstring inherited from the base class.
3215 refs = [self._cast_storage_class(ref) for ref in refs]
3216 records: dict[DatasetId, dict[str, list[StoredDatastoreItemInfo]]] = {}
3217 for ref in refs:
3218 if not self.constraints.isAcceptable(ref):
3219 continue
3220 fileLocations = self._get_expected_dataset_locations_info(ref)
3221 if not fileLocations:
3222 continue
3223 dataset_records = records.setdefault(ref.id, {})
3224 dataset_records.setdefault(self._table.name, [])
3225 for _, storedFileInfo in fileLocations:
3226 dataset_records[self._table.name].append(storedFileInfo)
3228 record_data = DatastoreRecordData(records=records)
3229 return {self.name: record_data}
3231 def set_retrieve_dataset_type_method(self, method: Callable[[str], DatasetType | None] | None) -> None:
3232 # Docstring inherited from the base class.
3233 self._retrieve_dataset_method = method
3235 def _cast_storage_class(self, ref: DatasetRef) -> DatasetRef:
3236 """Update dataset reference to use the storage class from registry."""
3237 if self._retrieve_dataset_method is None:
3238 # We could raise an exception here but unit tests do not define
3239 # this method.
3240 return ref
3241 dataset_type = self._retrieve_dataset_method(ref.datasetType.name)
3242 if dataset_type is not None:
3243 ref = ref.overrideStorageClass(dataset_type.storageClass_name)
3244 return ref
3246 def get_opaque_table_definitions(self) -> Mapping[str, DatastoreOpaqueTable]:
3247 # Docstring inherited from the base class.
3248 return {self._opaque_table_name: DatastoreOpaqueTable(self.makeTableSpec(), StoredFileInfo)}