Coverage for python / lsst / daf / butler / _quantum_backed.py: 28%
188 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-06 08:30 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-06 08:30 +0000
1# This file is part of daf_butler.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28from __future__ import annotations
30__all__ = ("QuantumBackedButler", "QuantumProvenanceData")
32import itertools
33import logging
34import uuid
35from collections import defaultdict
36from collections.abc import Iterable, Mapping
37from typing import TYPE_CHECKING, Any
39import pydantic
41from lsst.resources import ResourcePath, ResourcePathExpression
43from ._butler_config import ButlerConfig
44from ._butler_metrics import ButlerMetrics
45from ._config import Config
46from ._dataset_provenance import DatasetProvenance
47from ._dataset_ref import DatasetId, DatasetRef
48from ._dataset_type import DatasetType
49from ._deferredDatasetHandle import DeferredDatasetHandle
50from ._limited_butler import LimitedButler
51from ._quantum import Quantum
52from ._standalone_datastore import instantiate_standalone_datastore
53from ._storage_class import StorageClass, StorageClassFactory
54from .datastore import Datastore
55from .datastore.record_data import DatastoreRecordData, SerializedDatastoreRecordData
56from .datastores.file_datastore.retrieve_artifacts import retrieve_and_zip
57from .dimensions import DimensionUniverse
58from .registry.interfaces import Database, DatastoreRegistryBridgeManager, OpaqueTableStorageManager
60if TYPE_CHECKING:
61 from ._butler import Butler
63_LOG = logging.getLogger(__name__)
66class QuantumBackedButler(LimitedButler):
67 """An implementation of `LimitedButler` intended to back execution of a
68 single `Quantum`.
70 Parameters
71 ----------
72 predicted_inputs : `~collections.abc.Iterable` [`DatasetId`]
73 Dataset IDs for datasets that can can be read from this butler.
74 predicted_outputs : `~collections.abc.Iterable` [`DatasetId`]
75 Dataset IDs for datasets that can be stored in this butler.
76 dimensions : `DimensionUniverse`
77 Object managing all dimension definitions.
78 datastore : `Datastore`
79 Datastore to use for all dataset I/O and existence checks.
80 storageClasses : `StorageClassFactory`
81 Object managing all storage class definitions.
82 dataset_types : `~collections.abc.Mapping` [`str`, `DatasetType`]
83 The registry dataset type definitions, indexed by name.
84 metrics : `lsst.daf.butler.ButlerMetrics` or `None`, optional
85 Metrics object for tracking butler statistics.
86 database : `Database`, optional
87 Database instance used by datastore. Not required -- only provided
88 to allow database connections to be closed during cleanup.
90 Notes
91 -----
92 Most callers should use the `initialize` `classmethod` to construct new
93 instances instead of calling the constructor directly.
95 `QuantumBackedButler` uses a SQLite database internally, in order to reuse
96 existing `DatastoreRegistryBridge` and `OpaqueTableStorage`
97 implementations that rely SQLAlchemy. If implementations are added in the
98 future that don't rely on SQLAlchemy, it should be possible to swap them
99 in by overriding the type arguments to `initialize` (though at present,
100 `QuantumBackedButler` would still create at least an in-memory SQLite
101 database that would then go unused).`
103 We imagine `QuantumBackedButler` being used during (at least) batch
104 execution to capture `Datastore` records and save them to per-quantum
105 files, which are also a convenient place to store provenance for eventual
106 upload to a SQL-backed `Registry` (once `Registry` has tables to store
107 provenance, that is).
108 These per-quantum files can be written in two ways:
110 - The SQLite file used internally by `QuantumBackedButler` can be used
111 directly but customizing the ``filename`` argument to ``initialize``, and
112 then transferring that file to the object store after execution completes
113 (or fails; a ``try/finally`` pattern probably makes sense here).
115 - A JSON or YAML file can be written by calling `extract_provenance_data`,
116 and using ``pydantic`` methods to write the returned
117 `QuantumProvenanceData` to a file.
119 Note that at present, the SQLite file only contains datastore records, not
120 provenance, but that should be easy to address (if desired) after we
121 actually design a `Registry` schema for provenance. I also suspect that
122 we'll want to explicitly close the SQLite file somehow before trying to
123 transfer it. But I'm guessing we'd prefer to write the per-quantum files
124 as JSON anyway.
125 """
127 def __init__(
128 self,
129 predicted_inputs: Iterable[DatasetId],
130 predicted_outputs: Iterable[DatasetId],
131 dimensions: DimensionUniverse,
132 datastore: Datastore,
133 storageClasses: StorageClassFactory,
134 dataset_types: Mapping[str, DatasetType] | None = None,
135 metrics: ButlerMetrics | None = None,
136 database: Database | None = None,
137 ):
138 self._dimensions = dimensions
139 self._predicted_inputs = set(predicted_inputs)
140 self._predicted_outputs = set(predicted_outputs)
141 self._available_inputs: set[DatasetId] = set()
142 self._unavailable_inputs: set[DatasetId] = set()
143 self._actual_inputs: set[DatasetId] = set()
144 self._actual_output_refs: set[DatasetRef] = set()
145 self._datastore = datastore
146 self.storageClasses = storageClasses
147 self._dataset_types: Mapping[str, DatasetType] = {}
148 self._metrics = metrics if metrics is not None else ButlerMetrics()
149 self._database = database
150 if dataset_types is not None:
151 self._dataset_types = dataset_types
152 self._datastore.set_retrieve_dataset_type_method(self._retrieve_dataset_type)
154 @classmethod
155 def initialize(
156 cls,
157 config: Config | ResourcePathExpression,
158 quantum: Quantum,
159 dimensions: DimensionUniverse,
160 filename: str | None = None,
161 OpaqueManagerClass: type[OpaqueTableStorageManager] | None = None,
162 BridgeManagerClass: type[DatastoreRegistryBridgeManager] | None = None,
163 search_paths: list[str] | None = None,
164 dataset_types: Mapping[str, DatasetType] | None = None,
165 metrics: ButlerMetrics | None = None,
166 ) -> QuantumBackedButler:
167 """Construct a new `QuantumBackedButler` from repository configuration
168 and helper types.
170 Parameters
171 ----------
172 config : `Config` or `~lsst.resources.ResourcePathExpression`
173 A butler repository root, configuration filename, or configuration
174 instance.
175 quantum : `Quantum`
176 Object describing the predicted input and output dataset relevant
177 to this butler. This must have resolved `DatasetRef` instances for
178 all inputs and outputs.
179 dimensions : `DimensionUniverse`
180 Object managing all dimension definitions.
181 filename : `str`, optional
182 Name for the SQLite database that will back this butler; defaults
183 to an in-memory database.
184 OpaqueManagerClass : `type`, optional
185 A subclass of `OpaqueTableStorageManager` to use for datastore
186 opaque records. Default is a SQL-backed implementation.
187 BridgeManagerClass : `type`, optional
188 A subclass of `DatastoreRegistryBridgeManager` to use for datastore
189 location records. Default is a SQL-backed implementation.
190 search_paths : `list` of `str`, optional
191 Additional search paths for butler configuration.
192 dataset_types : `~collections.abc.Mapping` [`str`, `DatasetType`], \
193 optional
194 Mapping of the dataset type name to its registry definition.
195 metrics : `lsst.daf.butler.ButlerMetrics` or `None`, optional
196 Metrics object for gathering butler statistics.
197 """
198 predicted_inputs = [ref.id for ref in itertools.chain.from_iterable(quantum.inputs.values())]
199 predicted_inputs += [ref.id for ref in quantum.initInputs.values()]
200 predicted_outputs = [ref.id for ref in itertools.chain.from_iterable(quantum.outputs.values())]
201 return cls._initialize(
202 config=config,
203 predicted_inputs=predicted_inputs,
204 predicted_outputs=predicted_outputs,
205 dimensions=dimensions,
206 filename=filename,
207 datastore_records=quantum.datastore_records,
208 OpaqueManagerClass=OpaqueManagerClass,
209 BridgeManagerClass=BridgeManagerClass,
210 search_paths=search_paths,
211 dataset_types=dataset_types,
212 metrics=metrics,
213 )
215 @classmethod
216 def from_predicted(
217 cls,
218 config: Config | ResourcePathExpression,
219 predicted_inputs: Iterable[DatasetId],
220 predicted_outputs: Iterable[DatasetId],
221 dimensions: DimensionUniverse,
222 datastore_records: Mapping[str, DatastoreRecordData],
223 filename: str | None = None,
224 OpaqueManagerClass: type[OpaqueTableStorageManager] | None = None,
225 BridgeManagerClass: type[DatastoreRegistryBridgeManager] | None = None,
226 search_paths: list[str] | None = None,
227 dataset_types: Mapping[str, DatasetType] | None = None,
228 metrics: ButlerMetrics | None = None,
229 ) -> QuantumBackedButler:
230 """Construct a new `QuantumBackedButler` from sets of input and output
231 dataset IDs.
233 Parameters
234 ----------
235 config : `Config` or `~lsst.resources.ResourcePathExpression`
236 A butler repository root, configuration filename, or configuration
237 instance.
238 predicted_inputs : `~collections.abc.Iterable` [`DatasetId`]
239 Dataset IDs for datasets that can can be read from this butler.
240 predicted_outputs : `~collections.abc.Iterable` [`DatasetId`]
241 Dataset IDs for datasets that can be stored in this butler, must be
242 fully resolved.
243 dimensions : `DimensionUniverse`
244 Object managing all dimension definitions.
245 datastore_records : `dict` [`str`, `DatastoreRecordData`] or `None`
246 Datastore records to import into a datastore.
247 filename : `str`, optional
248 Name for the SQLite database that will back this butler; defaults
249 to an in-memory database.
250 OpaqueManagerClass : `type`, optional
251 A subclass of `OpaqueTableStorageManager` to use for datastore
252 opaque records. Default is a SQL-backed implementation.
253 BridgeManagerClass : `type`, optional
254 A subclass of `DatastoreRegistryBridgeManager` to use for datastore
255 location records. Default is a SQL-backed implementation.
256 search_paths : `list` of `str`, optional
257 Additional search paths for butler configuration.
258 dataset_types : `~collections.abc.Mapping` [`str`, `DatasetType`], \
259 optional
260 Mapping of the dataset type name to its registry definition.
261 metrics : `lsst.daf.butler.ButlerMetrics` or `None`, optional
262 Metrics object for gathering butler statistics.
263 """
264 return cls._initialize(
265 config=config,
266 predicted_inputs=predicted_inputs,
267 predicted_outputs=predicted_outputs,
268 dimensions=dimensions,
269 filename=filename,
270 datastore_records=datastore_records,
271 OpaqueManagerClass=OpaqueManagerClass,
272 BridgeManagerClass=BridgeManagerClass,
273 search_paths=search_paths,
274 dataset_types=dataset_types,
275 metrics=metrics,
276 )
278 @classmethod
279 def _initialize(
280 cls,
281 *,
282 config: Config | ResourcePathExpression,
283 predicted_inputs: Iterable[DatasetId],
284 predicted_outputs: Iterable[DatasetId],
285 dimensions: DimensionUniverse,
286 filename: str | None = None,
287 datastore_records: Mapping[str, DatastoreRecordData] | None = None,
288 OpaqueManagerClass: type[OpaqueTableStorageManager] | None = None,
289 BridgeManagerClass: type[DatastoreRegistryBridgeManager] | None = None,
290 search_paths: list[str] | None = None,
291 dataset_types: Mapping[str, DatasetType] | None = None,
292 metrics: ButlerMetrics | None = None,
293 ) -> QuantumBackedButler:
294 """Initialize quantum-backed butler.
296 Internal method with common implementation used by `initialize` and
297 `for_output`.
299 Parameters
300 ----------
301 config : `Config` or `~lsst.resources.ResourcePathExpression`
302 A butler repository root, configuration filename, or configuration
303 instance.
304 predicted_inputs : `~collections.abc.Iterable` [`DatasetId`]
305 Dataset IDs for datasets that can can be read from this butler.
306 predicted_outputs : `~collections.abc.Iterable` [`DatasetId`]
307 Dataset IDs for datasets that can be stored in this butler.
308 dimensions : `DimensionUniverse`
309 Object managing all dimension definitions.
310 filename : `str`, optional
311 Name for the SQLite database that will back this butler; defaults
312 to an in-memory database.
313 datastore_records : `dict` [`str`, `DatastoreRecordData`] or `None`
314 Datastore records to import into a datastore.
315 OpaqueManagerClass : `type`, optional
316 A subclass of `OpaqueTableStorageManager` to use for datastore
317 opaque records. Default is a SQL-backed implementation.
318 BridgeManagerClass : `type`, optional
319 A subclass of `DatastoreRegistryBridgeManager` to use for datastore
320 location records. Default is a SQL-backed implementation.
321 search_paths : `list` of `str`, optional
322 Additional search paths for butler configuration.
323 dataset_types : `~collections.abc.Mapping` [`str`, `DatasetType`]
324 Mapping of the dataset type name to its registry definition.
325 metrics : `lsst.daf.butler.ButlerMetrics` or `None`, optional
326 Metrics object for gathering butler statistics.
327 """
328 butler_config = ButlerConfig(config, searchPaths=search_paths)
329 datastore, database = instantiate_standalone_datastore(
330 butler_config, dimensions, filename, OpaqueManagerClass, BridgeManagerClass
331 )
333 # TODO: We need to inform `Datastore` here that it needs to support
334 # predictive reads; This only really works for file datastore but
335 # we need to try everything in case there is a chained datastore.
336 datastore._set_trust_mode(True)
338 if datastore_records is not None:
339 datastore.import_records(datastore_records)
340 storageClasses = StorageClassFactory()
341 storageClasses.addFromConfig(butler_config)
342 return cls(
343 predicted_inputs,
344 predicted_outputs,
345 dimensions,
346 datastore,
347 storageClasses=storageClasses,
348 dataset_types=dataset_types,
349 metrics=metrics,
350 database=database,
351 )
353 def close(self) -> None:
354 if self._database is not None:
355 self._database.dispose()
357 def _retrieve_dataset_type(self, name: str) -> DatasetType | None:
358 """Return DatasetType defined in registry given dataset type name."""
359 return self._dataset_types.get(name)
361 def isWriteable(self) -> bool:
362 # Docstring inherited.
363 return True
365 def get(
366 self,
367 ref: DatasetRef,
368 /,
369 *,
370 parameters: dict[str, Any] | None = None,
371 storageClass: StorageClass | str | None = None,
372 ) -> Any:
373 try:
374 obj = super().get(
375 ref,
376 parameters=parameters,
377 storageClass=storageClass,
378 )
379 except (LookupError, FileNotFoundError, OSError):
380 self._unavailable_inputs.add(ref.id)
381 raise
382 if ref.id in self._predicted_inputs:
383 # do this after delegating to super in case that raises.
384 self._actual_inputs.add(ref.id)
385 self._available_inputs.add(ref.id)
386 return obj
388 def getDeferred(
389 self,
390 ref: DatasetRef,
391 /,
392 *,
393 parameters: dict[str, Any] | None = None,
394 storageClass: str | StorageClass | None = None,
395 ) -> DeferredDatasetHandle:
396 if ref.id in self._predicted_inputs:
397 # Unfortunately, we can't do this after the handle succeeds in
398 # loading, so it's conceivable here that we're marking an input
399 # as "actual" even when it's not even available.
400 self._actual_inputs.add(ref.id)
401 return super().getDeferred(ref, parameters=parameters, storageClass=storageClass)
403 def stored(self, ref: DatasetRef) -> bool:
404 # Docstring inherited.
405 stored = super().stored(ref)
406 if ref.id in self._predicted_inputs:
407 if stored:
408 self._available_inputs.add(ref.id)
409 else:
410 self._unavailable_inputs.add(ref.id)
411 return stored
413 def stored_many(
414 self,
415 refs: Iterable[DatasetRef],
416 ) -> dict[DatasetRef, bool]:
417 # Docstring inherited.
418 existence = super().stored_many(refs)
420 for ref, stored in existence.items():
421 if ref.id in self._predicted_inputs:
422 if stored:
423 self._available_inputs.add(ref.id)
424 else:
425 self._unavailable_inputs.add(ref.id)
426 return existence
428 def markInputUnused(self, ref: DatasetRef) -> None:
429 # Docstring inherited.
430 self._actual_inputs.discard(ref.id)
432 @property
433 def dimensions(self) -> DimensionUniverse:
434 # Docstring inherited.
435 return self._dimensions
437 def put(self, obj: Any, ref: DatasetRef, /, *, provenance: DatasetProvenance | None = None) -> DatasetRef:
438 # Docstring inherited.
439 if ref.id not in self._predicted_outputs:
440 raise RuntimeError("Cannot `put` dataset that was not predicted as an output.")
441 with self._metrics.instrument_put(log=_LOG, msg="Put QBB dataset"):
442 self._datastore.put(obj, ref, provenance=provenance)
443 self._actual_output_refs.add(ref)
444 return ref
446 def pruneDatasets(
447 self,
448 refs: Iterable[DatasetRef],
449 *,
450 disassociate: bool = True,
451 unstore: bool = False,
452 tags: Iterable[str] = (),
453 purge: bool = False,
454 ) -> None:
455 # docstring inherited from LimitedButler
457 if purge:
458 if not disassociate:
459 raise TypeError("Cannot pass purge=True without disassociate=True.")
460 if not unstore:
461 raise TypeError("Cannot pass purge=True without unstore=True.")
462 elif disassociate:
463 # No tagged collections for this butler.
464 raise TypeError("Cannot pass disassociate=True without purge=True.")
466 refs = list(refs)
468 # Pruning a component of a DatasetRef makes no sense.
469 for ref in refs:
470 if ref.datasetType.component():
471 raise ValueError(f"Can not prune a component of a dataset (ref={ref})")
473 if unstore:
474 self._datastore.trash(refs)
475 if purge:
476 for ref in refs:
477 # We only care about removing them from actual output refs,
478 self._actual_output_refs.discard(ref)
480 if unstore:
481 # Point of no return for removing artifacts. Only try to remove
482 # refs associated with this pruning.
483 self._datastore.emptyTrash(refs=refs)
485 def retrieve_artifacts_zip(
486 self,
487 refs: Iterable[DatasetRef],
488 destination: ResourcePathExpression,
489 overwrite: bool = True,
490 ) -> ResourcePath:
491 """Retrieve artifacts from the graph and place in ZIP file.
493 Parameters
494 ----------
495 refs : `~collections.abc.Iterable` [ `DatasetRef` ]
496 The datasets to be included in the zip file.
497 destination : `lsst.resources.ResourcePathExpression`
498 Directory to write the new ZIP file. This directory will
499 also be used as a staging area for the datasets being downloaded
500 from the datastore.
501 overwrite : `bool`, optional
502 If `False` the output Zip will not be written if a file of the
503 same name is already present in ``destination``.
505 Returns
506 -------
507 zip_file : `lsst.resources.ResourcePath`
508 The path to the new ZIP file.
510 Raises
511 ------
512 ValueError
513 Raised if there are no refs to retrieve.
514 """
515 return retrieve_and_zip(refs, destination, self._datastore.retrieveArtifacts, overwrite)
517 def retrieve_artifacts(
518 self,
519 refs: Iterable[DatasetRef],
520 destination: ResourcePathExpression,
521 transfer: str = "auto",
522 preserve_path: bool = True,
523 overwrite: bool = False,
524 ) -> list[ResourcePath]:
525 """Retrieve the artifacts associated with the supplied refs.
527 Parameters
528 ----------
529 refs : `~collections.abc.Iterable` of `DatasetRef`
530 The datasets for which artifacts are to be retrieved.
531 A single ref can result in multiple artifacts. The refs must
532 be resolved.
533 destination : `lsst.resources.ResourcePath` or `str`
534 Location to write the artifacts.
535 transfer : `str`, optional
536 Method to use to transfer the artifacts. Must be one of the options
537 supported by `~lsst.resources.ResourcePath.transfer_from`.
538 "move" is not allowed.
539 preserve_path : `bool`, optional
540 If `True` the full path of the artifact within the datastore
541 is preserved. If `False` the final file component of the path
542 is used.
543 overwrite : `bool`, optional
544 If `True` allow transfers to overwrite existing files at the
545 destination.
547 Returns
548 -------
549 targets : `list` of `lsst.resources.ResourcePath`
550 URIs of file artifacts in destination location. Order is not
551 preserved.
552 """
553 outdir = ResourcePath(destination)
554 artifact_map = self._datastore.retrieveArtifacts(
555 refs,
556 outdir,
557 transfer=transfer,
558 preserve_path=preserve_path,
559 overwrite=overwrite,
560 write_index=True,
561 )
562 return list(artifact_map)
564 def extract_provenance_data(self) -> QuantumProvenanceData:
565 """Extract provenance information and datastore records from this
566 butler.
568 Returns
569 -------
570 provenance : `QuantumProvenanceData`
571 A serializable struct containing input/output dataset IDs and
572 datastore records. This assumes all dataset IDs are UUIDs (just to
573 make it easier for `pydantic` to reason about the struct's types);
574 the rest of this class makes no such assumption, but the approach
575 to processing in which it's useful effectively requires UUIDs
576 anyway.
578 Notes
579 -----
580 `QuantumBackedButler` records this provenance information when its
581 methods are used, which mostly saves `~lsst.pipe.base.PipelineTask`
582 authors from having to worry about while still recording very
583 detailed information. But it has two small weaknesses:
585 - Calling `getDeferred` or `get` is enough to mark a
586 dataset as an "actual input", which may mark some datasets that
587 aren't actually used. We rely on task authors to use
588 `markInputUnused` to address this.
590 - We assume that the execution system will call ``stored``
591 on all predicted inputs prior to execution, in order to populate the
592 "available inputs" set. This is what I envision
593 '`~lsst.ctrl.mpexec.SingleQuantumExecutor` doing after we update it
594 to use this class, but it feels fragile for this class to make such
595 a strong assumption about how it will be used, even if I can't think
596 of any other executor behavior that would make sense.
597 """
598 if not self._actual_inputs.isdisjoint(self._unavailable_inputs):
599 _LOG.warning(
600 "Inputs %s were marked as actually used (probably because a DeferredDatasetHandle) "
601 "was obtained, but did not actually exist. This task should be be using markInputUnused "
602 "directly to clarify its provenance.",
603 self._actual_inputs & self._unavailable_inputs,
604 )
605 self._actual_inputs -= self._unavailable_inputs
606 checked_inputs = self._available_inputs | self._unavailable_inputs
607 if self._predicted_inputs != checked_inputs:
608 _LOG.warning(
609 "Execution harness did not check predicted inputs %s for existence; available inputs "
610 "recorded in provenance may be incomplete.",
611 self._predicted_inputs - checked_inputs,
612 )
613 datastore_records = self._datastore.export_records(self._actual_output_refs)
614 provenance_records = {
615 datastore_name: records.to_simple() for datastore_name, records in datastore_records.items()
616 }
618 return QuantumProvenanceData(
619 predicted_inputs=self._predicted_inputs,
620 available_inputs=self._available_inputs,
621 actual_inputs=self._actual_inputs,
622 predicted_outputs=self._predicted_outputs,
623 actual_outputs={ref.id for ref in self._actual_output_refs},
624 datastore_records=provenance_records,
625 )
627 def export_predicted_datastore_records(
628 self, refs: Iterable[DatasetRef]
629 ) -> dict[str, DatastoreRecordData]:
630 """Export datastore records for a set of predicted output dataset
631 references.
633 Parameters
634 ----------
635 refs : `~collections.abc.Iterable` [ `DatasetRef` ]
636 Dataset references for which to export datastore records. These
637 refs must be known to this butler.
639 Returns
640 -------
641 records : `dict` [ `str`, `DatastoreRecordData` ]
642 Predicted datastore records indexed by datastore name. No attempt
643 is made to ensure that the associated datasets exist on disk.
644 """
645 unknowns = [
646 ref
647 for ref in refs
648 if (ref.id not in self._predicted_outputs and ref.id not in self._predicted_inputs)
649 ]
650 if unknowns:
651 raise ValueError(f"Cannot export datastore records for unknown outputs: {unknowns}")
653 return self._datastore.export_predicted_records(refs)
656class QuantumProvenanceData(pydantic.BaseModel):
657 """A serializable struct for per-quantum provenance information and
658 datastore records.
660 Notes
661 -----
662 This class slightly duplicates information from the `Quantum` class itself
663 (the ``predicted_inputs`` and ``predicted_outputs`` sets should have the
664 same IDs present in `Quantum.inputs` and `Quantum.outputs`), but overall it
665 assumes the original `Quantum` is also available to reconstruct the
666 complete provenance (e.g. by associating dataset IDs with data IDs,
667 dataset types, and `~CollectionType.RUN` names.
669 Note that ``pydantic`` method ``parse_raw()`` is not going to work
670 correctly for this class, use `direct` method instead.
671 """
673 # This class probably should have information about its execution
674 # environment (anything not controlled and recorded at the
675 # `~CollectionType.RUN` level, such as the compute node ID). but adding it
676 # now is out of scope for this prototype.
678 predicted_inputs: set[uuid.UUID]
679 """Unique IDs of datasets that were predicted as inputs to this quantum
680 when the QuantumGraph was built.
681 """
683 available_inputs: set[uuid.UUID]
684 """Unique IDs of input datasets that were actually present in the datastore
685 when this quantum was executed.
687 This is a subset of ``predicted_inputs``, with the difference generally
688 being datasets were ``predicted_outputs`` but not ``actual_outputs`` of
689 some upstream task.
690 """
692 actual_inputs: set[uuid.UUID]
693 """Unique IDs of datasets that were actually used as inputs by this task.
695 This is a subset of ``available_inputs``.
697 Notes
698 -----
699 The criteria for marking an input as used is that rerunning the quantum
700 with only these ``actual_inputs`` available must yield identical outputs.
701 This means that (for example) even just using an input to help determine
702 an output rejection criteria and then rejecting it as an outlier qualifies
703 that input as actually used.
704 """
706 predicted_outputs: set[uuid.UUID]
707 """Unique IDs of datasets that were predicted as outputs of this quantum
708 when the QuantumGraph was built.
709 """
711 actual_outputs: set[uuid.UUID]
712 """Unique IDs of datasets that were actually written when this quantum
713 was executed.
714 """
716 datastore_records: dict[str, SerializedDatastoreRecordData]
717 """Datastore records indexed by datastore name."""
719 @staticmethod
720 def collect_and_transfer(
721 butler: Butler, quanta: Iterable[Quantum], provenance: Iterable[QuantumProvenanceData]
722 ) -> None:
723 """Transfer output datasets from multiple quanta to a more permanent
724 `Butler` repository.
726 Parameters
727 ----------
728 butler : `Butler`
729 Full butler representing the data repository to transfer datasets
730 to.
731 quanta : `~collections.abc.Iterable` [ `Quantum` ]
732 Iterable of `Quantum` objects that carry information about
733 predicted outputs. May be a single-pass iterator.
734 provenance : `~collections.abc.Iterable` [ `QuantumProvenanceData` ]
735 Provenance and datastore data for each of the given quanta, in the
736 same order. May be a single-pass iterator.
738 Notes
739 -----
740 Input-output provenance data is not actually transferred yet, because
741 `Registry` has no place to store it.
743 This method probably works most efficiently if run on all quanta for a
744 single task label at once, because this will gather all datasets of
745 a particular type together into a single vectorized `Registry` import.
746 It should still behave correctly if run on smaller groups of quanta
747 or even quanta from multiple tasks.
749 Currently this method transfers datastore record data unchanged, with
750 no possibility of actually moving (e.g.) files. Datastores that are
751 present only in execution or only in the more permanent butler are
752 ignored.
753 """
754 grouped_refs = defaultdict(list)
755 summary_records: dict[str, DatastoreRecordData] = {}
756 for quantum, provenance_for_quantum in zip(quanta, provenance, strict=True):
757 quantum_refs_by_id = {
758 ref.id: ref
759 for ref in itertools.chain.from_iterable(quantum.outputs.values())
760 if ref.id in provenance_for_quantum.actual_outputs
761 }
762 for ref in quantum_refs_by_id.values():
763 grouped_refs[ref.datasetType, ref.run].append(ref)
765 # merge datastore records into a summary structure
766 for datastore_name, serialized_records in provenance_for_quantum.datastore_records.items():
767 quantum_records = DatastoreRecordData.from_simple(serialized_records)
768 if (records := summary_records.get(datastore_name)) is not None:
769 records.update(quantum_records)
770 else:
771 summary_records[datastore_name] = quantum_records
773 for refs in grouped_refs.values():
774 butler.registry._importDatasets(refs)
775 butler._datastore.import_records(summary_records)
777 @classmethod
778 def parse_raw(cls, *args: Any, **kwargs: Any) -> QuantumProvenanceData:
779 raise NotImplementedError("parse_raw() is not usable for this class, use direct() instead.")
781 @classmethod
782 def direct(
783 cls,
784 *,
785 predicted_inputs: Iterable[str | uuid.UUID],
786 available_inputs: Iterable[str | uuid.UUID],
787 actual_inputs: Iterable[str | uuid.UUID],
788 predicted_outputs: Iterable[str | uuid.UUID],
789 actual_outputs: Iterable[str | uuid.UUID],
790 datastore_records: Mapping[str, Mapping],
791 ) -> QuantumProvenanceData:
792 """Construct an instance directly without validators.
794 Parameters
795 ----------
796 predicted_inputs : `~collections.abc.Iterable` of `str` or `uuid.UUID`
797 The predicted inputs.
798 available_inputs : `~collections.abc.Iterable` of `str` or `uuid.UUID`
799 The available inputs.
800 actual_inputs : `~collections.abc.Iterable` of `str` or `uuid.UUID`
801 The actual inputs.
802 predicted_outputs : `~collections.abc.Iterable` of `str` or `uuid.UUID`
803 The predicted outputs.
804 actual_outputs : `~collections.abc.Iterable` of `str` or `uuid.UUID`
805 The actual outputs.
806 datastore_records : `~collections.abc.Mapping` [ `str`, \
807 `~collections.abc.Mapping` ]
808 The datastore records.
810 Returns
811 -------
812 provenance : `QuantumProvenanceData`
813 Serializable model of the quantum provenance.
815 Notes
816 -----
817 This differs from the Pydantic "construct" method in that the
818 arguments are explicitly what the model requires, and it will recurse
819 through members, constructing them from their corresponding `direct`
820 methods.
822 This method should only be called when the inputs are trusted.
823 """
825 def _to_uuid_set(uuids: Iterable[str | uuid.UUID]) -> set[uuid.UUID]:
826 """Convert input UUIDs, which could be in string representation to
827 a set of `UUID` instances.
828 """
829 return {uuid.UUID(id) if isinstance(id, str) else id for id in uuids}
831 data = cls.model_construct(
832 predicted_inputs=_to_uuid_set(predicted_inputs),
833 available_inputs=_to_uuid_set(available_inputs),
834 actual_inputs=_to_uuid_set(actual_inputs),
835 predicted_outputs=_to_uuid_set(predicted_outputs),
836 actual_outputs=_to_uuid_set(actual_outputs),
837 datastore_records={
838 key: SerializedDatastoreRecordData.direct(**records)
839 for key, records in datastore_records.items()
840 },
841 )
843 return data