Coverage for python / lsst / ctrl / mpexec / cli / script / report.py: 15%

62 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-18 08:45 +0000

1# This file is part of ctrl_mpexec. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27import pprint 

28from collections.abc import Sequence 

29from typing import Literal 

30 

31from astropy.table import Table 

32 

33from lsst.daf.butler import Butler 

34from lsst.pipe.base import QuantumGraph 

35from lsst.pipe.base.execution_reports import QuantumGraphExecutionReport 

36from lsst.pipe.base.quantum_provenance_graph import QuantumProvenanceGraph, Summary 

37 

38 

39def report( 

40 butler_config: str, 

41 qgraph_uri: str, 

42 full_output_filename: str | None, 

43 logs: bool = True, 

44 brief: bool = False, 

45) -> None: 

46 """Summarize the produced, missing and expected quanta and 

47 datasets belonging to an executed quantum graph using the 

48 `lsst.pipe.base.execution_reports.QuantumGraphExecutionReport`. 

49 

50 Parameters 

51 ---------- 

52 butler_config : `str` 

53 The Butler used for this report. This should match the Butler used for 

54 the run associated with the executed quantum graph. 

55 qgraph_uri : `str` 

56 The uri of the location of said quantum graph. 

57 full_output_filename : `str` 

58 Output the full summary report to a yaml file (named herein). Each data 

59 id and error message is keyed to a quantum graph node id. A convenient 

60 output format for error-matching and cataloguing tools such as the ones 

61 in the Campaign Management database. If this is not included, quanta 

62 and dataset information will be printed to the command-line instead. 

63 logs : `bool` 

64 Get butler log datasets for extra information (error messages). 

65 brief : `bool` 

66 List only the counts (or data_ids if number of failures < 5). This 

67 option is good for those who just want to see totals. 

68 """ 

69 qgraph = QuantumGraph.loadUri(qgraph_uri) 

70 with Butler.from_config(butler_config, writeable=False) as butler: 

71 report = QuantumGraphExecutionReport.make_reports(butler, qgraph) 

72 if not full_output_filename: 

73 # this is the option to print to the command-line 

74 summary_dict = report.to_summary_dict(butler, logs, human_readable=True) 

75 dataset_table_rows = [] 

76 data_products = [] 

77 quanta_summary = [] 

78 error_summary = [] 

79 for task in summary_dict.keys(): 

80 for data_product in summary_dict[task]["outputs"]: 

81 dataset_table_rows.append(summary_dict[task]["outputs"][data_product]) 

82 data_products.append(data_product) 

83 

84 if len(summary_dict[task]["failed_quanta"]) > 5: 

85 quanta_summary.append( 

86 { 

87 "Task": task, 

88 "Failed": len(summary_dict[task]["failed_quanta"]), 

89 "Blocked": summary_dict[task]["n_quanta_blocked"], 

90 "Succeeded": summary_dict[task]["n_succeeded"], 

91 "Expected": summary_dict[task]["n_expected"], 

92 } 

93 ) 

94 else: 

95 quanta_summary.append( 

96 { 

97 "Task": task, 

98 "Failed": summary_dict[task]["failed_quanta"], 

99 "Blocked": summary_dict[task]["n_quanta_blocked"], 

100 "Succeeded": summary_dict[task]["n_succeeded"], 

101 "Expected": summary_dict[task]["n_expected"], 

102 } 

103 ) 

104 if "errors" in summary_dict[task].keys(): 

105 error_summary.append({task: summary_dict[task]["errors"]}) 

106 quanta = Table(quanta_summary) 

107 datasets = Table(dataset_table_rows) 

108 datasets.add_column(data_products, index=0, name="DatasetType") 

109 quanta.pprint_all() 

110 print("\n") 

111 if not brief: 

112 pprint.pprint(error_summary) 

113 print("\n") 

114 datasets.pprint_all() 

115 else: 

116 report.write_summary_yaml(butler, full_output_filename, do_store_logs=logs) 

117 

118 

