Coverage for python / lsst / pipe / base / execution_reports.py: 28%
139 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-18 08:44 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-18 08:44 +0000
1# This file is part of pipe_base.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
21from __future__ import annotations
23__all__ = (
24 "DatasetTypeExecutionReport",
25 "QuantumGraphExecutionReport",
26 "TaskExecutionReport",
27 "lookup_quantum_data_id",
28)
30import dataclasses
31import itertools
32import logging
33import uuid
34from collections.abc import Iterable, Mapping
35from typing import Any
37import networkx
38import yaml
40from lsst.daf.butler import Butler, DataCoordinate, DatasetRef, Quantum
41from lsst.resources import ResourcePathExpression
43from .graph import QuantumGraph
46@dataclasses.dataclass
47class DatasetTypeExecutionReport:
48 """A report on the number of produced datasets as well as the status of
49 missing datasets based on metadata.
51 A `DatasetTypeExecutionReport` is created for each
52 `~lsst.daf.butler.DatasetType` in a `TaskExecutionReport`.
53 """
55 failed: set[DatasetRef] = dataclasses.field(default_factory=set)
56 """Datasets not produced because their quanta failed directly in this
57 run (`set`).
58 """
60 not_produced: set[DatasetRef] = dataclasses.field(default_factory=set)
61 """Missing datasets which were not produced by successful quanta.
62 """
64 blocked: set[DatasetRef] = dataclasses.field(default_factory=set)
65 """Datasets not produced due to an upstream failure (`set`).
66 """
68 n_produced: int = 0
69 """Count of datasets produced (`int`).
70 """
72 n_expected: int = 0
73 """Count of datasets expected (`int`)
74 """
76 def to_summary_dict(self) -> dict[str, Any]:
77 r"""Summarize the DatasetTypeExecutionReport in a dictionary.
79 Returns
80 -------
81 summary_dict : `dict`
82 A count of the datasets with each outcome; the number of
83 produced, ``failed``, ``not_produced``, and ``blocked``
84 `~lsst.daf.butler.DatasetType`\ s.
85 See above for attribute descriptions.
86 """
87 return {
88 "produced": self.n_produced,
89 "failed": len(self.failed),
90 "not_produced": len(self.not_produced),
91 "blocked": len(self.blocked),
92 "expected": self.n_expected,
93 }
96@dataclasses.dataclass
97class TaskExecutionReport:
98 """A report on the status and content of a task in an executed quantum
99 graph.
101 Use task metadata to identify and inspect failures and report on output
102 datasets.
104 See Also
105 --------
106 QuantumGraphExecutionReport : Quantum graph report.
107 DatasetTypeExecutionReport : DatasetType report.
108 """
110 failed: dict[uuid.UUID, DatasetRef] = dataclasses.field(default_factory=dict)
111 """A mapping from quantum data ID to log dataset reference for quanta that
112 failed directly in this run (`dict`).
113 """
115 n_succeeded: int = 0
116 """A count of successful quanta.
118 This may include quanta that did not produce any datasets; ie, raised
119 `NoWorkFound`.
120 """
122 n_expected: int = 0
123 """A count of expected quanta.
124 """
126 blocked: dict[uuid.UUID, DataCoordinate] = dataclasses.field(default_factory=dict)
127 """A mapping of data IDs of quanta that were not attempted due to an
128 upstream failure (`dict`).
129 """
131 output_datasets: dict[str, DatasetTypeExecutionReport] = dataclasses.field(default_factory=dict)
132 """Missing and produced outputs of each `~lsst.daf.butler.DatasetType`
133 (`dict`).
134 """
136 def inspect_quantum(
137 self,
138 quantum_id: uuid.UUID,
139 quantum: Quantum,
140 status_graph: networkx.DiGraph,
141 refs: Mapping[str, Mapping[uuid.UUID, DatasetRef]],
142 metadata_name: str,
143 log_name: str,
144 ) -> None:
145 """Inspect a quantum of a quantum graph and ascertain the status of
146 each associated data product.
148 Parameters
149 ----------
150 quantum_id : `uuid.UUID`
151 Unique identifier for the quantum to inspect.
152 quantum : `Quantum`
153 The specific node of the quantum graph to be inspected.
154 status_graph : `networkx.DiGraph`
155 The quantum graph produced by
156 `QuantumGraphExecutionReport.make_reports` which steps through the
157 quantum graph of a run and logs the status of each quantum.
158 refs : `~collections.abc.Mapping` [ `str`,\
159 `~collections.abc.Mapping` [ `uuid.UUID`,\
160 `~lsst.daf.butler.DatasetRef` ] ]
161 The DatasetRefs of each of the DatasetTypes produced by the task.
162 Includes initialization, intermediate and output data products.
163 metadata_name : `str`
164 The metadata dataset name for the node.
165 log_name : `str`
166 The name of the log files for the node.
168 See Also
169 --------
170 QuantumGraphExecutionReport.make_reports : Make reports.
171 """
172 (metadata_ref,) = quantum.outputs[metadata_name]
173 (log_ref,) = quantum.outputs[log_name]
174 blocked = False
175 if metadata_ref.id not in refs[metadata_name]:
176 if any(
177 status_graph.nodes[upstream_quantum_id]["failed"]
178 for upstream_dataset_id in status_graph.predecessors(quantum_id)
179 for upstream_quantum_id in status_graph.predecessors(upstream_dataset_id)
180 ):
181 assert quantum.dataId is not None
182 self.blocked[quantum_id] = quantum.dataId
183 blocked = True
184 else:
185 self.failed[quantum_id] = log_ref
186 # note: log_ref may or may not actually exist
187 failed = True
188 else:
189 failed = False
190 self.n_succeeded += 1
191 status_graph.nodes[quantum_id]["failed"] = failed
193 # Now, loop over the datasets to make a DatasetTypeExecutionReport.
194 for output_ref in itertools.chain.from_iterable(quantum.outputs.values()):
195 if output_ref == metadata_ref or output_ref == log_ref:
196 continue
197 if (dataset_type_report := self.output_datasets.get(output_ref.datasetType.name)) is None:
198 dataset_type_report = DatasetTypeExecutionReport()
199 self.output_datasets[output_ref.datasetType.name] = dataset_type_report
200 if output_ref.id not in refs[output_ref.datasetType.name]:
201 if failed:
202 if blocked:
203 dataset_type_report.blocked.add(output_ref)
204 else:
205 dataset_type_report.failed.add(output_ref)
206 else:
207 dataset_type_report.not_produced.add(output_ref)
208 else:
209 dataset_type_report.n_produced += 1
210 dataset_type_report.n_expected += 1
212 def to_summary_dict(
213 self, butler: Butler, do_store_logs: bool = True, human_readable: bool = False
214 ) -> dict[str, Any]:
215 """Summarize the results of the TaskExecutionReport in a dictionary.
217 Parameters
218 ----------
219 butler : `lsst.daf.butler.Butler`
220 The Butler used for this report.
221 do_store_logs : `bool`
222 Store the logs in the summary dictionary.
223 human_readable : `bool`
224 Store more human-readable information to be printed out to the
225 command-line.
227 Returns
228 -------
229 summary_dict : `dict`
230 A dictionary containing:
232 - outputs: A dictionary summarizing the
233 DatasetTypeExecutionReport for each DatasetType associated with
234 the task
235 - failed_quanta: A dictionary of quanta which failed and their
236 dataIDs by quantum graph node id
237 - n_quanta_blocked: The number of quanta which failed due to
238 upstream failures.
239 - n_succeded: The number of quanta which succeeded.
241 And possibly, if human-readable is passed:
243 - errors: A dictionary of data ids associated with each error
244 message. If `human-readable` and `do_store_logs`, this is stored
245 here. Otherwise, if `do_store_logs`, it is stored in
246 `failed_quanta` keyed by the quantum graph node id.
247 """
248 failed_quanta = {}
249 failed_data_ids = []
250 errors = []
251 for node_id, log_ref in self.failed.items():
252 data_id = dict(log_ref.dataId.required)
253 quantum_info: dict[str, Any] = {"data_id": data_id}
254 if do_store_logs:
255 try:
256 log = butler.get(log_ref)
257 except LookupError:
258 quantum_info["error"] = []
259 except FileNotFoundError:
260 quantum_info["error"] = None
261 else:
262 quantum_info["error"] = [
263 record.message for record in log if record.levelno >= logging.ERROR
264 ]
265 if human_readable:
266 failed_data_ids.append(data_id)
267 if do_store_logs:
268 errors.append(quantum_info)
270 else:
271 failed_quanta[str(node_id)] = quantum_info
272 result = {
273 "outputs": {name: r.to_summary_dict() for name, r in self.output_datasets.items()},
274 "n_quanta_blocked": len(self.blocked),
275 "n_succeeded": self.n_succeeded,
276 "n_expected": self.n_expected,
277 }
278 if human_readable:
279 result["failed_quanta"] = failed_data_ids
280 result["errors"] = errors
281 else:
282 result["failed_quanta"] = failed_quanta
283 return result
285 def __str__(self) -> str:
286 """Return a count of the failed and blocked tasks in the
287 TaskExecutionReport.
288 """
289 return f"failed: {len(self.failed)}\nblocked: {len(self.blocked)}\n"
292@dataclasses.dataclass
293class QuantumGraphExecutionReport:
294 """A report on the execution of a quantum graph.
296 Report the detailed status of each failure; whether tasks were not run,
297 data is missing from upstream failures, or specific errors occurred during
298 task execution (and report the errors). Contains a count of expected,
299 produced DatasetTypes for each task. This report can be output as a
300 dictionary or a yaml file.
302 See Also
303 --------
304 TaskExecutionReport : A task report.
305 DatasetTypeExecutionReport : A dataset type report.
306 """
308 tasks: dict[str, TaskExecutionReport] = dataclasses.field(default_factory=dict)
309 """A dictionary of TaskExecutionReports by task label (`dict`)."""
311 def to_summary_dict(
312 self, butler: Butler, do_store_logs: bool = True, human_readable: bool = False
313 ) -> dict[str, Any]:
314 """Summarize the results of the `QuantumGraphExecutionReport` in a
315 dictionary.
317 Parameters
318 ----------
319 butler : `lsst.daf.butler.Butler`
320 The Butler used for this report.
321 do_store_logs : `bool`
322 Store the logs in the summary dictionary.
323 human_readable : `bool`
324 Store more human-readable information to be printed out to the
325 command-line.
327 Returns
328 -------
329 summary_dict : `dict`
330 A dictionary containing a summary of a `TaskExecutionReport` for
331 each task in the quantum graph.
332 """
333 return {
334 task: report.to_summary_dict(butler, do_store_logs=do_store_logs, human_readable=human_readable)
335 for task, report in self.tasks.items()
336 }
338 def write_summary_yaml(self, butler: Butler, filename: str, do_store_logs: bool = True) -> None:
339 """Take the dictionary from
340 `QuantumGraphExecutionReport.to_summary_dict` and store its contents in
341 a yaml file.
343 Parameters
344 ----------
345 butler : `lsst.daf.butler.Butler`
346 The Butler used for this report.
347 filename : `str`
348 The name to be used for the summary yaml file.
349 do_store_logs : `bool`
350 Store the logs in the summary dictionary.
351 """
352 with open(filename, "w") as stream:
353 yaml.safe_dump(self.to_summary_dict(butler, do_store_logs=do_store_logs), stream)
355 @classmethod
356 def make_reports(
357 cls,
358 butler: Butler,
359 graph: QuantumGraph | ResourcePathExpression,
360 ) -> QuantumGraphExecutionReport:
361 """Make a `QuantumGraphExecutionReport`.
363 Step through the quantum graph associated with a run, creating a
364 `networkx.DiGraph` called status_graph to annotate the status of each
365 quantum node. For each task in the quantum graph, use
366 `TaskExecutionReport.inspect_quantum` to make a `TaskExecutionReport`
367 based on the status of each node. Return a `TaskExecutionReport` for
368 each task in the quantum graph.
370 Parameters
371 ----------
372 butler : `lsst.daf.butler.Butler`
373 The Butler used for this report. This should match the Butler used
374 for the run associated with the executed quantum graph.
375 graph : `QuantumGraph` | `ResourcePathExpression`
376 Either the associated quantum graph object or the uri of the
377 location of said quantum graph.
379 Returns
380 -------
381 report: `QuantumGraphExecutionReport`
382 The `TaskExecutionReport` for each task in the quantum graph.
383 """
384 refs = {} # type: dict[str, Any]
385 status_graph = networkx.DiGraph()
386 if not isinstance(graph, QuantumGraph):
387 qg = QuantumGraph.loadUri(graph)
388 else:
389 qg = graph
390 assert qg.metadata is not None, "Saved QGs always have metadata."
391 collection = qg.metadata["output_run"]
392 report = cls()
393 for dataset_type_node in qg.pipeline_graph.dataset_types.values():
394 if qg.pipeline_graph.producer_of(dataset_type_node.name) is None:
395 continue
396 refs[dataset_type_node.name] = {
397 ref.id: ref
398 for ref in butler.registry.queryDatasets(
399 dataset_type_node.name, collections=collection, findFirst=False
400 )
401 }
402 for task_node in qg.pipeline_graph.tasks.values():
403 for quantum_id, quantum in qg.get_task_quanta(task_node.label).items():
404 status_graph.add_node(quantum_id)
405 for ref in itertools.chain.from_iterable(quantum.outputs.values()):
406 status_graph.add_edge(quantum_id, ref.id)
407 for ref in itertools.chain.from_iterable(quantum.inputs.values()):
408 status_graph.add_edge(ref.id, quantum_id)
409 for task_node in qg.pipeline_graph.tasks.values():
410 task_report = TaskExecutionReport()
411 if task_node.log_output is None:
412 raise RuntimeError("QG must have log outputs to use execution reports.")
413 for quantum_id, quantum in qg.get_task_quanta(task_node.label).items():
414 task_report.inspect_quantum(
415 quantum_id,
416 quantum,
417 status_graph,
418 refs,
419 metadata_name=task_node.metadata_output.dataset_type_name,
420 log_name=task_node.log_output.dataset_type_name,
421 )
422 task_report.n_expected = len(qg.get_task_quanta(task_node.label).items())
423 report.tasks[task_node.label] = task_report
424 return report
426 def __str__(self) -> str:
427 return "\n".join(f"{tasklabel}:{report}" for tasklabel, report in self.tasks.items())
430def lookup_quantum_data_id(
431 graph_uri: ResourcePathExpression, nodes: Iterable[uuid.UUID]
432) -> list[DataCoordinate | None]:
433 """Look up a dataId from a quantum graph and a list of quantum graph
434 nodeIDs.
436 Parameters
437 ----------
438 graph_uri : `ResourcePathExpression`
439 URI of the quantum graph of the run.
440 nodes : `~collections.abc.Iterable` [ `uuid.UUID` ]
441 Quantum graph nodeID.
443 Returns
444 -------
445 data_ids : `list` [ `lsst.daf.butler.DataCoordinate` ]
446 A list of human-readable dataIDs which map to the nodeIDs on the
447 quantum graph at graph_uri.
448 """
449 qg = QuantumGraph.loadUri(graph_uri, nodes=nodes)
450 return [qg.getQuantumNodeByNodeId(node).quantum.dataId for node in nodes]