Coverage for python / lsst / pipe / base / execution_reports.py: 28%

139 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-17 08:59 +0000

1# This file is part of pipe_base. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <http://www.gnu.org/licenses/>. 

21from __future__ import annotations 

22 

23__all__ = ( 

24 "DatasetTypeExecutionReport", 

25 "QuantumGraphExecutionReport", 

26 "TaskExecutionReport", 

27 "lookup_quantum_data_id", 

28) 

29 

30import dataclasses 

31import itertools 

32import logging 

33import uuid 

34from collections.abc import Iterable, Mapping 

35from typing import Any 

36 

37import networkx 

38import yaml 

39 

40from lsst.daf.butler import Butler, DataCoordinate, DatasetRef, Quantum 

41from lsst.resources import ResourcePathExpression 

42 

43from .graph import QuantumGraph 

44 

45 

46@dataclasses.dataclass 

47class DatasetTypeExecutionReport: 

48 """A report on the number of produced datasets as well as the status of 

49 missing datasets based on metadata. 

50 

51 A `DatasetTypeExecutionReport` is created for each 

52 `~lsst.daf.butler.DatasetType` in a `TaskExecutionReport`. 

53 """ 

54 

55 failed: set[DatasetRef] = dataclasses.field(default_factory=set) 

56 """Datasets not produced because their quanta failed directly in this 

57 run (`set`). 

58 """ 

59 

60 not_produced: set[DatasetRef] = dataclasses.field(default_factory=set) 

61 """Missing datasets which were not produced by successful quanta. 

62 """ 

63 

64 blocked: set[DatasetRef] = dataclasses.field(default_factory=set) 

65 """Datasets not produced due to an upstream failure (`set`). 

66 """ 

67 

68 n_produced: int = 0 

69 """Count of datasets produced (`int`). 

70 """ 

71 

72 n_expected: int = 0 

73 """Count of datasets expected (`int`) 

74 """ 

75 

76 def to_summary_dict(self) -> dict[str, Any]: 

77 r"""Summarize the DatasetTypeExecutionReport in a dictionary. 

78 

79 Returns 

80 ------- 

81 summary_dict : `dict` 

82 A count of the datasets with each outcome; the number of 

83 produced, ``failed``, ``not_produced``, and ``blocked`` 

84 `~lsst.daf.butler.DatasetType`\ s. 

85 See above for attribute descriptions. 

86 """ 

87 return { 

88 "produced": self.n_produced, 

89 "failed": len(self.failed), 

90 "not_produced": len(self.not_produced), 

91 "blocked": len(self.blocked), 

92 "expected": self.n_expected, 

93 } 

94 

95 

96@dataclasses.dataclass 

97class TaskExecutionReport: 

98 """A report on the status and content of a task in an executed quantum 

99 graph. 

100 

101 Use task metadata to identify and inspect failures and report on output 

102 datasets. 

103 

104 See Also 

105 -------- 

106 QuantumGraphExecutionReport : Quantum graph report. 

107 DatasetTypeExecutionReport : DatasetType report. 

108 """ 

109 

110 failed: dict[uuid.UUID, DatasetRef] = dataclasses.field(default_factory=dict) 

111 """A mapping from quantum data ID to log dataset reference for quanta that 

112 failed directly in this run (`dict`). 

113 """ 

114 

115 n_succeeded: int = 0 

116 """A count of successful quanta. 

117 

118 This may include quanta that did not produce any datasets; ie, raised 

119 `NoWorkFound`. 

120 """ 

121 

122 n_expected: int = 0 

123 """A count of expected quanta. 

124 """ 

125 

126 blocked: dict[uuid.UUID, DataCoordinate] = dataclasses.field(default_factory=dict) 

127 """A mapping of data IDs of quanta that were not attempted due to an 

128 upstream failure (`dict`). 

129 """ 

130 

131 output_datasets: dict[str, DatasetTypeExecutionReport] = dataclasses.field(default_factory=dict) 

132 """Missing and produced outputs of each `~lsst.daf.butler.DatasetType` 

133 (`dict`). 

134 """ 

135 

136 def inspect_quantum( 

137 self, 

138 quantum_id: uuid.UUID, 

139 quantum: Quantum, 

140 status_graph: networkx.DiGraph, 

141 refs: Mapping[str, Mapping[uuid.UUID, DatasetRef]], 

142 metadata_name: str, 

143 log_name: str, 

144 ) -> None: 

