Coverage for tests / test_quantum_provenance_graph.py: 3%

233 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-28 08:44 +0000

1# # This file is part of pipe_base. 

2# # 

3# # Developed for the LSST Data Management System. 

4# # This product includes software developed by the LSST Project 

5# # (http://www.lsst.org). 

6# # See the COPYRIGHT file at the top-level directory of this distribution 

7# # for details of code ownership. 

8# # 

9# # This software is dual licensed under the GNU General Public License and 

10# also 

11# # under a 3-clause BSD license. Recipients may choose which of these licenses 

12# # to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

13# # respectively. If you choose the GPL option then the following text applies 

14# # (but note that there is still no warranty even if you opt for BSD instead): 

15# # 

16# # This program is free software: you can redistribute it and/or modify 

17# # it under the terms of the GNU General Public License as published by 

18# # the Free Software Foundation, either version 3 of the License, or 

19# # (at your option) any later version. 

20# # 

21# # This program is distributed in the hope that it will be useful, 

22# # but WITHOUT ANY WARRANTY; without even the implied warranty of 

23# # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

24# # GNU General Public License for more details. 

25# # 

26# # You should have received a copy of the GNU General Public License 

27# # along with this program. If not, see <http://www.gnu.org/licenses/>. 

28 

29"""Simple unit test for quantum_provenance_graph.""" 

30 

31import unittest 

32import uuid 

33 

34import lsst.utils.logging 

35from lsst.pipe.base.quantum_provenance_graph import ( 

36 CursedDatasetSummary, 

37 DatasetTypeSummary, 

38 ExceptionInfo, 

39 ExceptionInfoSummary, 

40 QuantumProvenanceGraph, 

41 Summary, 

42 TaskSummary, 

43 UnsuccessfulQuantumSummary, 

44) 

45from lsst.pipe.base.tests import simpleQGraph 

46from lsst.utils.tests import temporaryDirectory 

47 

48expected_mock_datasets = [ 

49 "add_dataset1", 

50 "add2_dataset1", 

51 "task0_metadata", 

52 "task0_log", 

53 "add_dataset2", 

54 "add2_dataset2", 

55 "task1_metadata", 

56 "task1_log", 

57 "add_dataset3", 

58 "add2_dataset3", 

59 "task2_metadata", 

60 "task2_log", 

61 "add_dataset4", 

62 "add2_dataset4", 

63 "task3_metadata", 

64 "task3_log", 

65 "add_dataset5", 

66 "add2_dataset5", 

67 "task4_metadata", 

68 "task4_log", 

69] 

70 

71 

72class QuantumProvenanceGraphTestCase(unittest.TestCase): 

73 """Test reports from the QuantumProvenanceGraph. 

74 

75 Verify that the `QuantumProvenanceGraph` is able to extract correct 

76 information from `simpleQgraph`. 

77 

78 More tests are in lsst/ci_middleware/tests/test_prod_outputs.py and 

79 lsst/ci_middleware/tests/test_rc2_outputs.py 

80 """ 

81 

82 def test_qpg_reports(self) -> None: 

83 """Test that we can add a new graph to the 

84 `QuantumProvenanceGraph`. 

85 """ 

86 with temporaryDirectory() as root: 

87 # make a simple qgraph to make an execution report on 

88 butler, qgraph = simpleQGraph.makeSimpleQGraph(root=root) 

89 qpg = QuantumProvenanceGraph(butler, [qgraph]) 

90 summary = qpg.to_summary(butler) 

91 

92 for task_summary in summary.tasks.values(): 

93 # We know that we have one expected task that was not run. 

94 # As such, the following dictionary should describe all of 

95 # the mock tasks. 

96 self.assertEqual(task_summary.n_successful, 0) 

97 self.assertEqual(task_summary.n_blocked, 0) 

98 self.assertEqual(task_summary.n_unknown, 1) 

99 self.assertEqual(task_summary.n_expected, 1) 

100 self.assertListEqual(task_summary.failed_quanta, []) 

101 self.assertListEqual(task_summary.recovered_quanta, []) 

102 self.assertListEqual(task_summary.wonky_quanta, []) 

