Coverage for tests / test_simple_pipeline_executor.py: 9%
297 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-22 08:57 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-22 08:57 +0000
1# This file is part of pipe_base.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <https://www.gnu.org/licenses/>.
28from __future__ import annotations
30import os
31import shutil
32import tempfile
33import unittest
35import lsst.pipe.base.quantum_provenance_graph as qpg
36import lsst.utils.tests
37from lsst.daf.butler import Butler, DatasetType
38from lsst.pipe.base import PipelineGraph, QuantumSuccessCaveats, RepeatableQuantumError
39from lsst.pipe.base.simple_pipeline_executor import SimplePipelineExecutor
40from lsst.pipe.base.tests.mocks import (
41 DirectButlerRepo,
42 DynamicConnectionConfig,
43 DynamicTestPipelineTask,
44 DynamicTestPipelineTaskConfig,
45 MockStorageClass,
46 get_mock_name,
47)
49TESTDIR = os.path.abspath(os.path.dirname(__file__))
52class SimplePipelineExecutorTests(lsst.utils.tests.TestCase):
53 """Test the SimplePipelineExecutor API.
55 Because SimplePipelineExecutor is the easiest way to run simple pipelines
56 in tests, this has also become a home for tests of execution edge cases
57 that don't have a clear home in other test files.
58 """
60 def setUp(self):
61 self.path = tempfile.mkdtemp()
62 # standalone parameter forces the returned config to also include
63 # the information from the search paths.
64 config = Butler.makeRepo(self.path, standalone=True, searchPaths=[os.path.join(TESTDIR, "config")])
65 self.butler = SimplePipelineExecutor.prep_butler(config, [], "fake")
66 self.enterContext(self.butler)
67 self.butler.registry.registerDatasetType(
68 DatasetType(
69 "input",
70 dimensions=self.butler.dimensions.empty,
71 storageClass="StructuredDataDict",
72 )
73 )
74 self.butler.put({"zero": 0}, "input")
75 MockStorageClass.get_or_register_mock("StructuredDataDict")
76 MockStorageClass.get_or_register_mock("TaskMetadataLike")
78 def tearDown(self):
79 shutil.rmtree(self.path, ignore_errors=True)
81 def test_from_task_class(self):
82 """Test executing a single quantum with an executor created by the
83 `from_task_class` factory method, and the
84 `SimplePipelineExecutor.as_generator` method.
85 """
86 config = DynamicTestPipelineTaskConfig()
87 config.inputs["i"] = DynamicConnectionConfig(
88 dataset_type_name="input", storage_class="StructuredDataDict", mock_storage_class=False
89 )
90 config.outputs["o"] = DynamicConnectionConfig(
91 dataset_type_name="output", storage_class="StructuredDataDict"
92 )
93 executor = SimplePipelineExecutor.from_task_class(
94 DynamicTestPipelineTask,
95 config=config,
96 butler=self.butler,
97 label="a",
98 )
99 (quantum,) = executor.as_generator(register_dataset_types=True)
100 self.assertEqual(self.butler.get("output").storage_class, get_mock_name("StructuredDataDict"))
102 def test_metadata_input(self):
103 """Test two tasks where the output uses metadata from input."""
104 config_a = DynamicTestPipelineTaskConfig()
105 config_a.inputs["i"] = DynamicConnectionConfig(
106 dataset_type_name="input", storage_class="StructuredDataDict", mock_storage_class=False
107 )
108 config_b = DynamicTestPipelineTaskConfig()
109 config_b.outputs["o"] = DynamicConnectionConfig(
110 dataset_type_name="output", storage_class="StructuredDataDict"
111 )
112 config_b.inputs["in_metadata"] = DynamicConnectionConfig(
113 dataset_type_name="a_metadata", storage_class="TaskMetadata"
114 )
115 pipeline_graph = PipelineGraph()
116 pipeline_graph.add_task("a", DynamicTestPipelineTask, config_a)
117 pipeline_graph.add_task("b", DynamicTestPipelineTask, config_b)
118 executor = SimplePipelineExecutor.from_pipeline_graph(pipeline_graph, butler=self.butler)
119 quanta = executor.run(register_dataset_types=True, save_versions=False)
120 self.assertEqual(len(quanta), 2)
121 output = self.butler.get("output")
122 self.assertEqual(output.quantum.inputs["in_metadata"][0].original_type, "lsst.pipe.base.TaskMetadata")
124 def test_optional_intermediate(self):
125 """Test a pipeline task with an optional regular input that is produced
126 by another task.
127 """
128 config_a = DynamicTestPipelineTaskConfig()
129 config_a.inputs["i"] = DynamicConnectionConfig(
130 dataset_type_name="input", storage_class="StructuredDataDict", mock_storage_class=False
131 )
132 config_a.fail_exception = "lsst.pipe.base.NoWorkFound"
133 config_a.fail_condition = "1=1" # butler query expression that is true
134 config_a.outputs["o"] = DynamicConnectionConfig(
135 dataset_type_name="intermediate", storage_class="StructuredDataDict"
136 )
137 config_b = DynamicTestPipelineTaskConfig()
138 config_b.inputs["i"] = DynamicConnectionConfig(
139 dataset_type_name="intermediate", storage_class="StructuredDataDict", minimum=0
140 )
141 config_b.outputs["o"] = DynamicConnectionConfig(
142 dataset_type_name="output", storage_class="StructuredDataDict"
143 )
144 pipeline_graph = PipelineGraph()
145 pipeline_graph.add_task("a", DynamicTestPipelineTask, config_a)
146 pipeline_graph.add_task("b", DynamicTestPipelineTask, config_b)
147 executor = SimplePipelineExecutor.from_pipeline_graph(pipeline_graph, butler=self.butler)
148 quanta = executor.run(register_dataset_types=True, save_versions=False)
149 self.assertEqual(len(quanta), 2)
150 # Both quanta ran successfully (NoWorkFound is a success).
151 self.assertTrue(self.butler.exists("a_metadata"))
152 self.assertTrue(self.butler.exists("b_metadata"))
153 # The intermediate dataset was not written, but the final output was.
154 self.assertFalse(self.butler.exists("intermediate"))
155 self.assertTrue(self.butler.exists("output"))
157 def test_optional_input(self):
158 """Test a pipeline task with an optional regular input that is an
159 overall input to the pipeline.
160 """
161 config_a = DynamicTestPipelineTaskConfig()
162 config_a.inputs["i1"] = DynamicConnectionConfig(
163 dataset_type_name="input", storage_class="StructuredDataDict", mock_storage_class=False
164 )
165 config_a.outputs["i2"] = DynamicConnectionConfig(
166 dataset_type_name="input_2",
167 storage_class="StructuredDataDict", # will never exist
168 )
169 config_a.outputs["o"] = DynamicConnectionConfig(
170 dataset_type_name="output", storage_class="StructuredDataDict"
171 )
172 pipeline_graph = PipelineGraph()
173 pipeline_graph.add_task("a", DynamicTestPipelineTask, config_a)
174 executor = SimplePipelineExecutor.from_pipeline_graph(pipeline_graph, butler=self.butler)
175 quanta = executor.run(register_dataset_types=True, save_versions=False)
176 self.assertEqual(len(quanta), 1)
177 # The quanta ran successfully.
178 self.assertTrue(self.butler.exists("a_metadata"))
179 # The final output was written.
180 self.assertTrue(self.butler.exists("output"))
182 def test_from_pipeline_file(self) -> None:
183 """Test executing a two quanta from different configurations of the
184 same task, with an executor created by the `from_pipeline_filename`
185 factory method, and the `SimplePipelineExecutor.run` method.
186 """
187 filename = os.path.join(TESTDIR, "pipelines", "pipeline_simple.yaml")
188 executor = SimplePipelineExecutor.from_pipeline_filename(filename, butler=self.butler)
189 self._test_pipeline_file(executor)
191 def test_use_local_butler(self) -> None:
192 """Test generating a local butler repository from a pipeline, then
193 running that pipeline using the local butler.
194 """
195 # Test a trivial pipeline that has only dataset types with empty
196 # dimensions.
197 filename = os.path.join(TESTDIR, "pipelines", "pipeline_simple.yaml")
198 executor = SimplePipelineExecutor.from_pipeline_filename(
199 filename, butler=self.butler, output="u/someone/pipeline"
200 )
201 with tempfile.TemporaryDirectory() as tempdir:
202 root = os.path.join(tempdir, "butler_root")
203 local_butler = executor.use_local_butler(root)
204 self.enterContext(local_butler)
205 self._test_pipeline_file(executor)
207 # Test a more complicated pipeline involving dataset types with
208 # non-empty dimensions. This will require dimension records to be
209 # copied into the destination repository.
210 with (
211 tempfile.TemporaryDirectory() as tempdir,
212 DirectButlerRepo.make_temporary("base.yaml", "spatial.yaml") as (helper, root),
213 ):
214 helper.add_task(dimensions=["visit", "detector"])
215 qg = helper.make_quantum_graph(output="out_chain")
216 executor = SimplePipelineExecutor(qg, helper.butler)
217 output_butler_root = os.path.join(tempdir, "root")
218 local_butler = executor.use_local_butler(output_butler_root)
219 self.enterContext(local_butler)
220 executor.run(register_dataset_types=True)
221 output_butler = Butler.from_config(output_butler_root)
222 self.enterContext(output_butler)
223 ref = output_butler.find_dataset(
224 "dataset_auto1",
225 collections="out_chain",
226 dimension_records=True,
227 instrument="Cam1",
228 detector=1,
229 visit=1,
230 )
231 self.assertIsNotNone(ref)
232 assert ref is not None # For linters.
233 self.assertIsNotNone(output_butler.get(ref))
234 # Check that dimension records are present in the output Butler.
235 self.assertEqual(ref.dataId.records["visit"].science_program, "test_survey")
236 self.assertTrue(ref.dataId.records["visit_detector_region"].region.contains(0.004, 0.020))
238 def _test_pipeline_file(self, executor: SimplePipelineExecutor) -> None:
239 quanta = executor.run(register_dataset_types=True, save_versions=False)
240 self.assertEqual(len(quanta), 2)
241 self.assertEqual(
242 executor.butler.get("intermediate").storage_class, get_mock_name("StructuredDataDict")
243 )
244 self.assertEqual(executor.butler.get("output").storage_class, get_mock_name("StructuredDataDict"))
246 def test_partial_outputs_success(self):
247 """Test executing two quanta where the first raises
248 `lsst.pipe.base.AnnotatedPartialOutputsError` and its output is an
249 optional input to the second, while configuring the executor to
250 consider this a success.
251 """
252 config_a = DynamicTestPipelineTaskConfig()
253 config_a.inputs["i"] = DynamicConnectionConfig(
254 dataset_type_name="input", storage_class="StructuredDataDict", mock_storage_class=False
255 )
256 config_a.fail_exception = "lsst.pipe.base.AnnotatedPartialOutputsError"
257 config_a.fail_condition = "1=1" # butler query expression that is true
258 config_a.outputs["o"] = DynamicConnectionConfig(
259 dataset_type_name="intermediate", storage_class="StructuredDataDict"
260 )
261 config_b = DynamicTestPipelineTaskConfig()
262 config_b.inputs["i"] = DynamicConnectionConfig(
263 dataset_type_name="intermediate", storage_class="StructuredDataDict", minimum=0
264 )
265 config_b.outputs["o"] = DynamicConnectionConfig(
266 dataset_type_name="output", storage_class="StructuredDataDict"
267 )
268 pipeline_graph = PipelineGraph()
269 pipeline_graph.add_task("a", DynamicTestPipelineTask, config_a)
270 pipeline_graph.add_task("b", DynamicTestPipelineTask, config_b)
271 # Consider the partial a success and proceed.
272 executor = SimplePipelineExecutor.from_pipeline_graph(
273 pipeline_graph, butler=self.butler, raise_on_partial_outputs=False
274 )
275 (_, _) = executor.as_generator(register_dataset_types=True)
276 self.assertFalse(self.butler.exists("intermediate"))
277 self.assertEqual(self.butler.get("output").storage_class, get_mock_name("StructuredDataDict"))
278 prov = qpg.QuantumProvenanceGraph(self.butler, [executor.quantum_graph], read_caveats="exhaustive")
279 (quantum_key_a,) = prov.quanta["a"]
280 quantum_info_a = prov.get_quantum_info(quantum_key_a)
281 _, quantum_run_a = qpg.QuantumRun.find_final(quantum_info_a)
282 self.assertEqual(
283 quantum_run_a.caveats,
284 QuantumSuccessCaveats.ALL_OUTPUTS_MISSING
285 | QuantumSuccessCaveats.ANY_OUTPUTS_MISSING
286 | QuantumSuccessCaveats.PARTIAL_OUTPUTS_ERROR,
287 )
288 self.assertEqual(
289 quantum_run_a.exception.type_name,
290 "lsst.pipe.base.tests.mocks.MockAlgorithmError",
291 )
292 self.assertEqual(
293 quantum_run_a.exception.metadata,
294 {"badness": 12},
295 )
296 (quantum_key_b,) = prov.quanta["b"]
297 quantum_info_b = prov.get_quantum_info(quantum_key_b)
298 _, quantum_run_b = qpg.QuantumRun.find_final(quantum_info_b)
299 self.assertEqual(quantum_run_b.caveats, QuantumSuccessCaveats.NO_CAVEATS)
300 prov_summary = prov.to_summary(self.butler)
301 # One partial-outputs case, with an empty data ID:
302 self.assertEqual(prov_summary.tasks["a"].caveats, {"*P": [{}]})
303 self.assertEqual(
304 prov_summary.tasks["a"].exceptions.keys(), {"lsst.pipe.base.tests.mocks.MockAlgorithmError"}
305 )
306 self.assertEqual(
307 prov_summary.tasks["a"]
308 .exceptions["lsst.pipe.base.tests.mocks.MockAlgorithmError"][0]
309 .exception.metadata,
310 {"badness": 12},
311 )
312 # No caveats for the second task, since it didn't need the first task's
313 # output anyway.
314 self.assertEqual(prov_summary.tasks["b"].caveats, {})
315 self.assertEqual(prov_summary.tasks["b"].exceptions, {})
316 # Check table forms for summaries of the same information.
317 quantum_table = prov_summary.make_quantum_table()
318 self.assertEqual(list(quantum_table["Task"]), ["a", "b"])
319 self.assertEqual(list(quantum_table["Unknown"]), [0, 0])
320 self.assertEqual(list(quantum_table["Successful"]), [1, 1])
321 self.assertEqual(list(quantum_table["Caveats"]), ["*P(1)", ""])
322 self.assertEqual(list(quantum_table["Blocked"]), [0, 0])
323 self.assertEqual(list(quantum_table["Failed"]), [0, 0])
324 self.assertEqual(list(quantum_table["Wonky"]), [0, 0])
325 self.assertEqual(list(quantum_table["TOTAL"]), [1, 1])
326 self.assertEqual(list(quantum_table["EXPECTED"]), [1, 1])
327 dataset_table = prov_summary.make_dataset_table()
328 self.assertEqual(
329 list(dataset_table["Dataset"]),
330 ["intermediate", "a_metadata", "a_log", "output", "b_metadata", "b_log"],
331 )
332 self.assertEqual(list(dataset_table["Visible"]), [0, 1, 1, 1, 1, 1])
333 self.assertEqual(list(dataset_table["Shadowed"]), [0, 0, 0, 0, 0, 0])
334 self.assertEqual(list(dataset_table["Predicted Only"]), [1, 0, 0, 0, 0, 0])
335 self.assertEqual(list(dataset_table["Unsuccessful"]), [0, 0, 0, 0, 0, 0])
336 self.assertEqual(list(dataset_table["Cursed"]), [0, 0, 0, 0, 0, 0])
337 self.assertEqual(list(dataset_table["TOTAL"]), [1, 1, 1, 1, 1, 1])
338 self.assertEqual(list(dataset_table["EXPECTED"]), [1, 1, 1, 1, 1, 1])
339 exception_table = prov_summary.make_exception_table()
340 self.assertEqual(list(exception_table["Task"]), ["a"])
341 self.assertEqual(
342 list(exception_table["Exception"]), ["lsst.pipe.base.tests.mocks.MockAlgorithmError"]
343 )
344 self.assertEqual(list(exception_table["Count"]), [1])
345 bad_quantum_tables = prov_summary.make_bad_quantum_tables()
346 self.assertEqual(bad_quantum_tables.keys(), {"a"})
347 self.assertEqual(list(bad_quantum_tables["a"]["Status(Caveats)"]), ["SUCCESSFUL(P)"])
348 self.assertEqual(list(bad_quantum_tables["a"]["Exception"]), ["MockAlgorithmError"])
349 self.assertFalse(prov_summary.make_bad_dataset_tables())
351 def test_no_work_found(self):
352 """Test executing two quanta where the first raises
353 `NoWorkFound` in `runQuantum`, leading the next to raise `NoWorkFound`
354 in `adjustQuantum`.
355 """
356 config_a = DynamicTestPipelineTaskConfig()
357 config_a.inputs["i"] = DynamicConnectionConfig(
358 dataset_type_name="input", storage_class="StructuredDataDict", mock_storage_class=False
359 )
360 config_a.fail_exception = "lsst.pipe.base.NoWorkFound"
361 config_a.fail_condition = "1=1" # butler query expression that is true
362 config_a.outputs["o"] = DynamicConnectionConfig(
363 dataset_type_name="intermediate", storage_class="StructuredDataDict"
364 )
365 config_b = DynamicTestPipelineTaskConfig()
366 config_b.inputs["i"] = DynamicConnectionConfig(
367 dataset_type_name="intermediate", storage_class="StructuredDataDict"
368 )
369 config_b.outputs["o"] = DynamicConnectionConfig(
370 dataset_type_name="output", storage_class="StructuredDataDict"
371 )
372 pipeline_graph = PipelineGraph()
373 pipeline_graph.add_task("a", DynamicTestPipelineTask, config_a)
374 pipeline_graph.add_task("b", DynamicTestPipelineTask, config_b)
375 # Consider the partial a success and proceed.
376 executor = SimplePipelineExecutor.from_pipeline_graph(
377 pipeline_graph, butler=self.butler, raise_on_partial_outputs=False
378 )
379 (_, _) = executor.as_generator(register_dataset_types=True)
380 prov = qpg.QuantumProvenanceGraph()
381 prov.assemble_quantum_provenance_graph(self.butler, [executor.quantum_graph])
382 (quantum_key_a,) = prov.quanta["a"]
383 quantum_info_a = prov.get_quantum_info(quantum_key_a)
384 _, quantum_run_a = qpg.QuantumRun.find_final(quantum_info_a)
385 self.assertEqual(
386 quantum_run_a.caveats,
387 QuantumSuccessCaveats.ALL_OUTPUTS_MISSING
388 | QuantumSuccessCaveats.ANY_OUTPUTS_MISSING
389 | QuantumSuccessCaveats.NO_WORK,
390 )
391 (quantum_key_b,) = prov.quanta["b"]
392 quantum_info_b = prov.get_quantum_info(quantum_key_b)
393 _, quantum_run_b = qpg.QuantumRun.find_final(quantum_info_b)
394 self.assertEqual(
395 quantum_run_b.caveats,
396 QuantumSuccessCaveats.ALL_OUTPUTS_MISSING
397 | QuantumSuccessCaveats.ANY_OUTPUTS_MISSING
398 | QuantumSuccessCaveats.ADJUST_QUANTUM_RAISED
399 | QuantumSuccessCaveats.NO_WORK,
400 )
401 prov_summary = prov.to_summary(self.butler)
402 # One NoWorkFound, raised by runQuantum, with an empty data ID:
403 self.assertEqual(prov_summary.tasks["a"].caveats, {"*N": [{}]})
404 self.assertEqual(prov_summary.tasks["a"].exceptions, {})
405 # One NoWorkFound, raised by adjustQuantum, with an empty data ID.
406 self.assertEqual(prov_summary.tasks["b"].caveats, {"*A": [{}]})
407 self.assertEqual(prov_summary.tasks["b"].exceptions, {})
408 # Check table forms for summaries of the same information.
409 quantum_table = prov_summary.make_quantum_table()
410 self.assertEqual(list(quantum_table["Task"]), ["a", "b"])
411 self.assertEqual(list(quantum_table["Unknown"]), [0, 0])
412 self.assertEqual(list(quantum_table["Successful"]), [1, 1])
413 self.assertEqual(list(quantum_table["Caveats"]), ["*N(1)", "*A(1)"])
414 self.assertEqual(list(quantum_table["Blocked"]), [0, 0])
415 self.assertEqual(list(quantum_table["Failed"]), [0, 0])
416 self.assertEqual(list(quantum_table["Wonky"]), [0, 0])
417 self.assertEqual(list(quantum_table["TOTAL"]), [1, 1])
418 self.assertEqual(list(quantum_table["EXPECTED"]), [1, 1])
419 dataset_table = prov_summary.make_dataset_table()
420 self.assertEqual(
421 list(dataset_table["Dataset"]),
422 ["intermediate", "a_metadata", "a_log", "output", "b_metadata", "b_log"],
423 )
424 self.assertEqual(list(dataset_table["Visible"]), [0, 1, 1, 0, 1, 1])
425 self.assertEqual(list(dataset_table["Shadowed"]), [0, 0, 0, 0, 0, 0])
426 self.assertEqual(list(dataset_table["Predicted Only"]), [1, 0, 0, 1, 0, 0])
427 self.assertEqual(list(dataset_table["Unsuccessful"]), [0, 0, 0, 0, 0, 0])
428 self.assertEqual(list(dataset_table["Cursed"]), [0, 0, 0, 0, 0, 0])
429 self.assertEqual(list(dataset_table["TOTAL"]), [1, 1, 1, 1, 1, 1])
430 self.assertEqual(list(dataset_table["EXPECTED"]), [1, 1, 1, 1, 1, 1])
431 self.assertFalse(prov_summary.make_exception_table())
432 self.assertFalse(prov_summary.make_bad_quantum_tables())
433 self.assertFalse(prov_summary.make_bad_dataset_tables())
435 def test_partial_outputs_failure(self):
436 """Test executing two quanta where the first raises
437 `lsst.pipe.base.AnnotatedPartialOutputsError` and its output is an
438 optional input to the second, while configuring the executor to
439 consider this a failure.
440 """
441 config_a = DynamicTestPipelineTaskConfig()
442 config_a.inputs["i"] = DynamicConnectionConfig(
443 dataset_type_name="input", storage_class="StructuredDataDict", mock_storage_class=False
444 )
445 config_a.fail_exception = "lsst.pipe.base.AnnotatedPartialOutputsError"
446 config_a.fail_condition = "1=1" # butler query expression that is true
447 config_a.outputs["o"] = DynamicConnectionConfig(
448 dataset_type_name="intermediate", storage_class="StructuredDataDict"
449 )
450 config_b = DynamicTestPipelineTaskConfig()
451 config_b.inputs["i"] = DynamicConnectionConfig(
452 dataset_type_name="intermediate", storage_class="StructuredDataDict", minimum=0
453 )
454 config_b.outputs["o"] = DynamicConnectionConfig(
455 dataset_type_name="output", storage_class="StructuredDataDict"
456 )
457 pipeline_graph = PipelineGraph()
458 pipeline_graph.add_task("a", DynamicTestPipelineTask, config_a)
459 pipeline_graph.add_task("b", DynamicTestPipelineTask, config_b)
460 executor = SimplePipelineExecutor.from_pipeline_graph(
461 pipeline_graph,
462 butler=self.butler,
463 raise_on_partial_outputs=True,
464 )
465 # The executor should raise the chained exception
466 # (RepeatableQuantumError, since that's what the mocking system in
467 # pipe_base uses here), not AnnotatedPartialOutputsError.
468 with self.assertRaises(RepeatableQuantumError):
469 executor.run(register_dataset_types=True)
470 self.assertFalse(self.butler.exists("intermediate"))
471 self.assertFalse(self.butler.exists("output"))
472 prov = qpg.QuantumProvenanceGraph()
473 prov.assemble_quantum_provenance_graph(self.butler, [executor.quantum_graph])
474 (quantum_key_a,) = prov.quanta["a"]
475 quantum_info_a = prov.get_quantum_info(quantum_key_a)
476 _, quantum_run_a = qpg.QuantumRun.find_final(quantum_info_a)
477 self.assertEqual(quantum_run_a.status, qpg.QuantumRunStatus.FAILED)
478 self.assertIsNone(quantum_run_a.caveats)
479 self.assertIsNone(quantum_run_a.exception)
480 (quantum_key_b,) = prov.quanta["b"]
481 quantum_info_b = prov.get_quantum_info(quantum_key_b)
482 self.assertEqual(quantum_info_b["status"], qpg.QuantumInfoStatus.BLOCKED)
483 prov_summary = prov.to_summary(self.butler)
484 # One partial-outputs failure case for the first task.
485 self.assertEqual(prov_summary.tasks["a"].n_failed, 1)
486 # No direct failures, but one blocked for the second
487 self.assertEqual(prov_summary.tasks["b"].n_failed, 0)
488 self.assertEqual(prov_summary.tasks["b"].n_blocked, 1)
489 # Check table forms for summaries of the same information.
490 quantum_table = prov_summary.make_quantum_table()
491 self.assertEqual(list(quantum_table["Task"]), ["a", "b"])
492 self.assertEqual(list(quantum_table["Unknown"]), [0, 0])
493 self.assertEqual(list(quantum_table["Successful"]), [0, 0])
494 self.assertEqual(list(quantum_table["Caveats"]), ["", ""])
495 self.assertEqual(list(quantum_table["Blocked"]), [0, 1])
496 self.assertEqual(list(quantum_table["Failed"]), [1, 0])
497 self.assertEqual(list(quantum_table["Wonky"]), [0, 0])
498 self.assertEqual(list(quantum_table["TOTAL"]), [1, 1])
499 self.assertEqual(list(quantum_table["EXPECTED"]), [1, 1])
500 dataset_table = prov_summary.make_dataset_table()
501 self.assertEqual(
502 list(dataset_table["Dataset"]),
503 ["intermediate", "a_metadata", "a_log", "output", "b_metadata", "b_log"],
504 )
505 # Note that a_log is UNSUCCESSFUL, not VISIBLE, despite being present
506 # in butler because those categories are mutually exclusive and we
507 # don't want to consider any outputs of failed quanta to be successful.
508 self.assertEqual(list(dataset_table["Visible"]), [0, 0, 0, 0, 0, 0])
509 self.assertEqual(list(dataset_table["Shadowed"]), [0, 0, 0, 0, 0, 0])
510 self.assertEqual(list(dataset_table["Predicted Only"]), [0, 0, 0, 0, 0, 0])
511 self.assertEqual(list(dataset_table["Unsuccessful"]), [1, 1, 1, 1, 1, 1])
512 self.assertEqual(list(dataset_table["Cursed"]), [0, 0, 0, 0, 0, 0])
513 self.assertEqual(list(dataset_table["TOTAL"]), [1, 1, 1, 1, 1, 1])
514 self.assertEqual(list(dataset_table["EXPECTED"]), [1, 1, 1, 1, 1, 1])
515 self.assertFalse(prov_summary.make_exception_table())
516 bad_quantum_tables = prov_summary.make_bad_quantum_tables()
517 self.assertEqual(bad_quantum_tables.keys(), {"a"})
518 self.assertEqual(list(bad_quantum_tables["a"]["Status(Caveats)"]), ["FAILED"])
519 self.assertEqual(list(bad_quantum_tables["a"]["Exception"]), [""])
520 self.assertFalse(prov_summary.make_bad_dataset_tables())
522 def test_existence_check_skips(self):
523 """Test that pre-execution existence checks are not performed for
524 overall-input datasets, as this those checks could otherwise mask
525 repository configuration problems or downtime as NoWorkFound cases.
526 """
527 # First we configure and execute task A, which is just a way to get a
528 # MockDataset in the repo for us to play with; the important test can't
529 # use the non-mock 'input' dataset because the mock runQuantum only
530 # actually reads MockDatasets.
531 config_a = DynamicTestPipelineTaskConfig()
532 config_a.inputs["i"] = DynamicConnectionConfig(
533 dataset_type_name="input", storage_class="StructuredDataDict", mock_storage_class=False
534 )
535 config_a.outputs["o"] = DynamicConnectionConfig(
536 dataset_type_name="intermediate", storage_class="StructuredDataDict"
537 )
538 executor_a = SimplePipelineExecutor.from_task_class(
539 DynamicTestPipelineTask,
540 config=config_a,
541 butler=self.butler,
542 label="a",
543 )
544 executor_a.run(register_dataset_types=True)
545 # Now we can do the real test.
546 config_b = DynamicTestPipelineTaskConfig()
547 config_b.inputs["i"] = DynamicConnectionConfig(
548 dataset_type_name="intermediate", storage_class="StructuredDataDict", minimum=0
549 )
550 config_b.outputs["o"] = DynamicConnectionConfig(
551 dataset_type_name="output", storage_class="StructuredDataDict"
552 )
553 butler = self.butler.clone(run="new_run")
554 executor_b = SimplePipelineExecutor.from_task_class(
555 DynamicTestPipelineTask,
556 config=config_b,
557 butler=butler,
558 label="b",
559 attach_datastore_records=True,
560 )
561 # Delete the input dataset after the QG has already been built.
562 intermediate_refs = butler.query_datasets("intermediate")
563 self.assertEqual(len(intermediate_refs), 1)
564 butler.pruneDatasets(intermediate_refs, purge=True, unstore=True)
565 with self.assertRaises(FileNotFoundError):
566 # We should get an exception rather than NoWorkFound, because for
567 # this single-task pipeline, the missing dataset is an
568 # overall-input (name notwithstanding).
569 executor_b.run(register_dataset_types=True)
572class MemoryTester(lsst.utils.tests.MemoryTestCase):
573 """Generic tests for file leaks."""
576def setup_module(module):
577 """Set up the module for pytest.
579 Parameters
580 ----------
581 module : `~types.ModuleType`
582 Module to set up.
583 """
584 lsst.utils.tests.init()
587if __name__ == "__main__":
588 lsst.utils.tests.init()
589 unittest.main()