119def report_v2( 

120 butler_config: str, 

121 qgraph_uris: Sequence[str], 

122 collections: Sequence[str] | None, 

123 where: str, 

124 full_output_filename: str | None, 

125 logs: bool = True, 

126 brief: bool = False, 

127 curse_failed_logs: bool = False, 

128 read_caveats: Literal["lazy", "exhaustive"] | None = "lazy", 

129 use_qbb: bool = True, 

130 n_cores: int = 1, 

131 view_graph: bool = False, 

132) -> None: 

133 """Summarize the state of executed quantum graph(s), with counts of failed, 

134 successful and expected quanta, as well as counts of output datasets and 

135 their visible/shadowed states. Analyze one or more attempts at the same 

136 processing on the same dataquery-identified "group" and resolve recoveries 

137 and persistent failures. Identify mismatch errors between groups. 

138 

139 Parameters 

140 ---------- 

141 butler_config : `str` 

142 The Butler used for this report. This should match the Butler used for 

143 the run associated with the executed quantum graph. 

144 qgraph_uris : `~collections.abc.Sequence` [`str`] 

145 One or more uris to the serialized Quantum Graph(s). 

146 collections : `~collections.abc.Sequence` [`str`] | `None` 

147 Collection(s) associated with said graphs/processing. For use in 

148 `lsst.daf.butler.Registry.queryDatasets` if paring down the query would 

149 be useful. 

150 where : `str` 

151 A "where" string to use to constrain the collections, if passed. 

152 full_output_filename : `str` 

153 Output the full pydantic model 

154 `lsst.pipe.base.quantum_provenance_graph.Summary` object 

155 into a JSON file. This is ideal for error-matching and cataloguing 

156 tools such as the ones used by Campaign Management software and pilots, 

157 and for searching and counting specific kinds or instances of failures. 

158 This option will also print a "brief" (counts-only) summary to stdout. 

159 logs : `bool` 

160 Store error messages from Butler logs associated with failed quanta if 

161 `True`. 

162 brief : `bool` 

163 Only display short (counts-only) summary on stdout. This includes 

164 counts and not error messages or data_ids (similar to BPS report). This 

165 option will still report all `cursed` datasets and `wonky` quanta. 

166 curse_failed_logs : `bool` 

167 Mark log datasets as `cursed` if they are published in the final output 

168 collection. Note that a campaign-level collection must be used here for 

169 `collections` if `curse_failed_logs` is `True`; if 

170 `lsst.pipe.base.QuantumProvenanceGraph.__resolve_duplicates` is run on 

171 a list of group-level collections, then each will only show log 

172 datasets from their own failures as visible and datasets from others 

173 will be marked as cursed. 

174 read_caveats : `str`, optional 

175 Whether and how to read success caveats from metadata datasets: 

176 

177 - "exhaustive": read all metadata datasets; 

178 - "lazy": read metadata datasets only for quanta that had predicted 

179 outputs that were not produced (will not pick up exceptions raised 

180 after all datasets were written); 

181 - `None`: do not read metadata datasets at all. 

182 use_qbb : `bool`, optional 

183 Whether to use a quantum-backed butler for metadata and log reads. 

184 This should reduce the number of database operations. 

185 n_cores : `int`, optional 

186 Number of cores for metadata and log reads. 

187 view_graph : `bool` 

188 Display a graph representation of 

189 `lsst.pipe.base.quantum_provenance_graph.Summary` on 

190 stdout instead of the default plain-text summary. Pipeline graph nodes 

191 are then annotated with their status. This is a useful way to visualize 

192 the flow of quanta and datasets through the graph and to identify where 

193 problems may be occurring. 

194 """ 

195 with Butler.from_config(butler_config, writeable=False) as butler: 

196 qpg = QuantumProvenanceGraph( 

197 butler, 

198 qgraph_uris, 

199 collections=collections, 

200 where=where, 

201 curse_failed_logs=curse_failed_logs, 

202 read_caveats=read_caveats, 

203 use_qbb=use_qbb, 

204 n_cores=n_cores, 

205 ) 