145 """Inspect a quantum of a quantum graph and ascertain the status of 

146 each associated data product. 

147 

148 Parameters 

149 ---------- 

150 quantum_id : `uuid.UUID` 

151 Unique identifier for the quantum to inspect. 

152 quantum : `Quantum` 

153 The specific node of the quantum graph to be inspected. 

154 status_graph : `networkx.DiGraph` 

155 The quantum graph produced by 

156 `QuantumGraphExecutionReport.make_reports` which steps through the 

157 quantum graph of a run and logs the status of each quantum. 

158 refs : `~collections.abc.Mapping` [ `str`,\ 

159 `~collections.abc.Mapping` [ `uuid.UUID`,\ 

160 `~lsst.daf.butler.DatasetRef` ] ] 

161 The DatasetRefs of each of the DatasetTypes produced by the task. 

162 Includes initialization, intermediate and output data products. 

163 metadata_name : `str` 

164 The metadata dataset name for the node. 

165 log_name : `str` 

166 The name of the log files for the node. 

167 

168 See Also 

169 -------- 

170 QuantumGraphExecutionReport.make_reports : Make reports. 

171 """ 

172 (metadata_ref,) = quantum.outputs[metadata_name] 

173 (log_ref,) = quantum.outputs[log_name] 

174 blocked = False 

175 if metadata_ref.id not in refs[metadata_name]: 

176 if any( 

177 status_graph.nodes[upstream_quantum_id]["failed"] 

178 for upstream_dataset_id in status_graph.predecessors(quantum_id) 

179 for upstream_quantum_id in status_graph.predecessors(upstream_dataset_id) 

180 ): 

181 assert quantum.dataId is not None 

182 self.blocked[quantum_id] = quantum.dataId 

183 blocked = True 

184 else: 

185 self.failed[quantum_id] = log_ref 

186 # note: log_ref may or may not actually exist 

187 failed = True 

188 else: 

189 failed = False 

190 self.n_succeeded += 1 

191 status_graph.nodes[quantum_id]["failed"] = failed 

192 

193 # Now, loop over the datasets to make a DatasetTypeExecutionReport. 

194 for output_ref in itertools.chain.from_iterable(quantum.outputs.values()): 

195 if output_ref == metadata_ref or output_ref == log_ref: 

196 continue 

197 if (dataset_type_report := self.output_datasets.get(output_ref.datasetType.name)) is None: 

198 dataset_type_report = DatasetTypeExecutionReport() 

199 self.output_datasets[output_ref.datasetType.name] = dataset_type_report 

200 if output_ref.id not in refs[output_ref.datasetType.name]: 

201 if failed: 

202 if blocked: 

203 dataset_type_report.blocked.add(output_ref) 

204 else: 

205 dataset_type_report.failed.add(output_ref) 

206 else: 

207 dataset_type_report.not_produced.add(output_ref) 

208 else: 

209 dataset_type_report.n_produced += 1 

210 dataset_type_report.n_expected += 1 

211 

212 def to_summary_dict( 

213 self, butler: Butler, do_store_logs: bool = True, human_readable: bool = False 

214 ) -> dict[str, Any]: 

215 """Summarize the results of the TaskExecutionReport in a dictionary. 

216 

217 Parameters 

218 ---------- 

219 butler : `lsst.daf.butler.Butler` 

220 The Butler used for this report. 

221 do_store_logs : `bool` 

222 Store the logs in the summary dictionary. 

223 human_readable : `bool` 

224 Store more human-readable information to be printed out to the 

225 command-line. 

226 

227 Returns 

228 ------- 

229 summary_dict : `dict` 

230 A dictionary containing: 

231 

232 - outputs: A dictionary summarizing the 

233 DatasetTypeExecutionReport for each DatasetType associated with 

234 the task 

235 - failed_quanta: A dictionary of quanta which failed and their 

236 dataIDs by quantum graph node id 

237 - n_quanta_blocked: The number of quanta which failed due to 

238 upstream failures. 

239 - n_succeded: The number of quanta which succeeded. 

240 

241 And possibly, if human-readable is passed: 

242 

243 - errors: A dictionary of data ids associated with each error 

244 message. If `human-readable` and `do_store_logs`, this is stored 

245 here. Otherwise, if `do_store_logs`, it is stored in 

246 `failed_quanta` keyed by the quantum graph node id. 

247 """ 

