Coverage for tests / test_quantum_provenance_graph.py: 3%
233 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 08:59 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 08:59 +0000
1# # This file is part of pipe_base.
2# #
3# # Developed for the LSST Data Management System.
4# # This product includes software developed by the LSST Project
5# # (http://www.lsst.org).
6# # See the COPYRIGHT file at the top-level directory of this distribution
7# # for details of code ownership.
8# #
9# # This software is dual licensed under the GNU General Public License and
10# also
11# # under a 3-clause BSD license. Recipients may choose which of these licenses
12# # to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
13# # respectively. If you choose the GPL option then the following text applies
14# # (but note that there is still no warranty even if you opt for BSD instead):
15# #
16# # This program is free software: you can redistribute it and/or modify
17# # it under the terms of the GNU General Public License as published by
18# # the Free Software Foundation, either version 3 of the License, or
19# # (at your option) any later version.
20# #
21# # This program is distributed in the hope that it will be useful,
22# # but WITHOUT ANY WARRANTY; without even the implied warranty of
23# # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
24# # GNU General Public License for more details.
25# #
26# # You should have received a copy of the GNU General Public License
27# # along with this program. If not, see <http://www.gnu.org/licenses/>.
29"""Simple unit test for quantum_provenance_graph."""
31import unittest
32import uuid
34import lsst.utils.logging
35from lsst.pipe.base.quantum_provenance_graph import (
36 CursedDatasetSummary,
37 DatasetTypeSummary,
38 ExceptionInfo,
39 ExceptionInfoSummary,
40 QuantumProvenanceGraph,
41 Summary,
42 TaskSummary,
43 UnsuccessfulQuantumSummary,
44)
45from lsst.pipe.base.tests import simpleQGraph
46from lsst.utils.tests import temporaryDirectory
48expected_mock_datasets = [
49 "add_dataset1",
50 "add2_dataset1",
51 "task0_metadata",
52 "task0_log",
53 "add_dataset2",
54 "add2_dataset2",
55 "task1_metadata",
56 "task1_log",
57 "add_dataset3",
58 "add2_dataset3",
59 "task2_metadata",
60 "task2_log",
61 "add_dataset4",
62 "add2_dataset4",
63 "task3_metadata",
64 "task3_log",
65 "add_dataset5",
66 "add2_dataset5",
67 "task4_metadata",
68 "task4_log",
69]
72class QuantumProvenanceGraphTestCase(unittest.TestCase):
73 """Test reports from the QuantumProvenanceGraph.
75 Verify that the `QuantumProvenanceGraph` is able to extract correct
76 information from `simpleQgraph`.
78 More tests are in lsst/ci_middleware/tests/test_prod_outputs.py and
79 lsst/ci_middleware/tests/test_rc2_outputs.py
80 """
82 def test_qpg_reports(self) -> None:
83 """Test that we can add a new graph to the
84 `QuantumProvenanceGraph`.
85 """
86 with temporaryDirectory() as root:
87 # make a simple qgraph to make an execution report on
88 butler, qgraph = simpleQGraph.makeSimpleQGraph(root=root)
89 qpg = QuantumProvenanceGraph(butler, [qgraph])
90 summary = qpg.to_summary(butler)
92 for task_summary in summary.tasks.values():
93 # We know that we have one expected task that was not run.
94 # As such, the following dictionary should describe all of
95 # the mock tasks.
96 self.assertEqual(task_summary.n_successful, 0)
97 self.assertEqual(task_summary.n_blocked, 0)
98 self.assertEqual(task_summary.n_unknown, 1)
99 self.assertEqual(task_summary.n_expected, 1)
100 self.assertListEqual(task_summary.failed_quanta, [])
101 self.assertListEqual(task_summary.recovered_quanta, [])
102 self.assertListEqual(task_summary.wonky_quanta, [])
103 self.assertDictEqual(task_summary.exceptions, {})
104 self.assertEqual(task_summary.n_wonky, 0)
105 self.assertEqual(task_summary.n_failed, 0)
107 for dataset_type_name, dataset_type_summary in summary.datasets.items():
108 self.assertListEqual(
109 dataset_type_summary.unsuccessful_datasets,
110 [{"instrument": "INSTR", "detector": 0}],
111 )
112 # Check dataset counts (can't be done all in one because
113 # datasets have different producers), but all the counts for
114 # each task should be the same.
115 self.assertEqual(dataset_type_summary.n_visible, 0)
116 self.assertEqual(dataset_type_summary.n_shadowed, 0)
117 self.assertEqual(dataset_type_summary.n_predicted_only, 0)
118 self.assertEqual(dataset_type_summary.n_expected, 1)
119 self.assertEqual(dataset_type_summary.n_cursed, 0)
120 self.assertEqual(dataset_type_summary.n_unsuccessful, 1)
121 # Make sure the cursed dataset is an empty list
122 self.assertListEqual(dataset_type_summary.cursed_datasets, [])
123 # Make sure we have the right datasets based on our mock
124 self.assertIn(dataset_type_name, expected_mock_datasets)
125 # Make sure the expected datasets were produced by the expected
126 # tasks
127 match dataset_type_name:
128 case name if name in ["add_dataset1", "add2_dataset1", "task0_metadata", "task0_log"]:
129 self.assertEqual(dataset_type_summary.producer, "task0")
130 case name if name in ["add_dataset2", "add2_dataset2", "task1_metadata", "task1_log"]:
131 self.assertEqual(dataset_type_summary.producer, "task1")
132 case name if name in ["add_dataset3", "add2_dataset3", "task2_metadata", "task2_log"]:
133 self.assertEqual(dataset_type_summary.producer, "task2")
134 case name if name in ["add_dataset4", "add2_dataset4", "task3_metadata", "task3_log"]:
135 self.assertEqual(dataset_type_summary.producer, "task3")
136 case name if name in ["add_dataset5", "add2_dataset5", "task4_metadata", "task4_log"]:
137 self.assertEqual(dataset_type_summary.producer, "task4")
139 def test_aggregate_reports(self) -> None:
140 """Test aggregating reports from the `QuantumProvenanceGraph.`"""
141 with temporaryDirectory() as root:
142 # make a simple qgraph to make an execution report on
143 butler, qgraph = simpleQGraph.makeSimpleQGraph(root=root)
144 qpg = QuantumProvenanceGraph(butler, [qgraph])
145 summary = qpg.to_summary(butler)
146 # Check that aggregating one summary only does not cause an error
147 one_graph_only_sum = Summary.aggregate([summary])
149 # Do the same tests as in `test_qpg_reports`, but on the
150 # 'aggregate' summary. Essentially, verify that the information in
151 # the report is preserved during the aggregation step.
152 for task_summary in one_graph_only_sum.tasks.values():
153 self.assertEqual(task_summary.n_successful, 0)
154 self.assertEqual(task_summary.n_blocked, 0)
155 self.assertEqual(task_summary.n_unknown, 1)
156 self.assertEqual(task_summary.n_expected, 1)
157 self.assertListEqual(task_summary.failed_quanta, [])
158 self.assertListEqual(task_summary.recovered_quanta, [])
159 self.assertListEqual(task_summary.wonky_quanta, [])
160 self.assertDictEqual(task_summary.exceptions, {})
161 self.assertEqual(task_summary.n_wonky, 0)
162 self.assertEqual(task_summary.n_failed, 0)
163 for dataset_type_name, dataset_type_summary in one_graph_only_sum.datasets.items():
164 self.assertListEqual(
165 dataset_type_summary.unsuccessful_datasets, [{"instrument": "INSTR", "detector": 0}]
166 )
167 self.assertEqual(dataset_type_summary.n_visible, 0)
168 self.assertEqual(dataset_type_summary.n_shadowed, 0)
169 self.assertEqual(dataset_type_summary.n_predicted_only, 0)
170 self.assertEqual(dataset_type_summary.n_expected, 1)
171 self.assertEqual(dataset_type_summary.n_cursed, 0)
172 self.assertEqual(dataset_type_summary.n_unsuccessful, 1)
173 self.assertListEqual(dataset_type_summary.cursed_datasets, [])
174 self.assertIn(dataset_type_name, expected_mock_datasets)
175 match dataset_type_name:
176 case name if name in ["add_dataset1", "add2_dataset1", "task0_metadata", "task0_log"]:
177 self.assertEqual(dataset_type_summary.producer, "task0")
178 case name if name in ["add_dataset2", "add2_dataset2", "task1_metadata", "task1_log"]:
179 self.assertEqual(dataset_type_summary.producer, "task1")
180 case name if name in ["add_dataset3", "add2_dataset3", "task2_metadata", "task2_log"]:
181 self.assertEqual(dataset_type_summary.producer, "task2")
182 case name if name in ["add_dataset4", "add2_dataset4", "task3_metadata", "task3_log"]:
183 self.assertEqual(dataset_type_summary.producer, "task3")
184 case name if name in ["add_dataset5", "add2_dataset5", "task4_metadata", "task4_log"]:
185 self.assertEqual(dataset_type_summary.producer, "task4")
187 # Now we test aggregating multiple summaries. First, we try
188 # aggregating with an exact copy and make sure we just have double
189 # the numbers.
190 summary2 = summary.model_copy(deep=True)
191 two_identical_graph_sum = Summary.aggregate([summary, summary2])
192 for task_summary in two_identical_graph_sum.tasks.values():
193 self.assertEqual(task_summary.n_successful, 0)
194 self.assertEqual(task_summary.n_blocked, 0)
195 self.assertEqual(task_summary.n_unknown, 2)
196 self.assertEqual(task_summary.n_expected, 2)
197 self.assertListEqual(task_summary.failed_quanta, [])
198 self.assertListEqual(task_summary.recovered_quanta, [])
199 self.assertListEqual(task_summary.wonky_quanta, [])
200 self.assertDictEqual(task_summary.exceptions, {})
201 self.assertEqual(task_summary.n_wonky, 0)
202 self.assertEqual(task_summary.n_failed, 0)
203 for dataset_type_name, dataset_type_summary in two_identical_graph_sum.datasets.items():
204 self.assertListEqual(
205 dataset_type_summary.unsuccessful_datasets,
206 [{"instrument": "INSTR", "detector": 0}, {"instrument": "INSTR", "detector": 0}],
207 )
208 self.assertEqual(dataset_type_summary.n_visible, 0)
209 self.assertEqual(dataset_type_summary.n_shadowed, 0)
210 self.assertEqual(dataset_type_summary.n_predicted_only, 0)
211 self.assertEqual(dataset_type_summary.n_expected, 2)
212 self.assertEqual(dataset_type_summary.n_cursed, 0)
213 self.assertEqual(dataset_type_summary.n_unsuccessful, 2)
214 self.assertListEqual(dataset_type_summary.cursed_datasets, [])
215 self.assertIn(dataset_type_name, expected_mock_datasets)
216 match dataset_type_name:
217 case name if name in ["add_dataset1", "add2_dataset1", "task0_metadata", "task0_log"]:
218 self.assertEqual(dataset_type_summary.producer, "task0")
219 case name if name in ["add_dataset2", "add2_dataset2", "task1_metadata", "task1_log"]:
220 self.assertEqual(dataset_type_summary.producer, "task1")
221 case name if name in ["add_dataset3", "add2_dataset3", "task2_metadata", "task2_log"]:
222 self.assertEqual(dataset_type_summary.producer, "task2")
223 case name if name in ["add_dataset4", "add2_dataset4", "task3_metadata", "task3_log"]:
224 self.assertEqual(dataset_type_summary.producer, "task3")
225 case name if name in ["add_dataset5", "add2_dataset5", "task4_metadata", "task4_log"]:
226 self.assertEqual(dataset_type_summary.producer, "task4")
228 # Let's see if we can change lots of counts and info for a task
229 # which exists in summary 1 and append to the overall summary
230 # effectively. This summary has a lot of valid variations.
231 uuid_a = uuid.uuid4()
232 summary3 = summary.model_copy(deep=True)
233 summary3.tasks["task4"] = TaskSummary.model_validate(
234 {
235 "n_successful": 10,
236 "n_blocked": 20,
237 "n_unknown": 4,
238 "n_expected": 47,
239 "failed_quanta": [
240 {
241 "data_id": {"instrument": "INSTR", "detector": 1},
242 "runs": {"run1": "failed"},
243 "messages": ["Error on detector 1", "Second error on detector 1"],
244 },
245 {
246 "data_id": {"instrument": "INSTR", "detector": 2},
247 "runs": {"run1": "failed", "run2": "failed"},
248 "messages": [],
249 },
250 {
251 "data_id": {"instrument": "INSTR", "detector": 3},
252 "runs": {"run1": "failed"},
253 "messages": ["Error on detector 3"],
254 },
255 ],
256 "recovered_quanta": [
257 {"instrument": "INSTR", "detector": 4},
258 {"instrument": "INSTR", "detector": 5},
259 {"instrument": "INSTR", "detector": 6},
260 ],
261 "wonky_quanta": [
262 {
263 "data_id": {"instrument": "INSTR", "detector": 7},
264 "runs": {"run1": "successful", "run2": "failed"},
265 "messages": ["This one is wonky because it moved from successful to failed."],
266 }
267 ],
268 "caveats": {
269 "+A": [{"instrument": "INSTR", "detector": 10}],
270 },
271 "exceptions": {
272 "lsst.pipe.base.tests.mocks.MockAlgorithmError": [
273 {
274 "quantum_id": str(uuid_a),
275 "data_id": {"instrument": "INSTR", "detector": 10},
276 "run": "run2",
277 "exception": {
278 "type_name": "lsst.pipe.base.tests.mocks.MockAlgorithmError",
279 "message": "message A",
280 "metadata": {"badness": 11},
281 },
282 }
283 ],
284 },
285 "n_wonky": 1,
286 "n_failed": 3,
287 }
288 )
289 summary3.datasets["add_dataset5"] = DatasetTypeSummary.model_validate(
290 {
291 "producer": "task4",
292 "n_visible": 0,
293 "n_shadowed": 0,
294 "n_predicted_only": 0,
295 "n_expected": 47,
296 "cursed_datasets": [
297 {
298 "producer_data_id": {"instrument": "INSTR", "detector": 7},
299 "data_id": {"instrument": "INSTR", "detector": 7},
300 "runs_produced": {"run1": True, "run2": False},
301 "run_visible": None,
302 "messages": ["Some kind of cursed dataset."],
303 }
304 ],
305 "unsuccessful_datasets": [
306 {"instrument": "INSTR", "detector": 0},
307 {"instrument": "INSTR", "detector": 1},
308 {"instrument": "INSTR", "detector": 2},
309 {"instrument": "INSTR", "detector": 3},
310 ],
311 }
312 )
313 summary3.datasets["add2_dataset5"] = DatasetTypeSummary.model_validate(
314 {
315 "producer": "task4",
316 "n_visible": 0,
317 "n_shadowed": 0,
318 "n_predicted_only": 0,
319 "n_expected": 47,
320 "cursed_datasets": [
321 {
322 "producer_data_id": {"instrument": "INSTR", "detector": 7},
323 "data_id": {"instrument": "INSTR", "detector": 7},
324 "runs_produced": {"run1": True, "run2": False},
325 "run_visible": None,
326 "messages": ["Some kind of cursed dataset."],
327 }
328 ],
329 "unsuccessful_datasets": [
330 {"instrument": "INSTR", "detector": 0},
331 {"instrument": "INSTR", "detector": 1},
332 {"instrument": "INSTR", "detector": 2},
333 {"instrument": "INSTR", "detector": 3},
334 ],
335 }
336 )
337 # Test that aggregate with this file works
338 two_graphs_different_numbers = Summary.aggregate([summary, summary3])
339 for task_label, task_summary in two_graphs_different_numbers.tasks.items():
340 if task_label == "task4":
341 self.assertEqual(task_summary.n_successful, 10)
342 self.assertEqual(task_summary.n_blocked, 20)
343 self.assertEqual(task_summary.n_unknown, 5)
344 self.assertEqual(task_summary.n_expected, 48)
345 self.assertListEqual(
346 task_summary.failed_quanta,
347 [
348 UnsuccessfulQuantumSummary(
349 data_id={"instrument": "INSTR", "detector": 1},
350 runs={"run1": "failed"},
351 messages=["Error on detector 1", "Second error on detector 1"],
352 ),
353 UnsuccessfulQuantumSummary(
354 data_id={"instrument": "INSTR", "detector": 2},
355 runs={"run1": "failed", "run2": "failed"},
356 messages=[],
357 ),
358 UnsuccessfulQuantumSummary(
359 data_id={"instrument": "INSTR", "detector": 3},
360 runs={"run1": "failed"},
361 messages=["Error on detector 3"],
362 ),
363 ],
364 )
365 self.assertListEqual(
366 task_summary.recovered_quanta,
367 [
368 {"instrument": "INSTR", "detector": 4},
369 {"instrument": "INSTR", "detector": 5},
370 {"instrument": "INSTR", "detector": 6},
371 ],
372 )
373 self.assertListEqual(
374 task_summary.wonky_quanta,
375 [
376 UnsuccessfulQuantumSummary(
377 data_id={"instrument": "INSTR", "detector": 7},
378 runs={"run1": "successful", "run2": "failed"},
379 messages=["This one is wonky because it moved from successful to failed."],
380 )
381 ],
382 )
383 self.assertDictEqual(
384 task_summary.caveats,
385 {"+A": [{"instrument": "INSTR", "detector": 10}]},
386 )
387 self.assertDictEqual(
388 task_summary.exceptions,
389 {
390 "lsst.pipe.base.tests.mocks.MockAlgorithmError": [
391 ExceptionInfoSummary(
392 quantum_id=uuid_a,
393 data_id={"instrument": "INSTR", "detector": 10},
394 run="run2",
395 exception=ExceptionInfo(
396 type_name="lsst.pipe.base.tests.mocks.MockAlgorithmError",
397 message="message A",
398 metadata={"badness": 11},
399 ),
400 )
401 ],
402 },
403 )
404 self.assertEqual(task_summary.n_wonky, 1)
405 self.assertEqual(task_summary.n_failed, 3)
406 else:
407 self.assertEqual(task_summary.n_successful, 0)
408 self.assertEqual(task_summary.n_blocked, 0)
409 self.assertEqual(task_summary.n_unknown, 2)
410 self.assertEqual(task_summary.n_expected, 2)
411 self.assertListEqual(task_summary.failed_quanta, [])
412 self.assertListEqual(task_summary.recovered_quanta, [])
413 self.assertListEqual(task_summary.wonky_quanta, [])
414 self.assertEqual(task_summary.n_wonky, 0)
415 self.assertEqual(task_summary.n_failed, 0)
416 for dataset_type_name, dataset_type_summary in two_graphs_different_numbers.datasets.items():
417 if dataset_type_name in ["add_dataset5", "add2_dataset5"]:
418 self.assertListEqual(
419 dataset_type_summary.unsuccessful_datasets,
420 [
421 {"instrument": "INSTR", "detector": 0},
422 {"instrument": "INSTR", "detector": 0},
423 {"instrument": "INSTR", "detector": 1},
424 {"instrument": "INSTR", "detector": 2},
425 {"instrument": "INSTR", "detector": 3},
426 ],
427 )
428 self.assertEqual(dataset_type_summary.n_visible, 0)
429 self.assertEqual(dataset_type_summary.n_shadowed, 0)
430 self.assertEqual(dataset_type_summary.n_predicted_only, 0)
431 self.assertEqual(dataset_type_summary.n_expected, 48)
432 self.assertEqual(dataset_type_summary.n_cursed, 1)
433 self.assertEqual(dataset_type_summary.n_unsuccessful, 5)
434 self.assertListEqual(
435 dataset_type_summary.cursed_datasets,
436 [
437 CursedDatasetSummary(
438 producer_data_id={"instrument": "INSTR", "detector": 7},
439 data_id={"instrument": "INSTR", "detector": 7},
440 runs_produced={"run1": True, "run2": False},
441 run_visible=None,
442 messages=["Some kind of cursed dataset."],
443 )
444 ],
445 )
446 else:
447 self.assertListEqual(
448 dataset_type_summary.unsuccessful_datasets,
449 [{"instrument": "INSTR", "detector": 0}, {"instrument": "INSTR", "detector": 0}],
450 )
451 self.assertEqual(dataset_type_summary.n_visible, 0)
452 self.assertEqual(dataset_type_summary.n_shadowed, 0)
453 self.assertEqual(dataset_type_summary.n_predicted_only, 0)
454 self.assertEqual(dataset_type_summary.n_expected, 2)
455 self.assertEqual(dataset_type_summary.n_cursed, 0)
456 self.assertEqual(dataset_type_summary.n_unsuccessful, 2)
457 self.assertListEqual(dataset_type_summary.cursed_datasets, [])
458 self.assertIn(dataset_type_name, expected_mock_datasets)
459 match dataset_type_name:
460 case name if name in ["add_dataset1", "add2_dataset1", "task0_metadata", "task0_log"]:
461 self.assertEqual(dataset_type_summary.producer, "task0")
462 case name if name in ["add_dataset2", "add2_dataset2", "task1_metadata", "task1_log"]:
463 self.assertEqual(dataset_type_summary.producer, "task1")
464 case name if name in ["add_dataset3", "add2_dataset3", "task2_metadata", "task2_log"]:
465 self.assertEqual(dataset_type_summary.producer, "task2")
466 case name if name in ["add_dataset4", "add2_dataset4", "task3_metadata", "task3_log"]:
467 self.assertEqual(dataset_type_summary.producer, "task3")
468 case name if name in ["add_dataset5", "add2_dataset5", "task4_metadata", "task4_log"]:
469 self.assertEqual(dataset_type_summary.producer, "task4")
471 # Now, let's add a task to one model and see if aggregate still
472 # works
473 summary4 = summary.model_copy(deep=True)
474 summary4.tasks["task5"] = TaskSummary(
475 n_successful=0,
476 n_blocked=0,
477 n_unknown=1,
478 n_expected=1,
479 failed_quanta=[],
480 recovered_quanta=[],
481 wonky_quanta=[],
482 )
483 summary4.datasets["add_dataset6"] = DatasetTypeSummary(
484 producer="task5",
485 n_visible=0,
486 n_shadowed=0,
487 n_predicted_only=0,
488 n_expected=1,
489 cursed_datasets=[],
490 unsuccessful_datasets=[{"instrument": "INSTR", "detector": 0}],
491 )
492 summary4.datasets["task5_log"] = DatasetTypeSummary(
493 producer="task5",
494 n_visible=0,
495 n_shadowed=0,
496 n_predicted_only=0,
497 n_expected=1,
498 cursed_datasets=[],
499 unsuccessful_datasets=[{"instrument": "INSTR", "detector": 0}],
500 )
501 summary4.datasets["task5_metadata"] = DatasetTypeSummary(
502 producer="task5",
503 n_visible=0,
504 n_shadowed=0,
505 n_predicted_only=0,
506 n_expected=1,
507 cursed_datasets=[],
508 unsuccessful_datasets=[{"instrument": "INSTR", "detector": 0}],
509 )
510 two_graphs_extra_task = Summary.aggregate([summary4, summary])
511 # Make sure the extra task is in there
512 self.assertIn("task5", two_graphs_extra_task.tasks)
513 for task_label, task_summary in two_graphs_extra_task.tasks.items():
514 self.assertEqual(task_summary.n_successful, 0)
515 self.assertEqual(task_summary.n_blocked, 0)
516 self.assertListEqual(task_summary.failed_quanta, [])
517 self.assertListEqual(task_summary.recovered_quanta, [])
518 self.assertListEqual(task_summary.wonky_quanta, [])
519 self.assertEqual(task_summary.n_wonky, 0)
520 self.assertEqual(task_summary.n_failed, 0)
521 self.assertIn(task_label, ["task0", "task1", "task2", "task3", "task4", "task5"])
522 if task_label == "task5":
523 self.assertEqual(task_summary.n_unknown, 1)
524 self.assertEqual(task_summary.n_expected, 1)
525 else:
526 self.assertEqual(task_summary.n_unknown, 2)
527 self.assertEqual(task_summary.n_expected, 2)
528 for dataset_type_name, dataset_type_summary in two_graphs_extra_task.datasets.items():
529 self.assertEqual(dataset_type_summary.n_visible, 0)
530 self.assertEqual(dataset_type_summary.n_shadowed, 0)
531 self.assertEqual(dataset_type_summary.n_predicted_only, 0)
532 self.assertEqual(dataset_type_summary.n_cursed, 0)
533 self.assertListEqual(dataset_type_summary.cursed_datasets, [])
534 if dataset_type_summary.producer == "task5":
535 self.assertEqual(dataset_type_summary.n_expected, 1)
536 self.assertEqual(dataset_type_summary.n_unsuccessful, 1)
537 self.assertListEqual(
538 dataset_type_summary.unsuccessful_datasets,
539 [{"instrument": "INSTR", "detector": 0}],
540 )
541 else:
542 self.assertEqual(dataset_type_summary.n_expected, 2)
543 self.assertEqual(dataset_type_summary.n_unsuccessful, 2)
544 self.assertListEqual(
545 dataset_type_summary.unsuccessful_datasets,
546 [{"instrument": "INSTR", "detector": 0}, {"instrument": "INSTR", "detector": 0}],
547 )
548 self.assertIn(
549 dataset_type_name,
550 expected_mock_datasets + ["add_dataset6", "task5_metadata", "task5_log"],
551 )
552 match dataset_type_name:
553 case name if name in ["add_dataset1", "add2_dataset1", "task0_metadata", "task0_log"]:
554 self.assertEqual(dataset_type_summary.producer, "task0")
555 case name if name in ["add_dataset2", "add2_dataset2", "task1_metadata", "task1_log"]:
556 self.assertEqual(dataset_type_summary.producer, "task1")
557 case name if name in ["add_dataset3", "add2_dataset3", "task2_metadata", "task2_log"]:
558 self.assertEqual(dataset_type_summary.producer, "task2")
559 case name if name in ["add_dataset4", "add2_dataset4", "task3_metadata", "task3_log"]:
560 self.assertEqual(dataset_type_summary.producer, "task3")
561 case name if name in ["add_dataset5", "add2_dataset5", "task4_metadata", "task4_log"]:
562 self.assertEqual(dataset_type_summary.producer, "task4")
563 case name if name in ["add_dataset6", "task5_metadata", "task5_log"]:
564 self.assertEqual(dataset_type_summary.producer, "task5")
566 # Now we test that we properly catch task-dataset mismatches in
567 # aggregated graphs. This is a problem because if task 1 produced
568 # a certain dataset in graph 1, but task 2 produced the same dataset
569 # in graph 2, the graphs are likely not comparable.
570 summary5 = summary.model_copy(deep=True)
571 summary5.datasets["add_dataset3"] = summary.datasets["add_dataset3"].model_copy(
572 deep=True,
573 update={
574 "producer": "task0",
575 "n_visible": 0,
576 "n_shadowed": 0,
577 "n_predicted_only": 0,
578 "n_expected": 1,
579 "cursed_datasets": [],
580 "unsuccessful_datasets": [
581 {"instrument": "INSTR", "detector": 0},
582 ],
583 "n_cursed": 0,
584 "n_unsuccessful": 1,
585 },
586 )
587 with self.assertLogs("lsst.pipe.base", level=lsst.utils.logging.VERBOSE) as warning_logs:
588 Summary.aggregate([summary, summary5])
589 self.assertIn(
590 "WARNING:lsst.pipe.base.quantum_provenance_graph:Producer for dataset type is not consistent"
591 ": 'task2' != 'task0'.",
592 warning_logs.output[0],
593 )
594 self.assertIn(
595 "WARNING:lsst.pipe.base.quantum_provenance_graph:Ignoring 'task0'.", warning_logs.output[1]
596 )
598 # Next up, we're going to try to aggregate summary with a dictionary
599 # and then with some garbage. Neither of these should work!
600 with self.assertRaises(AttributeError):
601 Summary.aggregate(
602 [
603 summary,
604 {
605 "tasks": {
606 "task0": {
607 "n_successful": 0,
608 "n_blocked": 0,
609 "n_unknown": 1,
610 "n_expected": 1,
611 "failed_quanta": [],
612 "recovered_quanta": [],
613 "wonky_quanta": [],
614 "n_wonky": 0,
615 "n_failed": 0,
616 },
617 "datasets": {
618 "add_dataset1": {
619 "producer": "task0",
620 "n_visible": 0,
621 "n_shadowed": 0,
622 "n_predicted_only": 0,
623 "n_expected": 1,
624 "cursed_datasets": [],
625 "unsuccessful_datasets": [{"instrument": "INSTR", "detector": 0}],
626 "n_cursed": 0,
627 "n_unsuccessful": 1,
628 },
629 "add2_dataset1": {
630 "producer": "task0",
631 "n_visible": 0,
632 "n_shadowed": 0,
633 "n_predicted_only": 0,
634 "n_expected": 1,
635 "cursed_datasets": [],
636 "unsuccessful_datasets": [{"instrument": "INSTR", "detector": 0}],
637 "n_cursed": 0,
638 "n_unsuccessful": 1,
639 },
640 "task0_metadata": {
641 "producer": "task0",
642 "n_visible": 0,
643 "n_shadowed": 0,
644 "n_predicted_only": 0,
645 "n_expected": 1,
646 "cursed_datasets": [],
647 "unsuccessful_datasets": [{"instrument": "INSTR", "detector": 0}],
648 "n_cursed": 0,
649 "n_unsuccessful": 1,
650 },
651 "task0_log": {
652 "producer": "task0",
653 "n_visible": 0,
654 "n_shadowed": 0,
655 "n_predicted_only": 0,
656 "n_expected": 1,
657 "cursed_datasets": [],
658 "unsuccessful_datasets": [{"instrument": "INSTR", "detector": 0}],
659 "n_cursed": 0,
660 "n_unsuccessful": 1,
661 },
662 },
663 }
664 },
665 ]
666 )
667 Summary.aggregate([summary, []])
668 Summary.aggregate([summary, "some_garbage"])