206 summary = qpg.to_summary(butler, do_store_logs=logs) 

207 

208 if view_graph: 

209 from lsst.pipe.base.pipeline_graph.visualization import ( 

210 QuantumProvenanceGraphStatusAnnotator, 

211 QuantumProvenanceGraphStatusOptions, 

212 show, 

213 ) 

214 

215 # Use any of the quantum graphs to get the `PipelineGraph` 

216 # representation of the pipeline. 

217 qgraph = QuantumGraph.loadUri(qgraph_uris[0]) 

218 pipeline_graph = qgraph.pipeline_graph 

219 

220 # Annotate the pipeline graph with the status information from the 

221 # quantum provenance graph summary. 

222 status_annotator = QuantumProvenanceGraphStatusAnnotator(summary) 

223 status_options = QuantumProvenanceGraphStatusOptions( 

224 display_percent=True, display_counts=True, abbreviate=True, visualize=True 

225 ) 

226 

227 show( 

228 pipeline_graph, 

229 dataset_types=True, 

230 status_annotator=status_annotator, 

231 status_options=status_options, 

232 ) 

233 else: 

234 print_summary(summary, full_output_filename, brief) 

235 

236 

237def aggregate_reports( 

238 filenames: Sequence[str], full_output_filename: str | None, brief: bool = False 

239) -> None: 

240 """Aggregrate multiple 

241 `lsst.pipe.base.quantum_provenance_graph.QuantumProvenanceGraph` summaries 

242 on separate dataquery-identified groups into one wholistic report. 

243 

244 This is intended for reports over the same tasks in the same pipeline, 

245 after ``pipetask report`` has been resolved over all graphs associated with 

246 each group. 

247 

248 Parameters 

249 ---------- 

250 filenames : `~collections.abc.Sequence` [`str`] 

251 The paths to the JSON files produced by ``pipetask report`` (note: this 

252 is only compatible with the multi-graph or ``--force-v2`` option). 

253 These files correspond to the 

254 `lsst.pipe.base.quantum_provenance_graph.Summary` objects 

255 which are produced for each group. 

256 full_output_filename : `str` | `None` 

257 The name of the JSON file in which to store the aggregate report, if 

258 passed. This is passed to `print_summary` at the end of this function. 

259 brief : `bool`, optional 

260 Only display short (counts-only) summary on stdout. This includes 

261 counts and not error messages or data_ids (similar to BPS report). 

262 This option will still report all ``cursed`` datasets and ``wonky`` 

263 quanta. This is passed to `print_summary` at the end of this function. 

264 """ 

265 summaries: list[Summary] = [] 

266 for filename in filenames: 

267 with open(filename) as f: 

268 model = Summary.model_validate_json(f.read()) 

269 summaries.extend([model]) 

270 result = Summary.aggregate(summaries) 

271 print_summary(result, full_output_filename, brief) 

272 

273 

274def print_summary(summary: Summary, full_output_filename: str | None, brief: bool = False) -> None: 

275 """Take a `lsst.pipe.base.quantum_provenance_graph.Summary` object and 

276 write it to a file and/or the screen. 

277 

278 Parameters 

279 ---------- 

280 summary : `lsst.pipe.base.quantum_provenance_graph.Summary` 

281 This `Pydantic` model contains all the information derived from the 

282 `lsst.pipe.base.quantum_provenance_graphQuantumProvenanceGraph`. 

283 full_output_filename : `str` | `None` 

284 Name of the JSON file in which to store summary information, if 

285 passed. 

286 brief : `bool` 

287 Only display short (counts-only) summary on stdout. This includes 

288 counts and not error messages or data_ids (similar to BPS report). 

289 Ignored (considered `False`) if ``full_output_filename`` is passed. 

290 """ 

291 summary.pprint(brief=(brief or bool(full_output_filename))) 

292 if full_output_filename: 

293 with open(full_output_filename, "w") as stream: 

294 stream.write(summary.model_dump_json(indent=2))