248 failed_quanta = {} 

249 failed_data_ids = [] 

250 errors = [] 

251 for node_id, log_ref in self.failed.items(): 

252 data_id = dict(log_ref.dataId.required) 

253 quantum_info: dict[str, Any] = {"data_id": data_id} 

254 if do_store_logs: 

255 try: 

256 log = butler.get(log_ref) 

257 except LookupError: 

258 quantum_info["error"] = [] 

259 except FileNotFoundError: 

260 quantum_info["error"] = None 

261 else: 

262 quantum_info["error"] = [ 

263 record.message for record in log if record.levelno >= logging.ERROR 

264 ] 

265 if human_readable: 

266 failed_data_ids.append(data_id) 

267 if do_store_logs: 

268 errors.append(quantum_info) 

269 

270 else: 

271 failed_quanta[str(node_id)] = quantum_info 

272 result = { 

273 "outputs": {name: r.to_summary_dict() for name, r in self.output_datasets.items()}, 

274 "n_quanta_blocked": len(self.blocked), 

275 "n_succeeded": self.n_succeeded, 

276 "n_expected": self.n_expected, 

277 } 

278 if human_readable: 

279 result["failed_quanta"] = failed_data_ids 

280 result["errors"] = errors 

281 else: 

282 result["failed_quanta"] = failed_quanta 

283 return result 

284 

285 def __str__(self) -> str: 

286 """Return a count of the failed and blocked tasks in the 

287 TaskExecutionReport. 

288 """ 

289 return f"failed: {len(self.failed)}\nblocked: {len(self.blocked)}\n" 

290 

291 

292@dataclasses.dataclass 

293class QuantumGraphExecutionReport: 

294 """A report on the execution of a quantum graph. 

295 

296 Report the detailed status of each failure; whether tasks were not run, 

297 data is missing from upstream failures, or specific errors occurred during 

298 task execution (and report the errors). Contains a count of expected, 

299 produced DatasetTypes for each task. This report can be output as a 

300 dictionary or a yaml file. 

301 

302 See Also 

303 -------- 

304 TaskExecutionReport : A task report. 

305 DatasetTypeExecutionReport : A dataset type report. 

306 """ 

307 

308 tasks: dict[str, TaskExecutionReport] = dataclasses.field(default_factory=dict) 

309 """A dictionary of TaskExecutionReports by task label (`dict`).""" 

310 

311 def to_summary_dict( 

312 self, butler: Butler, do_store_logs: bool = True, human_readable: bool = False 

313 ) -> dict[str, Any]: 

314 """Summarize the results of the `QuantumGraphExecutionReport` in a 

315 dictionary. 

316 

317 Parameters 

318 ---------- 

319 butler : `lsst.daf.butler.Butler` 

320 The Butler used for this report. 

321 do_store_logs : `bool` 

322 Store the logs in the summary dictionary. 

323 human_readable : `bool` 

324 Store more human-readable information to be printed out to the 

325 command-line. 

326 

327 Returns 

328 ------- 

329 summary_dict : `dict` 

330 A dictionary containing a summary of a `TaskExecutionReport` for 

331 each task in the quantum graph. 

332 """ 

333 return { 

334 task: report.to_summary_dict(butler, do_store_logs=do_store_logs, human_readable=human_readable) 

335 for task, report in self.tasks.items() 

336 } 

337 

338 def write_summary_yaml(self, butler: Butler, filename: str, do_store_logs: bool = True) -> None: 

339 """Take the dictionary from 

340 `QuantumGraphExecutionReport.to_summary_dict` and store its contents in 

341 a yaml file. 

342 

343 Parameters 

344 ---------- 

345 butler : `lsst.daf.butler.Butler` 

346 The Butler used for this report. 

347 filename : `str` 

348 The name to be used for the summary yaml file. 

349 do_store_logs : `bool` 

350 Store the logs in the summary dictionary. 

351 """ 

352 with open(filename, "w") as stream: 

353 yaml.safe_dump(self.to_summary_dict(butler, do_store_logs=do_store_logs), stream) 

354 

355 @classmethod 