103 self.assertDictEqual(task_summary.exceptions, {}) 

104 self.assertEqual(task_summary.n_wonky, 0) 

105 self.assertEqual(task_summary.n_failed, 0) 

106 

107 for dataset_type_name, dataset_type_summary in summary.datasets.items(): 

108 self.assertListEqual( 

109 dataset_type_summary.unsuccessful_datasets, 

110 [{"instrument": "INSTR", "detector": 0}], 

111 ) 

112 # Check dataset counts (can't be done all in one because 

113 # datasets have different producers), but all the counts for 

114 # each task should be the same. 

115 self.assertEqual(dataset_type_summary.n_visible, 0) 

116 self.assertEqual(dataset_type_summary.n_shadowed, 0) 

117 self.assertEqual(dataset_type_summary.n_predicted_only, 0) 

118 self.assertEqual(dataset_type_summary.n_expected, 1) 

119 self.assertEqual(dataset_type_summary.n_cursed, 0) 

120 self.assertEqual(dataset_type_summary.n_unsuccessful, 1) 

121 # Make sure the cursed dataset is an empty list 

122 self.assertListEqual(dataset_type_summary.cursed_datasets, []) 

123 # Make sure we have the right datasets based on our mock 

124 self.assertIn(dataset_type_name, expected_mock_datasets) 

125 # Make sure the expected datasets were produced by the expected 

126 # tasks 

127 match dataset_type_name: 

128 case name if name in ["add_dataset1", "add2_dataset1", "task0_metadata", "task0_log"]: 

129 self.assertEqual(dataset_type_summary.producer, "task0") 

130 case name if name in ["add_dataset2", "add2_dataset2", "task1_metadata", "task1_log"]: 

131 self.assertEqual(dataset_type_summary.producer, "task1") 

132 case name if name in ["add_dataset3", "add2_dataset3", "task2_metadata", "task2_log"]: 

133 self.assertEqual(dataset_type_summary.producer, "task2") 

134 case name if name in ["add_dataset4", "add2_dataset4", "task3_metadata", "task3_log"]: 

135 self.assertEqual(dataset_type_summary.producer, "task3") 

136 case name if name in ["add_dataset5", "add2_dataset5", "task4_metadata", "task4_log"]: 

137 self.assertEqual(dataset_type_summary.producer, "task4") 

138 

139 def test_aggregate_reports(self) -> None: 

140 """Test aggregating reports from the `QuantumProvenanceGraph.`""" 

141 with temporaryDirectory() as root: 

142 # make a simple qgraph to make an execution report on 

143 butler, qgraph = simpleQGraph.makeSimpleQGraph(root=root) 

144 qpg = QuantumProvenanceGraph(butler, [qgraph]) 

145 summary = qpg.to_summary(butler) 

146 # Check that aggregating one summary only does not cause an error 

147 one_graph_only_sum = Summary.aggregate([summary]) 

148 

149 # Do the same tests as in `test_qpg_reports`, but on the 

150 # 'aggregate' summary. Essentially, verify that the information in 

151 # the report is preserved during the aggregation step. 

152 for task_summary in one_graph_only_sum.tasks.values(): 

153 self.assertEqual(task_summary.n_successful, 0) 

154 self.assertEqual(task_summary.n_blocked, 0) 

155 self.assertEqual(task_summary.n_unknown, 1) 

156 self.assertEqual(task_summary.n_expected, 1) 

157 self.assertListEqual(task_summary.failed_quanta, []) 

158 self.assertListEqual(task_summary.recovered_quanta, []) 

159 self.assertListEqual(task_summary.wonky_quanta, []) 

160 self.assertDictEqual(task_summary.exceptions, {}) 

161 self.assertEqual(task_summary.n_wonky, 0) 

162 self.assertEqual(task_summary.n_failed, 0) 

163 for dataset_type_name, dataset_type_summary in one_graph_only_sum.datasets.items(): 

164 self.assertListEqual( 

165 dataset_type_summary.unsuccessful_datasets, [{"instrument": "INSTR", "detector": 0}] 

166 ) 

167 self.assertEqual(dataset_type_summary.n_visible, 0) 

