Coverage for python / lsst / pipe / base / quantum_provenance_graph.py: 26%
728 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:47 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:47 +0000
1# This file is part of pipe_base.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28"""A set of already-run, merged quantum graphs with provenance information
29which can be used to compose a report on the status of multi-attempt
30processing.
31"""
33from __future__ import annotations
35__all__ = (
36 "DatasetKey",
37 "PrerequisiteDatasetKey",
38 "QuantumKey",
39 "QuantumProvenanceGraph",
40)
42import concurrent.futures
43import dataclasses
44import datetime
45import itertools
46import logging
47import textwrap
48import threading
49import uuid
50from collections.abc import Callable, Iterator, Mapping, Sequence, Set
51from enum import Enum
52from typing import Any, ClassVar, Literal, TypedDict, cast
54import astropy.table
55import networkx
56import pydantic
58from lsst.daf.butler import (
59 Butler,
60 ButlerConfig,
61 ButlerLogRecords,
62 DataCoordinate,
63 DataIdValue,
64 DatasetId,
65 DatasetRef,
66 DatasetType,
67 DimensionUniverse,
68 LimitedButler,
69 MissingDatasetTypeError,
70 QuantumBackedButler,
71)
72from lsst.resources import ResourcePathExpression
73from lsst.utils.logging import PeriodicLogger, getLogger
75from ._status import ExceptionInfo, QuantumSuccessCaveats
76from .automatic_connection_constants import (
77 LOG_OUTPUT_CONNECTION_NAME,
78 LOG_OUTPUT_TEMPLATE,
79 METADATA_OUTPUT_CONNECTION_NAME,
80 METADATA_OUTPUT_STORAGE_CLASS,
81 METADATA_OUTPUT_TEMPLATE,
82 PROVENANCE_DATASET_TYPE_NAME,
83)
84from .graph import QuantumGraph, QuantumNode
86_LOG = getLogger(__name__)
89@dataclasses.dataclass(slots=True, eq=True, frozen=True)
90class QuantumKey:
91 """Identifier type for quantum keys in a `QuantumProvenanceGraph`. These
92 keys correspond to a task label and data ID, but can refer to this over
93 multiple runs or datasets.
94 """
96 task_label: str
97 """Label of the task in the pipeline."""
99 data_id_values: tuple[DataIdValue, ...]
100 """Data ID values of the quantum.
102 Note that keys are fixed given `task_label`, so using only the values here
103 speeds up comparisons.
104 """
106 is_task: ClassVar[Literal[True]] = True
107 """Whether this node represents a quantum rather
108 than a dataset (always `True`).
109 """
112@dataclasses.dataclass(slots=True, eq=True, frozen=True)
113class DatasetKey:
114 """Identifier type for dataset keys in a `QuantumProvenanceGraph`."""
116 dataset_type_name: str
117 """Name of the dataset type (never a component)."""
119 data_id_values: tuple[DataIdValue, ...]
120 """Data ID values of the dataset.
122 Note that keys are fixed given `parent_dataset_type_name`, so using only
123 the values here speeds up comparisons.
124 """
126 is_task: ClassVar[Literal[False]] = False
127 """Whether this node represents a quantum rather than a dataset (always
128 `False`).
129 """
131 is_prerequisite: ClassVar[Literal[False]] = False
132 """Whether this node is a prerequisite to another node (also always
133 `False`).
134 """
137@dataclasses.dataclass(slots=True, eq=True, frozen=True)
138class PrerequisiteDatasetKey:
139 """Identifier type for prerequisite dataset keys in a
140 `QuantumProvenanceGraph`.
142 Unlike regular datasets, prerequisites are not actually required to come
143 from a find-first search of `input_collections`, so we don't want to
144 assume that the same data ID implies the same dataset. Happily we also
145 don't need to search for them by data ID in the graph, so we can use the
146 dataset ID (UUID) instead.
147 """
149 dataset_type_name: str
150 """Name of the dataset type (never a component)."""
152 dataset_id_bytes: bytes
153 """Dataset ID (UUID) as raw bytes."""
155 is_task: ClassVar[Literal[False]] = False
156 """Whether this node represents a quantum rather
157 than a dataset (always `False`).
158 """
160 is_prerequisite: ClassVar[Literal[True]] = True
161 """Whether this node is a prerequisite to another node (always `True`).
162 """
165class QuantumRunStatus(Enum):
166 """Enum describing the status of a quantum-run collection combination.
168 Possible Statuses
169 -----------------
170 METADATA_MISSING = -3: Metadata is missing for this quantum in this run.
171 It is impossible to tell whether execution of this quantum was
172 attempted due to missing metadata.
173 LOGS_MISSING = -2: Logs are missing for this quantum in this run. It was
174 attempted, but it is impossible to tell if it succeeded or failed due
175 to missing logs.
176 FAILED = -1: Attempts to execute the quantum failed in this run.
177 BLOCKED = 0: This run does not include an executed version of this
178 quantum because an upstream task failed.
179 SUCCESSFUL = 1: This quantum was executed successfully in this run.
180 """
182 METADATA_MISSING = -3
183 LOGS_MISSING = -2
184 FAILED = -1
185 BLOCKED = 0
186 SUCCESSFUL = 1
189class QuantumRun(pydantic.BaseModel):
190 """Information about a quantum in a given run collection."""
192 model_config = pydantic.ConfigDict(arbitrary_types_allowed=True) # for DatasetRef attrs.
194 id: uuid.UUID
195 """The quantum graph node ID associated with the dataId in a specific run.
196 """
198 status: QuantumRunStatus = QuantumRunStatus.METADATA_MISSING
199 """The status of the quantum in that run.
200 """
202 caveats: QuantumSuccessCaveats | None = None
203 """Flags that describe possibly-qualified successes.
205 This is `None` when `status` is not `SUCCESSFUL` or `LOGS_MISSING`. It
206 may also be `None` if metadata was not loaded or had no success flags.
207 """
209 exception: ExceptionInfo | None = None
210 """Information about an exception that that was raised during the quantum's
211 execution.
213 Exception information for failed quanta is not currently stored, so this
214 field is actually only populated for quanta that raise
215 `AnnotatedPartialOutputsError`, and only when the execution system is
216 configured not to consider these partial successes a failure (i.e. when
217 `status` is `~QuantumRunStatus.SUCCESSFUL` and `caveats` has
218 `~QuantumSuccessCaveats.PARTIAL_OUTPUTS_ERROR` set. The error whose
219 information is reported here is the exception chained from the
220 `AnnotatedPartialOutputsError`.
222 In the future, exception information from failures may be available as
223 well.
224 """
226 metadata_ref: DatasetRef
227 """Predicted DatasetRef for the metadata dataset."""
229 log_ref: DatasetRef
230 """Predicted DatasetRef for the log dataset."""
232 @staticmethod
233 def find_final(info: QuantumInfo) -> tuple[str, QuantumRun]:
234 """Return the final RUN collection name and `QuantumRun` structure from
235 a `QuantumInfo` dictionary.
237 The "final run" is the last RUN collection in the sequence of quantum
238 graphs that:
240 - actually had a quantum for that task label and data ID;
241 - execution seems to have at least been attempted (at least one of
242 metadata or logs were produced).
244 Parameters
245 ----------
246 info : `QuantumInfo`
247 Quantum information that includes all runs.
249 Returns
250 -------
251 run : `str`
252 RUN collection name.
253 quantum_run : `QuantumRun`
254 Information about a quantum in a RUN collection.
256 Raises
257 ------
258 ValueError
259 Raised if this quantum never had a status that suggested execution
260 in any run.
261 """
262 for run, quantum_run in reversed(info["runs"].items()):
263 if (
264 quantum_run.status is not QuantumRunStatus.METADATA_MISSING
265 and quantum_run.status is not QuantumRunStatus.BLOCKED
266 ):
267 return run, quantum_run
268 raise ValueError("Quantum was never executed.")
271class QuantumInfoStatus(Enum):
272 """The status of a quantum (a particular task run on a particular dataID)
273 across all runs.
275 Possible Statuses
276 -----------------
277 WONKY = -3: The overall state of this quantum reflects inconsistencies or
278 is difficult to discern. There are a few specific ways to enter a wonky
279 state; it is impossible to exit and requires human intervention to
280 proceed with processing.
281 Currently, a quantum enters a wonky state for one of three reasons:
282 - Its overall `QuantumInfoStatus` moves from a successful state (as a
283 result of a successful run) to any other state. In other words,
284 something that initially succeeded fails on subsequent attempts.
285 - A `QuantumRun` is missing logs.
286 - There are multiple runs associated with a dataset, and this comes up
287 in a findFirst search. This means that a dataset which will be used
288 as an input data product for further processing has heterogeneous
289 inputs, which may have had different inputs or a different
290 data-query.
291 FAILED = -2: These quanta were attempted and failed. Failed quanta have
292 logs and no metadata.
293 UNKNOWN = -1: These are quanta which do not have any metadata associated
294 with processing, but for which it is impossible to tell the status due
295 to an additional absence of logs. Quanta which had not been processed
296 at all would reflect this state, as would quanta which were
297 conceptualized in the construction of the quantum graph but later
298 identified to be unneccesary or erroneous (deemed NoWorkFound by the
299 Science Pipelines).
300 BLOCKED = 0: The quantum is not able to execute because its inputs are
301 missing due to an upstream failure. Blocked quanta are distinguished
302 from failed quanta by being successors of failed quanta in the graph.
303 All the successors of blocked quanta are also marked as blocked.
304 SUCCESSFUL = 1: Attempts at executing this quantum were successful.
305 """
307 WONKY = -3
308 FAILED = -2
309 UNKNOWN = -1
310 BLOCKED = 0
311 SUCCESSFUL = 1
314class QuantumInfo(TypedDict):
315 """Information about a quantum (i.e., the combination of a task label and
316 data ID) across all attempted runs.
318 Used to annotate the networkx node dictionary.
319 """
321 data_id: DataCoordinate
322 """The data_id of the quantum.
323 """
325 runs: dict[str, QuantumRun]
326 """All run collections associated with the quantum.
327 """
329 status: QuantumInfoStatus
330 """The overall status of the quantum. Note that it is impossible to exit a
331 wonky state.
332 """
334 recovered: bool
335 """The quantum was originally not successful but was ultimately successful.
336 """
338 messages: list[str]
339 """Diagnostic messages to help disambiguate wonky states.
340 """
342 log: DatasetKey
343 """The `DatasetKey` which can be used to access the log associated with the
344 quantum across runs.
345 """
347 metadata: DatasetKey
348 """The `DatasetKey` which can be used to access the metadata for the
349 quantum across runs.
350 """
353class DatasetRun(pydantic.BaseModel):
354 """Information about a dataset in a given run collection."""
356 id: uuid.UUID
357 """The dataset ID associated with the dataset in a specific run.
358 """
360 produced: bool = False
361 """Whether the specific run wrote the dataset.
362 """
364 visible: bool = False
365 """Whether this dataset is visible in the final output collection; in other
366 words, whether this dataset is queryable in a find-first search. This
367 determines whether it will be used as an input to further processing.
368 """
370 @pydantic.model_validator(mode="after")
371 def _validate(self) -> DatasetRun:
372 """Validate the model for `DatasetRun` by asserting that no visible
373 `DatasetRun` is also not produced (this should be impossible).
375 Returns
376 -------
377 self : `DatasetRun`
378 The `DatasetRun` object, validated.
379 """
380 assert not (self.visible and not self.produced)
381 return self
384class DatasetInfoStatus(Enum):
385 """Status of the the DatasetType-dataID pair over all runs. This depends
386 not only on the presence of the dataset itself, but also on metadata, logs
387 and the state of its producer quantum.
389 Possible Statuses
390 -----------------
391 CURSED: The dataset was the result of an unsuccessful quantum and was
392 visible in the output collection anyway. These are flagged as
393 cursed so that they may be caught before they become inputs to
394 further processing.
395 UNSUCCESSFUL: The dataset was not produced. These are the results of
396 failed or blocked quanta.
397 PREDICTED_ONLY: The dataset was predicted, and was not visible in any
398 run, but was the successor of a successful quantum. These datasets are
399 the result of pipelines NoWorkFound cases, in which a dataset is
400 predicted in the graph but found to not be necessary in processing.
401 SHADOWED: The dataset exists but is not queryable in a find_first
402 search. This could mean that the version of this dataset which is
403 passed as an input to further processing is not in the collections
404 given. A shadowed dataset will not be used as an input to further
405 processing.
406 VISIBLE: The dataset is queryable in a find_first search. This means
407 that it can be used as an input by subsequent tasks and processing.
408 """
410 CURSED = -2
411 UNSUCCESSFUL = -1
412 PREDICTED_ONLY = 0
413 SHADOWED = 1
414 VISIBLE = 2
417class DatasetInfo(TypedDict):
418 """Information about a given dataset across all runs.
420 Used to annotate the networkx node dictionary.
421 """
423 data_id: DataCoordinate
424 """The data_id of the quantum.
425 """
427 runs: dict[str, DatasetRun]
428 """All runs associated with the dataset.
429 """
431 status: DatasetInfoStatus
432 """Overall status of the dataset.
433 """
435 messages: list[str]
436 """Diagnostic messages to help disambiguate cursed states.
437 """
440class UnsuccessfulQuantumSummary(pydantic.BaseModel):
441 """A summary of all relevant information on an unsuccessful quantum.
443 This summarizes all information on a task's output for a particular data ID
444 over all runs.
445 """
447 data_id: dict[str, DataIdValue]
448 """The data_id of the unsuccessful quantum.
449 """
450 runs: dict[str, str]
451 """A dictionary (key: output run collection name) with the value of the
452 enum name of the `QuantumRunStatus` of each run associated with an attempt
453 to process the unsuccessful quantum.
454 """
455 messages: list[str]
456 """Any messages associated with the unsuccessful quantum (any clues as to
457 why the quantum may be in a FAILED or WONKY state).
458 """
460 @classmethod
461 def _from_info(cls, info: QuantumInfo) -> UnsuccessfulQuantumSummary:
462 """Summarize all relevant information from the `QuantumInfo` in an
463 `UnsuccessfulQuantumSummary`; return an `UnsuccessfulQuantumSummary`.
465 Parameters
466 ----------
467 info : `QuantumInfo`
468 The `QuantumInfo` object for the unsuccessful quantum.
470 Returns
471 -------
472 summary : `UnsuccessfulQuantumSummary`
473 A Pydantic model containing the dataID, run collection names (and
474 each of their `QuantumRunStatus` enum names) as well as messages
475 which may point to any clues about the nature of the problem. For
476 failed quanta, these are usually error messages from the butler
477 logs. For wonky quanta, these can be messages generated during the
478 assembly of the `QuantumProvenanceGraph` that describe why it was
479 marked as wonky.
480 """
481 return cls(
482 data_id=dict(info["data_id"].required),
483 runs={k: v.status.name for k, v in info["runs"].items()},
484 messages=info["messages"],
485 )
488class ExceptionInfoSummary(pydantic.BaseModel):
489 """A summary of an exception raised by a quantum."""
491 quantum_id: uuid.UUID
492 """Unique identifier for this quantum in this run."""
494 data_id: dict[str, DataIdValue]
495 """The data ID of the quantum."""
497 run: str
498 """Name of the RUN collection in which this exception was (last) raised."""
500 exception: ExceptionInfo
501 """Information about an exception chained to
502 `AnnotatedPartialOutputsError`, if that was raised by this quantum in this
503 run.
504 """
507class TaskSummary(pydantic.BaseModel):
508 """A summary of the status of all quanta associated with a single task,
509 across all runs.
510 """
512 n_successful: int = 0
513 """A count of successful quanta.
514 """
515 n_blocked: int = 0
516 """A count of blocked quanta.
517 """
518 n_unknown: int = 0
519 """A count of quanta for which there are no metadata or logs.
520 """
522 n_expected: int = 0
523 """The number of quanta expected by the graph.
524 """
526 @pydantic.computed_field # type: ignore[prop-decorator]
527 @property
528 def n_wonky(self) -> int:
529 """Return a count of `wonky` quanta."""
530 return len(self.wonky_quanta)
532 @pydantic.computed_field # type: ignore[prop-decorator]
533 @property
534 def n_failed(self) -> int:
535 """Return a count of `failed` quanta."""
536 return len(self.failed_quanta)
538 caveats: dict[str, list[dict[str, DataIdValue]]] = pydantic.Field(default_factory=dict)
539 """Quanta that were successful with caveats.
541 Keys are 2-character codes returned by `QuantumSuccessCaveats.concise`;
542 values are lists of data IDs of quanta with those caveats. Quanta that were
543 unqualified successes are not included.
545 Quanta for which success flags were not read from metadata will not be
546 included.
547 """
549 exceptions: dict[str, list[ExceptionInfoSummary]] = pydantic.Field(default_factory=dict)
550 """Exceptions raised by partially-successful quanta.
552 Keys are fully-qualified exception type names and values are lists of
553 extra exception information, with each entry corresponding to different
554 data ID. Only the final RUN for each data ID is represented here.
556 Every entry in this data structure corresponds to one in `ceveats` with
557 the "P" code (for `QuantumSuccessCaveats.PARTIAL_OUTPUTS_ERROR`).
559 In the future, this may be expanded to include exceptions for failed quanta
560 as well (at present that information is not retained during execution).
561 """
563 failed_quanta: list[UnsuccessfulQuantumSummary] = pydantic.Field(default_factory=list)
564 """A list of all `UnsuccessfulQuantumSummary` objects associated with the
565 FAILED quanta. This is a report containing their data IDs, the status
566 of each run associated with each `failed` quantum, and the error messages
567 associated with the failures when applicable.
568 """
569 recovered_quanta: list[dict[str, DataIdValue]] = pydantic.Field(default_factory=list)
570 """A list of dataIDs (key->value) which moved from an unsuccessful to
571 successful state.
572 """
573 wonky_quanta: list[UnsuccessfulQuantumSummary] = pydantic.Field(default_factory=list)
574 """A list of all `UnsuccessfulQuantumSummary` objects associated with the
575 WONKY quanta. This is a report containing their data_ids, the status of
576 each run associated with each `wonky` quantum, and messages (dictated in
577 this module) associated with the particular issue identified.
578 """
580 def _add_quantum_info(
581 self,
582 info: QuantumInfo,
583 log_getter: Callable[[DatasetRef], ButlerLogRecords] | None,
584 executor: concurrent.futures.Executor,
585 ) -> concurrent.futures.Future[None] | None:
586 """Add a `QuantumInfo` to a `TaskSummary`.
588 Unpack the `QuantumInfo` object, sorting quanta of each status into
589 the correct place in the `TaskSummary`. If looking for error messages
590 in the `lsst.daf.butler.Butler` logs is desired, take special care to
591 catch issues with missing logs.
593 Parameters
594 ----------
595 info : `QuantumInfo`
596 The `QuantumInfo` object to add to the `TaskSummary`.
597 log_getter : `~collections.abc.Callable` or `None`
598 A callable that can be passed a `~lsst.daf.butler.DatasetRef` for
599 a log dataset to retreive those logs, or `None` to not load any
600 logs.
601 executor : `concurrent.futures.Executor`
602 A possibly-parallel executor that should be used to schedule
603 log dataset reads.
605 Returns
606 -------
607 future : `concurrent.futures.Future` or `None`
608 A future that represents a parallelized log read and summary
609 update.
610 """
611 try:
612 final_run, final_quantum_run = QuantumRun.find_final(info)
613 except ValueError:
614 final_run = None
615 final_quantum_run = None
616 match info["status"]:
617 case QuantumInfoStatus.SUCCESSFUL:
618 self.n_successful += 1
619 if info["recovered"]:
620 self.recovered_quanta.append(dict(info["data_id"].required))
621 if final_quantum_run is not None and final_quantum_run.caveats:
622 code = final_quantum_run.caveats.concise()
623 self.caveats.setdefault(code, []).append(dict(info["data_id"].required))
624 if final_quantum_run.caveats & QuantumSuccessCaveats.PARTIAL_OUTPUTS_ERROR:
625 if final_quantum_run.exception is not None:
626 self.exceptions.setdefault(final_quantum_run.exception.type_name, []).append(
627 ExceptionInfoSummary(
628 quantum_id=final_quantum_run.id,
629 data_id=dict(info["data_id"].required),
630 run=final_run,
631 exception=final_quantum_run.exception,
632 )
633 )
634 return None
635 case QuantumInfoStatus.WONKY:
636 self.wonky_quanta.append(UnsuccessfulQuantumSummary._from_info(info))
637 return None
638 case QuantumInfoStatus.BLOCKED:
639 self.n_blocked += 1
640 return None
641 case QuantumInfoStatus.FAILED:
642 failed_quantum_summary = UnsuccessfulQuantumSummary._from_info(info)
643 future: concurrent.futures.Future[None] | None = None
644 if log_getter:
646 def callback() -> None:
647 for quantum_run in info["runs"].values():
648 try:
649 log = log_getter(quantum_run.log_ref)
650 except LookupError:
651 failed_quantum_summary.messages.append(
652 f"Logs not ingested for {quantum_run.log_ref!r}"
653 )
654 except FileNotFoundError:
655 failed_quantum_summary.messages.append(
656 f"Logs missing or corrupt for {quantum_run.log_ref!r}"
657 )
658 else:
659 failed_quantum_summary.messages.extend(
660 [record.message for record in log if record.levelno >= logging.ERROR]
661 )
663 future = executor.submit(callback)
664 self.failed_quanta.append(failed_quantum_summary)
665 return future
666 case QuantumInfoStatus.UNKNOWN:
667 self.n_unknown += 1
668 return None
669 case unrecognized_state:
670 raise AssertionError(f"Unrecognized quantum status {unrecognized_state!r}")
672 def _add_data_id_group(self, other_summary: TaskSummary) -> None:
673 """Add information from a `TaskSummary` over one dataquery-identified
674 group to another, as part of aggregating `Summary` reports.
676 Parameters
677 ----------
678 other_summary : `TaskSummary`
679 `TaskSummary` to aggregate.
680 """
681 self.n_successful += other_summary.n_successful
682 self.n_blocked += other_summary.n_blocked
683 self.n_unknown += other_summary.n_unknown
684 self.n_expected += other_summary.n_expected
685 for code in self.caveats.keys() | other_summary.caveats.keys():
686 self.caveats.setdefault(code, []).extend(other_summary.caveats.get(code, []))
687 for type_name in self.exceptions.keys() | other_summary.exceptions.keys():
688 self.exceptions.setdefault(type_name, []).extend(other_summary.exceptions.get(type_name, []))
689 self.wonky_quanta.extend(other_summary.wonky_quanta)
690 self.recovered_quanta.extend(other_summary.recovered_quanta)
691 self.failed_quanta.extend(other_summary.failed_quanta)
694class CursedDatasetSummary(pydantic.BaseModel):
695 """A summary of all the relevant information on a cursed dataset."""
697 producer_data_id: dict[str, DataIdValue]
698 """The data_id of the task which produced this dataset. This is mostly
699 useful for people wishing to track down the task which produced this
700 cursed dataset quickly.
701 """
702 data_id: dict[str, DataIdValue]
703 """The data_id of the cursed dataset.
704 """
705 runs_produced: dict[str, bool]
706 """A dictionary of all the runs associated with the cursed dataset;
707 the `bool` is true if the dataset was produced in the associated run.
708 """
709 run_visible: str | None
710 """The run collection that holds the dataset that is visible in the final
711 output collection.
712 """
713 messages: list[str]
714 """Any diagnostic messages (dictated in this module) which might help in
715 understanding why or how the dataset became cursed.
716 """
718 @classmethod
719 def _from_info(cls, info: DatasetInfo, producer_info: QuantumInfo) -> CursedDatasetSummary:
720 """Summarize all relevant information from the `DatasetInfo` in an
721 `CursedDatasetSummary`; return a `CursedDatasetSummary`.
723 Parameters
724 ----------
725 info : `DatasetInfo`
726 All relevant information on the dataset.
727 producer_info : `QuantumInfo`
728 All relevant information on the producer task. This is used to
729 report the data_id of the producer task.
731 Returns
732 -------
733 summary : `CursedDatasetSummary`
734 A Pydantic model containing the dataID of the task which produced
735 this cursed dataset, the dataID associated with the cursed dataset,
736 run collection names (and their `DatasetRun` information) as well
737 as any messages which may point to any clues about the nature of
738 the problem. These are be messages generated during the assembly of
739 the `QuantumProvenanceGraph` that describe why it was marked as
740 cursed.
741 """
742 runs_visible = {k for k, v in info["runs"].items() if v.visible}
743 return cls(
744 producer_data_id=dict(producer_info["data_id"].required),
745 data_id=dict(info["data_id"].required),
746 runs_produced={k: v.produced for k, v in info["runs"].items()},
747 # this has at most one element
748 run_visible=runs_visible.pop() if runs_visible else None,
749 messages=info["messages"],
750 )
753class DatasetTypeSummary(pydantic.BaseModel):
754 """A summary of the status of all datasets of a particular type across all
755 runs.
756 """
758 producer: str = ""
759 """The name of the task which produced this dataset.
760 """
762 n_visible: int = 0
763 """A count of the datasets of this type which were visible in the
764 finalized collection(s).
765 """
766 n_shadowed: int = 0
767 """A count of the datasets of this type which were produced but not
768 visible. This includes any datasets which do not come up in a butler
769 query over their associated collection.
770 """
771 n_predicted_only: int = 0
772 """A count of the datasets of this type which were predicted but
773 ultimately not produced. Note that this does not indicate a failure,
774 which are accounted for differently. This is commonly referred to as
775 a `NoWorkFound` case.
776 """
777 n_expected: int = 0
778 """The number of datasets of this type expected by the graph.
779 """
781 @pydantic.computed_field # type: ignore[prop-decorator]
782 @property
783 def n_cursed(self) -> int:
784 """Return a count of cursed datasets."""
785 return len(self.cursed_datasets)
787 @pydantic.computed_field # type: ignore[prop-decorator]
788 @property
789 def n_unsuccessful(self) -> int:
790 """Return a count of unsuccessful datasets."""
791 return len(self.unsuccessful_datasets)
793 cursed_datasets: list[CursedDatasetSummary] = pydantic.Field(default_factory=list)
794 """A list of all `CursedDatasetSummary` objects associated with the
795 cursed datasets. This is a report containing their data_ids and the
796 data_ids of their producer task, the status of each run associated with
797 each `cursed` dataset, and messages (dictated in this module) associated
798 with the particular issue identified.
799 """
800 unsuccessful_datasets: list[dict[str, DataIdValue]] = pydantic.Field(default_factory=list)
801 """A list of all unsuccessful datasets by their name and data_id.
802 """
804 def _add_dataset_info(self, info: DatasetInfo, producer_info: QuantumInfo) -> None:
805 """Add a `DatasetInfo` to a `DatasetTypeSummary`.
807 Unpack the `DatasetInfo` object, sorting datasets of each status into
808 the correct place in the `DatasetTypeSummary`. If the status of a
809 dataset is not valid, raise an `AssertionError`.
811 Parameters
812 ----------
813 info : `DatasetInfo`
814 The `DatasetInfo` object to add to the `DatasetTypeSummary`.
815 producer_info : `QuantumInfo`
816 The `QuantumInfo` object associated with the producer of the
817 dataset. This is used to report the producer task in the
818 summaries for cursed datasets, which may help identify
819 specific issues.
820 """
821 match info["status"]:
822 case DatasetInfoStatus.VISIBLE:
823 self.n_visible += 1
824 case DatasetInfoStatus.SHADOWED:
825 self.n_shadowed += 1
826 case DatasetInfoStatus.UNSUCCESSFUL:
827 self.unsuccessful_datasets.append(dict(info["data_id"].mapping))
828 case DatasetInfoStatus.CURSED:
829 self.cursed_datasets.append(CursedDatasetSummary._from_info(info, producer_info))
830 case DatasetInfoStatus.PREDICTED_ONLY:
831 self.n_predicted_only += 1
832 case unrecognized_state:
833 raise AssertionError(f"Unrecognized dataset status {unrecognized_state!r}")
835 def _add_data_id_group(self, other_summary: DatasetTypeSummary) -> None:
836 """Add information from a `DatasetTypeSummary` over one
837 dataquery-identified group to another, as part of aggregating `Summary`
838 reports.
840 Parameters
841 ----------
842 other_summary : `DatasetTypeSummary`
843 `DatasetTypeSummary` to aggregate.
844 """
845 if self.producer and other_summary.producer:
846 # Guard against empty string
847 if self.producer != other_summary.producer:
848 _LOG.warning(
849 "Producer for dataset type is not consistent: %r != %r.",
850 self.producer,
851 other_summary.producer,
852 )
853 _LOG.warning("Ignoring %r.", other_summary.producer)
854 else:
855 if other_summary.producer and not self.producer:
856 self.producer = other_summary.producer
858 self.n_visible += other_summary.n_visible
859 self.n_shadowed += other_summary.n_shadowed
860 self.n_predicted_only += other_summary.n_predicted_only
861 self.n_expected += other_summary.n_expected
863 self.cursed_datasets.extend(other_summary.cursed_datasets)
864 self.unsuccessful_datasets.extend(other_summary.unsuccessful_datasets)
867class Summary(pydantic.BaseModel):
868 """A summary of the contents of the QuantumProvenanceGraph, including
869 all information on the quanta for each task and the datasets of each
870 `~lsst.daf.butler.DatasetType`.
871 """
873 tasks: dict[str, TaskSummary] = pydantic.Field(default_factory=dict)
874 """Summaries for the tasks and their quanta.
875 """
877 datasets: dict[str, DatasetTypeSummary] = pydantic.Field(default_factory=dict)
878 """Summaries for the datasets.
879 """
881 @classmethod
882 def aggregate(cls, summaries: Sequence[Summary]) -> Summary:
883 """Combine summaries from disjoint data id groups into an overall
884 summary of common tasks and datasets. Intended for use when the same
885 pipeline has been run over all groups.
887 Parameters
888 ----------
889 summaries : `~collections.abc.Sequence` [`Summary`]
890 Sequence of all `Summary` objects to aggregate.
891 """
892 result = cls()
893 for summary in summaries:
894 for label, task_summary in summary.tasks.items():
895 result_task_summary = result.tasks.setdefault(label, TaskSummary())
896 result_task_summary._add_data_id_group(task_summary)
897 for dataset_type, dataset_type_summary in summary.datasets.items():
898 result_dataset_summary = result.datasets.setdefault(dataset_type, DatasetTypeSummary())
899 result_dataset_summary._add_data_id_group(dataset_type_summary)
900 return result
902 def pprint(self, brief: bool = False, datasets: bool = True) -> None:
903 """Print this summary to stdout, as a series of tables.
905 Parameters
906 ----------
907 brief : `bool`, optional
908 If `True`, only display short (counts-only) tables. By default,
909 per-data ID information for exceptions and failures are printed as
910 well.
911 datasets : `bool`, optional
912 Whether to include tables of datasets as well as quanta. This
913 includes a summary table of dataset counts for various status and
914 (if ``brief`` is `True`) a table with per-data ID information for
915 each unsuccessful or cursed dataset.
916 """
917 self.make_quantum_table().pprint_all()
918 print("")
919 print("Caveat codes:")
920 for k, v in QuantumSuccessCaveats.legend().items():
921 print(f"{k}: {v}")
922 print("")
923 if exception_table := self.make_exception_table():
924 exception_table.pprint_all()
925 print("")
926 if datasets:
927 self.make_dataset_table().pprint_all()
928 print("")
929 if not brief:
930 for task_label, bad_quantum_table in self.make_bad_quantum_tables().items():
931 print(f"{task_label} errors:")
932 bad_quantum_table.pprint_all()
933 print("")
934 if datasets:
935 for dataset_type_name, bad_dataset_table in self.make_bad_dataset_tables().items():
936 print(f"{dataset_type_name} errors:")
937 bad_dataset_table.pprint_all()
938 print("")
940 def make_quantum_table(self) -> astropy.table.Table:
941 """Construct an `astropy.table.Table` with a tabular summary of the
942 quanta.
944 Returns
945 -------
946 table : `astropy.table.Table`
947 A table view of the quantum information. This only includes
948 counts of status categories and caveats, not any per-data-ID
949 detail.
951 Notes
952 -----
953 Success caveats in the table are represented by their
954 `~QuantumSuccessCaveats.concise` form, so when pretty-printing this
955 table for users, the `~QuantumSuccessCaveats.legend` should generally
956 be printed as well.
957 """
958 rows = []
959 for label, task_summary in self.tasks.items():
960 if len(task_summary.caveats) > 1:
961 caveats = "(multiple)"
962 elif len(task_summary.caveats) == 1:
963 ((code, data_ids),) = task_summary.caveats.items()
964 caveats = f"{code}({len(data_ids)})"
965 else:
966 caveats = ""
967 rows.append(
968 {
969 "Task": label,
970 "Unknown": task_summary.n_unknown,
971 "Successful": task_summary.n_successful,
972 "Caveats": caveats,
973 "Blocked": task_summary.n_blocked,
974 "Failed": task_summary.n_failed,
975 "Wonky": task_summary.n_wonky,
976 "TOTAL": sum(
977 [
978 task_summary.n_successful,
979 task_summary.n_unknown,
980 task_summary.n_blocked,
981 task_summary.n_failed,
982 task_summary.n_wonky,
983 ]
984 ),
985 "EXPECTED": task_summary.n_expected,
986 }
987 )
988 return astropy.table.Table(rows)
990 def make_dataset_table(self) -> astropy.table.Table:
991 """Construct an `astropy.table.Table` with a tabular summary of the
992 datasets.
994 Returns
995 -------
996 table : `astropy.table.Table`
997 A table view of the dataset information. This only includes
998 counts of status categories, not any per-data-ID detail.
999 """
1000 rows = []
1001 for dataset_type_name, dataset_type_summary in self.datasets.items():
1002 rows.append(
1003 {
1004 "Dataset": dataset_type_name,
1005 "Visible": dataset_type_summary.n_visible,
1006 "Shadowed": dataset_type_summary.n_shadowed,
1007 "Predicted Only": dataset_type_summary.n_predicted_only,
1008 "Unsuccessful": dataset_type_summary.n_unsuccessful,
1009 "Cursed": dataset_type_summary.n_cursed,
1010 "TOTAL": sum(
1011 [
1012 dataset_type_summary.n_visible,
1013 dataset_type_summary.n_shadowed,
1014 dataset_type_summary.n_predicted_only,
1015 dataset_type_summary.n_unsuccessful,
1016 dataset_type_summary.n_cursed,
1017 ]
1018 ),
1019 "EXPECTED": dataset_type_summary.n_expected,
1020 }
1021 )
1022 return astropy.table.Table(rows)
1024 def make_exception_table(self) -> astropy.table.Table:
1025 """Construct an `astropy.table.Table` with counts for each exception
1026 type raised by each task.
1028 At present this only includes information from partial-outputs-error
1029 successes, since exception information for failures is not tracked.
1030 This may change in the future.
1032 Returns
1033 -------
1034 table : `astropy.table.Table`
1035 A table with columns for task label, exception type, and counts.
1036 """
1037 rows = []
1038 for task_label, task_summary in self.tasks.items():
1039 for type_name, exception_summaries in task_summary.exceptions.items():
1040 rows.append({"Task": task_label, "Exception": type_name, "Count": len(exception_summaries)})
1041 return astropy.table.Table(rows)
1043 def make_bad_quantum_tables(self, max_message_width: int = 80) -> dict[str, astropy.table.Table]:
1044 """Construct an `astropy.table.Table` with per-data-ID information
1045 about failed, wonky, and partial-outputs-error quanta.
1047 Parameters
1048 ----------
1049 max_message_width : `int`, optional
1050 Maximum width for the Message column. Longer messages are
1051 truncated.
1053 Returns
1054 -------
1055 tables : `dict` [ `str`, `astropy.table.Table` ]
1056 A table for each task with status, data IDs, and log messages for
1057 each unsuccessful quantum. Keys are task labels. Only task with
1058 unsuccessful quanta or partial outputs errors are included.
1059 """
1060 result = {}
1061 for task_label, task_summary in self.tasks.items():
1062 rows = []
1063 for status, unsuccessful_quantum_summary in itertools.chain(
1064 zip(itertools.repeat("FAILED"), task_summary.failed_quanta),
1065 zip(itertools.repeat("WONKY"), task_summary.wonky_quanta),
1066 ):
1067 row = {"Status(Caveats)": status, "Exception": "", **unsuccessful_quantum_summary.data_id}
1068 row["Message"] = (
1069 textwrap.shorten(unsuccessful_quantum_summary.messages[-1], max_message_width)
1070 if unsuccessful_quantum_summary.messages
1071 else ""
1072 )
1073 rows.append(row)
1074 for exception_summary in itertools.chain.from_iterable(task_summary.exceptions.values()):
1075 # Trim off the package name from the exception type for
1076 # brevity.
1077 short_name: str = exception_summary.exception.type_name.rsplit(".", maxsplit=1)[-1]
1078 row = {
1079 "Status(Caveats)": "SUCCESSFUL(P)", # we only get exception info for partial outputs
1080 "Exception": short_name,
1081 **exception_summary.data_id,
1082 "Message": textwrap.shorten(exception_summary.exception.message, max_message_width),
1083 }
1084 rows.append(row)
1085 if rows:
1086 table = astropy.table.Table(rows)
1087 table.columns["Exception"].format = "<"
1088 table.columns["Message"].format = "<"
1089 result[task_label] = table
1090 return result
1092 def make_bad_dataset_tables(self, max_message_width: int = 80) -> dict[str, astropy.table.Table]:
1093 """Construct an `astropy.table.Table` with per-data-ID information
1094 about unsuccessful and cursed datasets.
1096 Parameters
1097 ----------
1098 max_message_width : `int`, optional
1099 Maximum width for the Message column. Longer messages are
1100 truncated.
1102 Returns
1103 -------
1104 tables : `dict` [ `str`, `astropy.table.Table` ]
1105 A table for each task with status, data IDs, and log messages for
1106 each unsuccessful quantum. Keys are task labels. Only task with
1107 unsuccessful quanta are included.
1108 """
1109 result = {}
1110 for dataset_type_name, dataset_type_summary in self.datasets.items():
1111 rows = []
1112 for data_id in dataset_type_summary.unsuccessful_datasets:
1113 row = {"Status": "UNSUCCESSFUL", **data_id, "Message": ""}
1114 for cursed_dataset_summary in dataset_type_summary.cursed_datasets:
1115 row = {"Status": "CURSED", **cursed_dataset_summary.data_id}
1116 row["Message"] = (
1117 textwrap.shorten(cursed_dataset_summary.messages[-1], max_message_width)
1118 if cursed_dataset_summary.messages
1119 else ""
1120 )
1121 rows.append(row)
1122 if rows:
1123 table = astropy.table.Table(rows)
1124 table.columns["Message"].format = "<"
1125 result[dataset_type_name] = table
1126 return result
1129class QuantumProvenanceGraph:
1130 """A set of already-run, merged quantum graphs with provenance
1131 information.
1133 Parameters
1134 ----------
1135 butler : `lsst.daf.butler.Butler`
1136 The Butler used for this report. This should match the Butler used
1137 for the run associated with the executed quantum graph.
1138 qgraphs : `~collections.abc.Sequence` [`QuantumGraph` |\
1139 `~lsst.utils.resources.ResourcePathExpression`]
1140 A list of either quantum graph objects or their uri's, to be used
1141 to assemble the `QuantumProvenanceGraph`.
1142 collections : `~collections.abc.Sequence` [`str`] | `None`
1143 Collections to use in `lsst.daf.butler.query_datasets` when testing
1144 which datasets are available at a high level.
1145 where : `str`
1146 A "where" string to use to constrain the datasets; should be provided
1147 if ``collections`` includes many datasets that are not in any graphs,
1148 to select just those that might be (e.g. when sharding over dimensions
1149 and using a final collection that spans multiple shards).
1150 curse_failed_logs : `bool`
1151 Mark log datasets as CURSED if they are visible in the final output
1152 collection. Note that a campaign-level collection must be used here for
1153 `collections` if `curse_failed_logs` is `True`.
1154 read_caveats : `str` or `None`, optional
1155 Whether to read metadata files to get flags that describe qualified
1156 successes. If `None`, no metadata files will be read and all
1157 ``caveats`` fields will be `None`. If "exhaustive", all metadata files
1158 will be read. If "lazy", only metadata files where at least one
1159 predicted output is missing will be read.
1160 use_qbb : `bool`, optional
1161 If `True`, use a quantum-backed butler when reading metadata files.
1162 Note that some butler database queries are still run even if this is
1163 `True`; this does not avoid database access entirely.
1164 n_cores : `int`, optional
1165 Number of threads to use for parallelization.
1166 """
1168 def __init__(
1169 self,
1170 butler: Butler | None = None,
1171 qgraphs: Sequence[QuantumGraph | ResourcePathExpression] = (),
1172 *,
1173 collections: Sequence[str] | None = None,
1174 where: str = "",
1175 curse_failed_logs: bool = False,
1176 read_caveats: Literal["lazy", "exhaustive"] | None = "lazy",
1177 use_qbb: bool = True,
1178 n_cores: int = 1,
1179 ) -> None:
1180 # The graph we annotate as we step through all the graphs associated
1181 # with the processing to create the `QuantumProvenanceGraph`.
1182 self._xgraph = networkx.DiGraph()
1183 # The nodes representing quanta in `_xgraph` grouped by task label.
1184 self._quanta: dict[str, set[QuantumKey]] = {}
1185 # The nodes representing datasets in `_xgraph` grouped by dataset type
1186 # name.
1187 self._datasets: dict[str, set[DatasetKey]] = {}
1188 # Bool representing whether the graph has been finalized. This is set
1189 # to True when resolve_duplicates completes.
1190 self._finalized: bool = False
1191 # In order to both parallelize metadata/log reads and potentially use
1192 # QBB to do it, we in general need one butler for each output_run and
1193 # thread combination. This dict is keyed by the former, and the
1194 # wrapper type used for the value handles the latter.
1195 self._butler_wrappers: dict[str, _ThreadLocalButlerWrapper] = {}
1196 if butler is not None:
1197 self.assemble_quantum_provenance_graph(
1198 butler,
1199 qgraphs,
1200 collections=collections,
1201 where=where,
1202 curse_failed_logs=curse_failed_logs,
1203 read_caveats=read_caveats,
1204 use_qbb=use_qbb,
1205 n_cores=n_cores,
1206 )
1207 elif qgraphs:
1208 raise TypeError("'butler' must be provided if `qgraphs` is.")
1210 @property
1211 def quanta(self) -> Mapping[str, Set[QuantumKey]]:
1212 """A mapping from task label to a set of keys for its quanta."""
1213 return self._quanta
1215 @property
1216 def datasets(self) -> Mapping[str, Set[DatasetKey]]:
1217 """A mapping from dataset type name to a set of keys for datasets."""
1218 return self._datasets
1220 def get_quantum_info(self, key: QuantumKey) -> QuantumInfo:
1221 """Get a `QuantumInfo` object from the `QuantumProvenanceGraph` using
1222 a `QuantumKey`.
1224 Parameters
1225 ----------
1226 key : `QuantumKey`
1227 The key used to refer to the node on the graph.
1229 Returns
1230 -------
1231 quantum_info : `QuantumInfo`
1232 The `TypedDict` with information on the task label-dataID pair
1233 across all runs.
1234 """
1235 return self._xgraph.nodes[key]
1237 def get_dataset_info(self, key: DatasetKey) -> DatasetInfo:
1238 """Get a `DatasetInfo` object from the `QuantumProvenanceGraph` using
1239 a `DatasetKey`.
1241 Parameters
1242 ----------
1243 key : `DatasetKey`
1244 The key used to refer to the node on the graph.
1246 Returns
1247 -------
1248 dataset_info : `DatasetInfo`
1249 The `TypedDict` with information about the
1250 `~lsst.daf.butler.DatasetType`-dataID pair across all runs.
1251 """
1252 return self._xgraph.nodes[key]
1254 def to_summary(
1255 self, butler: Butler | None = None, do_store_logs: bool = True, n_cores: int = 1
1256 ) -> Summary:
1257 """Summarize the `QuantumProvenanceGraph`.
1259 Parameters
1260 ----------
1261 butler : `lsst.daf.butler.Butler`, optional
1262 Ignored; accepted for backwards compatibility.
1263 do_store_logs : `bool`
1264 Store the logs in the summary dictionary.
1265 n_cores : `int`, optional
1266 Number of cores to use.
1268 Returns
1269 -------
1270 result : `Summary`
1271 A struct containing counts of quanta and datasets in each of
1272 the overall states defined in `QuantumInfo` and `DatasetInfo`,
1273 as well as diagnostic information and error messages for failed
1274 quanta and strange edge cases, and a list of recovered quanta.
1275 """
1276 status_log = PeriodicLogger(_LOG)
1277 if not self._finalized:
1278 raise RuntimeError(
1279 """resolve_duplicates must be called to finalize the
1280 QuantumProvenanceGraph before making a summary."""
1281 )
1282 result = Summary()
1283 futures: list[concurrent.futures.Future[None]] = []
1284 _LOG.verbose("Summarizing %s tasks.", len(self._quanta.keys()))
1285 with concurrent.futures.ThreadPoolExecutor(n_cores) as executor:
1286 for m, (task_label, quanta) in enumerate(self._quanta.items()):
1287 task_summary = TaskSummary()
1288 task_summary.n_expected = len(quanta)
1289 for n, quantum_key in enumerate(quanta):
1290 quantum_info = self.get_quantum_info(quantum_key)
1291 future = task_summary._add_quantum_info(
1292 quantum_info,
1293 log_getter=self._butler_get if do_store_logs else None,
1294 executor=executor,
1295 )
1296 if future is not None:
1297 futures.append(future)
1298 status_log.log(
1299 "Summarized %s of %s quanta of task %s of %s.",
1300 n + 1,
1301 len(quanta),
1302 m + 1,
1303 len(self._quanta.keys()),
1304 )
1305 result.tasks[task_label] = task_summary
1306 for n, future in enumerate(concurrent.futures.as_completed(futures)):
1307 if (err := future.exception()) is not None:
1308 raise err
1309 status_log.log("Loaded messages from %s of %s log datasets.", n + 1, len(futures))
1310 _LOG.verbose("Summarizing %s dataset types.", len(self._datasets.keys()))
1311 for m, (dataset_type_name, datasets) in enumerate(self._datasets.items()):
1312 dataset_type_summary = DatasetTypeSummary(producer="")
1313 dataset_type_summary.n_expected = len(datasets)
1314 for n, dataset_key in enumerate(datasets):
1315 dataset_info = self.get_dataset_info(dataset_key)
1316 producer_key = self.get_producer_of(dataset_key)
1317 producer_info = self.get_quantum_info(producer_key)
1318 # Not ideal, but hard to get out of the graph at the moment.
1319 # Change after DM-40441
1320 dataset_type_summary.producer = producer_key.task_label
1321 dataset_type_summary._add_dataset_info(dataset_info, producer_info)
1322 status_log.log(
1323 "Summarized %s of %s datasets of type %s of %s.",
1324 n + 1,
1325 len(datasets),
1326 m + 1,
1327 len(self._datasets.keys()),
1328 )
1329 result.datasets[dataset_type_name] = dataset_type_summary
1330 return result
1332 def iter_outputs_of(self, quantum_key: QuantumKey) -> Iterator[DatasetKey]:
1333 """Iterate through the outputs of a quantum, yielding the keys of
1334 all of the datasets produced by the quantum.
1336 Parameters
1337 ----------
1338 quantum_key : `QuantumKey`
1339 The key for the quantum whose outputs are needed.
1340 """
1341 yield from self._xgraph.successors(quantum_key)
1343 def get_producer_of(self, dataset_key: DatasetKey) -> QuantumKey:
1344 """Unpack the predecessor (producer quantum) of a given dataset key
1345 from a graph.
1347 Parameters
1348 ----------
1349 dataset_key : `DatasetKey`
1350 The key for the dataset whose producer quantum is needed.
1352 Returns
1353 -------
1354 result : `QuantumKey`
1355 The key for the quantum which produced the dataset.
1356 """
1357 (result,) = self._xgraph.predecessors(dataset_key)
1358 return result
1360 def iter_downstream(
1361 self, key: QuantumKey | DatasetKey
1362 ) -> Iterator[tuple[QuantumKey, QuantumInfo] | tuple[DatasetKey, DatasetInfo]]:
1363 """Iterate over the quanta and datasets that are downstream of a
1364 quantum or dataset.
1366 Parameters
1367 ----------
1368 key : `QuantumKey` or `DatasetKey`
1369 Starting node.
1371 Returns
1372 -------
1373 iter : `~collections.abc.Iterator` [ `tuple` ]
1374 An iterator over pairs of (`QuantumKey`, `QuantumInfo`) or
1375 (`DatasetKey`, `DatasetInfo`).
1376 """
1377 for key in networkx.dag.descendants(self._xgraph, key):
1378 yield (key, self._xgraph.nodes[key]) # type: ignore
1380 def assemble_quantum_provenance_graph(
1381 self,
1382 butler: Butler,
1383 qgraphs: Sequence[QuantumGraph | ResourcePathExpression],
1384 collections: Sequence[str] | None = None,
1385 where: str = "",
1386 curse_failed_logs: bool = False,
1387 read_caveats: Literal["lazy", "exhaustive"] | None = "lazy",
1388 use_qbb: bool = True,
1389 n_cores: int = 1,
1390 ) -> None:
1391 """Assemble the quantum provenance graph from a list of all graphs
1392 corresponding to processing attempts.
1394 Parameters
1395 ----------
1396 butler : `lsst.daf.butler.Butler`
1397 The Butler used for this report. This should match the Butler used
1398 for the run associated with the executed quantum graph.
1399 qgraphs : `~collections.abc.Sequence` [`QuantumGraph` |\
1400 `~lsst.utils.resources.ResourcePathExpression`]
1401 A list of either quantum graph objects or their uri's, to be used
1402 to assemble the `QuantumProvenanceGraph`.
1403 collections : `~collections.abc.Sequence` [`str`] | `None`
1404 Collections to use in `lsst.daf.butler.query_datasets` when testing
1405 which datasets are available at a high level.
1406 where : `str`
1407 A "where" string to use to constrain the datasets; should be
1408 provided if ``collections`` includes many datasets that are not in
1409 any graphs, to select just those that might be (e.g. when sharding
1410 over dimensions and using a final collection that spans multiple
1411 shards).
1412 curse_failed_logs : `bool`
1413 Mark log datasets as CURSED if they are visible in the final
1414 output collection. Note that a campaign-level collection must be
1415 used here for `collections` if `curse_failed_logs` is `True`.
1416 read_caveats : `str` or `None`, optional
1417 Whether to read metadata files to get flags that describe qualified
1418 successes. If `None`, no metadata files will be read and all
1419 ``caveats`` fields will be `None`. If "exhaustive", all
1420 metadata files will be read. If "lazy", only metadata files where
1421 at least one predicted output is missing will be read.
1422 use_qbb : `bool`, optional
1423 If `True`, use a quantum-backed butler when reading metadata files.
1424 Note that some butler database queries are still run even if this
1425 is `True`; this does not avoid database access entirely.
1426 n_cores : `int`, optional
1427 Number of threads to use for parallelization.
1428 """
1429 if read_caveats not in ("lazy", "exhaustive", None):
1430 raise TypeError(
1431 f"Invalid option {read_caveats!r} for read_caveats; should be 'lazy', 'exhaustive', or None."
1432 )
1433 output_runs = []
1434 last_time: datetime.datetime | None = None
1435 for graph in qgraphs:
1436 if not isinstance(graph, QuantumGraph):
1437 _LOG.verbose("Loading quantum graph %r.", graph)
1438 qgraph = QuantumGraph.loadUri(graph)
1439 else:
1440 qgraph = graph
1441 assert qgraph.metadata is not None, "Saved QGs always have metadata."
1442 self._add_new_graph(butler, qgraph, read_caveats=read_caveats, use_qbb=use_qbb, n_cores=n_cores)
1443 output_runs.append(qgraph.metadata["output_run"])
1444 if last_time is not None and last_time > qgraph.metadata["time"]:
1445 raise RuntimeError("Quantum graphs must be passed in chronological order.")
1446 last_time = qgraph.metadata["time"]
1447 if not collections:
1448 # We reverse the order of the associated output runs because the
1449 # query in _resolve_duplicates must be done most-recent first.
1450 collections = list(reversed(output_runs))
1451 assert not curse_failed_logs, (
1452 "curse_failed_logs option must be used with one campaign-level collection."
1453 )
1454 self._resolve_duplicates(butler, collections, where, curse_failed_logs)
1456 def _add_new_graph(
1457 self,
1458 butler: Butler,
1459 qgraph: QuantumGraph,
1460 read_caveats: Literal["lazy", "exhaustive"] | None,
1461 use_qbb: bool = True,
1462 n_cores: int = 1,
1463 ) -> None:
1464 """Add a new quantum graph to the `QuantumProvenanceGraph`.
1466 Parameters
1467 ----------
1468 butler : `lsst.daf.butler.Butler`
1469 The Butler used for this report. This should match the Butler
1470 used for the run associated with the executed quantum graph.
1471 qgraph : `QuantumGraph`
1472 The quantum graph object to add.
1473 read_caveats : `str` or `None`
1474 Whether to read metadata files to get flags that describe qualified
1475 successes. If `None`, no metadata files will be read and all
1476 ``caveats`` fields will be `None`. If "exhaustive", all
1477 metadata files will be read. If "lazy", only metadata files where
1478 at least one predicted output is missing will be read.
1479 use_qbb : `bool`, optional
1480 If `True`, use a quantum-backed butler when reading metadata files.
1481 Note that some butler database queries are still run even if this
1482 is `True`; this does not avoid database access entirely.
1483 n_cores : `int`, optional
1484 Number of threads to use for parallelization.
1485 """
1486 status_log = PeriodicLogger(_LOG)
1487 output_run = qgraph.metadata["output_run"]
1488 # Add QuantumRun and DatasetRun (and nodes/edges, as needed) to the
1489 # QPG for all quanta in the QG.
1490 _LOG.verbose("Adding output run to provenance graph.")
1491 new_quanta: list[QuantumKey] = []
1492 for n, node in enumerate(qgraph):
1493 new_quanta.append(self._add_new_quantum(node, output_run))
1494 status_log.log("Added nodes for %s of %s quanta.", n + 1, len(qgraph))
1495 # Query for datasets in the output run to see which ones were actually
1496 # produced.
1497 _LOG.verbose("Querying for existence for %s dataset types.", len(self._datasets.keys()))
1498 for m, dataset_type_name in enumerate(self._datasets):
1499 try:
1500 refs = butler.query_datasets(
1501 dataset_type_name, collections=output_run, explain=False, limit=None
1502 )
1503 except MissingDatasetTypeError:
1504 continue
1505 for n, ref in enumerate(refs):
1506 dataset_key = DatasetKey(ref.datasetType.name, ref.dataId.required_values)
1507 dataset_info = self.get_dataset_info(dataset_key)
1508 dataset_run = dataset_info["runs"][output_run] # dataset run (singular)
1509 dataset_run.produced = True
1510 status_log.log(
1511 "Updated status for %s of %s datasets of %s of %s types.",
1512 n + 1,
1513 len(refs),
1514 m + 1,
1515 len(self._datasets.keys()),
1516 )
1517 if use_qbb:
1518 provenance_graph_ref: DatasetRef | None = None
1519 try:
1520 provenance_graph_ref = butler.find_dataset(
1521 PROVENANCE_DATASET_TYPE_NAME, collections=output_run
1522 )
1523 except MissingDatasetTypeError:
1524 pass
1525 if provenance_graph_ref is not None:
1526 _LOG.warning(
1527 "Cannot use QBB for metadata/log reads after provenance has been ingested; "
1528 "falling back to full butler."
1529 )
1530 self._butler_wrappers[output_run] = _ThreadLocalButlerWrapper.wrap_full(butler)
1531 else:
1532 _LOG.verbose("Using quantum-backed butler for metadata loads.")
1533 self._butler_wrappers[output_run] = _ThreadLocalButlerWrapper.wrap_qbb(butler, qgraph)
1534 else:
1535 _LOG.verbose("Using full butler for metadata loads.")
1536 self._butler_wrappers[output_run] = _ThreadLocalButlerWrapper.wrap_full(butler)
1538 _LOG.verbose("Setting quantum status from dataset existence.")
1539 # Update quantum status information based on which datasets were
1540 # produced.
1541 blocked: set[DatasetKey] = set() # the outputs of failed or blocked quanta in this run.
1542 with concurrent.futures.ThreadPoolExecutor(n_cores) as executor:
1543 futures: list[concurrent.futures.Future[None]] = []
1544 for n, quantum_key in enumerate(new_quanta):
1545 if (
1546 self._update_run_status(quantum_key, output_run, blocked) == QuantumRunStatus.SUCCESSFUL
1547 and read_caveats is not None
1548 ):
1549 self._update_caveats(quantum_key, output_run, read_caveats, executor, futures)
1550 self._update_info_status(quantum_key, output_run)
1551 status_log.log("Updated status for %s of %s quanta.", n + 1, len(new_quanta))
1552 for n, future in enumerate(concurrent.futures.as_completed(futures)):
1553 if (err := future.exception()) is not None:
1554 raise err
1555 status_log.log("Added exception/caveat information for %s of %s quanta.", n + 1, len(futures))
1557 def _add_new_quantum(self, node: QuantumNode, output_run: str) -> QuantumKey:
1558 """Add a quantum from a new quantum graph to the provenance graph.
1560 Parameters
1561 ----------
1562 node : `QuantumNode`
1563 Node in the quantum graph.
1564 output_run : `str`
1565 Output run collection.
1567 Returns
1568 -------
1569 quantum_key : `QuantumKey`
1570 Key for the new or existing node in the provenance graph.
1572 Notes
1573 -----
1574 This method adds new quantum and dataset nodes to the provenance graph
1575 if they don't already exist, while adding new `QuantumRun` and
1576 `DatasetRun` objects to both new and existing nodes. All status
1577 information on those nodes is set to initial, default values that
1578 generally reflect quanta that have not been attempted to be run.
1579 """
1580 # make a key to refer to the quantum and add it to the quantum
1581 # provenance graph.
1582 quantum_key = QuantumKey(
1583 node.taskDef.label, cast(DataCoordinate, node.quantum.dataId).required_values
1584 )
1585 self._xgraph.add_node(quantum_key)
1586 # use the key to get a `QuantumInfo` object for the quantum
1587 # and set defaults for its values.
1588 quantum_info = self.get_quantum_info(quantum_key)
1589 quantum_info.setdefault("messages", [])
1590 quantum_info.setdefault("runs", {})
1591 quantum_info.setdefault("data_id", cast(DataCoordinate, node.quantum.dataId))
1592 quantum_info.setdefault("status", QuantumInfoStatus.UNKNOWN)
1593 quantum_info.setdefault("recovered", False)
1594 self._quanta.setdefault(quantum_key.task_label, set()).add(quantum_key)
1595 metadata_ref = node.quantum.outputs[METADATA_OUTPUT_TEMPLATE.format(label=node.taskDef.label)][0]
1596 log_ref = node.quantum.outputs[LOG_OUTPUT_TEMPLATE.format(label=node.taskDef.label)][0]
1597 # associate run collections with specific quanta. this is important
1598 # if the same quanta are processed in multiple runs as in recovery
1599 # workflows.
1600 quantum_runs = quantum_info.setdefault("runs", {})
1601 # the `QuantumRun` here is the specific quantum-run collection
1602 # combination.
1603 quantum_runs[output_run] = QuantumRun(id=node.nodeId, metadata_ref=metadata_ref, log_ref=log_ref)
1604 # For each of the outputs of the quanta (datasets) make a key to
1605 # refer to the dataset.
1606 for ref in itertools.chain.from_iterable(node.quantum.outputs.values()):
1607 dataset_key = DatasetKey(ref.datasetType.name, ref.dataId.required_values)
1608 # add datasets to the nodes of the graph, with edges on the
1609 # quanta.
1610 self._xgraph.add_edge(quantum_key, dataset_key)
1611 # use the dataset key to make a `DatasetInfo` object for
1612 # the dataset and set defaults for its values.
1613 dataset_info = self.get_dataset_info(dataset_key)
1614 dataset_info.setdefault("data_id", ref.dataId)
1615 dataset_info.setdefault("status", DatasetInfoStatus.PREDICTED_ONLY)
1616 dataset_info.setdefault("messages", [])
1617 self._datasets.setdefault(dataset_key.dataset_type_name, set()).add(dataset_key)
1618 dataset_runs = dataset_info.setdefault("runs", {})
1619 # make a `DatasetRun` for the specific dataset-run
1620 # collection combination.
1621 dataset_runs[output_run] = DatasetRun(id=ref.id)
1622 # save metadata and logs for easier status interpretation later
1623 if dataset_key.dataset_type_name.endswith(METADATA_OUTPUT_CONNECTION_NAME):
1624 quantum_info["metadata"] = dataset_key
1625 quantum_runs[output_run].metadata_ref = ref
1626 if dataset_key.dataset_type_name.endswith(LOG_OUTPUT_CONNECTION_NAME):
1627 quantum_info["log"] = dataset_key
1628 quantum_runs[output_run].log_ref = ref
1629 for ref in itertools.chain.from_iterable(node.quantum.inputs.values()):
1630 dataset_key = DatasetKey(ref.datasetType.nameAndComponent()[0], ref.dataId.required_values)
1631 if dataset_key in self._xgraph:
1632 # add another edge if the input datasetType and quantum are
1633 # in the graph
1634 self._xgraph.add_edge(dataset_key, quantum_key)
1635 return quantum_key
1637 def _update_run_status(
1638 self, quantum_key: QuantumKey, output_run: str, blocked: set[DatasetKey]
1639 ) -> QuantumRunStatus:
1640 """Update the status of this quantum in its own output run, using
1641 information in the graph about which of its output datasets exist.
1643 Parameters
1644 ----------
1645 quantum_key : `QuantumKey`
1646 Key for the node in the provenance graph.
1647 output_run : `str`
1648 Output run collection.
1649 blocked : `set` [ `DatasetKey` ]
1650 A set of output datasets (for all quanta, not just this one) that
1651 were blocked by failures. Will be modified in place.
1653 Returns
1654 -------
1655 run_status : `QuantumRunStatus`
1656 Run-specific status for this quantum.
1657 """
1658 quantum_info = self.get_quantum_info(quantum_key)
1659 quantum_run = quantum_info["runs"][output_run]
1660 metadata_key = quantum_info["metadata"]
1661 log_key = quantum_info["log"]
1662 metadata_dataset_run = self.get_dataset_info(metadata_key)["runs"][output_run]
1663 log_dataset_run = self.get_dataset_info(log_key)["runs"][output_run]
1664 # if we do have metadata, we know that the task finished.
1665 if metadata_dataset_run.produced:
1666 # if we also have logs, this is a success.
1667 if log_dataset_run.produced:
1668 quantum_run.status = QuantumRunStatus.SUCCESSFUL
1669 else:
1670 # if we have metadata and no logs, this is a very rare
1671 # case. either the task ran successfully and the datastore
1672 # died immediately afterwards, or some supporting
1673 # infrastructure for transferring the logs to the datastore
1674 # failed.
1675 quantum_run.status = QuantumRunStatus.LOGS_MISSING
1677 # missing metadata means that the task did not finish.
1678 else:
1679 # if we have logs and no metadata, the task not finishing is
1680 # a failure in the task itself. This includes all payload
1681 # errors and some other problems.
1682 if log_dataset_run.produced:
1683 quantum_run.status = QuantumRunStatus.FAILED
1684 # if a quantum fails, all its successor datasets are
1685 # blocked.
1686 blocked.update(self._xgraph.successors(quantum_key))
1687 # if we are missing metadata and logs, either the task was not
1688 # started, or a hard external environmental error prevented
1689 # it from writing logs or metadata.
1690 else:
1691 # if none of this quantum's inputs were blocked, the
1692 # metadata must just be missing.
1693 if blocked.isdisjoint(self._xgraph.predecessors(quantum_key)):
1694 # None of this quantum's inputs were blocked.
1695 quantum_run.status = QuantumRunStatus.METADATA_MISSING
1696 # otherwise we can assume from no metadata and no logs
1697 # that the task was blocked by an upstream failure.
1698 else:
1699 quantum_run.status = QuantumRunStatus.BLOCKED
1700 blocked.update(self._xgraph.successors(quantum_key))
1701 return quantum_run.status
1703 def _update_info_status(self, quantum_key: QuantumKey, output_run: str) -> QuantumInfoStatus:
1704 """Update the status of this quantum across all runs with the status
1705 for its latest run.
1707 Parameters
1708 ----------
1709 quantum_key : `QuantumKey`
1710 Key for the node in the provenance graph.
1711 output_run : `str`
1712 Output run collection.
1714 Returns
1715 -------
1716 info_status : `QuantumRunStatus`
1717 Run-specific status for this quantum.
1718 """
1719 # Now we can start using state transitions to mark overall status.
1720 quantum_info = self.get_quantum_info(quantum_key)
1721 quantum_run = quantum_info["runs"][output_run]
1722 last_status = quantum_info["status"]
1723 new_status: QuantumInfoStatus
1724 match last_status, quantum_run.status:
1725 # A quantum can never escape a WONKY state.
1726 case (QuantumInfoStatus.WONKY, _):
1727 new_status = QuantumInfoStatus.WONKY
1728 # Any transition to a success (excluding from WONKY) is
1729 # a success; any transition from a failed state is also a
1730 # recovery.
1731 case (_, QuantumRunStatus.SUCCESSFUL):
1732 new_status = QuantumInfoStatus.SUCCESSFUL
1733 if last_status != QuantumInfoStatus.SUCCESSFUL and last_status != QuantumInfoStatus.UNKNOWN:
1734 quantum_info["recovered"] = True
1735 # Missing logs are one of the categories of wonky quanta. They
1736 # interfere with our ability to discern quantum status and are
1737 # signs of weird things afoot in processing. Add a message
1738 # noting why this quantum is being marked as wonky to be stored
1739 # in its `UnsuccessfulQuantumInfo`.
1740 case (_, QuantumRunStatus.LOGS_MISSING):
1741 new_status = QuantumInfoStatus.WONKY
1742 quantum_info["messages"].append(f"Logs missing for run {output_run!r}.")
1743 # Leaving a successful state is another category of wonky
1744 # quanta. If a previous success fails on a subsequent run,
1745 # a human should inspect why. Add a message noting why this
1746 # quantum is being marked as wonky to be stored in its
1747 # `UnsuccessfulQuantumInfo`.
1748 case (QuantumInfoStatus.SUCCESSFUL, _):
1749 new_status = QuantumInfoStatus.WONKY
1750 quantum_info["messages"].append(
1751 f"Status went from successful in run {list(quantum_info['runs'].values())[-1]!r} "
1752 f"to {quantum_run.status!r} in {output_run!r}."
1753 )
1754 # If a quantum status is unknown and it moves to blocked, we
1755 # know for sure that it is a blocked quantum.
1756 case (QuantumInfoStatus.UNKNOWN, QuantumRunStatus.BLOCKED):
1757 new_status = QuantumInfoStatus.BLOCKED
1758 # A transition into blocked does not change the overall quantum
1759 # status for a failure.
1760 case (_, QuantumRunStatus.BLOCKED):
1761 new_status = last_status
1762 # If a quantum transitions from any state into missing
1763 # metadata, we don't have enough information to diagnose its
1764 # state.
1765 case (_, QuantumRunStatus.METADATA_MISSING):
1766 new_status = QuantumInfoStatus.UNKNOWN
1767 # Any transition into failure is a failed state.
1768 case (_, QuantumRunStatus.FAILED):
1769 new_status = QuantumInfoStatus.FAILED
1770 # Update `QuantumInfo.status` for this quantum.
1771 quantum_info["status"] = new_status
1772 return new_status
1774 def _update_caveats(
1775 self,
1776 quantum_key: QuantumKey,
1777 output_run: str,
1778 read_caveats: Literal["lazy", "exhaustive"],
1779 executor: concurrent.futures.Executor,
1780 futures: list[concurrent.futures.Future[None]],
1781 ) -> None:
1782 """Read quantum success caveats and exception information from task
1783 metadata.
1785 Parameters
1786 ----------
1787 quantum_key : `QuantumKey`
1788 Key for the node in the provenance graph.
1789 output_run : `str`
1790 Output run collection.
1791 read_caveats : `str`
1792 Whether to read metadata files to get flags that describe qualified
1793 successes. If "exhaustive", all metadata files will be read. If
1794 "lazy", only metadata files where at least one predicted output is
1795 missing will be read.
1796 executor : `concurrent.futures.Executor`
1797 The futures executor to use.
1798 futures : `list` [ `concurrent.futures.Future` ]
1799 Current list of futures. Will be modified.
1800 """
1801 if read_caveats == "lazy" and all(
1802 self.get_dataset_info(dataset_key)["runs"][output_run].produced
1803 for dataset_key in self._xgraph.successors(quantum_key)
1804 ):
1805 return
1806 quantum_info = self.get_quantum_info(quantum_key)
1807 quantum_run = quantum_info["runs"][output_run]
1809 def read_metadata() -> None:
1810 md = self._butler_get(quantum_run.metadata_ref, storageClass=METADATA_OUTPUT_STORAGE_CLASS)
1811 try:
1812 # Int conversion guards against spurious conversion to
1813 # float that can apparently sometimes happen in
1814 # TaskMetadata.
1815 quantum_run.caveats = QuantumSuccessCaveats(int(md["quantum"]["caveats"]))
1816 except LookupError:
1817 pass
1818 try:
1819 quantum_run.exception = ExceptionInfo._from_metadata(md[quantum_key.task_label]["failure"])
1820 except LookupError:
1821 pass
1823 futures.append(executor.submit(read_metadata))
1825 def _resolve_duplicates(
1826 self,
1827 butler: Butler,
1828 collections: Sequence[str] | None = None,
1829 where: str = "",
1830 curse_failed_logs: bool = False,
1831 ) -> None:
1832 """After quantum graphs associated with each run have been added
1833 to the `QuantumProvenanceGraph, resolve any discrepancies between
1834 them and use all attempts to finalize overall status.
1836 Particularly, use the state of each `DatasetRun` in combination with
1837 overall quantum status to ascertain the status of each dataset.
1838 Additionally, if there are multiple visible runs associated with a
1839 dataset, mark the producer quantum as WONKY.
1841 This method should be called after
1842 `QuantumProvenanceGraph._add_new_graph` has been called on every graph
1843 associated with the data processing.
1845 Parameters
1846 ----------
1847 butler : `lsst.daf.butler.Butler`
1848 The Butler used for this report. This should match the Butler used
1849 for the run associated with the executed quantum graph.
1850 collections : `~collections.abc.Sequence` [`str`] | `None`
1851 Collections to use in `lsst.daf.butler.query_datasets` when testing
1852 which datasets are available at a high level.
1853 where : `str`
1854 A "where" string to use to constrain the datasets; should be
1855 provided if ``collections`` includes many datasets that are not in
1856 any graphs, to select just those that might be (e.g. when sharding
1857 over dimensions and using a final collection that spans multiple
1858 shards).
1859 curse_failed_logs : `bool`
1860 Mark log datasets as CURSED if they are visible in the final
1861 output collection. Note that a campaign-level collection must be
1862 used here for `collections` if `curse_failed_logs` is `True`; if
1863 `_resolve_duplicates` is run on a list of group-level collections
1864 then each will only show log datasets from their own failures as
1865 visible and datasets from others will be marked as cursed.
1866 """
1867 # First thing: raise an error if resolve_duplicates has been run
1868 # before on this qpg.
1869 if self._finalized:
1870 raise RuntimeError(
1871 """resolve_duplicates may only be called on a
1872 QuantumProvenanceGraph once. Call only after all graphs have
1873 been added, or make a new graph with all constituent
1874 attempts."""
1875 )
1876 status_log = PeriodicLogger(_LOG)
1877 _LOG.verbose("Querying for dataset visibility.")
1878 for m, dataset_type_name in enumerate(self._datasets):
1879 # find datasets in a larger collection.
1880 try:
1881 refs = butler.query_datasets(
1882 dataset_type_name, collections=collections, where=where, limit=None, explain=False
1883 )
1884 except MissingDatasetTypeError:
1885 continue
1886 for n, ref in enumerate(refs):
1887 dataset_key = DatasetKey(ref.datasetType.name, ref.dataId.required_values)
1888 try:
1889 dataset_info = self.get_dataset_info(dataset_key)
1890 # Ignore if we don't actually have the dataset in any of the
1891 # graphs given.
1892 except KeyError:
1893 continue
1894 # queryable datasets are `visible`.
1895 dataset_info["runs"][ref.run].visible = True
1896 status_log.log(
1897 "Updated visibility for %s of %s datasets of type %s of %s.",
1898 n + 1,
1899 len(refs),
1900 m + 1,
1901 len(self._datasets.keys()),
1902 )
1903 _LOG.verbose("Updating task status from dataset visibility.")
1904 for m, task_quanta in enumerate(self._quanta.values()):
1905 for n, quantum_key in enumerate(task_quanta):
1906 # runs associated with visible datasets.
1907 visible_runs: set[str] = set()
1908 quantum_info = self.get_quantum_info(quantum_key)
1909 # Loop over each dataset in the outputs of a single quantum.
1910 for dataset_key in self.iter_outputs_of(quantum_key):
1911 dataset_info = self.get_dataset_info(dataset_key)
1912 dataset_type_name = dataset_key.dataset_type_name
1913 visible_runs.update(
1914 run for run, dataset_run in dataset_info["runs"].items() if dataset_run.visible
1915 )
1916 if any(dataset_run.visible for dataset_run in dataset_info["runs"].values()):
1917 query_state = "visible"
1918 # set the publish state to `shadowed` if the dataset was
1919 # produced but not visible (i.e., not queryable from the
1920 # final collection(s)).
1921 elif any(dataset_run.produced for dataset_run in dataset_info["runs"].values()):
1922 query_state = "shadowed"
1923 # a dataset which was not produced and not visible is
1924 # missing.
1925 else:
1926 query_state = "missing"
1927 # use the quantum status and publish state to ascertain the
1928 # status of the dataset.
1929 match (quantum_info["status"], query_state):
1930 # visible datasets from successful quanta are as
1931 # intended.
1932 case (QuantumInfoStatus.SUCCESSFUL, "visible"):
1933 dataset_info["status"] = DatasetInfoStatus.VISIBLE
1934 # missing datasets from successful quanta indicate a
1935 # `NoWorkFound` case.
1936 case (QuantumInfoStatus.SUCCESSFUL, "missing"):
1937 dataset_info["status"] = DatasetInfoStatus.PREDICTED_ONLY
1938 case (QuantumInfoStatus.SUCCESSFUL, "shadowed"):
1939 dataset_info["status"] = DatasetInfoStatus.SHADOWED
1940 # If anything other than a successful quantum produces
1941 # a visible dataset, that dataset is cursed. Set the
1942 # status for the dataset to cursed and note the reason
1943 # for labeling the dataset as cursed.
1944 case (_, "visible"):
1945 # Avoiding publishing failed logs is difficult
1946 # without using tagged collections, so flag them as
1947 # merely unsuccessful unless the user requests it.
1948 if (
1949 dataset_type_name.endswith(LOG_OUTPUT_CONNECTION_NAME)
1950 and not curse_failed_logs
1951 ):
1952 dataset_info["status"] = DatasetInfoStatus.UNSUCCESSFUL
1953 else:
1954 dataset_info["status"] = DatasetInfoStatus.CURSED
1955 dataset_info["messages"].append(
1956 f"Unsuccessful dataset {dataset_type_name} visible in "
1957 "final output collection."
1958 )
1959 # any other produced dataset (produced but not
1960 # visible and not successful) is a regular
1961 # failure.
1962 case _:
1963 dataset_info["status"] = DatasetInfoStatus.UNSUCCESSFUL
1964 if len(visible_runs) > 1:
1965 quantum_info["status"] = QuantumInfoStatus.WONKY
1966 quantum_info["messages"].append(
1967 f"Outputs from different runs of the same quanta were visible: {visible_runs}."
1968 )
1969 for dataset_key in self.iter_outputs_of(quantum_key):
1970 dataset_info = self.get_dataset_info(dataset_key)
1971 quantum_info["messages"].append(
1972 f"{dataset_key.dataset_type_name}"
1973 + f"from {str(dataset_info['runs'])};"
1974 + f"{str(dataset_info['status'])}"
1975 )
1976 status_log.log(
1977 "Updated task status from visibility for %s of %s quanta of task %s of %s.",
1978 n + 1,
1979 len(task_quanta),
1980 m + 1,
1981 len(self._quanta.keys()),
1982 )
1983 # If we make it all the way through resolve_duplicates, set
1984 # self._finalized = True so that it cannot be run again.
1985 self._finalized = True
1987 def _butler_get(self, ref: DatasetRef, **kwargs: Any) -> Any:
1988 return self._butler_wrappers[ref.run].butler.get(ref, **kwargs)
1991class _ThreadLocalButlerWrapper:
1992 """A wrapper for a thread-local limited butler.
1994 Parameter
1995 ---------
1996 factory : `~collections.abc.Callable`
1997 A callable that takes no arguments and returns a limited butler.
1998 """
2000 def __init__(self, factory: Callable[[], LimitedButler]):
2001 self._factory = factory
2002 self._thread_local = threading.local()
2004 @classmethod
2005 def wrap_qbb(cls, full_butler: Butler, qg: QuantumGraph) -> _ThreadLocalButlerWrapper:
2006 """Wrap a `~lsst.daf.butler.QuantumBackedButler` suitable for reading
2007 log and metadata files.
2009 Parameters
2010 ----------
2011 full_butler : `~lsst.daf.butler.Butler`
2012 Full butler to draw datastore and dimension configuration from.
2013 qg : `QuantumGraph`
2014 Quantum graph.
2016 Returns
2017 -------
2018 wrapper : `_ThreadLocalButlerWrapper`
2019 A wrapper that provides access to a thread-local QBB, constructing]
2020 it on first use.
2021 """
2022 dataset_ids = []
2023 for task_label in qg.pipeline_graph.tasks.keys():
2024 for quantum in qg.get_task_quanta(task_label).values():
2025 dataset_ids.append(quantum.outputs[LOG_OUTPUT_TEMPLATE.format(label=task_label)][0].id)
2026 dataset_ids.append(quantum.outputs[METADATA_OUTPUT_TEMPLATE.format(label=task_label)][0].id)
2027 try:
2028 butler_config = full_butler._config # type: ignore[attr-defined]
2029 except AttributeError:
2030 raise RuntimeError("use_qbb=True requires a direct butler.") from None
2031 factory = _QuantumBackedButlerFactory(
2032 butler_config,
2033 dataset_ids,
2034 full_butler.dimensions,
2035 dataset_types={dt.name: dt for dt in qg.registryDatasetTypes()},
2036 )
2037 return cls(factory)
2039 @classmethod
2040 def wrap_full(cls, full_butler: Butler) -> _ThreadLocalButlerWrapper:
2041 """Wrap a full `~lsst.daf.butler.Butler`.
2043 Parameters
2044 ----------
2045 full_butler : `~lsst.daf.butler.Butler`
2046 Full butler to clone when making thread-local copies.
2048 Returns
2049 -------
2050 wrapper : `_ThreadLocalButlerWrapper`
2051 A wrapper that provides access to a thread-local butler,
2052 constructing it on first use.
2053 """
2054 return cls(full_butler.clone)
2056 @property
2057 def butler(self) -> LimitedButler:
2058 """The wrapped butler, constructed on first use within each thread."""
2059 if (butler := getattr(self._thread_local, "butler", None)) is None:
2060 self._thread_local.butler = self._factory()
2061 butler = self._thread_local.butler
2062 return butler
2065@dataclasses.dataclass
2066class _QuantumBackedButlerFactory:
2067 """A factory for `~lsst.daf.butler.QuantumBackedButler`, for use by
2068 `_ThreadLocalButlerWrapper`.
2069 """
2071 config: ButlerConfig
2072 dataset_ids: list[DatasetId]
2073 universe: DimensionUniverse
2074 dataset_types: dict[str, DatasetType]
2076 def __call__(self) -> QuantumBackedButler:
2077 return QuantumBackedButler.from_predicted(
2078 self.config,
2079 predicted_inputs=self.dataset_ids,
2080 predicted_outputs=[],
2081 dimensions=self.universe,
2082 # We don't need the datastore records in the QG because we're
2083 # only going to read metadata and logs, and those are never
2084 # overall inputs.
2085 datastore_records={},
2086 dataset_types=self.dataset_types,
2087 )
2090def _cli() -> None:
2091 import argparse
2093 from .pipeline_graph.visualization import (
2094 QuantumProvenanceGraphStatusAnnotator,
2095 QuantumProvenanceGraphStatusOptions,
2096 show,
2097 )
2099 parser = argparse.ArgumentParser(
2100 "QuantumProvenanceGraph command-line utilities.",
2101 description=(
2102 "This is a small, low-effort debugging utility. "
2103 "It may disappear at any time in favor of a public 'pipetask' interface."
2104 ),
2105 )
2106 subparsers = parser.add_subparsers(dest="cmd")
2108 pprint_parser = subparsers.add_parser("pprint", help="Print a saved summary as a series of tables.")
2109 pprint_parser.add_argument("file", type=argparse.FileType("r"), help="Saved summary JSON file.")
2110 pprint_parser.add_argument(
2111 "--brief",
2112 action=argparse.BooleanOptionalAction,
2113 default=False,
2114 help="Whether to print per-data ID information.",
2115 )
2116 pprint_parser.add_argument(
2117 "--datasets",
2118 action=argparse.BooleanOptionalAction,
2119 default=True,
2120 )
2122 xgraph_parser = subparsers.add_parser("xgraph", help="Print a visual representation of a saved xgraph.")
2123 xgraph_parser.add_argument("file", type=argparse.FileType("r"), help="Saved summary JSON file.")
2124 xgraph_parser.add_argument("qgraph", type=str, help="Saved quantum graph file.")
2126 args = parser.parse_args()
2128 match args.cmd:
2129 case "pprint":
2130 summary = Summary.model_validate_json(args.file.read())
2131 args.file.close()
2132 summary.pprint(brief=args.brief, datasets=args.datasets)
2133 case "xgraph":
2134 summary = Summary.model_validate_json(args.file.read())
2135 args.file.close()
2136 status_annotator = QuantumProvenanceGraphStatusAnnotator(summary)
2137 status_options = QuantumProvenanceGraphStatusOptions(
2138 display_percent=True, display_counts=True, abbreviate=True, visualize=True
2139 )
2140 qgraph = QuantumGraph.loadUri(args.qgraph)
2141 pipeline_graph = qgraph.pipeline_graph
2142 show(
2143 pipeline_graph,
2144 dataset_types=True,
2145 status_annotator=status_annotator,
2146 status_options=status_options,
2147 )
2148 case _:
2149 raise AssertionError(f"Unhandled subcommand {args.dest}.")
2152if __name__ == "__main__":
2153 _cli()