Coverage for python / lsst / ctrl / mpexec / cli / script / report.py: 15%
62 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 09:00 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 09:00 +0000
1# This file is part of ctrl_mpexec.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
27import pprint
28from collections.abc import Sequence
29from typing import Literal
31from astropy.table import Table
33from lsst.daf.butler import Butler
34from lsst.pipe.base import QuantumGraph
35from lsst.pipe.base.execution_reports import QuantumGraphExecutionReport
36from lsst.pipe.base.quantum_provenance_graph import QuantumProvenanceGraph, Summary
39def report(
40 butler_config: str,
41 qgraph_uri: str,
42 full_output_filename: str | None,
43 logs: bool = True,
44 brief: bool = False,
45) -> None:
46 """Summarize the produced, missing and expected quanta and
47 datasets belonging to an executed quantum graph using the
48 `lsst.pipe.base.execution_reports.QuantumGraphExecutionReport`.
50 Parameters
51 ----------
52 butler_config : `str`
53 The Butler used for this report. This should match the Butler used for
54 the run associated with the executed quantum graph.
55 qgraph_uri : `str`
56 The uri of the location of said quantum graph.
57 full_output_filename : `str`
58 Output the full summary report to a yaml file (named herein). Each data
59 id and error message is keyed to a quantum graph node id. A convenient
60 output format for error-matching and cataloguing tools such as the ones
61 in the Campaign Management database. If this is not included, quanta
62 and dataset information will be printed to the command-line instead.
63 logs : `bool`
64 Get butler log datasets for extra information (error messages).
65 brief : `bool`
66 List only the counts (or data_ids if number of failures < 5). This
67 option is good for those who just want to see totals.
68 """
69 qgraph = QuantumGraph.loadUri(qgraph_uri)
70 with Butler.from_config(butler_config, writeable=False) as butler:
71 report = QuantumGraphExecutionReport.make_reports(butler, qgraph)
72 if not full_output_filename:
73 # this is the option to print to the command-line
74 summary_dict = report.to_summary_dict(butler, logs, human_readable=True)
75 dataset_table_rows = []
76 data_products = []
77 quanta_summary = []
78 error_summary = []
79 for task in summary_dict.keys():
80 for data_product in summary_dict[task]["outputs"]:
81 dataset_table_rows.append(summary_dict[task]["outputs"][data_product])
82 data_products.append(data_product)
84 if len(summary_dict[task]["failed_quanta"]) > 5:
85 quanta_summary.append(
86 {
87 "Task": task,
88 "Failed": len(summary_dict[task]["failed_quanta"]),
89 "Blocked": summary_dict[task]["n_quanta_blocked"],
90 "Succeeded": summary_dict[task]["n_succeeded"],
91 "Expected": summary_dict[task]["n_expected"],
92 }
93 )
94 else:
95 quanta_summary.append(
96 {
97 "Task": task,
98 "Failed": summary_dict[task]["failed_quanta"],
99 "Blocked": summary_dict[task]["n_quanta_blocked"],
100 "Succeeded": summary_dict[task]["n_succeeded"],
101 "Expected": summary_dict[task]["n_expected"],
102 }
103 )
104 if "errors" in summary_dict[task].keys():
105 error_summary.append({task: summary_dict[task]["errors"]})
106 quanta = Table(quanta_summary)
107 datasets = Table(dataset_table_rows)
108 datasets.add_column(data_products, index=0, name="DatasetType")
109 quanta.pprint_all()
110 print("\n")
111 if not brief:
112 pprint.pprint(error_summary)
113 print("\n")
114 datasets.pprint_all()
115 else:
116 report.write_summary_yaml(butler, full_output_filename, do_store_logs=logs)
119def report_v2(
120 butler_config: str,
121 qgraph_uris: Sequence[str],
122 collections: Sequence[str] | None,
123 where: str,
124 full_output_filename: str | None,
125 logs: bool = True,
126 brief: bool = False,
127 curse_failed_logs: bool = False,
128 read_caveats: Literal["lazy", "exhaustive"] | None = "lazy",
129 use_qbb: bool = True,
130 n_cores: int = 1,
131 view_graph: bool = False,
132) -> None:
133 """Summarize the state of executed quantum graph(s), with counts of failed,
134 successful and expected quanta, as well as counts of output datasets and
135 their visible/shadowed states. Analyze one or more attempts at the same
136 processing on the same dataquery-identified "group" and resolve recoveries
137 and persistent failures. Identify mismatch errors between groups.
139 Parameters
140 ----------
141 butler_config : `str`
142 The Butler used for this report. This should match the Butler used for
143 the run associated with the executed quantum graph.
144 qgraph_uris : `~collections.abc.Sequence` [`str`]
145 One or more uris to the serialized Quantum Graph(s).
146 collections : `~collections.abc.Sequence` [`str`] | `None`
147 Collection(s) associated with said graphs/processing. For use in
148 `lsst.daf.butler.Registry.queryDatasets` if paring down the query would
149 be useful.
150 where : `str`
151 A "where" string to use to constrain the collections, if passed.
152 full_output_filename : `str`
153 Output the full pydantic model
154 `lsst.pipe.base.quantum_provenance_graph.Summary` object
155 into a JSON file. This is ideal for error-matching and cataloguing
156 tools such as the ones used by Campaign Management software and pilots,
157 and for searching and counting specific kinds or instances of failures.
158 This option will also print a "brief" (counts-only) summary to stdout.
159 logs : `bool`
160 Store error messages from Butler logs associated with failed quanta if
161 `True`.
162 brief : `bool`
163 Only display short (counts-only) summary on stdout. This includes
164 counts and not error messages or data_ids (similar to BPS report). This
165 option will still report all `cursed` datasets and `wonky` quanta.
166 curse_failed_logs : `bool`
167 Mark log datasets as `cursed` if they are published in the final output
168 collection. Note that a campaign-level collection must be used here for
169 `collections` if `curse_failed_logs` is `True`; if
170 `lsst.pipe.base.QuantumProvenanceGraph.__resolve_duplicates` is run on
171 a list of group-level collections, then each will only show log
172 datasets from their own failures as visible and datasets from others
173 will be marked as cursed.
174 read_caveats : `str`, optional
175 Whether and how to read success caveats from metadata datasets:
177 - "exhaustive": read all metadata datasets;
178 - "lazy": read metadata datasets only for quanta that had predicted
179 outputs that were not produced (will not pick up exceptions raised
180 after all datasets were written);
181 - `None`: do not read metadata datasets at all.
182 use_qbb : `bool`, optional
183 Whether to use a quantum-backed butler for metadata and log reads.
184 This should reduce the number of database operations.
185 n_cores : `int`, optional
186 Number of cores for metadata and log reads.
187 view_graph : `bool`
188 Display a graph representation of
189 `lsst.pipe.base.quantum_provenance_graph.Summary` on
190 stdout instead of the default plain-text summary. Pipeline graph nodes
191 are then annotated with their status. This is a useful way to visualize
192 the flow of quanta and datasets through the graph and to identify where
193 problems may be occurring.
194 """
195 with Butler.from_config(butler_config, writeable=False) as butler:
196 qpg = QuantumProvenanceGraph(
197 butler,
198 qgraph_uris,
199 collections=collections,
200 where=where,
201 curse_failed_logs=curse_failed_logs,
202 read_caveats=read_caveats,
203 use_qbb=use_qbb,
204 n_cores=n_cores,
205 )
206 summary = qpg.to_summary(butler, do_store_logs=logs)
208 if view_graph:
209 from lsst.pipe.base.pipeline_graph.visualization import (
210 QuantumProvenanceGraphStatusAnnotator,
211 QuantumProvenanceGraphStatusOptions,
212 show,
213 )
215 # Use any of the quantum graphs to get the `PipelineGraph`
216 # representation of the pipeline.
217 qgraph = QuantumGraph.loadUri(qgraph_uris[0])
218 pipeline_graph = qgraph.pipeline_graph
220 # Annotate the pipeline graph with the status information from the
221 # quantum provenance graph summary.
222 status_annotator = QuantumProvenanceGraphStatusAnnotator(summary)
223 status_options = QuantumProvenanceGraphStatusOptions(
224 display_percent=True, display_counts=True, abbreviate=True, visualize=True
225 )
227 show(
228 pipeline_graph,
229 dataset_types=True,
230 status_annotator=status_annotator,
231 status_options=status_options,
232 )
233 else:
234 print_summary(summary, full_output_filename, brief)
237def aggregate_reports(
238 filenames: Sequence[str], full_output_filename: str | None, brief: bool = False
239) -> None:
240 """Aggregrate multiple
241 `lsst.pipe.base.quantum_provenance_graph.QuantumProvenanceGraph` summaries
242 on separate dataquery-identified groups into one wholistic report.
244 This is intended for reports over the same tasks in the same pipeline,
245 after ``pipetask report`` has been resolved over all graphs associated with
246 each group.
248 Parameters
249 ----------
250 filenames : `~collections.abc.Sequence` [`str`]
251 The paths to the JSON files produced by ``pipetask report`` (note: this
252 is only compatible with the multi-graph or ``--force-v2`` option).
253 These files correspond to the
254 `lsst.pipe.base.quantum_provenance_graph.Summary` objects
255 which are produced for each group.
256 full_output_filename : `str` | `None`
257 The name of the JSON file in which to store the aggregate report, if
258 passed. This is passed to `print_summary` at the end of this function.
259 brief : `bool`, optional
260 Only display short (counts-only) summary on stdout. This includes
261 counts and not error messages or data_ids (similar to BPS report).
262 This option will still report all ``cursed`` datasets and ``wonky``
263 quanta. This is passed to `print_summary` at the end of this function.
264 """
265 summaries: list[Summary] = []
266 for filename in filenames:
267 with open(filename) as f:
268 model = Summary.model_validate_json(f.read())
269 summaries.extend([model])
270 result = Summary.aggregate(summaries)
271 print_summary(result, full_output_filename, brief)
274def print_summary(summary: Summary, full_output_filename: str | None, brief: bool = False) -> None:
275 """Take a `lsst.pipe.base.quantum_provenance_graph.Summary` object and
276 write it to a file and/or the screen.
278 Parameters
279 ----------
280 summary : `lsst.pipe.base.quantum_provenance_graph.Summary`
281 This `Pydantic` model contains all the information derived from the
282 `lsst.pipe.base.quantum_provenance_graphQuantumProvenanceGraph`.
283 full_output_filename : `str` | `None`
284 Name of the JSON file in which to store summary information, if
285 passed.
286 brief : `bool`
287 Only display short (counts-only) summary on stdout. This includes
288 counts and not error messages or data_ids (similar to BPS report).
289 Ignored (considered `False`) if ``full_output_filename`` is passed.
290 """
291 summary.pprint(brief=(brief or bool(full_output_filename)))
292 if full_output_filename:
293 with open(full_output_filename, "w") as stream:
294 stream.write(summary.model_dump_json(indent=2))