168 self.assertEqual(dataset_type_summary.n_shadowed, 0) 

169 self.assertEqual(dataset_type_summary.n_predicted_only, 0) 

170 self.assertEqual(dataset_type_summary.n_expected, 1) 

171 self.assertEqual(dataset_type_summary.n_cursed, 0) 

172 self.assertEqual(dataset_type_summary.n_unsuccessful, 1) 

173 self.assertListEqual(dataset_type_summary.cursed_datasets, []) 

174 self.assertIn(dataset_type_name, expected_mock_datasets) 

175 match dataset_type_name: 

176 case name if name in ["add_dataset1", "add2_dataset1", "task0_metadata", "task0_log"]: 

177 self.assertEqual(dataset_type_summary.producer, "task0") 

178 case name if name in ["add_dataset2", "add2_dataset2", "task1_metadata", "task1_log"]: 

179 self.assertEqual(dataset_type_summary.producer, "task1") 

180 case name if name in ["add_dataset3", "add2_dataset3", "task2_metadata", "task2_log"]: 

181 self.assertEqual(dataset_type_summary.producer, "task2") 

182 case name if name in ["add_dataset4", "add2_dataset4", "task3_metadata", "task3_log"]: 

183 self.assertEqual(dataset_type_summary.producer, "task3") 

184 case name if name in ["add_dataset5", "add2_dataset5", "task4_metadata", "task4_log"]: 

185 self.assertEqual(dataset_type_summary.producer, "task4") 

186 

187 # Now we test aggregating multiple summaries. First, we try 

188 # aggregating with an exact copy and make sure we just have double 

189 # the numbers. 

190 summary2 = summary.model_copy(deep=True) 

191 two_identical_graph_sum = Summary.aggregate([summary, summary2]) 

192 for task_summary in two_identical_graph_sum.tasks.values(): 

193 self.assertEqual(task_summary.n_successful, 0) 

194 self.assertEqual(task_summary.n_blocked, 0) 

195 self.assertEqual(task_summary.n_unknown, 2) 

196 self.assertEqual(task_summary.n_expected, 2) 

197 self.assertListEqual(task_summary.failed_quanta, []) 

198 self.assertListEqual(task_summary.recovered_quanta, []) 

199 self.assertListEqual(task_summary.wonky_quanta, []) 

200 self.assertDictEqual(task_summary.exceptions, {}) 

201 self.assertEqual(task_summary.n_wonky, 0) 

202 self.assertEqual(task_summary.n_failed, 0) 

203 for dataset_type_name, dataset_type_summary in two_identical_graph_sum.datasets.items(): 

204 self.assertListEqual( 

205 dataset_type_summary.unsuccessful_datasets, 

206 [{"instrument": "INSTR", "detector": 0}, {"instrument": "INSTR", "detector": 0}], 

207 ) 

208 self.assertEqual(dataset_type_summary.n_visible, 0) 

209 self.assertEqual(dataset_type_summary.n_shadowed, 0) 

210 self.assertEqual(dataset_type_summary.n_predicted_only, 0) 

211 self.assertEqual(dataset_type_summary.n_expected, 2) 

212 self.assertEqual(dataset_type_summary.n_cursed, 0) 

213 self.assertEqual(dataset_type_summary.n_unsuccessful, 2) 

214 self.assertListEqual(dataset_type_summary.cursed_datasets, []) 

215 self.assertIn(dataset_type_name, expected_mock_datasets) 

216 match dataset_type_name: 

217 case name if name in ["add_dataset1", "add2_dataset1", "task0_metadata", "task0_log"]: 

218 self.assertEqual(dataset_type_summary.producer, "task0") 

219 case name if name in ["add_dataset2", "add2_dataset2", "task1_metadata", "task1_log"]: 

220 self.assertEqual(dataset_type_summary.producer, "task1") 

221 case name if name in ["add_dataset3", "add2_dataset3", "task2_metadata", "task2_log"]: 

222 self.assertEqual(dataset_type_summary.producer, "task2") 

223 case name if name in ["add_dataset4", "add2_dataset4", "task3_metadata", "task3_log"]: 

224 self.assertEqual(dataset_type_summary.producer, "task3") 