356 def make_reports( 

357 cls, 

358 butler: Butler, 

359 graph: QuantumGraph | ResourcePathExpression, 

360 ) -> QuantumGraphExecutionReport: 

361 """Make a `QuantumGraphExecutionReport`. 

362 

363 Step through the quantum graph associated with a run, creating a 

364 `networkx.DiGraph` called status_graph to annotate the status of each 

365 quantum node. For each task in the quantum graph, use 

366 `TaskExecutionReport.inspect_quantum` to make a `TaskExecutionReport` 

367 based on the status of each node. Return a `TaskExecutionReport` for 

368 each task in the quantum graph. 

369 

370 Parameters 

371 ---------- 

372 butler : `lsst.daf.butler.Butler` 

373 The Butler used for this report. This should match the Butler used 

374 for the run associated with the executed quantum graph. 

375 graph : `QuantumGraph` | `ResourcePathExpression` 

376 Either the associated quantum graph object or the uri of the 

377 location of said quantum graph. 

378 

379 Returns 

380 ------- 

381 report: `QuantumGraphExecutionReport` 

382 The `TaskExecutionReport` for each task in the quantum graph. 

383 """ 

384 refs = {} # type: dict[str, Any] 

385 status_graph = networkx.DiGraph() 

386 if not isinstance(graph, QuantumGraph): 

387 qg = QuantumGraph.loadUri(graph) 

388 else: 

389 qg = graph 

390 assert qg.metadata is not None, "Saved QGs always have metadata." 

391 collection = qg.metadata["output_run"] 

392 report = cls() 

393 for dataset_type_node in qg.pipeline_graph.dataset_types.values(): 

394 if qg.pipeline_graph.producer_of(dataset_type_node.name) is None: 

395 continue 

396 refs[dataset_type_node.name] = { 

397 ref.id: ref 

398 for ref in butler.registry.queryDatasets( 

399 dataset_type_node.name, collections=collection, findFirst=False 

400 ) 

401 } 

402 for task_node in qg.pipeline_graph.tasks.values(): 

403 for quantum_id, quantum in qg.get_task_quanta(task_node.label).items(): 

404 status_graph.add_node(quantum_id) 

405 for ref in itertools.chain.from_iterable(quantum.outputs.values()): 

406 status_graph.add_edge(quantum_id, ref.id) 

407 for ref in itertools.chain.from_iterable(quantum.inputs.values()): 

408 status_graph.add_edge(ref.id, quantum_id) 

409 for task_node in qg.pipeline_graph.tasks.values(): 

410 task_report = TaskExecutionReport() 

411 if task_node.log_output is None: 

412 raise RuntimeError("QG must have log outputs to use execution reports.") 

413 for quantum_id, quantum in qg.get_task_quanta(task_node.label).items(): 

414 task_report.inspect_quantum( 

415 quantum_id, 

416 quantum, 

417 status_graph, 

418 refs, 

419 metadata_name=task_node.metadata_output.dataset_type_name, 

420 log_name=task_node.log_output.dataset_type_name, 

421 ) 

422 task_report.n_expected = len(qg.get_task_quanta(task_node.label).items()) 

423 report.tasks[task_node.label] = task_report 

424 return report 

425 

426 def __str__(self) -> str: 

427 return "\n".join(f"{tasklabel}:{report}" for tasklabel, report in self.tasks.items()) 

428 

429 

430def lookup_quantum_data_id( 

431 graph_uri: ResourcePathExpression, nodes: Iterable[uuid.UUID] 

432) -> list[DataCoordinate | None]: 

433 """Look up a dataId from a quantum graph and a list of quantum graph 

434 nodeIDs. 

435 

436 Parameters 

437 ---------- 

438 graph_uri : `ResourcePathExpression` 

439 URI of the quantum graph of the run. 

440 nodes : `~collections.abc.Iterable` [ `uuid.UUID` ] 

441 Quantum graph nodeID. 

442 

443 Returns 

444 ------- 

445 data_ids : `list` [ `lsst.daf.butler.DataCoordinate` ] 

446 A list of human-readable dataIDs which map to the nodeIDs on the 

447 quantum graph at graph_uri. 

448 """ 

449 qg = QuantumGraph.loadUri(graph_uri, nodes=nodes) 

450 return [qg.getQuantumNodeByNodeId(node).quantum.dataId for node in nodes]