Coverage for python / lsst / pipe / base / quantum_graph / _provenance.py: 27%
700 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-28 08:44 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-28 08:44 +0000
1# This file is part of pipe_base.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28from __future__ import annotations
30__all__ = (
31 "ProvenanceDatasetInfo",
32 "ProvenanceDatasetModel",
33 "ProvenanceInitQuantumInfo",
34 "ProvenanceInitQuantumModel",
35 "ProvenanceLogRecordsModel",
36 "ProvenanceQuantumGraph",
37 "ProvenanceQuantumGraphReader",
38 "ProvenanceQuantumGraphWriter",
39 "ProvenanceQuantumInfo",
40 "ProvenanceQuantumModel",
41 "ProvenanceQuantumReport",
42 "ProvenanceQuantumScanData",
43 "ProvenanceQuantumScanModels",
44 "ProvenanceQuantumScanStatus",
45 "ProvenanceReport",
46 "ProvenanceTaskMetadataModel",
47)
49import dataclasses
50import enum
51import itertools
52import sys
53import uuid
54from collections import Counter
55from collections.abc import Callable, Iterable, Iterator, Mapping
56from contextlib import ExitStack, contextmanager
57from typing import TYPE_CHECKING, Any, TypedDict
59import astropy.table
60import networkx
61import numpy as np
62import pydantic
64from lsst.daf.butler import Butler, DataCoordinate
65from lsst.daf.butler.logging import ButlerLogRecord, ButlerLogRecords
66from lsst.resources import ResourcePath, ResourcePathExpression
67from lsst.utils.iteration import ensure_iterable
68from lsst.utils.logging import LsstLogAdapter, getLogger
69from lsst.utils.packages import Packages
71from .. import automatic_connection_constants as acc
72from .._status import ExceptionInfo, QuantumAttemptStatus, QuantumSuccessCaveats
73from .._task_metadata import TaskMetadata
74from ..log_capture import _ExecutionLogRecordsExtra
75from ..log_on_close import LogOnClose
76from ..pipeline_graph import PipelineGraph, TaskImportMode, TaskInitNode
77from ..resource_usage import QuantumResourceUsage
78from ._common import (
79 BaseQuantumGraph,
80 BaseQuantumGraphReader,
81 BaseQuantumGraphWriter,
82 ConnectionName,
83 DataCoordinateValues,
84 DatasetInfo,
85 DatasetTypeName,
86 HeaderModel,
87 QuantumInfo,
88 TaskLabel,
89)
90from ._multiblock import Compressor, MultiblockReader, MultiblockWriter
91from ._predicted import (
92 PredictedDatasetModel,
93 PredictedQuantumDatasetsModel,
94 PredictedQuantumGraph,
95 PredictedQuantumGraphComponents,
96)
98# Sphinx needs imports for type annotations of base class members.
99if "sphinx" in sys.modules:
100 import zipfile # noqa: F401
102 from ._multiblock import AddressReader, Decompressor # noqa: F401
105type LoopWrapper[T] = Callable[[Iterable[T]], Iterable[T]]
107_LOG = getLogger(__file__)
109DATASET_ADDRESS_INDEX = 0
110QUANTUM_ADDRESS_INDEX = 1
111LOG_ADDRESS_INDEX = 2
112METADATA_ADDRESS_INDEX = 3
114DATASET_MB_NAME = "datasets"
115QUANTUM_MB_NAME = "quanta"
116LOG_MB_NAME = "logs"
117METADATA_MB_NAME = "metadata"
120def pass_through[T](arg: T) -> T:
121 return arg
124class ProvenanceDatasetInfo(DatasetInfo):
125 """A typed dictionary that annotates the attributes of the NetworkX graph
126 node data for a provenance dataset.
128 Since NetworkX types are not generic over their node mapping type, this has
129 to be used explicitly, e.g.::
131 node_data: ProvenanceDatasetInfo = xgraph.nodes[dataset_id]
133 where ``xgraph`` is `ProvenanceQuantumGraph.bipartite_xgraph`.
134 """
136 dataset_id: uuid.UUID
137 """Unique identifier for the dataset."""
139 produced: bool
140 """Whether this dataset was produced (vs. only predicted).
142 This is always `True` for overall input datasets. It is also `True` for
143 datasets that were produced and then removed before/during transfer back to
144 the central butler repository, so it may not reflect the continued
145 existence of the dataset.
146 """
149class ProvenanceQuantumInfo(QuantumInfo):
150 """A typed dictionary that annotates the attributes of the NetworkX graph
151 node data for a provenance quantum.
153 Since NetworkX types are not generic over their node mapping type, this has
154 to be used explicitly, e.g.::
156 node_data: ProvenanceQuantumInfo = xgraph.nodes[quantum_id]
158 where ``xgraph`` is `ProvenanceQuantumGraph.bipartite_xgraph` or
159 `ProvenanceQuantumGraph.quantum_only_xgraph`
160 """
162 status: QuantumAttemptStatus
163 """Enumerated status for the quantum.
165 This corresponds to the last attempt to run this quantum, or
166 `QuantumAttemptStatus.BLOCKED` if there were no attempts.
167 """
169 caveats: QuantumSuccessCaveats | None
170 """Flags indicating caveats on successful quanta.
172 This corresponds to the last attempt to run this quantum.
173 """
175 exception: ExceptionInfo | None
176 """Information about an exception raised when the quantum was executing.
178 This corresponds to the last attempt to run this quantum.
179 """
181 resource_usage: QuantumResourceUsage | None
182 """Resource usage information (timing, memory use) for this quantum.
184 This corresponds to the last attempt to run this quantum.
185 """
187 attempts: list[ProvenanceQuantumAttemptModel]
188 """Information about each attempt to run this quantum.
190 An entry is added merely if the quantum *should* have been attempted; an
191 empty `list` is used only for quanta that were blocked by an upstream
192 failure.
193 """
195 metadata_id: uuid.UUID
196 """ID of this quantum's metadata dataset."""
198 log_id: uuid.UUID
199 """ID of this quantum's log dataset."""
202class ProvenanceInitQuantumInfo(TypedDict):
203 """A typed dictionary that annotates the attributes of the NetworkX graph
204 node data for a provenance init quantum.
206 Since NetworkX types are not generic over their node mapping type, this has
207 to be used explicitly, e.g.::
209 node_data: ProvenanceInitQuantumInfo = xgraph.nodes[quantum_id]
211 where ``xgraph`` is `ProvenanceQuantumGraph.bipartite_xgraph`.
212 """
214 data_id: DataCoordinate
215 """Data ID of the quantum.
217 This is always an empty ID; this key exists to allow init-quanta and
218 regular quanta to be treated more similarly.
219 """
221 task_label: str
222 """Label of the task for this quantum."""
224 pipeline_node: TaskInitNode
225 """Node in the pipeline graph for this task's init-only step."""
227 config_id: uuid.UUID
228 """ID of this task's config dataset."""
231class ProvenanceDatasetModel(PredictedDatasetModel):
232 """Data model for the datasets in a provenance quantum graph file."""
234 produced: bool
235 """Whether this dataset was produced (vs. only predicted).
237 This is always `True` for overall input datasets. It is also `True` for
238 datasets that were produced and then removed before/during transfer back to
239 the central butler repository, so it may not reflect the continued
240 existence of the dataset.
241 """
243 producer: uuid.UUID | None = None
244 """ID of the quantum that produced this dataset.
246 This is `None` for overall inputs to the graph.
247 """
249 consumers: list[uuid.UUID] = pydantic.Field(default_factory=list)
250 """IDs of quanta that were predicted to consume this dataset."""
252 @property
253 def node_id(self) -> uuid.UUID:
254 """Alias for the dataset ID."""
255 return self.dataset_id
257 @classmethod
258 def from_predicted(
259 cls,
260 predicted: PredictedDatasetModel,
261 producer: uuid.UUID | None = None,
262 consumers: Iterable[uuid.UUID] = (),
263 ) -> ProvenanceDatasetModel:
264 """Construct from a predicted dataset model.
266 Parameters
267 ----------
268 predicted : `PredictedDatasetModel`
269 Information about the dataset from the predicted graph.
270 producer : `uuid.UUID` or `None`, optional
271 ID of the quantum that was predicted to produce this dataset.
272 consumers : `~collections.abc.Iterable` [`uuid.UUID`], optional
273 IDs of the quanta that were predicted to consume this dataset.
275 Returns
276 -------
277 provenance : `ProvenanceDatasetModel`
278 Provenance dataset model.
280 Notes
281 -----
282 This initializes `produced` to `True` when ``producer is None`` and
283 `False` otherwise, on the assumption that it will be updated later.
284 """
285 return cls.model_construct(
286 dataset_id=predicted.dataset_id,
287 dataset_type_name=predicted.dataset_type_name,
288 data_coordinate=predicted.data_coordinate,
289 run=predicted.run,
290 produced=(producer is None), # if it's not produced by this QG, it's an overall input
291 producer=producer,
292 consumers=list(consumers),
293 )
295 def _add_to_graph(self, graph: ProvenanceQuantumGraph) -> None:
296 """Add this dataset and its edges to quanta to a provenance graph.
298 Parameters
299 ----------
300 graph : `ProvenanceQuantumGraph`
301 Graph to update in place.
303 Notes
304 -----
305 This method adds:
307 - a ``bipartite_xgraph`` dataset node with full attributes;
308 - ``bipartite_xgraph`` edges to adjacent quanta (which adds quantum
309 nodes with no attributes), without populating edge attributes;
310 - ``quantum_only_xgraph`` edges for each pair of quanta in which one
311 produces this dataset and another consumes it (this also adds quantum
312 nodes with no attributes).
313 """
314 dataset_type_node = graph.pipeline_graph.dataset_types[self.dataset_type_name]
315 data_id = DataCoordinate.from_full_values(dataset_type_node.dimensions, tuple(self.data_coordinate))
316 graph._bipartite_xgraph.add_node(
317 self.dataset_id,
318 data_id=data_id,
319 dataset_type_name=self.dataset_type_name,
320 pipeline_node=dataset_type_node,
321 run=self.run,
322 produced=self.produced,
323 )
324 if self.producer is not None:
325 graph._bipartite_xgraph.add_edge(self.producer, self.dataset_id)
326 for consumer_id in self.consumers:
327 graph._bipartite_xgraph.add_edge(self.dataset_id, consumer_id)
328 if self.producer is not None:
329 graph._quantum_only_xgraph.add_edge(self.producer, consumer_id)
330 graph._datasets_by_type[self.dataset_type_name][data_id] = self.dataset_id
332 # Work around the fact that Sphinx chokes on Pydantic docstring formatting,
333 # when we inherit those docstrings in our public classes.
334 if "sphinx" in sys.modules and not TYPE_CHECKING:
336 def copy(self, *args: Any, **kwargs: Any) -> Any:
337 """See `pydantic.BaseModel.copy`."""
338 return super().copy(*args, **kwargs)
340 def model_dump(self, *args: Any, **kwargs: Any) -> Any:
341 """See `pydantic.BaseModel.model_dump`."""
342 return super().model_dump(*args, **kwargs)
344 def model_dump_json(self, *args: Any, **kwargs: Any) -> Any:
345 """See `pydantic.BaseModel.model_dump_json`."""
346 return super().model_dump(*args, **kwargs)
348 def model_copy(self, *args: Any, **kwargs: Any) -> Any:
349 """See `pydantic.BaseModel.model_copy`."""
350 return super().model_copy(*args, **kwargs)
352 @classmethod
353 def model_construct(cls, *args: Any, **kwargs: Any) -> Any: # type: ignore[misc, override]
354 """See `pydantic.BaseModel.model_construct`."""
355 return super().model_construct(*args, **kwargs)
357 @classmethod
358 def model_json_schema(cls, *args: Any, **kwargs: Any) -> Any:
359 """See `pydantic.BaseModel.model_json_schema`."""
360 return super().model_json_schema(*args, **kwargs)
362 @classmethod
363 def model_validate(cls, *args: Any, **kwargs: Any) -> Any:
364 """See `pydantic.BaseModel.model_validate`."""
365 return super().model_validate(*args, **kwargs)
367 @classmethod
368 def model_validate_json(cls, *args: Any, **kwargs: Any) -> Any:
369 """See `pydantic.BaseModel.model_validate_json`."""
370 return super().model_validate_json(*args, **kwargs)
372 @classmethod
373 def model_validate_strings(cls, *args: Any, **kwargs: Any) -> Any:
374 """See `pydantic.BaseModel.model_validate_strings`."""
375 return super().model_validate_strings(*args, **kwargs)
378class ProvenanceQuantumAttemptModel(pydantic.BaseModel):
379 """Data model for a now-superseded attempt to run a quantum in a
380 provenance quantum graph file.
381 """
383 attempt: int = 0
384 """Counter incremented for every attempt to execute this quantum."""
386 status: QuantumAttemptStatus = QuantumAttemptStatus.UNKNOWN
387 """Enumerated status for the quantum."""
389 caveats: QuantumSuccessCaveats | None = None
390 """Flags indicating caveats on successful quanta."""
392 exception: ExceptionInfo | None = None
393 """Information about an exception raised when the quantum was executing."""
395 resource_usage: QuantumResourceUsage | None = None
396 """Resource usage information (timing, memory use) for this quantum."""
398 previous_process_quanta: list[uuid.UUID] = pydantic.Field(default_factory=list)
399 """The IDs of other quanta previously executed in the same process as this
400 one.
401 """
403 # Work around the fact that Sphinx chokes on Pydantic docstring formatting,
404 # when we inherit those docstrings in our public classes.
405 if "sphinx" in sys.modules and not TYPE_CHECKING:
407 def copy(self, *args: Any, **kwargs: Any) -> Any:
408 """See `pydantic.BaseModel.copy`."""
409 return super().copy(*args, **kwargs)
411 def model_dump(self, *args: Any, **kwargs: Any) -> Any:
412 """See `pydantic.BaseModel.model_dump`."""
413 return super().model_dump(*args, **kwargs)
415 def model_dump_json(self, *args: Any, **kwargs: Any) -> Any:
416 """See `pydantic.BaseModel.model_dump_json`."""
417 return super().model_dump(*args, **kwargs)
419 def model_copy(self, *args: Any, **kwargs: Any) -> Any:
420 """See `pydantic.BaseModel.model_copy`."""
421 return super().model_copy(*args, **kwargs)
423 @classmethod
424 def model_construct(cls, *args: Any, **kwargs: Any) -> Any: # type: ignore[misc, override]
425 """See `pydantic.BaseModel.model_construct`."""
426 return super().model_construct(*args, **kwargs)
428 @classmethod
429 def model_json_schema(cls, *args: Any, **kwargs: Any) -> Any:
430 """See `pydantic.BaseModel.model_json_schema`."""
431 return super().model_json_schema(*args, **kwargs)
433 @classmethod
434 def model_validate(cls, *args: Any, **kwargs: Any) -> Any:
435 """See `pydantic.BaseModel.model_validate`."""
436 return super().model_validate(*args, **kwargs)
438 @classmethod
439 def model_validate_json(cls, *args: Any, **kwargs: Any) -> Any:
440 """See `pydantic.BaseModel.model_validate_json`."""
441 return super().model_validate_json(*args, **kwargs)
443 @classmethod
444 def model_validate_strings(cls, *args: Any, **kwargs: Any) -> Any:
445 """See `pydantic.BaseModel.model_validate_strings`."""
446 return super().model_validate_strings(*args, **kwargs)
449class ProvenanceLogRecordsModel(pydantic.BaseModel):
450 """Data model for storing execution logs in a provenance quantum graph
451 file.
452 """
454 attempts: list[list[ButlerLogRecord] | None] = pydantic.Field(default_factory=list)
455 """Logs from attempts to run this task, ordered chronologically from first
456 to last.
457 """
459 # Work around the fact that Sphinx chokes on Pydantic docstring formatting,
460 # when we inherit those docstrings in our public classes.
461 if "sphinx" in sys.modules and not TYPE_CHECKING:
463 def copy(self, *args: Any, **kwargs: Any) -> Any:
464 """See `pydantic.BaseModel.copy`."""
465 return super().copy(*args, **kwargs)
467 def model_dump(self, *args: Any, **kwargs: Any) -> Any:
468 """See `pydantic.BaseModel.model_dump`."""
469 return super().model_dump(*args, **kwargs)
471 def model_dump_json(self, *args: Any, **kwargs: Any) -> Any:
472 """See `pydantic.BaseModel.model_dump_json`."""
473 return super().model_dump(*args, **kwargs)
475 def model_copy(self, *args: Any, **kwargs: Any) -> Any:
476 """See `pydantic.BaseModel.model_copy`."""
477 return super().model_copy(*args, **kwargs)
479 @classmethod
480 def model_construct(cls, *args: Any, **kwargs: Any) -> Any: # type: ignore[misc, override]
481 """See `pydantic.BaseModel.model_construct`."""
482 return super().model_construct(*args, **kwargs)
484 @classmethod
485 def model_json_schema(cls, *args: Any, **kwargs: Any) -> Any:
486 """See `pydantic.BaseModel.model_json_schema`."""
487 return super().model_json_schema(*args, **kwargs)
489 @classmethod
490 def model_validate(cls, *args: Any, **kwargs: Any) -> Any:
491 """See `pydantic.BaseModel.model_validate`."""
492 return super().model_validate(*args, **kwargs)
494 @classmethod
495 def model_validate_json(cls, *args: Any, **kwargs: Any) -> Any:
496 """See `pydantic.BaseModel.model_validate_json`."""
497 return super().model_validate_json(*args, **kwargs)
499 @classmethod
500 def model_validate_strings(cls, *args: Any, **kwargs: Any) -> Any:
501 """See `pydantic.BaseModel.model_validate_strings`."""
502 return super().model_validate_strings(*args, **kwargs)
505class ProvenanceTaskMetadataModel(pydantic.BaseModel):
506 """Data model for storing task metadata in a provenance quantum graph
507 file.
508 """
510 # We want to convert infs and nans to constants, not null. Unfortunately
511 # the fact that TaskMetadata _also_ sets this is ignored when that model
512 # is nested here.
513 model_config = pydantic.ConfigDict(ser_json_inf_nan="constants")
515 attempts: list[TaskMetadata | None] = pydantic.Field(default_factory=list)
516 """Metadata from attempts to run this task, ordered chronologically from
517 first to last.
518 """
520 # Work around the fact that Sphinx chokes on Pydantic docstring formatting,
521 # when we inherit those docstrings in our public classes.
522 if "sphinx" in sys.modules and not TYPE_CHECKING:
524 def copy(self, *args: Any, **kwargs: Any) -> Any:
525 """See `pydantic.BaseModel.copy`."""
526 return super().copy(*args, **kwargs)
528 def model_dump(self, *args: Any, **kwargs: Any) -> Any:
529 """See `pydantic.BaseModel.model_dump`."""
530 return super().model_dump(*args, **kwargs)
532 def model_dump_json(self, *args: Any, **kwargs: Any) -> Any:
533 """See `pydantic.BaseModel.model_dump_json`."""
534 return super().model_dump(*args, **kwargs)
536 def model_copy(self, *args: Any, **kwargs: Any) -> Any:
537 """See `pydantic.BaseModel.model_copy`."""
538 return super().model_copy(*args, **kwargs)
540 @classmethod
541 def model_construct(cls, *args: Any, **kwargs: Any) -> Any: # type: ignore[misc, override]
542 """See `pydantic.BaseModel.model_construct`."""
543 return super().model_construct(*args, **kwargs)
545 @classmethod
546 def model_json_schema(cls, *args: Any, **kwargs: Any) -> Any:
547 """See `pydantic.BaseModel.model_json_schema`."""
548 return super().model_json_schema(*args, **kwargs)
550 @classmethod
551 def model_validate(cls, *args: Any, **kwargs: Any) -> Any:
552 """See `pydantic.BaseModel.model_validate`."""
553 return super().model_validate(*args, **kwargs)
555 @classmethod
556 def model_validate_json(cls, *args: Any, **kwargs: Any) -> Any:
557 """See `pydantic.BaseModel.model_validate_json`."""
558 return super().model_validate_json(*args, **kwargs)
560 @classmethod
561 def model_validate_strings(cls, *args: Any, **kwargs: Any) -> Any:
562 """See `pydantic.BaseModel.model_validate_strings`."""
563 return super().model_validate_strings(*args, **kwargs)
566class ProvenanceQuantumReport(pydantic.BaseModel):
567 """A Pydantic model that used to report information about a single
568 (generally problematic) quantum.
569 """
571 quantum_id: uuid.UUID
572 data_id: dict[str, int | str]
573 attempts: list[ProvenanceQuantumAttemptModel]
575 @classmethod
576 def from_info(cls, quantum_id: uuid.UUID, quantum_info: ProvenanceQuantumInfo) -> ProvenanceQuantumReport:
577 """Construct from a provenance quantum graph node.
579 Parameters
580 ----------
581 quantum_id : `uuid.UUID`
582 Unique ID for the quantum.
583 quantum_info : `ProvenanceQuantumInfo`
584 Node attributes for this quantum.
585 """
586 return cls(
587 quantum_id=quantum_id,
588 data_id=dict(quantum_info["data_id"].mapping),
589 attempts=quantum_info["attempts"],
590 )
592 # Work around the fact that Sphinx chokes on Pydantic docstring formatting,
593 # when we inherit those docstrings in our public classes.
594 if "sphinx" in sys.modules and not TYPE_CHECKING:
596 def copy(self, *args: Any, **kwargs: Any) -> Any:
597 """See `pydantic.BaseModel.copy`."""
598 return super().copy(*args, **kwargs)
600 def model_dump(self, *args: Any, **kwargs: Any) -> Any:
601 """See `pydantic.BaseModel.model_dump`."""
602 return super().model_dump(*args, **kwargs)
604 def model_dump_json(self, *args: Any, **kwargs: Any) -> Any:
605 """See `pydantic.BaseModel.model_dump_json`."""
606 return super().model_dump(*args, **kwargs)
608 def model_copy(self, *args: Any, **kwargs: Any) -> Any:
609 """See `pydantic.BaseModel.model_copy`."""
610 return super().model_copy(*args, **kwargs)
612 @classmethod
613 def model_construct(cls, *args: Any, **kwargs: Any) -> Any: # type: ignore[misc, override]
614 """See `pydantic.BaseModel.model_construct`."""
615 return super().model_construct(*args, **kwargs)
617 @classmethod
618 def model_json_schema(cls, *args: Any, **kwargs: Any) -> Any:
619 """See `pydantic.BaseModel.model_json_schema`."""
620 return super().model_json_schema(*args, **kwargs)
622 @classmethod
623 def model_validate(cls, *args: Any, **kwargs: Any) -> Any:
624 """See `pydantic.BaseModel.model_validate`."""
625 return super().model_validate(*args, **kwargs)
627 @classmethod
628 def model_validate_json(cls, *args: Any, **kwargs: Any) -> Any:
629 """See `pydantic.BaseModel.model_validate_json`."""
630 return super().model_validate_json(*args, **kwargs)
632 @classmethod
633 def model_validate_strings(cls, *args: Any, **kwargs: Any) -> Any:
634 """See `pydantic.BaseModel.model_validate_strings`."""
635 return super().model_validate_strings(*args, **kwargs)
638class ProvenanceReport(pydantic.RootModel):
639 """A Pydantic model that groups quantum information by task label, then
640 status (as a string), and then exception type.
641 """
643 root: dict[TaskLabel, dict[str, dict[str | None, list[ProvenanceQuantumReport]]]] = {}
645 # Work around the fact that Sphinx chokes on Pydantic docstring formatting,
646 # when we inherit those docstrings in our public classes.
647 if "sphinx" in sys.modules and not TYPE_CHECKING:
649 def copy(self, *args: Any, **kwargs: Any) -> Any:
650 """See `pydantic.BaseModel.copy`."""
651 return super().copy(*args, **kwargs)
653 def model_dump(self, *args: Any, **kwargs: Any) -> Any:
654 """See `pydantic.BaseModel.model_dump`."""
655 return super().model_dump(*args, **kwargs)
657 def model_dump_json(self, *args: Any, **kwargs: Any) -> Any:
658 """See `pydantic.BaseModel.model_dump_json`."""
659 return super().model_dump(*args, **kwargs)
661 def model_copy(self, *args: Any, **kwargs: Any) -> Any:
662 """See `pydantic.BaseModel.model_copy`."""
663 return super().model_copy(*args, **kwargs)
665 @classmethod
666 def model_construct(cls, *args: Any, **kwargs: Any) -> Any: # type: ignore[misc, override]
667 """See `pydantic.BaseModel.model_construct`."""
668 return super().model_construct(*args, **kwargs)
670 @classmethod
671 def model_json_schema(cls, *args: Any, **kwargs: Any) -> Any:
672 """See `pydantic.BaseModel.model_json_schema`."""
673 return super().model_json_schema(*args, **kwargs)
675 @classmethod
676 def model_validate(cls, *args: Any, **kwargs: Any) -> Any:
677 """See `pydantic.BaseModel.model_validate`."""
678 return super().model_validate(*args, **kwargs)
680 @classmethod
681 def model_validate_json(cls, *args: Any, **kwargs: Any) -> Any:
682 """See `pydantic.BaseModel.model_validate_json`."""
683 return super().model_validate_json(*args, **kwargs)
685 @classmethod
686 def model_validate_strings(cls, *args: Any, **kwargs: Any) -> Any:
687 """See `pydantic.BaseModel.model_validate_strings`."""
688 return super().model_validate_strings(*args, **kwargs)
691class ProvenanceQuantumModel(pydantic.BaseModel):
692 """Data model for the quanta in a provenance quantum graph file."""
694 quantum_id: uuid.UUID
695 """Unique identifier for the quantum."""
697 task_label: TaskLabel
698 """Name of the type of this dataset."""
700 data_coordinate: DataCoordinateValues = pydantic.Field(default_factory=list)
701 """The full values (required and implied) of this dataset's data ID."""
703 inputs: dict[ConnectionName, list[uuid.UUID]] = pydantic.Field(default_factory=dict)
704 """IDs of the datasets predicted to be consumed by this quantum, grouped by
705 connection name.
706 """
708 outputs: dict[ConnectionName, list[uuid.UUID]] = pydantic.Field(default_factory=dict)
709 """IDs of the datasets predicted to be produced by this quantum, grouped by
710 connection name.
711 """
713 attempts: list[ProvenanceQuantumAttemptModel] = pydantic.Field(default_factory=list)
714 """Provenance for all attempts to execute this quantum, ordered
715 chronologically from first to last.
717 An entry is added merely if the quantum *should* have been attempted; an
718 empty `list` is used only for quanta that were blocked by an upstream
719 failure.
720 """
722 @property
723 def node_id(self) -> uuid.UUID:
724 """Alias for the quantum ID."""
725 return self.quantum_id
727 @classmethod
728 def from_predicted(cls, predicted: PredictedQuantumDatasetsModel) -> ProvenanceQuantumModel:
729 """Construct from a predicted quantum model.
731 Parameters
732 ----------
733 predicted : `PredictedQuantumDatasetsModel`
734 Information about the quantum from the predicted graph.
736 Returns
737 -------
738 provenance : `ProvenanceQuantumModel`
739 Provenance quantum model.
740 """
741 inputs = {
742 connection_name: [d.dataset_id for d in predicted_inputs]
743 for connection_name, predicted_inputs in predicted.inputs.items()
744 }
745 outputs = {
746 connection_name: [d.dataset_id for d in predicted_outputs]
747 for connection_name, predicted_outputs in predicted.outputs.items()
748 }
749 return cls(
750 quantum_id=predicted.quantum_id,
751 task_label=predicted.task_label,
752 data_coordinate=predicted.data_coordinate,
753 inputs=inputs,
754 outputs=outputs,
755 )
757 def _add_to_graph(self, graph: ProvenanceQuantumGraph) -> None:
758 """Add this quantum and its edges to datasets to a provenance graph.
760 Parameters
761 ----------
762 graph : `ProvenanceQuantumGraph`
763 Graph to update in place.
765 Notes
766 -----
767 This method adds:
769 - a ``bipartite_xgraph`` quantum node with full attributes;
770 - a ``quantum_only_xgraph`` quantum node with full attributes;
771 - ``bipartite_xgraph`` edges to adjacent datasets (which adds datasets
772 nodes with no attributes), while populating those edge attributes;
773 - ``quantum_only_xgraph`` edges to any adjacent quantum that has also
774 already been loaded.
775 """
776 task_node = graph.pipeline_graph.tasks[self.task_label]
777 data_id = DataCoordinate.from_full_values(task_node.dimensions, tuple(self.data_coordinate))
778 last_attempt = (
779 self.attempts[-1]
780 if self.attempts
781 else ProvenanceQuantumAttemptModel(status=QuantumAttemptStatus.BLOCKED)
782 )
783 graph._bipartite_xgraph.add_node(
784 self.quantum_id,
785 data_id=data_id,
786 task_label=self.task_label,
787 pipeline_node=task_node,
788 status=last_attempt.status,
789 caveats=last_attempt.caveats,
790 exception=last_attempt.exception,
791 resource_usage=last_attempt.resource_usage,
792 attempts=self.attempts,
793 )
794 graph._quanta_by_task_label[self.task_label][data_id] = self.quantum_id
795 graph._quantum_only_xgraph.add_node(self.quantum_id, **graph._bipartite_xgraph.nodes[self.quantum_id])
796 for connection_name, dataset_ids in self.inputs.items():
797 read_edge = task_node.get_input_edge(connection_name)
798 for dataset_id in dataset_ids:
799 graph._bipartite_xgraph.add_edge(dataset_id, self.quantum_id, is_read=True)
800 graph._bipartite_xgraph.edges[dataset_id, self.quantum_id].setdefault(
801 "pipeline_edges", []
802 ).append(read_edge)
803 for connection_name, dataset_ids in self.outputs.items():
804 write_edge = task_node.get_output_edge(connection_name)
805 if connection_name == acc.METADATA_OUTPUT_CONNECTION_NAME:
806 graph._bipartite_xgraph.add_node(
807 dataset_ids[0],
808 data_id=data_id,
809 dataset_type_name=write_edge.dataset_type_name,
810 pipeline_node=graph.pipeline_graph.dataset_types[write_edge.dataset_type_name],
811 run=graph.header.output_run,
812 produced=last_attempt.status.has_metadata,
813 )
814 graph._datasets_by_type[write_edge.dataset_type_name][data_id] = dataset_ids[0]
815 graph._bipartite_xgraph.nodes[self.quantum_id]["metadata_id"] = dataset_ids[0]
816 graph._quantum_only_xgraph.nodes[self.quantum_id]["metadata_id"] = dataset_ids[0]
817 if connection_name == acc.LOG_OUTPUT_CONNECTION_NAME:
818 graph._bipartite_xgraph.add_node(
819 dataset_ids[0],
820 data_id=data_id,
821 dataset_type_name=write_edge.dataset_type_name,
822 pipeline_node=graph.pipeline_graph.dataset_types[write_edge.dataset_type_name],
823 run=graph.header.output_run,
824 produced=last_attempt.status.has_log,
825 )
826 graph._datasets_by_type[write_edge.dataset_type_name][data_id] = dataset_ids[0]
827 graph._bipartite_xgraph.nodes[self.quantum_id]["log_id"] = dataset_ids[0]
828 graph._quantum_only_xgraph.nodes[self.quantum_id]["log_id"] = dataset_ids[0]
829 for dataset_id in dataset_ids:
830 graph._bipartite_xgraph.add_edge(
831 self.quantum_id,
832 dataset_id,
833 is_read=False,
834 # There can only be one pipeline edge for an output.
835 pipeline_edges=[write_edge],
836 )
837 for dataset_id in graph._bipartite_xgraph.predecessors(self.quantum_id):
838 for upstream_quantum_id in graph._bipartite_xgraph.predecessors(dataset_id):
839 graph._quantum_only_xgraph.add_edge(upstream_quantum_id, self.quantum_id)
840 for dataset_id in graph._bipartite_xgraph.successors(self.quantum_id):
841 for downstream_quantum_id in graph._bipartite_xgraph.successors(dataset_id):
842 graph._quantum_only_xgraph.add_edge(self.quantum_id, downstream_quantum_id)
844 # Work around the fact that Sphinx chokes on Pydantic docstring formatting,
845 # when we inherit those docstrings in our public classes.
846 if "sphinx" in sys.modules and not TYPE_CHECKING:
848 def copy(self, *args: Any, **kwargs: Any) -> Any:
849 """See `pydantic.BaseModel.copy`."""
850 return super().copy(*args, **kwargs)
852 def model_dump(self, *args: Any, **kwargs: Any) -> Any:
853 """See `pydantic.BaseModel.model_dump`."""
854 return super().model_dump(*args, **kwargs)
856 def model_dump_json(self, *args: Any, **kwargs: Any) -> Any:
857 """See `pydantic.BaseModel.model_dump_json`."""
858 return super().model_dump(*args, **kwargs)
860 def model_copy(self, *args: Any, **kwargs: Any) -> Any:
861 """See `pydantic.BaseModel.model_copy`."""
862 return super().model_copy(*args, **kwargs)
864 @classmethod
865 def model_construct(cls, *args: Any, **kwargs: Any) -> Any: # type: ignore[misc, override]
866 """See `pydantic.BaseModel.model_construct`."""
867 return super().model_construct(*args, **kwargs)
869 @classmethod
870 def model_json_schema(cls, *args: Any, **kwargs: Any) -> Any:
871 """See `pydantic.BaseModel.model_json_schema`."""
872 return super().model_json_schema(*args, **kwargs)
874 @classmethod
875 def model_validate(cls, *args: Any, **kwargs: Any) -> Any:
876 """See `pydantic.BaseModel.model_validate`."""
877 return super().model_validate(*args, **kwargs)
879 @classmethod
880 def model_validate_json(cls, *args: Any, **kwargs: Any) -> Any:
881 """See `pydantic.BaseModel.model_validate_json`."""
882 return super().model_validate_json(*args, **kwargs)
884 @classmethod
885 def model_validate_strings(cls, *args: Any, **kwargs: Any) -> Any:
886 """See `pydantic.BaseModel.model_validate_strings`."""
887 return super().model_validate_strings(*args, **kwargs)
890class ProvenanceInitQuantumModel(pydantic.BaseModel):
891 """Data model for the special "init" quanta in a provenance quantum graph
892 file.
893 """
895 quantum_id: uuid.UUID
896 """Unique identifier for the quantum."""
898 task_label: TaskLabel
899 """Name of the type of this dataset.
901 This is always a parent dataset type name, not a component.
903 Note that full dataset type definitions are stored in the pipeline graph.
904 """
906 inputs: dict[ConnectionName, uuid.UUID] = pydantic.Field(default_factory=dict)
907 """IDs of the datasets predicted to be consumed by this quantum, grouped by
908 connection name.
909 """
911 outputs: dict[ConnectionName, uuid.UUID] = pydantic.Field(default_factory=dict)
912 """IDs of the datasets predicted to be produced by this quantum, grouped by
913 connection name.
914 """
916 @classmethod
917 def from_predicted(cls, predicted: PredictedQuantumDatasetsModel) -> ProvenanceInitQuantumModel:
918 """Construct from a predicted quantum model.
920 Parameters
921 ----------
922 predicted : `PredictedQuantumDatasetsModel`
923 Information about the quantum from the predicted graph.
925 Returns
926 -------
927 provenance : `ProvenanceInitQuantumModel`
928 Provenance init quantum model.
929 """
930 inputs = {
931 connection_name: predicted_inputs[0].dataset_id
932 for connection_name, predicted_inputs in predicted.inputs.items()
933 }
934 outputs = {
935 connection_name: predicted_outputs[0].dataset_id
936 for connection_name, predicted_outputs in predicted.outputs.items()
937 }
938 return cls(
939 quantum_id=predicted.quantum_id,
940 task_label=predicted.task_label,
941 inputs=inputs,
942 outputs=outputs,
943 )
945 def _add_to_graph(self, graph: ProvenanceQuantumGraph, empty_data_id: DataCoordinate) -> None:
946 """Add this quantum and its edges to datasets to a provenance graph.
948 Parameters
949 ----------
950 graph : `ProvenanceQuantumGraph`
951 Graph to update in place.
952 empty_data_id : `lsst.daf.butler.DataCoordinate`
953 The empty data ID for the appropriate dimension universe.
955 Notes
956 -----
957 This method adds:
959 - a ``bipartite_xgraph`` quantum node with full attributes;
960 - ``bipartite_xgraph`` edges to adjacent datasets (which adds datasets
961 nodes with no attributes), while populating those edge attributes;
962 """
963 task_init_node = graph.pipeline_graph.tasks[self.task_label].init
964 graph._bipartite_xgraph.add_node(
965 self.quantum_id, data_id=empty_data_id, task_label=self.task_label, pipeline_node=task_init_node
966 )
967 for connection_name, dataset_id in self.inputs.items():
968 read_edge = task_init_node.get_input_edge(connection_name)
969 graph._bipartite_xgraph.add_edge(dataset_id, self.quantum_id, is_read=True)
970 graph._bipartite_xgraph.edges[dataset_id, self.quantum_id].setdefault(
971 "pipeline_edges", []
972 ).append(read_edge)
973 for connection_name, dataset_id in self.outputs.items():
974 write_edge = task_init_node.get_output_edge(connection_name)
975 graph._bipartite_xgraph.add_node(
976 dataset_id,
977 data_id=empty_data_id,
978 dataset_type_name=write_edge.dataset_type_name,
979 pipeline_node=graph.pipeline_graph.dataset_types[write_edge.dataset_type_name],
980 run=graph.header.output_run,
981 produced=True,
982 )
983 graph._datasets_by_type[write_edge.dataset_type_name][empty_data_id] = dataset_id
984 graph._bipartite_xgraph.add_edge(
985 self.quantum_id,
986 dataset_id,
987 is_read=False,
988 # There can only be one pipeline edge for an output.
989 pipeline_edges=[write_edge],
990 )
991 if write_edge.connection_name == acc.CONFIG_INIT_OUTPUT_CONNECTION_NAME:
992 graph._bipartite_xgraph.nodes[self.quantum_id]["config_id"] = dataset_id
993 graph._init_quanta[self.task_label] = self.quantum_id
995 # Work around the fact that Sphinx chokes on Pydantic docstring formatting,
996 # when we inherit those docstrings in our public classes.
997 if "sphinx" in sys.modules and not TYPE_CHECKING:
999 def copy(self, *args: Any, **kwargs: Any) -> Any:
1000 """See `pydantic.BaseModel.copy`."""
1001 return super().copy(*args, **kwargs)
1003 def model_dump(self, *args: Any, **kwargs: Any) -> Any:
1004 """See `pydantic.BaseModel.model_dump`."""
1005 return super().model_dump(*args, **kwargs)
1007 def model_dump_json(self, *args: Any, **kwargs: Any) -> Any:
1008 """See `pydantic.BaseModel.model_dump_json`."""
1009 return super().model_dump(*args, **kwargs)
1011 def model_copy(self, *args: Any, **kwargs: Any) -> Any:
1012 """See `pydantic.BaseModel.model_copy`."""
1013 return super().model_copy(*args, **kwargs)
1015 @classmethod
1016 def model_construct(cls, *args: Any, **kwargs: Any) -> Any: # type: ignore[misc, override]
1017 """See `pydantic.BaseModel.model_construct`."""
1018 return super().model_construct(*args, **kwargs)
1020 @classmethod
1021 def model_json_schema(cls, *args: Any, **kwargs: Any) -> Any:
1022 """See `pydantic.BaseModel.model_json_schema`."""
1023 return super().model_json_schema(*args, **kwargs)
1025 @classmethod
1026 def model_validate(cls, *args: Any, **kwargs: Any) -> Any:
1027 """See `pydantic.BaseModel.model_validate`."""
1028 return super().model_validate(*args, **kwargs)
1030 @classmethod
1031 def model_validate_json(cls, *args: Any, **kwargs: Any) -> Any:
1032 """See `pydantic.BaseModel.model_validate_json`."""
1033 return super().model_validate_json(*args, **kwargs)
1035 @classmethod
1036 def model_validate_strings(cls, *args: Any, **kwargs: Any) -> Any:
1037 """See `pydantic.BaseModel.model_validate_strings`."""
1038 return super().model_validate_strings(*args, **kwargs)
1041class ProvenanceInitQuantaModel(pydantic.RootModel):
1042 """Data model for the init quanta in a provenance graph."""
1044 root: list[ProvenanceInitQuantumModel] = pydantic.Field(default_factory=list)
1045 """List of special "init" quanta, one for each task."""
1047 def _add_to_graph(self, graph: ProvenanceQuantumGraph) -> None:
1048 """Add this quantum and its edges to datasets to a provenance graph.
1050 Parameters
1051 ----------
1052 graph : `ProvenanceQuantumGraph`
1053 Graph to update in place.
1054 """
1055 empty_data_id = DataCoordinate.make_empty(graph.pipeline_graph.universe)
1056 for init_quantum in self.root:
1057 init_quantum._add_to_graph(graph, empty_data_id=empty_data_id)
1059 # Work around the fact that Sphinx chokes on Pydantic docstring formatting,
1060 # when we inherit those docstrings in our public classes.
1061 if "sphinx" in sys.modules and not TYPE_CHECKING:
1063 def copy(self, *args: Any, **kwargs: Any) -> Any:
1064 """See `pydantic.BaseModel.copy`."""
1065 return super().copy(*args, **kwargs)
1067 def model_dump(self, *args: Any, **kwargs: Any) -> Any:
1068 """See `pydantic.BaseModel.model_dump`."""
1069 return super().model_dump(*args, **kwargs)
1071 def model_dump_json(self, *args: Any, **kwargs: Any) -> Any:
1072 """See `pydantic.BaseModel.model_dump_json`."""
1073 return super().model_dump(*args, **kwargs)
1075 def model_copy(self, *args: Any, **kwargs: Any) -> Any:
1076 """See `pydantic.BaseModel.model_copy`."""
1077 return super().model_copy(*args, **kwargs)
1079 @classmethod
1080 def model_construct(cls, *args: Any, **kwargs: Any) -> Any: # type: ignore[misc, override]
1081 """See `pydantic.BaseModel.model_construct`."""
1082 return super().model_construct(*args, **kwargs)
1084 @classmethod
1085 def model_json_schema(cls, *args: Any, **kwargs: Any) -> Any:
1086 """See `pydantic.BaseModel.model_json_schema`."""
1087 return super().model_json_schema(*args, **kwargs)
1089 @classmethod
1090 def model_validate(cls, *args: Any, **kwargs: Any) -> Any:
1091 """See `pydantic.BaseModel.model_validate`."""
1092 return super().model_validate(*args, **kwargs)
1094 @classmethod
1095 def model_validate_json(cls, *args: Any, **kwargs: Any) -> Any:
1096 """See `pydantic.BaseModel.model_validate_json`."""
1097 return super().model_validate_json(*args, **kwargs)
1099 @classmethod
1100 def model_validate_strings(cls, *args: Any, **kwargs: Any) -> Any:
1101 """See `pydantic.BaseModel.model_validate_strings`."""
1102 return super().model_validate_strings(*args, **kwargs)
1105class ProvenanceQuantumGraph(BaseQuantumGraph):
1106 """A quantum graph that represents processing that has already been
1107 executed.
1109 Parameters
1110 ----------
1111 header : `HeaderModel`
1112 General metadata shared with other quantum graph types.
1113 pipeline_graph : `.pipeline_graph.PipelineGraph`
1114 Graph of tasks and dataset types. May contain a superset of the tasks
1115 and dataset types that actually have quanta and datasets in the quantum
1116 graph.
1118 Notes
1119 -----
1120 A provenance quantum graph is generally obtained via the
1121 `ProvenanceQuantumGraphReader.graph` attribute, which is updated in-place
1122 as information is read from disk.
1123 """
1125 def __init__(self, header: HeaderModel, pipeline_graph: PipelineGraph) -> None:
1126 super().__init__(header, pipeline_graph)
1127 self._init_quanta: dict[TaskLabel, uuid.UUID] = {}
1128 self._quantum_only_xgraph = networkx.DiGraph()
1129 self._bipartite_xgraph = networkx.DiGraph()
1130 self._quanta_by_task_label: dict[str, dict[DataCoordinate, uuid.UUID]] = {
1131 task_label: {} for task_label in self.pipeline_graph.tasks.keys()
1132 }
1133 self._datasets_by_type: dict[str, dict[DataCoordinate, uuid.UUID]] = {
1134 dataset_type_name: {} for dataset_type_name in self.pipeline_graph.dataset_types.keys()
1135 }
1137 @classmethod
1138 @contextmanager
1139 def from_args(
1140 cls,
1141 repo_or_filename: str,
1142 /,
1143 collection: str | None = None,
1144 *,
1145 quanta: Iterable[uuid.UUID] | None = None,
1146 datasets: Iterable[uuid.UUID] | None = None,
1147 writeable: bool = False,
1148 ) -> Iterator[tuple[ProvenanceQuantumGraph, Butler | None]]:
1149 """Construct a `ProvenanceQuantumGraph` fron CLI-friendly arguments for
1150 a file or butler-ingested graph dataset.
1152 Parameters
1153 ----------
1154 repo_or_filename : `str`
1155 Either a provenance quantum graph filename or a butler repository
1156 path or alias.
1157 collection : `str`, optional
1158 Collection to search; presence indicates that the first argument
1159 is a butler repository, not a filename.
1160 quanta : `~collections.abc.Iterable` [ `str` ] or `None`, optional
1161 IDs of the quanta to load, or `None` to load all.
1162 datasets : `~collections.abc.Iterable` [ `str` ], optional
1163 IDs of the datasets to load, or `None` to load all.
1164 writeable : `bool`, optional
1165 Whether the butler should be constructed with write support.
1167 Returns
1168 -------
1169 context : `contextlib.AbstractContextManager`
1170 A context manager that yields a tuple of
1172 - the `ProvenanceQuantumGraph`
1173 - the `Butler` constructed (or `None`)
1175 when entered.
1176 """
1177 exit_stack = ExitStack()
1178 if collection is not None:
1179 try:
1180 butler = exit_stack.enter_context(
1181 Butler.from_config(repo_or_filename, collections=[collection], writeable=writeable)
1182 )
1183 except Exception as err:
1184 err.add_note(
1185 f"Expected {repo_or_filename!r} to be a butler repository path or alias because a "
1186 f"collection ({collection}) was provided."
1187 )
1188 raise
1189 with exit_stack:
1190 graph = butler.get(
1191 acc.PROVENANCE_DATASET_TYPE_NAME, parameters={"quanta": quanta, "datasets": datasets}
1192 )
1193 yield graph, butler
1194 else:
1195 try:
1196 reader = exit_stack.enter_context(ProvenanceQuantumGraphReader.open(repo_or_filename))
1197 except Exception as err:
1198 err.add_note(
1199 f"Expected a {repo_or_filename} to be a provenance quantum graph filename "
1200 f"because no collection was provided."
1201 )
1202 raise
1203 with exit_stack:
1204 if quanta is None:
1205 reader.read_quanta()
1206 elif not quanta:
1207 reader.read_quanta(quanta)
1208 if datasets is None:
1209 reader.read_datasets()
1210 elif not datasets:
1211 reader.read_datasets(datasets)
1212 yield reader.graph, None
1214 @property
1215 def init_quanta(self) -> Mapping[TaskLabel, uuid.UUID]:
1216 """A mapping from task label to the ID of the special init quantum for
1217 that task.
1219 This is populated by the ``init_quanta`` component. Additional
1220 information about each init quantum can be found by using the ID to
1221 look up node attributes in the `bipartite_xgraph`, i.e.::
1223 info: ProvenanceInitQuantumInfo = qg.bipartite_xgraph.nodes[id]
1224 """
1225 return self._init_quanta
1227 @property
1228 def quanta_by_task(self) -> Mapping[TaskLabel, Mapping[DataCoordinate, uuid.UUID]]:
1229 """A nested mapping of all quanta, keyed first by task name and then by
1230 data ID.
1232 Notes
1233 -----
1234 This is populated one quantum at a time as they are read. All tasks in
1235 the pipeline graph are included, even if none of their quanta were
1236 loaded (i.e. nested mappings may be empty).
1238 The returned object may be an internal dictionary; as the type
1239 annotation indicates, it should not be modified in place.
1240 """
1241 return self._quanta_by_task_label
1243 @property
1244 def datasets_by_type(self) -> Mapping[DatasetTypeName, Mapping[DataCoordinate, uuid.UUID]]:
1245 """A nested mapping of all datasets, keyed first by dataset type name
1246 and then by data ID.
1248 Notes
1249 -----
1250 This is populated one dataset at a time as they are read. All dataset
1251 types in the pipeline graph are included, even if none of their
1252 datasets were loaded (i.e. nested mappings may be empty).
1254 Reading a quantum also populates its log and metadata datasets.
1256 The returned object may be an internal dictionary; as the type
1257 annotation indicates, it should not be modified in place.
1258 """
1259 return self._datasets_by_type
1261 @property
1262 def quantum_only_xgraph(self) -> networkx.DiGraph:
1263 """A directed acyclic graph with quanta as nodes (and datasets elided).
1265 Notes
1266 -----
1267 Node keys are quantum UUIDs, and are populated one quantum at a time as
1268 they are loaded. Loading quanta (via
1269 `ProvenanceQuantumGraphReader.read_quanta`) will add the loaded nodes
1270 with full attributes and add edges to adjacent nodes with no
1271 attributes. Loading datasets (via
1272 `ProvenanceQuantumGraphReader.read_datasets`) will also add edges and
1273 nodes with no attributes.
1275 Node attributes are described by the `ProvenanceQuantumInfo` types.
1277 This graph does not include special "init" quanta.
1279 The returned object is a read-only view of an internal one.
1280 """
1281 return self._quantum_only_xgraph.copy(as_view=True)
1283 @property
1284 def bipartite_xgraph(self) -> networkx.DiGraph:
1285 """A directed acyclic graph with quantum and dataset nodes.
1287 Notes
1288 -----
1289 Node keys are quantum or dataset UUIDs, and are populated one quantum
1290 or dataset at a time as they are loaded. Loading quanta (via
1291 `ProvenanceQuantumGraphReader.read_quanta`) or datasets (via
1292 `ProvenanceQuantumGraphReader.read_datasets`) will load those nodes
1293 with full attributes and edges to adjacent nodes with no attributes.
1294 Loading quanta is necessary to populate edge attributes.
1295 Reading a quantum also populates its log and metadata datasets.
1297 Node attributes are described by the
1298 `ProvenanceQuantumInfo`, `ProvenanceInitQuantumInfo`, and
1299 `ProvenanceDatasetInfo` types.
1301 This graph includes init-input and init-output datasets, but it does
1302 *not* reflect the dependency between each task's special "init" quantum
1303 and its runtime quanta (as this would require edges between quanta, and
1304 that would break the "bipartite" property).
1306 The returned object is a read-only view of an internal one.
1307 """
1308 return self._bipartite_xgraph.copy(as_view=True)
1310 def make_quantum_table(self, drop_unused_columns: bool = True) -> astropy.table.Table:
1311 """Construct an `astropy.table.Table` with a tabular summary of the
1312 quanta.
1314 Parameters
1315 ----------
1316 drop_unused_columns : `bool`, optional
1317 Whether to drop columns for rare states that did not actually
1318 occur in this run.
1320 Returns
1321 -------
1322 table : `astropy.table.Table`
1323 A table view of the quantum information. This only includes
1324 counts of status categories and caveats, not any per-data-ID
1325 detail.
1327 Notes
1328 -----
1329 Success caveats in the table are represented by their
1330 `~QuantumSuccessCaveats.concise` form, so when pretty-printing this
1331 table for users, the `~QuantumSuccessCaveats.legend` should generally
1332 be printed as well.
1333 """
1334 rows = []
1335 for task_label, quanta_for_task in self.quanta_by_task.items():
1336 if not self.header.n_task_quanta[task_label]:
1337 continue
1338 status_counts = Counter[QuantumAttemptStatus](
1339 self._quantum_only_xgraph.nodes[q]["status"] for q in quanta_for_task.values()
1340 )
1341 caveat_counts = Counter[QuantumSuccessCaveats | None](
1342 self._quantum_only_xgraph.nodes[q]["caveats"] for q in quanta_for_task.values()
1343 )
1344 caveat_counts.pop(QuantumSuccessCaveats.NO_CAVEATS, None)
1345 caveat_counts.pop(None, None)
1346 if len(caveat_counts) > 1:
1347 caveats = "(multiple)"
1348 elif len(caveat_counts) == 1:
1349 ((code, count),) = caveat_counts.items()
1350 # MyPy can't tell that the pop(None, None) above makes None
1351 # impossible here.
1352 caveats = f"{code.concise()}({count})" # type: ignore[union-attr]
1353 else:
1354 caveats = ""
1355 row: dict[str, Any] = {
1356 "Task": task_label,
1357 "Caveats": caveats,
1358 }
1359 for status in QuantumAttemptStatus:
1360 row[status.title] = status_counts.get(status, 0)
1361 row.update(
1362 {
1363 "TOTAL": len(quanta_for_task),
1364 "EXPECTED": self.header.n_task_quanta[task_label],
1365 }
1366 )
1367 rows.append(row)
1368 table = astropy.table.Table(rows)
1369 if drop_unused_columns:
1370 for status in QuantumAttemptStatus:
1371 if status.is_rare and not table[status.title].any():
1372 del table[status.title]
1373 return table
1375 def make_exception_table(self) -> astropy.table.Table:
1376 """Construct an `astropy.table.Table` with counts for each exception
1377 type raised by each task.
1379 Returns
1380 -------
1381 table : `astropy.table.Table`
1382 A table with columns for task label, exception type, and counts.
1383 """
1384 rows = []
1385 for task_label, quanta_for_task in self.quanta_by_task.items():
1386 success_counts = Counter[str]()
1387 failed_counts = Counter[str]()
1388 for quantum_id in quanta_for_task.values():
1389 quantum_info: ProvenanceQuantumInfo = self._quantum_only_xgraph.nodes[quantum_id]
1390 exc_info = quantum_info["exception"]
1391 if exc_info is not None:
1392 if quantum_info["status"] is QuantumAttemptStatus.SUCCESSFUL:
1393 success_counts[exc_info.type_name] += 1
1394 else:
1395 failed_counts[exc_info.type_name] += 1
1396 for type_name in sorted(success_counts.keys() | failed_counts.keys()):
1397 rows.append(
1398 {
1399 "Task": task_label,
1400 "Exception": type_name,
1401 "Successes": success_counts.get(type_name, 0),
1402 "Failures": failed_counts.get(type_name, 0),
1403 }
1404 )
1405 return astropy.table.Table(rows)
1407 def make_task_resource_usage_table(
1408 self, task_label: TaskLabel, include_data_ids: bool = False
1409 ) -> astropy.table.Table:
1410 """Make a table of resource usage for a single task.
1412 Parameters
1413 ----------
1414 task_label : `str`
1415 Label of the task to extract resource usage for.
1416 include_data_ids : `bool`, optional
1417 Whether to also include data ID columns.
1419 Returns
1420 -------
1421 table : `astropy.table.Table`
1422 A table with columns for quantum ID and all fields in
1423 `QuantumResourceUsage`.
1424 """
1425 quanta_for_task = self.quanta_by_task[task_label]
1426 dtype_terms: list[tuple[str, np.dtype]] = [("quantum_id", np.dtype((np.void, 16)))]
1427 if include_data_ids:
1428 dimensions = self.pipeline_graph.tasks[task_label].dimensions
1429 for dimension_name in dimensions.data_coordinate_keys:
1430 dtype = np.dtype(self.pipeline_graph.universe.dimensions[dimension_name].primary_key.pytype)
1431 dtype_terms.append((dimension_name, dtype))
1432 fields = QuantumResourceUsage.get_numpy_fields()
1433 dtype_terms.extend(fields.items())
1434 row_dtype = np.dtype(dtype_terms)
1435 rows: list[object] = []
1436 for data_id, quantum_id in quanta_for_task.items():
1437 info: ProvenanceQuantumInfo = self._quantum_only_xgraph.nodes[quantum_id]
1438 if (resource_usage := info["resource_usage"]) is not None:
1439 row: tuple[object, ...] = (quantum_id.bytes,)
1440 if include_data_ids:
1441 row += data_id.full_values
1442 row += resource_usage.get_numpy_row()
1443 rows.append(row)
1444 array = np.array(rows, dtype=row_dtype)
1445 return astropy.table.Table(array, units=QuantumResourceUsage.get_units())
1447 def make_status_report(
1448 self,
1449 states: Iterable[QuantumAttemptStatus] = (
1450 QuantumAttemptStatus.FAILED,
1451 QuantumAttemptStatus.ABORTED,
1452 QuantumAttemptStatus.ABORTED_SUCCESS,
1453 ),
1454 *,
1455 also: QuantumAttemptStatus | Iterable[QuantumAttemptStatus] = (),
1456 with_caveats: QuantumSuccessCaveats | None = QuantumSuccessCaveats.PARTIAL_OUTPUTS_ERROR,
1457 data_id_table_dir: ResourcePathExpression | None = None,
1458 ) -> ProvenanceReport:
1459 """Make a JSON- or YAML-friendly report of all quanta with the given
1460 states.
1462 Parameters
1463 ----------
1464 states : `~collections.abc.Iterable` [`..QuantumAttemptStatus`] or \
1465 `..QuantumAttemptStatus`, optional
1466 A quantum is included if it has any of these states. Defaults to
1467 states that clearly represent problems.
1468 also : `~collections.abc.Iterable` [`..QuantumAttemptStatus`] or \
1469 `..QuantumAttemptStatus`, optional
1470 Additional states to consider; unioned with ``states``. This is
1471 provided so users can easily request additional states while also
1472 getting the defaults.
1473 with_caveats : `..QuantumSuccessCaveats` or `None`, optional
1474 If `..QuantumAttemptStatus.SUCCESSFUL` is in ``states``, only
1475 include quanta with these caveat flags. May be set to `None`
1476 to report on all successful quanta.
1477 data_id_table_dir : convertible to `~lsst.resources.ResourcePath`, \
1478 optional
1479 If provided, a directory to write data ID tables (in ECSV format)
1480 with all of the data IDs with the given states, for use with the
1481 ``--data-id-tables`` argument to the quantum graph builder.
1482 Subdirectories for each task and status will created within this
1483 directory, with one file for each exception type (or ``UNKNOWN``
1484 when there is no exception).
1486 Returns
1487 -------
1488 report : `ProvenanceModel`
1489 A Pydantic model that groups quanta by task label and exception
1490 type.
1491 """
1492 states = set(ensure_iterable(states))
1493 states.update(ensure_iterable(also))
1494 result = ProvenanceReport(root={})
1495 if data_id_table_dir is not None:
1496 data_id_table_dir = ResourcePath(data_id_table_dir)
1497 for task_label, quanta_for_task in self.quanta_by_task.items():
1498 reports_for_task: dict[str, dict[str | None, list[ProvenanceQuantumReport]]] = {}
1499 table_rows_for_task: dict[str, dict[str | None, list[tuple[int | str, ...]]]] = {}
1500 for quantum_id in quanta_for_task.values():
1501 quantum_info: ProvenanceQuantumInfo = self._quantum_only_xgraph.nodes[quantum_id]
1502 quantum_status = quantum_info["status"]
1503 if quantum_status not in states:
1504 continue
1505 if (
1506 quantum_status is QuantumAttemptStatus.SUCCESSFUL
1507 and with_caveats is not None
1508 and (quantum_info["caveats"] is None or not (quantum_info["caveats"] & with_caveats))
1509 ):
1510 continue
1511 key1 = quantum_status.name
1512 exc_info = quantum_info["exception"]
1513 key2 = exc_info.type_name if exc_info is not None else None
1514 reports_for_task.setdefault(key1, {}).setdefault(key2, []).append(
1515 ProvenanceQuantumReport.from_info(quantum_id, quantum_info)
1516 )
1517 if data_id_table_dir:
1518 table_rows_for_task.setdefault(key1, {}).setdefault(key2, []).append(
1519 quantum_info["data_id"].required_values
1520 )
1521 if reports_for_task:
1522 result.root[task_label] = reports_for_task
1523 if table_rows_for_task:
1524 assert data_id_table_dir is not None, "table_rows_for_task should be empty"
1525 for status_name, table_rows_for_status in table_rows_for_task.items():
1526 dir_for_task_and_status = data_id_table_dir.join(task_label, forceDirectory=True).join(
1527 status_name, forceDirectory=True
1528 )
1529 if dir_for_task_and_status.isLocal:
1530 dir_for_task_and_status.mkdir()
1531 for exc_name, data_id_rows in table_rows_for_status.items():
1532 table = astropy.table.Table(
1533 rows=data_id_rows,
1534 names=list(self.pipeline_graph.tasks[task_label].dimensions.required),
1535 )
1536 filename = f"{exc_name}.ecsv" if exc_name is not None else "UNKNOWN.ecsv"
1537 with dir_for_task_and_status.join(filename).open("w") as stream:
1538 table.write(stream, format="ecsv")
1539 return result
1541 def make_many_reports(
1542 self,
1543 states: Iterable[QuantumAttemptStatus] = (
1544 QuantumAttemptStatus.FAILED,
1545 QuantumAttemptStatus.ABORTED,
1546 QuantumAttemptStatus.ABORTED_SUCCESS,
1547 ),
1548 *,
1549 status_report_file: ResourcePathExpression | None = None,
1550 print_quantum_table: bool = False,
1551 print_exception_table: bool = False,
1552 also: QuantumAttemptStatus | Iterable[QuantumAttemptStatus] = (),
1553 with_caveats: QuantumSuccessCaveats | None = None,
1554 data_id_table_dir: ResourcePathExpression | None = None,
1555 ) -> None:
1556 """Write multiple reports.
1558 Parameters
1559 ----------
1560 states : `~collections.abc.Iterable` [`..QuantumAttemptStatus`] or \
1561 `..QuantumAttemptStatus`, optional
1562 A quantum is included in the status report and data ID tables if it
1563 has any of these states. Defaults to states that clearly represent
1564 problems.
1565 status_report_file : convertible to `~lsst.resources.ResourcePath`,
1566 optional
1567 Filename for the JSON status report (see `make_status_report`).
1568 print_quantum_table : `bool`, optional
1569 If `True`, print a quantum summary table (counts only) to STDOUT.
1570 print_exception_table : `bool`, optional
1571 If `True`, print an exception-type summary table (counts only) to
1572 STDOUT.
1573 also : `~collections.abc.Iterable` [`..QuantumAttemptStatus`] or \
1574 `..QuantumAttemptStatus`, optional
1575 Additional states to consider in the status report and data ID
1576 tables; unioned with ``states``. This is provided so users can
1577 easily request additional states while also getting the defaults.
1578 with_caveats : `..QuantumSuccessCaveats` or `None`, optional
1579 Only include quanta with these caveat flags in the status report
1580 and data ID tables. May be set to `None` to report on all
1581 successful quanta (an empty sequence reports on only quanta with no
1582 caveats). If provided, `QuantumAttemptStatus.SUCCESSFUL` is
1583 automatically included in ``states``.
1584 data_id_table_dir : convertible to `~lsst.resources.ResourcePath`, \
1585 optional
1586 If provided, a directory to write data ID tables (in ECSV format)
1587 with all of the data IDs with the given states, for use with the
1588 ``--data-id-tables`` argument to the quantum graph builder.
1589 Subdirectories for each task and status will created within this
1590 directory, with one file for each exception type (or ``UNKNOWN``
1591 when there is no exception).
1592 """
1593 if status_report_file is not None or data_id_table_dir is not None:
1594 status_report = self.make_status_report(
1595 states, also=also, with_caveats=with_caveats, data_id_table_dir=data_id_table_dir
1596 )
1597 if status_report_file is not None:
1598 status_report_file = ResourcePath(status_report_file)
1599 if status_report_file.isLocal:
1600 status_report_file.dirname().mkdir()
1601 with ResourcePath(status_report_file).open("w") as stream:
1602 stream.write(status_report.model_dump_json(indent=2))
1603 if print_quantum_table:
1604 quantum_table = self.make_quantum_table()
1605 if quantum_table:
1606 quantum_table.pprint_all()
1607 print("")
1608 if print_exception_table:
1609 exception_table = self.make_exception_table()
1610 if exception_table:
1611 exception_table.pprint_all()
1612 print("")
1615@dataclasses.dataclass
1616class ProvenanceQuantumGraphReader(BaseQuantumGraphReader):
1617 """A helper class for reading provenance quantum graphs.
1619 Notes
1620 -----
1621 The `open` context manager should be used to construct new instances.
1622 Instances cannot be used after the context manager exits, except to access
1623 the `graph` attribute`.
1625 The various ``read_*`` methods in this class update the `graph` attribute
1626 in place.
1627 """
1629 graph: ProvenanceQuantumGraph = dataclasses.field(init=False)
1630 """Loaded provenance graph, populated in place as components are read."""
1632 @classmethod
1633 @contextmanager
1634 def open(
1635 cls,
1636 uri: ResourcePathExpression,
1637 *,
1638 page_size: int | None = None,
1639 import_mode: TaskImportMode = TaskImportMode.DO_NOT_IMPORT,
1640 ) -> Iterator[ProvenanceQuantumGraphReader]:
1641 """Construct a reader from a URI.
1643 Parameters
1644 ----------
1645 uri : convertible to `lsst.resources.ResourcePath`
1646 URI to open. Should have a ``.qg`` extension.
1647 page_size : `int`, optional
1648 Approximate number of bytes to read at once from address files and
1649 multi-block files. Note that this does not set a page size for
1650 *all* reads, but it does affect the smallest, most numerous reads.
1651 Can also be set via the ``LSST_QG_PAGE_SIZE`` environment variable.
1652 import_mode : `.pipeline_graph.TaskImportMode`, optional
1653 How to handle importing the task classes referenced in the pipeline
1654 graph.
1656 Returns
1657 -------
1658 reader : `contextlib.AbstractContextManager` [ \
1659 `ProvenanceQuantumGraphReader` ]
1660 A context manager that returns the reader when entered.
1661 """
1662 with cls._open(
1663 uri,
1664 graph_type="provenance",
1665 address_filename="nodes",
1666 page_size=page_size,
1667 import_mode=import_mode,
1668 n_addresses=4,
1669 ) as self:
1670 yield self
1672 def __post_init__(self) -> None:
1673 self.graph = ProvenanceQuantumGraph(self.header, self.pipeline_graph)
1675 def read_init_quanta(self) -> None:
1676 """Read the thin graph, with all edge information and categorization of
1677 quanta by task label.
1678 """
1679 init_quanta = self._read_single_block("init_quanta", ProvenanceInitQuantaModel)
1680 for init_quantum in init_quanta.root:
1681 self.graph._init_quanta[init_quantum.task_label] = init_quantum.quantum_id
1682 init_quanta._add_to_graph(self.graph)
1684 def read_full_graph(self) -> None:
1685 """Read all bipartite edges and all quantum and dataset node
1686 attributes, fully populating the `graph` attribute.
1688 Notes
1689 -----
1690 This does not read logs, metadata, or packages ; those must always be
1691 fetched explicitly.
1692 """
1693 self.read_init_quanta()
1694 self.read_datasets()
1695 self.read_quanta()
1697 def read_datasets(self, datasets: Iterable[uuid.UUID] | None = None) -> None:
1698 """Read information about the given datasets.
1700 Parameters
1701 ----------
1702 datasets : `~collections.abc.Iterable` [`uuid.UUID`], optional
1703 Iterable of dataset IDs to load. If not provided, all datasets
1704 will be loaded. The UUIDs and indices of quanta will be ignored.
1705 """
1706 self._read_nodes(datasets, DATASET_ADDRESS_INDEX, DATASET_MB_NAME, ProvenanceDatasetModel)
1708 def read_quanta(self, quanta: Iterable[uuid.UUID] | None = None) -> None:
1709 """Read information about the given quanta.
1711 Parameters
1712 ----------
1713 quanta : `~collections.abc.Iterable` [`uuid.UUID`], optional
1714 Iterable of quantum IDs to load. If not provided, all quanta will
1715 be loaded. The UUIDs and indices of datasets and special init
1716 quanta will be ignored.
1717 """
1718 self._read_nodes(quanta, QUANTUM_ADDRESS_INDEX, QUANTUM_MB_NAME, ProvenanceQuantumModel)
1720 def _read_nodes(
1721 self,
1722 nodes: Iterable[uuid.UUID] | None,
1723 address_index: int,
1724 mb_name: str,
1725 model_type: type[ProvenanceDatasetModel] | type[ProvenanceQuantumModel],
1726 ) -> None:
1727 node: ProvenanceDatasetModel | ProvenanceQuantumModel | None
1728 if nodes is None:
1729 self.address_reader.read_all()
1730 nodes = self.address_reader.rows.keys()
1731 for node in MultiblockReader.read_all_models_in_zip(
1732 self.zf,
1733 mb_name,
1734 model_type,
1735 self.decompressor,
1736 int_size=self.header.int_size,
1737 page_size=self.page_size,
1738 ):
1739 if "pipeline_node" in self.graph._bipartite_xgraph.nodes.get(node.node_id, {}):
1740 # Use the old node to reduce memory usage (since it might
1741 # also have other outstanding reference holders).
1742 continue
1743 node._add_to_graph(self.graph)
1744 else:
1745 with MultiblockReader.open_in_zip(self.zf, mb_name, int_size=self.header.int_size) as mb_reader:
1746 for node_id_or_index in nodes:
1747 address_row = self.address_reader.find(node_id_or_index)
1748 if "pipeline_node" in self.graph._bipartite_xgraph.nodes.get(address_row.key, {}):
1749 # Use the old node to reduce memory usage (since it
1750 # might also have other outstanding reference holders).
1751 continue
1752 node = mb_reader.read_model(
1753 address_row.addresses[address_index], model_type, self.decompressor
1754 )
1755 if node is not None:
1756 node._add_to_graph(self.graph)
1758 def fetch_logs(self, nodes: Iterable[uuid.UUID]) -> dict[uuid.UUID, list[ButlerLogRecords | None]]:
1759 """Fetch log datasets.
1761 Parameters
1762 ----------
1763 nodes : `~collections.abc.Iterable` [ `uuid.UUID` ]
1764 UUIDs of the log datasets themselves or of the quanta they
1765 correspond to.
1767 Returns
1768 -------
1769 logs : `dict` [ `uuid.UUID`, `list` [\
1770 `lsst.daf.butler.ButlerLogRecords` or `None`] ]
1771 Logs for the given IDs. Each value is a list of
1772 `lsst.daf.butler.ButlerLogRecords` instances representing different
1773 execution attempts, ordered chronologically from first to last.
1774 Attempts where logs were missing will have `None` in this list.
1775 """
1776 result: dict[uuid.UUID, list[ButlerLogRecords | None]] = {}
1777 with MultiblockReader.open_in_zip(self.zf, LOG_MB_NAME, int_size=self.header.int_size) as mb_reader:
1778 for node_id_or_index in nodes:
1779 address_row = self.address_reader.find(node_id_or_index)
1780 logs_by_attempt = mb_reader.read_model(
1781 address_row.addresses[LOG_ADDRESS_INDEX], ProvenanceLogRecordsModel, self.decompressor
1782 )
1783 if logs_by_attempt is not None:
1784 result[node_id_or_index] = [
1785 ButlerLogRecords.from_records(attempt_logs) if attempt_logs is not None else None
1786 for attempt_logs in logs_by_attempt.attempts
1787 ]
1788 return result
1790 def fetch_metadata(self, nodes: Iterable[uuid.UUID]) -> dict[uuid.UUID, list[TaskMetadata | None]]:
1791 """Fetch metadata datasets.
1793 Parameters
1794 ----------
1795 nodes : `~collections.abc.Iterable` [ `uuid.UUID` ]
1796 UUIDs of the metadata datasets themselves or of the quanta they
1797 correspond to.
1799 Returns
1800 -------
1801 metadata : `dict` [ `uuid.UUID`, `list` [`.TaskMetadata`] ]
1802 Metadata for the given IDs. Each value is a list of
1803 `.TaskMetadata` instances representing different execution
1804 attempts, ordered chronologically from first to last. Attempts
1805 where metadata was missing (not written even in the fallback extra
1806 provenance in the logs) will have `None` in this list.
1807 """
1808 result: dict[uuid.UUID, list[TaskMetadata | None]] = {}
1809 with MultiblockReader.open_in_zip(
1810 self.zf, METADATA_MB_NAME, int_size=self.header.int_size
1811 ) as mb_reader:
1812 for node_id_or_index in nodes:
1813 address_row = self.address_reader.find(node_id_or_index)
1814 metadata_by_attempt = mb_reader.read_model(
1815 address_row.addresses[METADATA_ADDRESS_INDEX],
1816 ProvenanceTaskMetadataModel,
1817 self.decompressor,
1818 )
1819 if metadata_by_attempt is not None:
1820 result[node_id_or_index] = metadata_by_attempt.attempts
1821 return result
1823 def fetch_packages(self) -> Packages:
1824 """Fetch package version information."""
1825 data = self._read_single_block_raw("packages")
1826 return Packages.fromBytes(data, format="json")
1829class ProvenanceQuantumGraphWriter:
1830 """A struct of low-level writer objects for the main components of a
1831 provenance quantum graph.
1833 Parameters
1834 ----------
1835 output_path : `str`
1836 Path to write the graph to.
1837 exit_stack : `contextlib.ExitStack`
1838 Object that can be used to manage multiple context managers.
1839 log_on_close : `LogOnClose`
1840 Factory for context managers that log when closed.
1841 predicted : `.PredictedQuantumGraphComponents`
1842 Components of the predicted graph.
1843 zstd_level : `int`, optional
1844 Compression level.
1845 cdict_data : `bytes` or `None`, optional
1846 Bytes representation of the compression dictionary used by the
1847 compressor.
1848 loop_wrapper : `~collections.abc.Callable`, optional
1849 A callable that takes an iterable and returns an equivalent one, to be
1850 used in all potentially-large loops. This can be used to add progress
1851 reporting or check for cancelation signals.
1852 log : `LsstLogAdapter`, optional
1853 Logger to use for debug messages.
1854 """
1856 def __init__(
1857 self,
1858 output_path: str,
1859 *,
1860 exit_stack: ExitStack,
1861 log_on_close: LogOnClose,
1862 predicted: PredictedQuantumGraphComponents | PredictedQuantumGraph,
1863 zstd_level: int = 10,
1864 cdict_data: bytes | None = None,
1865 loop_wrapper: LoopWrapper = pass_through,
1866 log: LsstLogAdapter | None = None,
1867 ) -> None:
1868 header = predicted.header.model_copy()
1869 header.graph_type = "provenance"
1870 if log is None:
1871 log = _LOG
1872 self.log = log
1873 self._base_writer = exit_stack.enter_context(
1874 log_on_close.wrap(
1875 BaseQuantumGraphWriter.open(
1876 output_path,
1877 header,
1878 predicted.pipeline_graph,
1879 address_filename="nodes",
1880 zstd_level=zstd_level,
1881 cdict_data=cdict_data,
1882 ),
1883 "Finishing writing provenance quantum graph.",
1884 )
1885 )
1886 self._base_writer.address_writer.addresses = [{}, {}, {}, {}]
1887 self._log_writer = exit_stack.enter_context(
1888 log_on_close.wrap(
1889 MultiblockWriter.open_in_zip(
1890 self._base_writer.zf, LOG_MB_NAME, header.int_size, use_tempfile=True
1891 ),
1892 "Copying logs into zip archive.",
1893 ),
1894 )
1895 self._base_writer.address_writer.addresses[LOG_ADDRESS_INDEX] = self._log_writer.addresses
1896 self._metadata_writer = exit_stack.enter_context(
1897 log_on_close.wrap(
1898 MultiblockWriter.open_in_zip(
1899 self._base_writer.zf, METADATA_MB_NAME, header.int_size, use_tempfile=True
1900 ),
1901 "Copying metadata into zip archive.",
1902 )
1903 )
1904 self._base_writer.address_writer.addresses[METADATA_ADDRESS_INDEX] = self._metadata_writer.addresses
1905 self._dataset_writer = exit_stack.enter_context(
1906 log_on_close.wrap(
1907 MultiblockWriter.open_in_zip(
1908 self._base_writer.zf, DATASET_MB_NAME, header.int_size, use_tempfile=True
1909 ),
1910 "Copying dataset provenance into zip archive.",
1911 )
1912 )
1913 self._base_writer.address_writer.addresses[DATASET_ADDRESS_INDEX] = self._dataset_writer.addresses
1914 self._quantum_writer = exit_stack.enter_context(
1915 log_on_close.wrap(
1916 MultiblockWriter.open_in_zip(
1917 self._base_writer.zf, QUANTUM_MB_NAME, header.int_size, use_tempfile=True
1918 ),
1919 "Copying quantum provenance into zip archive.",
1920 )
1921 )
1922 self._base_writer.address_writer.addresses[QUANTUM_ADDRESS_INDEX] = self._quantum_writer.addresses
1923 self._init_predicted_quanta(predicted)
1924 self._populate_xgraph_and_inputs(loop_wrapper)
1925 self._existing_init_outputs: set[uuid.UUID] = set()
1927 def _init_predicted_quanta(
1928 self, predicted: PredictedQuantumGraph | PredictedQuantumGraphComponents
1929 ) -> None:
1930 self._predicted_init_quanta: list[PredictedQuantumDatasetsModel] = []
1931 self._predicted_quanta: dict[uuid.UUID, PredictedQuantumDatasetsModel] = {}
1932 if isinstance(predicted, PredictedQuantumGraph):
1933 self._predicted_init_quanta.extend(predicted._init_quanta.values())
1934 self._predicted_quanta.update(predicted._quantum_datasets)
1935 else:
1936 self._predicted_init_quanta.extend(predicted.init_quanta.root)
1937 self._predicted_quanta.update(predicted.quantum_datasets)
1938 self._predicted_quanta.update({q.quantum_id: q for q in self._predicted_init_quanta})
1940 def _populate_xgraph_and_inputs(self, loop_wrapper: LoopWrapper = pass_through) -> None:
1941 self._xgraph = networkx.DiGraph()
1942 self._overall_inputs: dict[uuid.UUID, PredictedDatasetModel] = {}
1943 output_dataset_ids: set[uuid.UUID] = set()
1944 for predicted_quantum in loop_wrapper(self._predicted_quanta.values()):
1945 if not predicted_quantum.task_label:
1946 # Skip the 'packages' producer quantum.
1947 continue
1948 output_dataset_ids.update(predicted_quantum.iter_output_dataset_ids())
1949 for predicted_quantum in loop_wrapper(self._predicted_quanta.values()):
1950 if not predicted_quantum.task_label:
1951 # Skip the 'packages' producer quantum.
1952 continue
1953 for predicted_input in itertools.chain.from_iterable(predicted_quantum.inputs.values()):
1954 self._xgraph.add_edge(predicted_input.dataset_id, predicted_quantum.quantum_id)
1955 if predicted_input.dataset_id not in output_dataset_ids:
1956 self._overall_inputs.setdefault(predicted_input.dataset_id, predicted_input)
1957 for predicted_output in itertools.chain.from_iterable(predicted_quantum.outputs.values()):
1958 self._xgraph.add_edge(predicted_quantum.quantum_id, predicted_output.dataset_id)
1960 @property
1961 def compressor(self) -> Compressor:
1962 """Object that should be used to compress all JSON blocks."""
1963 return self._base_writer.compressor
1965 def write_packages(self) -> None:
1966 """Write package version information to the provenance graph."""
1967 packages = Packages.fromSystem(include_all=True)
1968 data = packages.toBytes("json")
1969 self._base_writer.write_single_block("packages", data)
1971 def write_overall_inputs(self, loop_wrapper: LoopWrapper = pass_through) -> None:
1972 """Write provenance for overall-input datasets.
1974 Parameters
1975 ----------
1976 loop_wrapper : `~collections.abc.Callable`, optional
1977 A callable that takes an iterable and returns an equivalent one, to
1978 be used in all potentially-large loops. This can be used to add
1979 progress reporting or check for cancelation signals.
1980 """
1981 for predicted_input in loop_wrapper(self._overall_inputs.values()):
1982 if predicted_input.dataset_id not in self._dataset_writer.addresses:
1983 self._dataset_writer.write_model(
1984 predicted_input.dataset_id,
1985 ProvenanceDatasetModel.from_predicted(
1986 predicted_input,
1987 producer=None,
1988 consumers=self._xgraph.successors(predicted_input.dataset_id),
1989 ),
1990 self.compressor,
1991 )
1992 del self._overall_inputs
1994 def write_init_outputs(self, assume_existence: bool = True) -> None:
1995 """Write provenance for init-output datasets and init-quanta.
1997 Parameters
1998 ----------
1999 assume_existence : `bool`, optional
2000 If `True`, just assume all init-outputs exist.
2001 """
2002 init_quanta = ProvenanceInitQuantaModel()
2003 for predicted_init_quantum in self._predicted_init_quanta:
2004 if not predicted_init_quantum.task_label:
2005 # Skip the 'packages' producer quantum.
2006 continue
2007 for predicted_output in itertools.chain.from_iterable(predicted_init_quantum.outputs.values()):
2008 provenance_output = ProvenanceDatasetModel.from_predicted(
2009 predicted_output,
2010 producer=predicted_init_quantum.quantum_id,
2011 consumers=self._xgraph.successors(predicted_output.dataset_id),
2012 )
2013 provenance_output.produced = assume_existence or (
2014 provenance_output.dataset_id in self._existing_init_outputs
2015 )
2016 self._dataset_writer.write_model(
2017 provenance_output.dataset_id, provenance_output, self.compressor
2018 )
2019 init_quanta.root.append(ProvenanceInitQuantumModel.from_predicted(predicted_init_quantum))
2020 self._base_writer.write_single_model("init_quanta", init_quanta)
2022 def write_quantum_provenance(
2023 self, quantum_id: uuid.UUID, metadata: TaskMetadata | None, logs: ButlerLogRecords | None
2024 ) -> None:
2025 """Gather and write provenance for a quantum.
2027 Parameters
2028 ----------
2029 quantum_id : `uuid.UUID`
2030 Unique ID for the quantum.
2031 metadata : `..TaskMetadata` or `None`
2032 Task metadata.
2033 logs : `lsst.daf.butler.logging.ButlerLogRecords` or `None`
2034 Task logs.
2035 """
2036 predicted_quantum = self._predicted_quanta[quantum_id]
2037 provenance_models = ProvenanceQuantumScanModels.from_metadata_and_logs(
2038 predicted_quantum, metadata, logs, incomplete=False
2039 )
2040 scan_data = provenance_models.to_scan_data(predicted_quantum, compressor=self.compressor)
2041 self.write_scan_data(scan_data)
2043 def write_blocked_quantum_provenance(self, quantum_id: uuid.UUID) -> None:
2044 """Gather and write provenance for a quantum that was blocked by an
2045 upstream failure.
2047 Parameters
2048 ----------
2049 quantum_id : `uuid.UUID`
2050 Unique ID for the quantum.
2051 """
2052 self.write_scan_data(ProvenanceQuantumScanData.make_blocked(quantum_id))
2054 def write_scan_data(self, scan_data: ProvenanceQuantumScanData) -> None:
2055 """Write the output of a quantum provenance scan to disk.
2057 Parameters
2058 ----------
2059 scan_data : `ProvenanceQuantumScanData`
2060 Result of a quantum provenance scan.
2061 """
2062 if scan_data.status is ProvenanceQuantumScanStatus.INIT:
2063 self.log.debug("Handling init-output scan for %s.", scan_data.quantum_id)
2064 self._existing_init_outputs.update(scan_data.existing_outputs)
2065 return
2066 self.log.debug("Handling quantum scan for %s.", scan_data.quantum_id)
2067 # We shouldn't need this predicted quantum after this method runs; pop
2068 # from the dict it in the hopes that'll free up some memory when we're
2069 # done.
2070 predicted_quantum = self._predicted_quanta.pop(scan_data.quantum_id)
2071 outputs: dict[uuid.UUID, bytes] = {}
2072 for predicted_output in itertools.chain.from_iterable(predicted_quantum.outputs.values()):
2073 provenance_output = ProvenanceDatasetModel.from_predicted(
2074 predicted_output,
2075 producer=predicted_quantum.quantum_id,
2076 consumers=self._xgraph.successors(predicted_output.dataset_id),
2077 )
2078 provenance_output.produced = provenance_output.dataset_id in scan_data.existing_outputs
2079 outputs[provenance_output.dataset_id] = self.compressor.compress(
2080 provenance_output.model_dump_json().encode()
2081 )
2082 if not scan_data.quantum:
2083 scan_data.quantum = (
2084 ProvenanceQuantumModel.from_predicted(predicted_quantum).model_dump_json().encode()
2085 )
2086 if scan_data.is_compressed:
2087 scan_data.quantum = self.compressor.compress(scan_data.quantum)
2088 if not scan_data.is_compressed:
2089 scan_data.quantum = self.compressor.compress(scan_data.quantum)
2090 if scan_data.metadata:
2091 scan_data.metadata = self.compressor.compress(scan_data.metadata)
2092 if scan_data.logs:
2093 scan_data.logs = self.compressor.compress(scan_data.logs)
2094 self.log.debug("Writing quantum %s.", scan_data.quantum_id)
2095 self._quantum_writer.write_bytes(scan_data.quantum_id, scan_data.quantum)
2096 for dataset_id, dataset_data in outputs.items():
2097 self._dataset_writer.write_bytes(dataset_id, dataset_data)
2098 if scan_data.metadata:
2099 (metadata_output,) = predicted_quantum.outputs[acc.METADATA_OUTPUT_CONNECTION_NAME]
2100 address = self._metadata_writer.write_bytes(scan_data.quantum_id, scan_data.metadata)
2101 self._metadata_writer.addresses[metadata_output.dataset_id] = address
2102 if scan_data.logs:
2103 (log_output,) = predicted_quantum.outputs[acc.LOG_OUTPUT_CONNECTION_NAME]
2104 address = self._log_writer.write_bytes(scan_data.quantum_id, scan_data.logs)
2105 self._log_writer.addresses[log_output.dataset_id] = address
2108class ProvenanceQuantumScanStatus(enum.Enum):
2109 """Status enum for quantum scanning.
2111 Note that this records the status for the *scanning* which is distinct
2112 from the status of the quantum's execution.
2113 """
2115 INCOMPLETE = enum.auto()
2116 """The quantum is not necessarily done running, and cannot be scanned
2117 conclusively yet.
2118 """
2120 ABANDONED = enum.auto()
2121 """The quantum's execution appears to have failed but we cannot rule out
2122 the possibility that it could be recovered, but we've also waited long
2123 enough (according to `ScannerTimeConfigDict.retry_timeout`) that it's time
2124 to stop trying for now.
2126 This state means `ProvenanceQuantumScanModels.from_metadata_and_logs` must
2127 be run again with ``incomplete=False``.
2128 """
2130 SUCCESSFUL = enum.auto()
2131 """The quantum was conclusively scanned and was executed successfully,
2132 unblocking scans for downstream quanta.
2133 """
2135 FAILED = enum.auto()
2136 """The quantum was conclusively scanned and failed execution, blocking
2137 scans for downstream quanta.
2138 """
2140 BLOCKED = enum.auto()
2141 """A quantum upstream of this one failed."""
2143 INIT = enum.auto()
2144 """Init quanta need special handling, because they don't have logs and
2145 metadata.
2146 """
2149@dataclasses.dataclass
2150class ProvenanceQuantumScanModels:
2151 """A struct that represents provenance information for a single quantum."""
2153 quantum_id: uuid.UUID
2154 """Unique ID for the quantum."""
2156 status: ProvenanceQuantumScanStatus = ProvenanceQuantumScanStatus.INCOMPLETE
2157 """Combined status for the scan and the execution of the quantum."""
2159 attempts: list[ProvenanceQuantumAttemptModel] = dataclasses.field(default_factory=list)
2160 """Provenance information about each attempt to run the quantum."""
2162 output_existence: dict[uuid.UUID, bool] = dataclasses.field(default_factory=dict)
2163 """Unique IDs of the output datasets mapped to whether they were actually
2164 produced.
2165 """
2167 metadata: ProvenanceTaskMetadataModel = dataclasses.field(default_factory=ProvenanceTaskMetadataModel)
2168 """Task metadata information for each attempt.
2169 """
2171 logs: ProvenanceLogRecordsModel = dataclasses.field(default_factory=ProvenanceLogRecordsModel)
2172 """Log records for each attempt.
2173 """
2175 @classmethod
2176 def from_metadata_and_logs(
2177 cls,
2178 predicted: PredictedQuantumDatasetsModel,
2179 metadata: TaskMetadata | None,
2180 logs: ButlerLogRecords | None,
2181 *,
2182 incomplete: bool = False,
2183 ) -> ProvenanceQuantumScanModels:
2184 """Construct provenance information from task metadata and logs.
2186 Parameters
2187 ----------
2188 predicted : `PredictedQuantumDatasetsModel`
2189 Information about the predicted quantum.
2190 metadata : `..TaskMetadata` or `None`
2191 Task metadata.
2192 logs : `lsst.daf.butler.logging.ButlerLogRecords` or `None`
2193 Task logs.
2194 incomplete : `bool`, optional
2195 If `True`, treat execution failures as possibly-incomplete quanta
2196 and do not fully process them; instead just set the status to
2197 `ProvenanceQuantumScanStatus.ABANDONED` and return.
2199 Returns
2200 -------
2201 scan_models : `ProvenanceQuantumScanModels`
2202 Struct of models that describe quantum provenance.
2204 Notes
2205 -----
2206 This method does not necessarily fully populate the `output_existence`
2207 field; it does what it can given the information in the metadata and
2208 logs, but the caller is responsible for filling in the existence status
2209 for any predicted outputs that are not present at all in that `dict`.
2210 """
2211 self = ProvenanceQuantumScanModels(predicted.quantum_id)
2212 last_attempt = ProvenanceQuantumAttemptModel()
2213 self._process_logs(predicted, logs, last_attempt, incomplete=incomplete)
2214 self._process_metadata(predicted, metadata, last_attempt, incomplete=incomplete)
2215 if self.status is ProvenanceQuantumScanStatus.ABANDONED:
2216 return self
2217 self._reconcile_attempts(last_attempt)
2218 self._extract_output_existence(predicted)
2219 return self
2221 def _process_logs(
2222 self,
2223 predicted: PredictedQuantumDatasetsModel,
2224 logs: ButlerLogRecords | None,
2225 last_attempt: ProvenanceQuantumAttemptModel,
2226 *,
2227 incomplete: bool,
2228 ) -> None:
2229 (predicted_log_dataset,) = predicted.outputs[acc.LOG_OUTPUT_CONNECTION_NAME]
2230 if logs is None:
2231 self.output_existence[predicted_log_dataset.dataset_id] = False
2232 if incomplete:
2233 self.status = ProvenanceQuantumScanStatus.ABANDONED
2234 else:
2235 self.status = ProvenanceQuantumScanStatus.FAILED
2236 else:
2237 # Set the attempt's run status to FAILED, since the default is
2238 # UNKNOWN (i.e. logs *and* metadata are missing) and we now know
2239 # the logs exist. This will usually get replaced by SUCCESSFUL
2240 # when we look for metadata next.
2241 last_attempt.status = QuantumAttemptStatus.FAILED
2242 self.output_existence[predicted_log_dataset.dataset_id] = True
2243 if logs.extra:
2244 log_extra = _ExecutionLogRecordsExtra.model_validate(logs.extra)
2245 self._extract_from_log_extra(log_extra, last_attempt=last_attempt)
2246 self.logs.attempts.append(list(logs))
2248 def _extract_from_log_extra(
2249 self,
2250 log_extra: _ExecutionLogRecordsExtra,
2251 last_attempt: ProvenanceQuantumAttemptModel | None,
2252 ) -> None:
2253 for previous_attempt_log_extra in log_extra.previous_attempts:
2254 self._extract_from_log_extra(
2255 previous_attempt_log_extra,
2256 last_attempt=None,
2257 )
2258 quantum_attempt: ProvenanceQuantumAttemptModel
2259 if last_attempt is None:
2260 # This is not the last attempt, so it must be a failure.
2261 quantum_attempt = ProvenanceQuantumAttemptModel(
2262 attempt=len(self.attempts), status=QuantumAttemptStatus.FAILED
2263 )
2264 # We also need to get the logs from this extra provenance, since
2265 # they won't be the main section of the log records.
2266 self.logs.attempts.append(log_extra.logs)
2267 # The special last attempt is only appended after we attempt to
2268 # read metadata later, but we have to append this one now.
2269 self.attempts.append(quantum_attempt)
2270 else:
2271 assert not log_extra.logs, "Logs for the last attempt should not be stored in the extra JSON."
2272 quantum_attempt = last_attempt
2273 if log_extra.exception is not None or log_extra.metadata is not None or last_attempt is None:
2274 # We won't be getting a separate metadata dataset, so anything we
2275 # might get from the metadata has to come from this extra
2276 # provenance in the logs.
2277 quantum_attempt.exception = log_extra.exception
2278 if log_extra.metadata is not None:
2279 quantum_attempt.resource_usage = QuantumResourceUsage.from_task_metadata(log_extra.metadata)
2280 self.metadata.attempts.append(log_extra.metadata)
2281 else:
2282 self.metadata.attempts.append(None)
2283 # Regardless of whether this is the last attempt or not, we can only
2284 # get the previous_process_quanta from the log extra.
2285 quantum_attempt.previous_process_quanta.extend(log_extra.previous_process_quanta)
2287 def _process_metadata(
2288 self,
2289 predicted: PredictedQuantumDatasetsModel,
2290 metadata: TaskMetadata | None,
2291 last_attempt: ProvenanceQuantumAttemptModel,
2292 *,
2293 incomplete: bool,
2294 ) -> None:
2295 (predicted_metadata_dataset,) = predicted.outputs[acc.METADATA_OUTPUT_CONNECTION_NAME]
2296 if metadata is None:
2297 self.output_existence[predicted_metadata_dataset.dataset_id] = False
2298 if incomplete:
2299 self.status = ProvenanceQuantumScanStatus.ABANDONED
2300 else:
2301 self.status = ProvenanceQuantumScanStatus.FAILED
2302 else:
2303 self.status = ProvenanceQuantumScanStatus.SUCCESSFUL
2304 self.output_existence[predicted_metadata_dataset.dataset_id] = True
2305 last_attempt.status = QuantumAttemptStatus.SUCCESSFUL
2306 try:
2307 # Int conversion guards against spurious conversion to
2308 # float that can apparently sometimes happen in
2309 # TaskMetadata.
2310 last_attempt.caveats = QuantumSuccessCaveats(int(metadata["quantum"]["caveats"]))
2311 except LookupError:
2312 pass
2313 try:
2314 last_attempt.exception = ExceptionInfo._from_metadata(
2315 metadata[predicted.task_label]["failure"]
2316 )
2317 except LookupError:
2318 pass
2319 last_attempt.resource_usage = QuantumResourceUsage.from_task_metadata(metadata)
2320 self.metadata.attempts.append(metadata)
2322 def _reconcile_attempts(self, last_attempt: ProvenanceQuantumAttemptModel) -> None:
2323 last_attempt.attempt = len(self.attempts)
2324 self.attempts.append(last_attempt)
2325 assert self.status is not ProvenanceQuantumScanStatus.INCOMPLETE
2326 assert self.status is not ProvenanceQuantumScanStatus.ABANDONED
2327 if len(self.logs.attempts) < len(self.attempts):
2328 # Logs were not found for this attempt; must have been a hard error
2329 # that kept the `finally` block from running or otherwise
2330 # interrupted the writing of the logs.
2331 self.logs.attempts.append(None)
2332 if self.status is ProvenanceQuantumScanStatus.SUCCESSFUL:
2333 # But we found the metadata! Either that hard error happened
2334 # at a very unlucky time (in between those two writes), or
2335 # something even weirder happened.
2336 self.attempts[-1].status = QuantumAttemptStatus.ABORTED_SUCCESS
2337 else:
2338 self.attempts[-1].status = QuantumAttemptStatus.FAILED
2339 if len(self.metadata.attempts) < len(self.attempts):
2340 # Metadata missing usually just means a failure. In any case, the
2341 # status will already be correct, either because it was set to a
2342 # failure when we read the logs, or left at UNKNOWN if there were
2343 # no logs. Note that scanners never process BLOCKED quanta at all.
2344 self.metadata.attempts.append(None)
2345 assert len(self.logs.attempts) == len(self.attempts) or len(self.metadata.attempts) == len(
2346 self.attempts
2347 ), (
2348 "The only way we can add more than one quantum attempt is by "
2349 "extracting info stored with the logs, and that always appends "
2350 "a log attempt and a metadata attempt, so this must be a bug in "
2351 "this class."
2352 )
2354 def _extract_output_existence(self, predicted: PredictedQuantumDatasetsModel) -> None:
2355 try:
2356 outputs_put = self.metadata.attempts[-1]["quantum"].getArray("outputs") # type: ignore[index]
2357 except (
2358 IndexError, # metadata.attempts is empty
2359 TypeError, # metadata.attempts[-1] is None
2360 LookupError, # no 'quantum' entry in metadata or 'outputs' in that
2361 ):
2362 pass
2363 else:
2364 for id_str in ensure_iterable(outputs_put):
2365 self.output_existence[uuid.UUID(id_str)] = True
2366 # If the metadata told us what it wrote, anything not in that
2367 # list was not written.
2368 for predicted_output in itertools.chain.from_iterable(predicted.outputs.values()):
2369 self.output_existence.setdefault(predicted_output.dataset_id, False)
2371 def to_scan_data(
2372 self: ProvenanceQuantumScanModels,
2373 predicted_quantum: PredictedQuantumDatasetsModel,
2374 compressor: Compressor | None = None,
2375 ) -> ProvenanceQuantumScanData:
2376 """Convert these models to JSON data.
2378 Parameters
2379 ----------
2380 predicted_quantum : `PredictedQuantumDatasetsModel`
2381 Information about the predicted quantum.
2382 compressor : `Compressor`
2383 Object that can compress bytes.
2385 Returns
2386 -------
2387 scan_data : `ProvenanceQuantumScanData`
2388 Scan information ready for serialization.
2389 """
2390 quantum: ProvenanceInitQuantumModel | ProvenanceQuantumModel
2391 if self.status is ProvenanceQuantumScanStatus.INIT:
2392 quantum = ProvenanceInitQuantumModel.from_predicted(predicted_quantum)
2393 else:
2394 quantum = ProvenanceQuantumModel.from_predicted(predicted_quantum)
2395 quantum.attempts = self.attempts
2396 for predicted_output in itertools.chain.from_iterable(predicted_quantum.outputs.values()):
2397 if predicted_output.dataset_id not in self.output_existence:
2398 raise RuntimeError(
2399 "Logic bug in provenance gathering or execution invariants: "
2400 f"no existence information for output {predicted_output.dataset_id} "
2401 f"({predicted_output.dataset_type_name}@{predicted_output.data_coordinate})."
2402 )
2403 data = ProvenanceQuantumScanData(
2404 self.quantum_id,
2405 self.status,
2406 existing_outputs={
2407 dataset_id for dataset_id, was_produced in self.output_existence.items() if was_produced
2408 },
2409 quantum=quantum.model_dump_json().encode(),
2410 logs=self.logs.model_dump_json().encode() if self.logs.attempts else b"",
2411 metadata=self.metadata.model_dump_json().encode() if self.metadata.attempts else b"",
2412 )
2413 if compressor is not None:
2414 data.compress(compressor)
2415 return data
2418@dataclasses.dataclass
2419class ProvenanceQuantumScanData:
2420 """A struct that represents ready-for-serialization provenance information
2421 for a single quantum.
2422 """
2424 quantum_id: uuid.UUID
2425 """Unique ID for the quantum."""
2427 status: ProvenanceQuantumScanStatus
2428 """Combined status for the scan and the execution of the quantum."""
2430 existing_outputs: set[uuid.UUID] = dataclasses.field(default_factory=set)
2431 """Unique IDs of the output datasets that were actually written."""
2433 quantum: bytes = b""
2434 """Serialized quantum provenance model.
2436 This may be empty for quanta that had no attempts.
2437 """
2439 metadata: bytes = b""
2440 """Serialized task metadata."""
2442 logs: bytes = b""
2443 """Serialized logs."""
2445 is_compressed: bool = False
2446 """Whether the ``quantum``, ``metadata``, and ``log`` attributes are
2447 compressed.
2448 """
2450 @classmethod
2451 def make_blocked(cls, quantum_id: uuid.UUID) -> ProvenanceQuantumScanData:
2452 """Construct provenance information for a quantum blocked by an
2453 upstream failure.
2455 Parameters
2456 ----------
2457 quantum_id : `uuid.UUID`
2458 Unique ID of the quantum
2460 Returns
2461 -------
2462 scan_data : `ProvenanceQuantumScanData`
2463 Struct with ready-to-write provenance data.
2464 """
2465 return ProvenanceQuantumScanData(
2466 quantum_id,
2467 status=ProvenanceQuantumScanStatus.BLOCKED,
2468 is_compressed=True, # nothing to compress
2469 )
2471 def compress(self, compressor: Compressor) -> None:
2472 """Compress the data in this struct if it has not been compressed
2473 already.
2475 Parameters
2476 ----------
2477 compressor : `Compressor`
2478 Object with a ``compress`` method that takes and returns `bytes`.
2479 """
2480 if not self.is_compressed:
2481 self.quantum = compressor.compress(self.quantum)
2482 self.logs = compressor.compress(self.logs) if self.logs else b""
2483 self.metadata = compressor.compress(self.metadata) if self.metadata else b""
2484 self.is_compressed = True