225 case name if name in ["add_dataset5", "add2_dataset5", "task4_metadata", "task4_log"]: 

226 self.assertEqual(dataset_type_summary.producer, "task4") 

227 

228 # Let's see if we can change lots of counts and info for a task 

229 # which exists in summary 1 and append to the overall summary 

230 # effectively. This summary has a lot of valid variations. 

231 uuid_a = uuid.uuid4() 

232 summary3 = summary.model_copy(deep=True) 

233 summary3.tasks["task4"] = TaskSummary.model_validate( 

234 { 

235 "n_successful": 10, 

236 "n_blocked": 20, 

237 "n_unknown": 4, 

238 "n_expected": 47, 

239 "failed_quanta": [ 

240 { 

241 "data_id": {"instrument": "INSTR", "detector": 1}, 

242 "runs": {"run1": "failed"}, 

243 "messages": ["Error on detector 1", "Second error on detector 1"], 

244 }, 

245 { 

246 "data_id": {"instrument": "INSTR", "detector": 2}, 

247 "runs": {"run1": "failed", "run2": "failed"}, 

248 "messages": [], 

249 }, 

250 { 

251 "data_id": {"instrument": "INSTR", "detector": 3}, 

252 "runs": {"run1": "failed"}, 

253 "messages": ["Error on detector 3"], 

254 }, 

255 ], 

256 "recovered_quanta": [ 

257 {"instrument": "INSTR", "detector": 4}, 

258 {"instrument": "INSTR", "detector": 5}, 

259 {"instrument": "INSTR", "detector": 6}, 

260 ], 

261 "wonky_quanta": [ 

262 { 

263 "data_id": {"instrument": "INSTR", "detector": 7}, 

264 "runs": {"run1": "successful", "run2": "failed"}, 

265 "messages": ["This one is wonky because it moved from successful to failed."], 

266 } 

267 ], 

268 "caveats": { 

269 "+A": [{"instrument": "INSTR", "detector": 10}], 

270 }, 

271 "exceptions": { 

272 "lsst.pipe.base.tests.mocks.MockAlgorithmError": [ 

273 { 

274 "quantum_id": str(uuid_a), 

275 "data_id": {"instrument": "INSTR", "detector": 10}, 

276 "run": "run2", 

277 "exception": { 

278 "type_name": "lsst.pipe.base.tests.mocks.MockAlgorithmError", 

279 "message": "message A", 

280 "metadata": {"badness": 11}, 

281 }, 

282 } 

283 ], 

284 }, 

285 "n_wonky": 1, 

286 "n_failed": 3, 

287 } 

288 ) 

289 summary3.datasets["add_dataset5"] = DatasetTypeSummary.model_validate( 

290 { 

291 "producer": "task4", 

292 "n_visible": 0, 

293 "n_shadowed": 0, 

294 "n_predicted_only": 0, 

295 "n_expected": 47, 

296 "cursed_datasets": [ 

297 { 

298 "producer_data_id": {"instrument": "INSTR", "detector": 7}, 

299 "data_id": {"instrument": "INSTR", "detector": 7}, 

300 "runs_produced": {"run1": True, "run2": False}, 

301 "run_visible": None, 

302 "messages": ["Some kind of cursed dataset."], 

303 } 

304 ], 

305 "unsuccessful_datasets": [ 

306 {"instrument": "INSTR", "detector": 0}, 

307 {"instrument": "INSTR", "detector": 1}, 

308 {"instrument": "INSTR", "detector": 2}, 

309 {"instrument": "INSTR", "detector": 3}, 

310 ], 

311 } 

312 ) 

313 summary3.datasets["add2_dataset5"] = DatasetTypeSummary.model_validate( 

314 { 

315 "producer": "task4", 

316 "n_visible": 0, 

317 "n_shadowed": 0, 

318 "n_predicted_only": 0, 

319 "n_expected": 47, 

320 "cursed_datasets": [ 

321 { 

322 "producer_data_id": {"instrument": "INSTR", "detector": 7}, 

323 "data_id": {"instrument": "INSTR", "detector": 7}, 

324 "runs_produced": {"run1": True, "run2": False}, 

325 "run_visible": None, 

326 "messages": ["Some kind of cursed dataset."], 

327 } 

328 ], 

329 "unsuccessful_datasets": [ 

330 {"instrument": "INSTR", "detector": 0}, 

331 {"instrument": "INSTR", "detector": 1}, 

332 {"instrument": "INSTR", "detector": 2}, 

333 {"instrument": "INSTR", "detector": 3}, 

334 ], 

335 } 

336 ) 

337 # Test that aggregate with this file works 

338 two_graphs_different_numbers = Summary.aggregate([summary, summary3]) 

339 for task_label, task_summary in two_graphs_different_numbers.tasks.items(): 

340 if task_label == "task4": 

341 self.assertEqual(task_summary.n_successful, 10) 

342 self.assertEqual(task_summary.n_blocked, 20) 

343 self.assertEqual(task_summary.n_unknown, 5) 

344 self.assertEqual(task_summary.n_expected, 48) 

345 self.assertListEqual( 

346 task_summary.failed_quanta, 

347 [ 

348 UnsuccessfulQuantumSummary( 

349 data_id={"instrument": "INSTR", "detector": 1}, 

350 runs={"run1": "failed"}, 

351 messages=["Error on detector 1", "Second error on detector 1"], 

352 ), 

353 UnsuccessfulQuantumSummary( 

354 data_id={"instrument": "INSTR", "detector": 2}, 

355 runs={"run1": "failed", "run2": "failed"}, 

356 messages=[], 

357 ), 

358 UnsuccessfulQuantumSummary( 

359 data_id={"instrument": "INSTR", "detector": 3}, 

360 runs={"run1": "failed"}, 

361 messages=["Error on detector 3"], 

362 ), 

363 ], 

364 ) 

365 self.assertListEqual( 

366 task_summary.recovered_quanta, 

367 [ 

368 {"instrument": "INSTR", "detector": 4}, 

369 {"instrument": "INSTR", "detector": 5}, 

370 {"instrument": "INSTR", "detector": 6}, 

371 ], 

372 ) 

373 self.assertListEqual( 

374 task_summary.wonky_quanta, 

375 [ 

376 UnsuccessfulQuantumSummary( 

377 data_id={"instrument": "INSTR", "detector": 7}, 

378 runs={"run1": "successful", "run2": "failed"}, 

379 messages=["This one is wonky because it moved from successful to failed."], 

380 ) 

381 ], 

382 ) 

383 self.assertDictEqual( 

384 task_summary.caveats, 

385 {"+A": [{"instrument": "INSTR", "detector": 10}]}, 

386 ) 

387 self.assertDictEqual( 

388 task_summary.exceptions, 

389 { 

390 "lsst.pipe.base.tests.mocks.MockAlgorithmError": [ 

391 ExceptionInfoSummary( 

392 quantum_id=uuid_a, 

393 data_id={"instrument": "INSTR", "detector": 10}, 

394 run="run2", 

395 exception=ExceptionInfo( 

396 type_name="lsst.pipe.base.tests.mocks.MockAlgorithmError", 

397 message="message A", 

398 metadata={"badness": 11}, 

399 ), 

400 ) 

401 ], 

402 }, 

403 ) 

404 self.assertEqual(task_summary.n_wonky, 1) 

405 self.assertEqual(task_summary.n_failed, 3) 

406 else: 

407 self.assertEqual(task_summary.n_successful, 0) 

408 self.assertEqual(task_summary.n_blocked, 0) 

409 self.assertEqual(task_summary.n_unknown, 2) 

410 self.assertEqual(task_summary.n_expected, 2) 

411 self.assertListEqual(task_summary.failed_quanta, []) 

412 self.assertListEqual(task_summary.recovered_quanta, []) 

413 self.assertListEqual(task_summary.wonky_quanta, []) 

414 self.assertEqual(task_summary.n_wonky, 0) 

415 self.assertEqual(task_summary.n_failed, 0) 

416 for dataset_type_name, dataset_type_summary in two_graphs_different_numbers.datasets.items(): 

417 if dataset_type_name in ["add_dataset5", "add2_dataset5"]: 

418 self.assertListEqual( 

419 dataset_type_summary.unsuccessful_datasets, 

420 [ 

421 {"instrument": "INSTR", "detector": 0}, 

422 {"instrument": "INSTR", "detector": 0}, 

423 {"instrument": "INSTR", "detector": 1}, 

424 {"instrument": "INSTR", "detector": 2}, 

425 {"instrument": "INSTR", "detector": 3}, 

426 ], 

427 ) 

428 self.assertEqual(dataset_type_summary.n_visible, 0) 

429 self.assertEqual(dataset_type_summary.n_shadowed, 0) 

430 self.assertEqual(dataset_type_summary.n_predicted_only, 0) 

431 self.assertEqual(dataset_type_summary.n_expected, 48) 

432 self.assertEqual(dataset_type_summary.n_cursed, 1) 

433 self.assertEqual(dataset_type_summary.n_unsuccessful, 5) 

434 self.assertListEqual( 

435 dataset_type_summary.cursed_datasets, 

436 [ 

437 CursedDatasetSummary( 

438 producer_data_id={"instrument": "INSTR", "detector": 7}, 

439 data_id={"instrument": "INSTR", "detector": 7}, 

440 runs_produced={"run1": True, "run2": False}, 

441 run_visible=None, 

442 messages=["Some kind of cursed dataset."], 

443 ) 

444 ], 

445 ) 

446 else: 

447 self.assertListEqual( 

448 dataset_type_summary.unsuccessful_datasets, 

449 [{"instrument": "INSTR", "detector": 0}, {"instrument": "INSTR", "detector": 0}], 

450 ) 

451 self.assertEqual(dataset_type_summary.n_visible, 0) 

452 self.assertEqual(dataset_type_summary.n_shadowed, 0) 

453 self.assertEqual(dataset_type_summary.n_predicted_only, 0) 

454 self.assertEqual(dataset_type_summary.n_expected, 2) 

455 self.assertEqual(dataset_type_summary.n_cursed, 0) 

456 self.assertEqual(dataset_type_summary.n_unsuccessful, 2) 

457 self.assertListEqual(dataset_type_summary.cursed_datasets, []) 

458 self.assertIn(dataset_type_name, expected_mock_datasets) 

459 match dataset_type_name: 

460 case name if name in ["add_dataset1", "add2_dataset1", "task0_metadata", "task0_log"]: 

461 self.assertEqual(dataset_type_summary.producer, "task0") 

462 case name if name in ["add_dataset2", "add2_dataset2", "task1_metadata", "task1_log"]: 

463 self.assertEqual(dataset_type_summary.producer, "task1") 

464 case name if name in ["add_dataset3", "add2_dataset3", "task2_metadata", "task2_log"]: 

465 self.assertEqual(dataset_type_summary.producer, "task2") 

466 case name if name in ["add_dataset4", "add2_dataset4", "task3_metadata", "task3_log"]: 

467 self.assertEqual(dataset_type_summary.producer, "task3") 

468 case name if name in ["add_dataset5", "add2_dataset5", "task4_metadata", "task4_log"]: 

469 self.assertEqual(dataset_type_summary.producer, "task4") 

470 

471 # Now, let's add a task to one model and see if aggregate still 

472 # works 

473 summary4 = summary.model_copy(deep=True) 

474 summary4.tasks["task5"] = TaskSummary( 

475 n_successful=0, 

476 n_blocked=0, 

477 n_unknown=1, 

478 n_expected=1, 

479 failed_quanta=[], 

480 recovered_quanta=[], 

481 wonky_quanta=[], 

482 ) 

483 summary4.datasets["add_dataset6"] = DatasetTypeSummary( 

484 producer="task5", 

485 n_visible=0, 

486 n_shadowed=0, 

487 n_predicted_only=0, 

488 n_expected=1, 

489 cursed_datasets=[], 

490 unsuccessful_datasets=[{"instrument": "INSTR", "detector": 0}], 

491 ) 

492 summary4.datasets["task5_log"] = DatasetTypeSummary( 

493 producer="task5", 

494 n_visible=0, 

495 n_shadowed=0, 

496 n_predicted_only=0, 

497 n_expected=1, 

498 cursed_datasets=[], 

499 unsuccessful_datasets=[{"instrument": "INSTR", "detector": 0}], 

500 ) 

501 summary4.datasets["task5_metadata"] = DatasetTypeSummary( 

502 producer="task5", 

503 n_visible=0, 

504 n_shadowed=0, 

505 n_predicted_only=0, 

506 n_expected=1, 

507 cursed_datasets=[], 

508 unsuccessful_datasets=[{"instrument": "INSTR", "detector": 0}], 

509 ) 

510 two_graphs_extra_task = Summary.aggregate([summary4, summary]) 

511 # Make sure the extra task is in there 

512 self.assertIn("task5", two_graphs_extra_task.tasks) 

513 for task_label, task_summary in two_graphs_extra_task.tasks.items(): 

514 self.assertEqual(task_summary.n_successful, 0) 

515 self.assertEqual(task_summary.n_blocked, 0) 

516 self.assertListEqual(task_summary.failed_quanta, []) 

517 self.assertListEqual(task_summary.recovered_quanta, []) 

518 self.assertListEqual(task_summary.wonky_quanta, []) 

519 self.assertEqual(task_summary.n_wonky, 0) 

520 self.assertEqual(task_summary.n_failed, 0) 

521 self.assertIn(task_label, ["task0", "task1", "task2", "task3", "task4", "task5"]) 

522 if task_label == "task5": 

523 self.assertEqual(task_summary.n_unknown, 1) 

524 self.assertEqual(task_summary.n_expected, 1) 

525 else: 

526 self.assertEqual(task_summary.n_unknown, 2) 

527 self.assertEqual(task_summary.n_expected, 2) 

528 for dataset_type_name, dataset_type_summary in two_graphs_extra_task.datasets.items(): 

529 self.assertEqual(dataset_type_summary.n_visible, 0) 

530 self.assertEqual(dataset_type_summary.n_shadowed, 0) 

531 self.assertEqual(dataset_type_summary.n_predicted_only, 0) 

532 self.assertEqual(dataset_type_summary.n_cursed, 0) 

533 self.assertListEqual(dataset_type_summary.cursed_datasets, []) 

534 if dataset_type_summary.producer == "task5": 

535 self.assertEqual(dataset_type_summary.n_expected, 1) 

536 self.assertEqual(dataset_type_summary.n_unsuccessful, 1) 

537 self.assertListEqual( 

538 dataset_type_summary.unsuccessful_datasets, 

539 [{"instrument": "INSTR", "detector": 0}], 

540 ) 

541 else: 

542 self.assertEqual(dataset_type_summary.n_expected, 2) 

543 self.assertEqual(dataset_type_summary.n_unsuccessful, 2) 

544 self.assertListEqual( 

545 dataset_type_summary.unsuccessful_datasets, 

546 [{"instrument": "INSTR", "detector": 0}, {"instrument": "INSTR", "detector": 0}], 

547 ) 

548 self.assertIn( 

549 dataset_type_name, 

550 expected_mock_datasets + ["add_dataset6", "task5_metadata", "task5_log"], 

551 ) 

552 match dataset_type_name: 

553 case name if name in ["add_dataset1", "add2_dataset1", "task0_metadata", "task0_log"]: 

554 self.assertEqual(dataset_type_summary.producer, "task0") 

555 case name if name in ["add_dataset2", "add2_dataset2", "task1_metadata", "task1_log"]: 

556 self.assertEqual(dataset_type_summary.producer, "task1") 

557 case name if name in ["add_dataset3", "add2_dataset3", "task2_metadata", "task2_log"]: 

558 self.assertEqual(dataset_type_summary.producer, "task2") 

559 case name if name in ["add_dataset4", "add2_dataset4", "task3_metadata", "task3_log"]: 

560 self.assertEqual(dataset_type_summary.producer, "task3") 

561 case name if name in ["add_dataset5", "add2_dataset5", "task4_metadata", "task4_log"]: 

562 self.assertEqual(dataset_type_summary.producer, "task4") 

563 case name if name in ["add_dataset6", "task5_metadata", "task5_log"]: 

564 self.assertEqual(dataset_type_summary.producer, "task5") 

565 

566 # Now we test that we properly catch task-dataset mismatches in 

567 # aggregated graphs. This is a problem because if task 1 produced 

568 # a certain dataset in graph 1, but task 2 produced the same dataset 

569 # in graph 2, the graphs are likely not comparable. 

570 summary5 = summary.model_copy(deep=True) 

571 summary5.datasets["add_dataset3"] = summary.datasets["add_dataset3"].model_copy( 

572 deep=True, 

573 update={ 

574 "producer": "task0", 

575 "n_visible": 0, 

576 "n_shadowed": 0, 

577 "n_predicted_only": 0, 

578 "n_expected": 1, 

579 "cursed_datasets": [], 

580 "unsuccessful_datasets": [ 

581 {"instrument": "INSTR", "detector": 0}, 

582 ], 

583 "n_cursed": 0, 

584 "n_unsuccessful": 1, 

585 }, 

586 ) 

587 with self.assertLogs("lsst.pipe.base", level=lsst.utils.logging.VERBOSE) as warning_logs: 

588 Summary.aggregate([summary, summary5]) 

589 self.assertIn( 

590 "WARNING:lsst.pipe.base.quantum_provenance_graph:Producer for dataset type is not consistent" 

591 ": 'task2' != 'task0'.", 

592 warning_logs.output[0], 

593 ) 

594 self.assertIn( 

595 "WARNING:lsst.pipe.base.quantum_provenance_graph:Ignoring 'task0'.", warning_logs.output[1] 

596 ) 

597 

598 # Next up, we're going to try to aggregate summary with a dictionary 

599 # and then with some garbage. Neither of these should work! 

600 with self.assertRaises(AttributeError): 

601 Summary.aggregate( 

602 [ 

603 summary, 

604 { 

605 "tasks": { 

606 "task0": { 

607 "n_successful": 0, 

608 "n_blocked": 0, 

609 "n_unknown": 1, 

610 "n_expected": 1, 

611 "failed_quanta": [], 

612 "recovered_quanta": [], 

613 "wonky_quanta": [], 

614 "n_wonky": 0, 

615 "n_failed": 0, 

616 }, 

617 "datasets": { 

618 "add_dataset1": { 

619 "producer": "task0", 

620 "n_visible": 0, 

621 "n_shadowed": 0, 

622 "n_predicted_only": 0, 

623 "n_expected": 1, 

624 "cursed_datasets": [], 

625 "unsuccessful_datasets": [{"instrument": "INSTR", "detector": 0}], 

626 "n_cursed": 0, 

627 "n_unsuccessful": 1, 

628 }, 

629 "add2_dataset1": { 

630 "producer": "task0", 

631 "n_visible": 0, 

632 "n_shadowed": 0, 

633 "n_predicted_only": 0, 

634 "n_expected": 1, 

635 "cursed_datasets": [], 

636 "unsuccessful_datasets": [{"instrument": "INSTR", "detector": 0}], 

637 "n_cursed": 0, 

638 "n_unsuccessful": 1, 

639 }, 

640 "task0_metadata": { 

641 "producer": "task0", 

642 "n_visible": 0, 

643 "n_shadowed": 0, 

644 "n_predicted_only": 0, 

645 "n_expected": 1, 

646 "cursed_datasets": [], 

647 "unsuccessful_datasets": [{"instrument": "INSTR", "detector": 0}], 

648 "n_cursed": 0, 

649 "n_unsuccessful": 1, 

650 }, 

651 "task0_log": { 

652 "producer": "task0", 

653 "n_visible": 0, 

654 "n_shadowed": 0, 

655 "n_predicted_only": 0, 

656 "n_expected": 1, 

657 "cursed_datasets": [], 

658 "unsuccessful_datasets": [{"instrument": "INSTR", "detector": 0}], 

659 "n_cursed": 0, 

660 "n_unsuccessful": 1, 

661 }, 

662 }, 

663 } 

664 }, 

665 ] 

666 ) 

667 Summary.aggregate([summary, []]) 

668 Summary.aggregate([summary, "some_garbage"])