Coverage for tests / test_separable_pipeline_executor.py: 11%
728 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-26 08:59 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-26 08:59 +0000
1# This file is part of pipe_base.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <https://www.gnu.org/licenses/>.
29import os
30import tempfile
31import unittest
33import lsst.daf.butler
34import lsst.daf.butler.tests as butlerTests
35import lsst.pex.config
36import lsst.utils.tests
37from lsst.daf.butler.registry import RegistryDefaults
38from lsst.pipe.base import (
39 Instrument,
40 Pipeline,
41 PipelineGraph,
42 QuantumAttemptStatus,
43 QuantumGraph,
44 QuantumSuccessCaveats,
45 TaskMetadata,
46)
47from lsst.pipe.base.all_dimensions_quantum_graph_builder import AllDimensionsQuantumGraphBuilder
48from lsst.pipe.base.automatic_connection_constants import (
49 PACKAGES_INIT_OUTPUT_NAME,
50 PROVENANCE_DATASET_TYPE_NAME,
51 PROVENANCE_STORAGE_CLASS,
52)
53from lsst.pipe.base.mp_graph_executor import MPGraphExecutorError
54from lsst.pipe.base.quantum_graph import ProvenanceQuantumGraph
55from lsst.pipe.base.quantum_graph_builder import OutputExistsError
56from lsst.pipe.base.separable_pipeline_executor import SeparablePipelineExecutor
57from lsst.pipe.base.tests.mocks import (
58 DirectButlerRepo,
59 DynamicTestPipelineTaskConfig,
60)
61from lsst.resources import ResourcePath
62from lsst.utils.packages import Packages
64TESTDIR = os.path.abspath(os.path.dirname(__file__))
67class SeparablePipelineExecutorTests(lsst.utils.tests.TestCase):
68 """Test the SeparablePipelineExecutor API with a trivial task."""
70 pipeline_file = os.path.join(TESTDIR, "pipelines", "pipeline_separable.yaml")
72 def setUp(self):
73 repodir = tempfile.TemporaryDirectory()
74 # TemporaryDirectory warns on leaks; addCleanup also keeps it from
75 # getting garbage-collected.
76 self.addCleanup(tempfile.TemporaryDirectory.cleanup, repodir)
78 # standalone parameter forces the returned config to also include
79 # the information from the search paths.
80 config = lsst.daf.butler.Butler.makeRepo(
81 repodir.name, standalone=True, searchPaths=[os.path.join(TESTDIR, "config")]
82 )
83 butler = lsst.daf.butler.Butler.from_config(config, writeable=True)
84 self.enterContext(butler)
85 output = "fake"
86 output_run = f"{output}/{Instrument.makeCollectionTimestamp()}"
87 butler.registry.registerCollection(output_run, lsst.daf.butler.CollectionType.RUN)
88 butler.registry.registerCollection(output, lsst.daf.butler.CollectionType.CHAINED)
89 butler.registry.setCollectionChain(output, [output_run])
90 butler.registry.defaults = RegistryDefaults(collections=[output], run=output_run)
91 self.butler = butler
93 butlerTests.addDatasetType(self.butler, "input", set(), "StructuredDataDict")
94 butlerTests.addDatasetType(self.butler, "intermediate", set(), "StructuredDataDict")
95 butlerTests.addDatasetType(self.butler, "a_log", set(), "ButlerLogRecords")
96 butlerTests.addDatasetType(self.butler, "a_metadata", set(), "TaskMetadata")
97 butlerTests.addDatasetType(self.butler, "a_config", set(), "Config")
98 provenance_dataset_type = butlerTests.addDatasetType(
99 self.butler, PROVENANCE_DATASET_TYPE_NAME, set(), PROVENANCE_STORAGE_CLASS
100 )
101 self.provenance_ref = lsst.daf.butler.DatasetRef(
102 provenance_dataset_type,
103 lsst.daf.butler.DataCoordinate.make_empty(self.butler.dimensions),
104 run=butler.run,
105 )
107 def build_empty_quantum_graph(self) -> None:
108 pipeline_graph = PipelineGraph(universe=self.butler.dimensions)
109 pipeline_graph.resolve(self.butler.registry)
110 builder = AllDimensionsQuantumGraphBuilder(pipeline_graph, self.butler)
111 return builder.finish(attach_datastore_records=False).assemble()
113 def check_provenance_fullgraph(self):
114 provenance_qg = self.butler.get(self.provenance_ref)
115 empty_data_id = lsst.daf.butler.DataCoordinate.make_empty(self.butler.dimensions)
116 self.assertCountEqual(provenance_qg.quanta_by_task.keys(), {"a", "b"})
117 self.assertCountEqual(
118 provenance_qg.datasets_by_type.keys(),
119 {
120 "input",
121 "intermediate",
122 "output",
123 "a_config",
124 "b_config",
125 "a_metadata",
126 "b_metadata",
127 "a_log",
128 "b_log",
129 },
130 )
131 a_id = provenance_qg.quanta_by_task["a"][empty_data_id]
132 b_id = provenance_qg.quanta_by_task["b"][empty_data_id]
133 input_id = provenance_qg.datasets_by_type["input"][empty_data_id]
134 intermediate_id = provenance_qg.datasets_by_type["intermediate"][empty_data_id]
135 output_id = provenance_qg.datasets_by_type["output"][empty_data_id]
136 a_metadata_id = provenance_qg.datasets_by_type["a_metadata"][empty_data_id]
137 a_log_id = provenance_qg.datasets_by_type["a_log"][empty_data_id]
138 b_metadata_id = provenance_qg.datasets_by_type["b_metadata"][empty_data_id]
139 b_log_id = provenance_qg.datasets_by_type["b_log"][empty_data_id]
140 self.assertEqual(
141 provenance_qg.bipartite_xgraph.nodes[a_id]["status"], QuantumAttemptStatus.SUCCESSFUL
142 )
143 self.assertEqual(
144 provenance_qg.bipartite_xgraph.nodes[b_id]["status"], QuantumAttemptStatus.SUCCESSFUL
145 )
146 self.assertEqual(list(provenance_qg.bipartite_xgraph.predecessors(a_id)), [input_id])
147 self.assertEqual(
148 list(provenance_qg.bipartite_xgraph.successors(a_id)), [intermediate_id, a_metadata_id, a_log_id]
149 )
150 self.assertEqual(list(provenance_qg.bipartite_xgraph.predecessors(b_id)), [intermediate_id])
151 self.assertEqual(
152 list(provenance_qg.bipartite_xgraph.successors(b_id)), [output_id, b_metadata_id, b_log_id]
153 )
154 for datasets_by_data_id in provenance_qg.datasets_by_type.values():
155 for dataset_id in datasets_by_data_id.values():
156 self.assertTrue(provenance_qg.bipartite_xgraph.nodes[dataset_id]["produced"])
157 logs_pqg = self.butler.get(
158 self.provenance_ref.makeComponentRef("logs"),
159 parameters={"quanta": (a_id,), "datasets": (b_log_id,)},
160 )
161 self.assertEqual(list(logs_pqg[a_id][-1]), list(self.butler.get("a_log", empty_data_id)))
162 self.assertEqual(list(logs_pqg[b_log_id][-1]), list(self.butler.get("b_log", empty_data_id)))
163 metadata_pqg = self.butler.get(
164 self.provenance_ref.makeComponentRef("metadata"),
165 parameters={"quanta": (b_id,), "datasets": (a_metadata_id,)},
166 )
167 self.assertEqual(metadata_pqg[a_metadata_id][-1], self.butler.get("a_metadata", empty_data_id))
168 self.assertEqual(metadata_pqg[b_id][-1], self.butler.get("b_metadata", empty_data_id))
169 self.assertIsInstance(self.butler.get("run_provenance.packages"), Packages)
171 def check_provenance_emptygraph(self):
172 provenance_qg = self.butler.get(self.provenance_ref)
173 self.assertFalse(provenance_qg.bipartite_xgraph)
174 self.assertFalse(any(provenance_qg.quanta_by_task.values()))
175 self.assertFalse(any(provenance_qg.datasets_by_type.values()))
176 self.assertIsInstance(self.butler.get("run_provenance.packages"), Packages)
178 def test_pre_execute_qgraph_old(self):
179 # Too hard to make a quantum graph from scratch.
180 executor = SeparablePipelineExecutor(self.butler)
181 pipeline = Pipeline.fromFile(self.pipeline_file)
182 self.butler.put({"zero": 0}, "input")
183 graph = executor.make_quantum_graph(pipeline)
185 butlerTests.addDatasetType(self.butler, "output", set(), "StructuredDataDict")
186 butlerTests.addDatasetType(self.butler, "b_config", set(), "Config")
187 butlerTests.addDatasetType(self.butler, "b_log", set(), "ButlerLogRecords")
188 butlerTests.addDatasetType(self.butler, "b_metadata", set(), "TaskMetadata")
189 butlerTests.addDatasetType(self.butler, PACKAGES_INIT_OUTPUT_NAME, set(), "Packages")
191 executor.pre_execute_qgraph(
192 graph,
193 register_dataset_types=False,
194 save_init_outputs=False,
195 save_versions=False,
196 )
197 self.assertFalse(self.butler.exists("a_config", {}, collections=[self.butler.run]))
198 self.assertFalse(self.butler.exists(PACKAGES_INIT_OUTPUT_NAME, {}))
200 def test_pre_execute_qgraph(self):
201 # Too hard to make a quantum graph from scratch.
202 executor = SeparablePipelineExecutor(self.butler)
203 pipeline = Pipeline.fromFile(self.pipeline_file)
204 self.butler.put({"zero": 0}, "input")
205 graph = executor.build_quantum_graph(pipeline)
207 butlerTests.addDatasetType(self.butler, "output", set(), "StructuredDataDict")
208 butlerTests.addDatasetType(self.butler, "b_config", set(), "Config")
209 butlerTests.addDatasetType(self.butler, "b_log", set(), "ButlerLogRecords")
210 butlerTests.addDatasetType(self.butler, "b_metadata", set(), "TaskMetadata")
211 butlerTests.addDatasetType(self.butler, PACKAGES_INIT_OUTPUT_NAME, set(), "Packages")
213 executor.pre_execute_qgraph(
214 graph,
215 register_dataset_types=False,
216 save_init_outputs=False,
217 save_versions=False,
218 )
219 self.assertFalse(self.butler.exists("a_config", {}, collections=[self.butler.run]))
220 self.assertFalse(self.butler.exists(PACKAGES_INIT_OUTPUT_NAME, {}))
222 def test_pre_execute_qgraph_unconnected_old(self):
223 # Unconnected graph; see
224 # test_make_quantum_graph_nowhere_skippartial_clobber.
225 executor = SeparablePipelineExecutor(
226 self.butler,
227 skip_existing_in=[self.butler.run],
228 clobber_output=True,
229 )
230 pipeline = Pipeline.fromFile(self.pipeline_file)
231 self.butler.put({"zero": 0}, "input")
232 self.butler.put({"zero": 0}, "intermediate")
233 graph = executor.make_quantum_graph(pipeline)
235 butlerTests.addDatasetType(self.butler, "output", set(), "StructuredDataDict")
236 butlerTests.addDatasetType(self.butler, "b_config", set(), "Config")
237 butlerTests.addDatasetType(self.butler, "b_log", set(), "ButlerLogRecords")
238 butlerTests.addDatasetType(self.butler, "b_metadata", set(), "TaskMetadata")
239 butlerTests.addDatasetType(self.butler, PACKAGES_INIT_OUTPUT_NAME, set(), "Packages")
241 executor.pre_execute_qgraph(
242 graph,
243 register_dataset_types=False,
244 save_init_outputs=False,
245 save_versions=False,
246 )
247 self.assertFalse(self.butler.exists("a_config", {}, collections=[self.butler.run]))
248 self.assertFalse(self.butler.exists(PACKAGES_INIT_OUTPUT_NAME, {}))
250 def test_pre_execute_qgraph_unconnected(self):
251 # Unconnected graph; see
252 # test_make_quantum_graph_nowhere_skippartial_clobber.
253 executor = SeparablePipelineExecutor(
254 self.butler,
255 skip_existing_in=[self.butler.run],
256 clobber_output=True,
257 )
258 pipeline = Pipeline.fromFile(self.pipeline_file)
259 self.butler.put({"zero": 0}, "input")
260 self.butler.put({"zero": 0}, "intermediate")
261 graph = executor.build_quantum_graph(pipeline)
263 butlerTests.addDatasetType(self.butler, "output", set(), "StructuredDataDict")
264 butlerTests.addDatasetType(self.butler, "b_config", set(), "Config")
265 butlerTests.addDatasetType(self.butler, "b_log", set(), "ButlerLogRecords")
266 butlerTests.addDatasetType(self.butler, "b_metadata", set(), "TaskMetadata")
267 butlerTests.addDatasetType(self.butler, PACKAGES_INIT_OUTPUT_NAME, set(), "Packages")
269 executor.pre_execute_qgraph(
270 graph,
271 register_dataset_types=False,
272 save_init_outputs=False,
273 save_versions=False,
274 )
275 self.assertFalse(self.butler.exists("a_config", {}, collections=[self.butler.run]))
276 self.assertFalse(self.butler.exists(PACKAGES_INIT_OUTPUT_NAME, {}))
278 def test_pre_execute_qgraph_empty_old(self):
279 executor = SeparablePipelineExecutor(self.butler)
280 graph = QuantumGraph({}, universe=self.butler.dimensions)
282 butlerTests.addDatasetType(self.butler, "output", set(), "StructuredDataDict")
283 butlerTests.addDatasetType(self.butler, "b_config", set(), "Config")
284 butlerTests.addDatasetType(self.butler, "b_log", set(), "ButlerLogRecords")
285 butlerTests.addDatasetType(self.butler, "b_metadata", set(), "TaskMetadata")
286 butlerTests.addDatasetType(self.butler, PACKAGES_INIT_OUTPUT_NAME, set(), "Packages")
288 executor.pre_execute_qgraph(
289 graph,
290 register_dataset_types=False,
291 save_init_outputs=False,
292 save_versions=False,
293 )
294 self.assertFalse(self.butler.exists("a_config", {}, collections=[self.butler.run]))
295 self.assertFalse(self.butler.exists(PACKAGES_INIT_OUTPUT_NAME, {}))
297 def test_pre_execute_qgraph_empty(self):
298 executor = SeparablePipelineExecutor(self.butler)
299 graph = self.build_empty_quantum_graph()
301 butlerTests.addDatasetType(self.butler, "output", set(), "StructuredDataDict")
302 butlerTests.addDatasetType(self.butler, "b_config", set(), "Config")
303 butlerTests.addDatasetType(self.butler, "b_log", set(), "ButlerLogRecords")
304 butlerTests.addDatasetType(self.butler, "b_metadata", set(), "TaskMetadata")
305 butlerTests.addDatasetType(self.butler, PACKAGES_INIT_OUTPUT_NAME, set(), "Packages")
307 executor.pre_execute_qgraph(
308 graph,
309 register_dataset_types=False,
310 save_init_outputs=False,
311 save_versions=False,
312 )
313 self.assertFalse(self.butler.exists("a_config", {}, collections=[self.butler.run]))
314 self.assertFalse(self.butler.exists(PACKAGES_INIT_OUTPUT_NAME, {}))
316 def test_pre_execute_qgraph_register_old(self):
317 executor = SeparablePipelineExecutor(self.butler)
318 pipeline = Pipeline.fromFile(self.pipeline_file)
319 self.butler.put({"zero": 0}, "input")
320 graph = executor.make_quantum_graph(pipeline)
322 executor.pre_execute_qgraph(
323 graph,
324 register_dataset_types=True,
325 save_init_outputs=False,
326 save_versions=False,
327 )
328 self.assertEqual({d.name for d in self.butler.registry.queryDatasetTypes("output")}, {"output"})
329 self.assertEqual(
330 {d.name for d in self.butler.registry.queryDatasetTypes("b_*")},
331 {"b_config", "b_log", "b_metadata"},
332 )
333 self.assertFalse(self.butler.exists("a_config", {}, collections=[self.butler.run]))
334 self.assertFalse(self.butler.exists(PACKAGES_INIT_OUTPUT_NAME, {}))
336 def test_pre_execute_qgraph_register(self):
337 executor = SeparablePipelineExecutor(self.butler)
338 pipeline = Pipeline.fromFile(self.pipeline_file)
339 self.butler.put({"zero": 0}, "input")
340 graph = executor.build_quantum_graph(pipeline)
342 executor.pre_execute_qgraph(
343 graph,
344 register_dataset_types=True,
345 save_init_outputs=False,
346 save_versions=False,
347 )
348 self.assertEqual({d.name for d in self.butler.registry.queryDatasetTypes("output")}, {"output"})
349 self.assertEqual(
350 {d.name for d in self.butler.registry.queryDatasetTypes("b_*")},
351 {"b_config", "b_log", "b_metadata"},
352 )
353 self.assertFalse(self.butler.exists("a_config", {}, collections=[self.butler.run]))
354 self.assertFalse(self.butler.exists(PACKAGES_INIT_OUTPUT_NAME, {}))
356 def test_pre_execute_qgraph_init_outputs_old(self):
357 # Too hard to make a quantum graph from scratch.
358 executor = SeparablePipelineExecutor(self.butler)
359 pipeline = Pipeline.fromFile(self.pipeline_file)
360 self.butler.put({"zero": 0}, "input")
361 graph = executor.make_quantum_graph(pipeline)
363 butlerTests.addDatasetType(self.butler, "output", set(), "StructuredDataDict")
364 butlerTests.addDatasetType(self.butler, "b_config", set(), "Config")
365 butlerTests.addDatasetType(self.butler, "b_log", set(), "ButlerLogRecords")
366 butlerTests.addDatasetType(self.butler, "b_metadata", set(), "TaskMetadata")
367 butlerTests.addDatasetType(self.butler, PACKAGES_INIT_OUTPUT_NAME, set(), "Packages")
369 executor.pre_execute_qgraph(
370 graph,
371 register_dataset_types=False,
372 save_init_outputs=True,
373 save_versions=False,
374 )
375 self.assertTrue(self.butler.exists("a_config", {}, collections=[self.butler.run]))
376 self.assertFalse(self.butler.exists(PACKAGES_INIT_OUTPUT_NAME, {}))
378 def test_pre_execute_qgraph_init_outputs(self):
379 # Too hard to make a quantum graph from scratch.
380 executor = SeparablePipelineExecutor(self.butler)
381 pipeline = Pipeline.fromFile(self.pipeline_file)
382 self.butler.put({"zero": 0}, "input")
383 graph = executor.build_quantum_graph(pipeline)
385 butlerTests.addDatasetType(self.butler, "output", set(), "StructuredDataDict")
386 butlerTests.addDatasetType(self.butler, "b_config", set(), "Config")
387 butlerTests.addDatasetType(self.butler, "b_log", set(), "ButlerLogRecords")
388 butlerTests.addDatasetType(self.butler, "b_metadata", set(), "TaskMetadata")
389 butlerTests.addDatasetType(self.butler, PACKAGES_INIT_OUTPUT_NAME, set(), "Packages")
391 executor.pre_execute_qgraph(
392 graph,
393 register_dataset_types=False,
394 save_init_outputs=True,
395 save_versions=False,
396 )
397 self.assertTrue(self.butler.exists("a_config", {}, collections=[self.butler.run]))
398 self.assertFalse(self.butler.exists(PACKAGES_INIT_OUTPUT_NAME, {}))
400 def test_pre_execute_qgraph_versions_old(self):
401 executor = SeparablePipelineExecutor(self.butler)
402 pipeline = Pipeline.fromFile(self.pipeline_file)
403 self.butler.put({"zero": 0}, "input")
404 graph = executor.make_quantum_graph(pipeline)
406 butlerTests.addDatasetType(self.butler, "output", set(), "StructuredDataDict")
407 butlerTests.addDatasetType(self.butler, "b_config", set(), "Config")
408 butlerTests.addDatasetType(self.butler, "b_log", set(), "ButlerLogRecords")
409 butlerTests.addDatasetType(self.butler, "b_metadata", set(), "TaskMetadata")
410 butlerTests.addDatasetType(self.butler, PACKAGES_INIT_OUTPUT_NAME, set(), "Packages")
412 executor.pre_execute_qgraph(
413 graph,
414 register_dataset_types=False,
415 save_init_outputs=True,
416 save_versions=True,
417 )
418 self.assertTrue(self.butler.exists("a_config", {}, collections=[self.butler.run]))
419 self.assertTrue(self.butler.exists(PACKAGES_INIT_OUTPUT_NAME, {}))
421 def test_pre_execute_qgraph_versions(self):
422 executor = SeparablePipelineExecutor(self.butler)
423 pipeline = Pipeline.fromFile(self.pipeline_file)
424 self.butler.put({"zero": 0}, "input")
425 graph = executor.build_quantum_graph(pipeline)
427 butlerTests.addDatasetType(self.butler, "output", set(), "StructuredDataDict")
428 butlerTests.addDatasetType(self.butler, "b_config", set(), "Config")
429 butlerTests.addDatasetType(self.butler, "b_log", set(), "ButlerLogRecords")
430 butlerTests.addDatasetType(self.butler, "b_metadata", set(), "TaskMetadata")
431 butlerTests.addDatasetType(self.butler, PACKAGES_INIT_OUTPUT_NAME, set(), "Packages")
433 executor.pre_execute_qgraph(
434 graph,
435 register_dataset_types=False,
436 save_init_outputs=True,
437 save_versions=True,
438 )
439 self.assertTrue(self.butler.exists("a_config", {}, collections=[self.butler.run]))
440 self.assertTrue(self.butler.exists(PACKAGES_INIT_OUTPUT_NAME, {}))
442 def test_init_badinput(self):
443 with lsst.daf.butler.Butler.from_config(butler=self.butler, collections=[], run="foo") as butler:
444 with self.assertRaises(ValueError):
445 SeparablePipelineExecutor(butler)
447 def test_init_badoutput(self):
448 with lsst.daf.butler.Butler.from_config(butler=self.butler, collections=["foo"]) as butler:
449 with self.assertRaises(ValueError):
450 SeparablePipelineExecutor(butler)
452 def test_make_pipeline_full(self):
453 executor = SeparablePipelineExecutor(self.butler)
454 for uri in [
455 self.pipeline_file,
456 ResourcePath(self.pipeline_file),
457 ResourcePath(self.pipeline_file).geturl(),
458 ]:
459 pipeline_graph = executor.make_pipeline(uri).to_graph()
460 self.assertEqual(set(pipeline_graph.tasks), {"a", "b"})
462 def test_make_pipeline_subset(self):
463 executor = SeparablePipelineExecutor(self.butler)
464 path = self.pipeline_file + "#a"
465 for uri in [
466 path,
467 ResourcePath(path),
468 ResourcePath(path).geturl(),
469 ]:
470 pipeline_graph = executor.make_pipeline(uri).to_graph()
471 self.assertEqual(set(pipeline_graph.tasks), {"a"})
473 def test_build_quantum_graph_nowhere_noskip_noclobber(self):
474 executor = SeparablePipelineExecutor(self.butler, skip_existing_in=None, clobber_output=False)
475 pipeline = Pipeline.fromFile(self.pipeline_file)
477 self.butler.put({"zero": 0}, "input")
479 graph = executor.make_quantum_graph(pipeline)
480 self.assertTrue(graph.isConnected)
481 self.assertEqual(len(graph), 2)
482 self.assertEqual({q.taskDef.label for q in graph.inputQuanta}, {"a"})
483 self.assertEqual({q.taskDef.label for q in graph.outputQuanta}, {"b"})
485 def test_make_quantum_graph_nowhere_noskip_noclobber(self):
486 executor = SeparablePipelineExecutor(self.butler, skip_existing_in=None, clobber_output=False)
487 pipeline = Pipeline.fromFile(self.pipeline_file)
489 self.butler.put({"zero": 0}, "input")
491 graph = executor.build_quantum_graph(pipeline)
492 self.assertEqual(len(graph), 2)
493 self.assertEqual(graph.quanta_by_task.keys(), {"a", "b"})
495 def test_make_quantum_graph_nowhere_noskip_noclobber_conflict(self):
496 executor = SeparablePipelineExecutor(self.butler, skip_existing_in=None, clobber_output=False)
497 pipeline = Pipeline.fromFile(self.pipeline_file)
499 self.butler.put({"zero": 0}, "input")
500 self.butler.put({"zero": 0}, "intermediate")
501 self.butler.put(lsst.daf.butler.ButlerLogRecords.from_records([]), "a_log")
502 self.butler.put(TaskMetadata(), "a_metadata")
504 with self.assertRaises(OutputExistsError):
505 executor.build_quantum_graph(pipeline)
507 # TODO: need more complex task and Butler to test
508 # make_quantum_graph(where=...)
510 def test_build_quantum_graph_nowhere_skipnone_noclobber(self):
511 executor = SeparablePipelineExecutor(
512 self.butler,
513 skip_existing_in=[self.butler.run],
514 clobber_output=False,
515 )
516 pipeline = Pipeline.fromFile(self.pipeline_file)
518 self.butler.put({"zero": 0}, "input")
520 graph = executor.make_quantum_graph(pipeline)
521 self.assertTrue(graph.isConnected)
522 self.assertEqual(len(graph), 2)
523 self.assertEqual({q.taskDef.label for q in graph.inputQuanta}, {"a"})
524 self.assertEqual({q.taskDef.label for q in graph.outputQuanta}, {"b"})
526 def test_make_quantum_graph_nowhere_skipnone_noclobber(self):
527 executor = SeparablePipelineExecutor(
528 self.butler,
529 skip_existing_in=[self.butler.run],
530 clobber_output=False,
531 )
532 pipeline = Pipeline.fromFile(self.pipeline_file)
533 self.butler.put({"zero": 0}, "input")
534 graph = executor.build_quantum_graph(pipeline)
535 self.assertEqual(len(graph), 2)
536 self.assertEqual(graph.quanta_by_task.keys(), {"a", "b"})
538 def test_build_quantum_graph_nowhere_skiptotal_noclobber(self):
539 executor = SeparablePipelineExecutor(
540 self.butler,
541 skip_existing_in=[self.butler.run],
542 clobber_output=False,
543 )
544 pipeline = Pipeline.fromFile(self.pipeline_file)
546 self.butler.put({"zero": 0}, "input")
547 self.butler.put({"zero": 0}, "intermediate")
548 self.butler.put(lsst.daf.butler.ButlerLogRecords.from_records([]), "a_log")
549 self.butler.put(TaskMetadata(), "a_metadata")
550 self.butler.put(lsst.pex.config.Config(), "a_config")
552 graph = executor.make_quantum_graph(pipeline)
553 self.assertTrue(graph.isConnected)
554 self.assertEqual(len(graph), 1)
555 self.assertEqual({q.taskDef.label for q in graph.inputQuanta}, {"b"})
556 self.assertEqual({q.taskDef.label for q in graph.outputQuanta}, {"b"})
558 def test_make_quantum_graph_nowhere_skiptotal_noclobber(self):
559 executor = SeparablePipelineExecutor(
560 self.butler,
561 skip_existing_in=[self.butler.run],
562 clobber_output=False,
563 )
564 pipeline = Pipeline.fromFile(self.pipeline_file)
566 self.butler.put({"zero": 0}, "input")
567 self.butler.put({"zero": 0}, "intermediate")
568 self.butler.put(lsst.daf.butler.ButlerLogRecords.from_records([]), "a_log")
569 self.butler.put(TaskMetadata(), "a_metadata")
570 self.butler.put(lsst.pex.config.Config(), "a_config")
572 graph = executor.build_quantum_graph(pipeline)
573 self.assertEqual(len(graph), 1)
574 self.assertEqual(graph.header.n_task_quanta["a"], 0)
575 self.assertEqual(graph.header.n_task_quanta["b"], 1)
577 def test_make_quantum_graph_nowhere_skippartial_noclobber(self):
578 executor = SeparablePipelineExecutor(
579 self.butler,
580 skip_existing_in=[self.butler.run],
581 clobber_output=False,
582 )
583 pipeline = Pipeline.fromFile(self.pipeline_file)
585 self.butler.put({"zero": 0}, "input")
586 self.butler.put({"zero": 0}, "intermediate")
588 with self.assertRaises(OutputExistsError):
589 executor.build_quantum_graph(pipeline)
591 def test_build_quantum_graph_nowhere_noskip_clobber(self):
592 executor = SeparablePipelineExecutor(self.butler, skip_existing_in=None, clobber_output=True)
593 pipeline = Pipeline.fromFile(self.pipeline_file)
595 self.butler.put({"zero": 0}, "input")
597 graph = executor.make_quantum_graph(pipeline)
598 self.assertTrue(graph.isConnected)
599 self.assertEqual(len(graph), 2)
600 self.assertEqual({q.taskDef.label for q in graph.inputQuanta}, {"a"})
601 self.assertEqual({q.taskDef.label for q in graph.outputQuanta}, {"b"})
603 def test_make_quantum_graph_nowhere_noskip_clobber(self):
604 executor = SeparablePipelineExecutor(self.butler, skip_existing_in=None, clobber_output=True)
605 pipeline = Pipeline.fromFile(self.pipeline_file)
606 self.butler.put({"zero": 0}, "input")
607 graph = executor.build_quantum_graph(pipeline)
608 self.assertEqual(len(graph), 2)
609 self.assertEqual(graph.quanta_by_task.keys(), {"a", "b"})
611 def test_build_quantum_graph_nowhere_noskip_clobber_conflict(self):
612 executor = SeparablePipelineExecutor(self.butler, skip_existing_in=None, clobber_output=True)
613 pipeline = Pipeline.fromFile(self.pipeline_file)
615 self.butler.put({"zero": 0}, "input")
616 self.butler.put({"zero": 0}, "intermediate")
617 self.butler.put(lsst.daf.butler.ButlerLogRecords.from_records([]), "a_log")
618 self.butler.put(TaskMetadata(), "a_metadata")
620 graph = executor.make_quantum_graph(pipeline)
621 self.assertTrue(graph.isConnected)
622 self.assertEqual(len(graph), 2)
623 self.assertEqual({q.taskDef.label for q in graph.inputQuanta}, {"a"})
624 self.assertEqual({q.taskDef.label for q in graph.outputQuanta}, {"b"})
626 def test_make_quantum_graph_nowhere_noskip_clobber_conflict(self):
627 executor = SeparablePipelineExecutor(self.butler, skip_existing_in=None, clobber_output=True)
628 pipeline = Pipeline.fromFile(self.pipeline_file)
629 self.butler.put({"zero": 0}, "input")
630 self.butler.put({"zero": 0}, "intermediate")
631 self.butler.put(lsst.daf.butler.ButlerLogRecords.from_records([]), "a_log")
632 self.butler.put(TaskMetadata(), "a_metadata")
633 graph = executor.build_quantum_graph(pipeline)
634 self.assertEqual(len(graph), 2)
635 self.assertEqual(graph.quanta_by_task.keys(), {"a", "b"})
637 def test_build_quantum_graph_nowhere_skipnone_clobber(self):
638 executor = SeparablePipelineExecutor(
639 self.butler,
640 skip_existing_in=[self.butler.run],
641 clobber_output=True,
642 )
643 pipeline = Pipeline.fromFile(self.pipeline_file)
645 self.butler.put({"zero": 0}, "input")
647 graph = executor.make_quantum_graph(pipeline)
648 self.assertTrue(graph.isConnected)
649 self.assertEqual(len(graph), 2)
650 self.assertEqual({q.taskDef.label for q in graph.inputQuanta}, {"a"})
651 self.assertEqual({q.taskDef.label for q in graph.outputQuanta}, {"b"})
653 def test_make_quantum_graph_nowhere_skipnone_clobber(self):
654 executor = SeparablePipelineExecutor(
655 self.butler,
656 skip_existing_in=[self.butler.run],
657 clobber_output=True,
658 )
659 pipeline = Pipeline.fromFile(self.pipeline_file)
660 self.butler.put({"zero": 0}, "input")
661 graph = executor.build_quantum_graph(pipeline)
662 self.assertEqual(len(graph), 2)
663 self.assertEqual(graph.quanta_by_task.keys(), {"a", "b"})
665 def test_make_quantum_graph_nowhere_skiptotal_clobber(self):
666 executor = SeparablePipelineExecutor(
667 self.butler,
668 skip_existing_in=[self.butler.run],
669 clobber_output=True,
670 )
671 pipeline = Pipeline.fromFile(self.pipeline_file)
673 self.butler.put({"zero": 0}, "input")
674 self.butler.put({"zero": 0}, "intermediate")
675 self.butler.put(lsst.daf.butler.ButlerLogRecords.from_records([]), "a_log")
676 self.butler.put(TaskMetadata(), "a_metadata")
677 self.butler.put(lsst.pex.config.Config(), "a_config")
679 graph = executor.make_quantum_graph(pipeline)
680 self.assertTrue(graph.isConnected)
681 self.assertEqual(len(graph), 1)
682 self.assertEqual({q.taskDef.label for q in graph.inputQuanta}, {"b"})
683 self.assertEqual({q.taskDef.label for q in graph.outputQuanta}, {"b"})
685 def test_build_quantum_graph_nowhere_skiptotal_clobber(self):
686 executor = SeparablePipelineExecutor(
687 self.butler,
688 skip_existing_in=[self.butler.run],
689 clobber_output=True,
690 )
691 pipeline = Pipeline.fromFile(self.pipeline_file)
693 self.butler.put({"zero": 0}, "input")
694 self.butler.put({"zero": 0}, "intermediate")
695 self.butler.put(lsst.daf.butler.ButlerLogRecords.from_records([]), "a_log")
696 self.butler.put(TaskMetadata(), "a_metadata")
697 self.butler.put(lsst.pex.config.Config(), "a_config")
699 graph = executor.make_quantum_graph(pipeline)
700 self.assertTrue(graph.isConnected)
701 self.assertEqual(len(graph), 1)
702 self.assertEqual({q.taskDef.label for q in graph.inputQuanta}, {"b"})
703 self.assertEqual({q.taskDef.label for q in graph.outputQuanta}, {"b"})
705 def test_make_quantum_graph_nowhere_skippartial_clobber(self):
706 executor = SeparablePipelineExecutor(
707 self.butler,
708 skip_existing_in=[self.butler.run],
709 clobber_output=True,
710 )
711 pipeline = Pipeline.fromFile(self.pipeline_file)
712 self.butler.put({"zero": 0}, "input")
713 self.butler.put({"zero": 0}, "intermediate")
714 graph = executor.build_quantum_graph(pipeline)
715 self.assertEqual(len(graph), 2)
716 self.assertEqual(graph.quanta_by_task.keys(), {"a", "b"})
718 def test_make_quantum_graph_noinput(self):
719 executor = SeparablePipelineExecutor(self.butler)
720 pipeline = Pipeline.fromFile(self.pipeline_file)
722 graph = executor.make_quantum_graph(pipeline)
723 self.assertEqual(len(graph), 0)
725 def test_build_quantum_graph_noinput(self):
726 executor = SeparablePipelineExecutor(self.butler)
727 pipeline = Pipeline.fromFile(self.pipeline_file)
729 graph = executor.build_quantum_graph(pipeline)
730 self.assertEqual(len(graph), 0)
732 def test_make_quantum_graph_alloutput_skip(self):
733 executor = SeparablePipelineExecutor(self.butler, skip_existing_in=[self.butler.run])
734 pipeline = Pipeline.fromFile(self.pipeline_file)
736 butlerTests.addDatasetType(self.butler, "output", set(), "StructuredDataDict")
737 butlerTests.addDatasetType(self.butler, "b_log", set(), "ButlerLogRecords")
738 butlerTests.addDatasetType(self.butler, "b_metadata", set(), "TaskMetadata")
739 butlerTests.addDatasetType(self.butler, "b_config", set(), "Config")
741 self.butler.put({"zero": 0}, "input")
742 self.butler.put({"zero": 0}, "intermediate")
743 self.butler.put(lsst.daf.butler.ButlerLogRecords.from_records([]), "a_log")
744 self.butler.put(TaskMetadata(), "a_metadata")
745 self.butler.put({"zero": 0}, "output")
746 self.butler.put(lsst.daf.butler.ButlerLogRecords.from_records([]), "b_log")
747 self.butler.put(TaskMetadata(), "b_metadata")
748 self.butler.put(lsst.pex.config.Config(), "a_config")
749 self.butler.put(lsst.pex.config.Config(), "b_config")
751 graph = executor.make_quantum_graph(pipeline)
752 self.assertEqual(len(graph), 0)
754 def test_build_quantum_graph_alloutput_skip(self):
755 executor = SeparablePipelineExecutor(self.butler, skip_existing_in=[self.butler.run])
756 pipeline = Pipeline.fromFile(self.pipeline_file)
758 butlerTests.addDatasetType(self.butler, "output", set(), "StructuredDataDict")
759 butlerTests.addDatasetType(self.butler, "b_log", set(), "ButlerLogRecords")
760 butlerTests.addDatasetType(self.butler, "b_metadata", set(), "TaskMetadata")
761 butlerTests.addDatasetType(self.butler, "b_config", set(), "Config")
763 self.butler.put({"zero": 0}, "input")
764 self.butler.put({"zero": 0}, "intermediate")
765 self.butler.put(lsst.daf.butler.ButlerLogRecords.from_records([]), "a_log")
766 self.butler.put(TaskMetadata(), "a_metadata")
767 self.butler.put({"zero": 0}, "output")
768 self.butler.put(lsst.daf.butler.ButlerLogRecords.from_records([]), "b_log")
769 self.butler.put(TaskMetadata(), "b_metadata")
770 self.butler.put(lsst.pex.config.Config(), "a_config")
771 self.butler.put(lsst.pex.config.Config(), "b_config")
773 graph = executor.build_quantum_graph(pipeline)
774 self.assertEqual(len(graph), 0)
776 def test_run_pipeline_noskip_noclobber_fullgraph(self):
777 executor = SeparablePipelineExecutor(self.butler, skip_existing_in=None, clobber_output=False)
778 pipeline = Pipeline.fromFile(self.pipeline_file)
779 self.butler.put({"zero": 0}, "input")
780 graph = executor.make_quantum_graph(pipeline)
781 executor.pre_execute_qgraph(
782 graph,
783 register_dataset_types=True,
784 save_init_outputs=True,
785 save_versions=False,
786 )
788 executor.run_pipeline(graph, provenance_dataset_ref=self.provenance_ref)
789 self.butler.registry.refresh()
790 self.assertEqual(self.butler.get("intermediate"), {"zero": 0, "one": 1})
791 self.assertEqual(self.butler.get("output"), {"zero": 0, "one": 1, "two": 2})
792 self.check_provenance_fullgraph()
794 def test_run_pipeline_noskip_noclobber_fullgraph_old(self):
795 executor = SeparablePipelineExecutor(self.butler, skip_existing_in=None, clobber_output=False)
796 pipeline = Pipeline.fromFile(self.pipeline_file)
797 self.butler.put({"zero": 0}, "input")
798 graph = executor.build_quantum_graph(pipeline)
799 executor.pre_execute_qgraph(
800 graph,
801 register_dataset_types=True,
802 save_init_outputs=True,
803 save_versions=False,
804 )
806 executor.run_pipeline(graph, provenance_dataset_ref=self.provenance_ref)
807 self.butler.registry.refresh()
808 self.assertEqual(self.butler.get("intermediate"), {"zero": 0, "one": 1})
809 self.assertEqual(self.butler.get("output"), {"zero": 0, "one": 1, "two": 2})
810 self.check_provenance_fullgraph()
812 def test_run_pipeline_noskip_noclobber_emptygraph_old(self):
813 old_repo_size = self.butler.registry.queryDatasets(...).count()
815 executor = SeparablePipelineExecutor(self.butler, skip_existing_in=None, clobber_output=False)
816 graph = QuantumGraph({}, universe=self.butler.dimensions)
817 executor.pre_execute_qgraph(
818 graph,
819 register_dataset_types=True,
820 save_init_outputs=True,
821 save_versions=False,
822 )
824 executor.run_pipeline(graph, provenance_dataset_ref=self.provenance_ref)
825 self.butler.registry.refresh()
826 # Empty graph execution should do nothing but write provenance.
827 self.assertEqual(self.butler.registry.queryDatasets(...).count(), old_repo_size + 1)
828 self.check_provenance_emptygraph()
830 def test_run_pipeline_noskip_noclobber_emptygraph(self):
831 old_repo_size = self.butler.registry.queryDatasets(...).count()
833 executor = SeparablePipelineExecutor(self.butler, skip_existing_in=None, clobber_output=False)
834 graph = self.build_empty_quantum_graph()
835 executor.pre_execute_qgraph(
836 graph,
837 register_dataset_types=True,
838 save_init_outputs=True,
839 save_versions=False,
840 )
842 executor.run_pipeline(graph, provenance_dataset_ref=self.provenance_ref)
843 self.butler.registry.refresh()
844 # Empty graph execution should do nothing but write provenance.
845 self.assertEqual(self.butler.registry.queryDatasets(...).count(), old_repo_size + 1)
846 self.check_provenance_emptygraph()
848 def test_run_pipeline_skipnone_noclobber_old(self):
849 executor = SeparablePipelineExecutor(
850 self.butler,
851 skip_existing_in=[self.butler.run],
852 clobber_output=False,
853 )
854 pipeline = Pipeline.fromFile(self.pipeline_file)
855 self.butler.put({"zero": 0}, "input")
856 graph = executor.make_quantum_graph(pipeline)
857 executor.pre_execute_qgraph(
858 graph,
859 register_dataset_types=True,
860 save_init_outputs=True,
861 save_versions=False,
862 )
864 executor.run_pipeline(graph)
865 self.butler.registry.refresh()
866 self.assertEqual(self.butler.get("intermediate"), {"zero": 0, "one": 1})
867 self.assertEqual(self.butler.get("output"), {"zero": 0, "one": 1, "two": 2})
869 def test_run_pipeline_skipnone_noclobber(self):
870 executor = SeparablePipelineExecutor(
871 self.butler,
872 skip_existing_in=[self.butler.run],
873 clobber_output=False,
874 )
875 pipeline = Pipeline.fromFile(self.pipeline_file)
876 self.butler.put({"zero": 0}, "input")
877 graph = executor.build_quantum_graph(pipeline)
878 executor.pre_execute_qgraph(
879 graph,
880 register_dataset_types=True,
881 save_init_outputs=True,
882 save_versions=False,
883 )
885 executor.run_pipeline(graph)
886 self.butler.registry.refresh()
887 self.assertEqual(self.butler.get("intermediate"), {"zero": 0, "one": 1})
888 self.assertEqual(self.butler.get("output"), {"zero": 0, "one": 1, "two": 2})
890 def test_run_pipeline_skiptotal_noclobber_old(self):
891 executor = SeparablePipelineExecutor(
892 self.butler,
893 skip_existing_in=[self.butler.run],
894 clobber_output=False,
895 )
896 pipeline = Pipeline.fromFile(self.pipeline_file)
897 self.butler.put({"zero": 0}, "input")
898 self.butler.put({"zero": 0}, "intermediate")
899 self.butler.put(lsst.daf.butler.ButlerLogRecords.from_records([]), "a_log")
900 self.butler.put(TaskMetadata(), "a_metadata")
901 self.butler.put(lsst.pex.config.Config(), "a_config")
902 graph = executor.make_quantum_graph(pipeline)
903 executor.pre_execute_qgraph(
904 graph,
905 register_dataset_types=True,
906 save_init_outputs=True,
907 save_versions=False,
908 )
910 executor.run_pipeline(graph)
911 self.butler.registry.refresh()
912 self.assertEqual(self.butler.get("output"), {"zero": 0, "two": 2})
914 def test_run_pipeline_skiptotal_noclobber(self):
915 executor = SeparablePipelineExecutor(
916 self.butler,
917 skip_existing_in=[self.butler.run],
918 clobber_output=False,
919 )
920 pipeline = Pipeline.fromFile(self.pipeline_file)
921 self.butler.put({"zero": 0}, "input")
922 self.butler.put({"zero": 0}, "intermediate")
923 self.butler.put(lsst.daf.butler.ButlerLogRecords.from_records([]), "a_log")
924 self.butler.put(TaskMetadata(), "a_metadata")
925 self.butler.put(lsst.pex.config.Config(), "a_config")
926 graph = executor.build_quantum_graph(pipeline)
927 executor.pre_execute_qgraph(
928 graph,
929 register_dataset_types=True,
930 save_init_outputs=True,
931 save_versions=False,
932 )
934 executor.run_pipeline(graph)
935 self.butler.registry.refresh()
936 self.assertEqual(self.butler.get("output"), {"zero": 0, "two": 2})
938 def test_run_pipeline_noskip_clobber_connected_old(self):
939 executor = SeparablePipelineExecutor(self.butler, skip_existing_in=None, clobber_output=True)
940 pipeline = Pipeline.fromFile(self.pipeline_file)
941 self.butler.put({"zero": 0}, "input")
942 graph = executor.make_quantum_graph(pipeline)
943 executor.pre_execute_qgraph(
944 graph,
945 register_dataset_types=True,
946 save_init_outputs=True,
947 save_versions=False,
948 )
950 executor.run_pipeline(graph, provenance_dataset_ref=self.provenance_ref)
951 self.butler.registry.refresh()
952 self.assertEqual(self.butler.get("intermediate"), {"zero": 0, "one": 1})
953 self.assertEqual(self.butler.get("output"), {"zero": 0, "one": 1, "two": 2})
954 self.check_provenance_fullgraph()
956 def test_run_pipeline_noskip_clobber_connected(self):
957 executor = SeparablePipelineExecutor(self.butler, skip_existing_in=None, clobber_output=True)
958 pipeline = Pipeline.fromFile(self.pipeline_file)
959 self.butler.put({"zero": 0}, "input")
960 graph = executor.build_quantum_graph(pipeline)
961 executor.pre_execute_qgraph(
962 graph,
963 register_dataset_types=True,
964 save_init_outputs=True,
965 save_versions=False,
966 )
968 executor.run_pipeline(graph, provenance_dataset_ref=self.provenance_ref)
969 self.butler.registry.refresh()
970 self.assertEqual(self.butler.get("intermediate"), {"zero": 0, "one": 1})
971 self.assertEqual(self.butler.get("output"), {"zero": 0, "one": 1, "two": 2})
972 self.check_provenance_fullgraph()
974 def test_run_pipeline_noskip_clobber_unconnected_old(self):
975 executor = SeparablePipelineExecutor(self.butler, skip_existing_in=None, clobber_output=True)
976 pipeline = Pipeline.fromFile(self.pipeline_file)
977 self.butler.put({"zero": 0}, "input")
978 self.butler.put({"zero": 0}, "intermediate")
979 self.butler.put(lsst.daf.butler.ButlerLogRecords.from_records([]), "a_log")
980 self.butler.put(TaskMetadata(), "a_metadata")
981 graph = executor.make_quantum_graph(pipeline)
982 executor.pre_execute_qgraph(
983 graph,
984 register_dataset_types=True,
985 save_init_outputs=True,
986 save_versions=False,
987 )
989 executor.run_pipeline(graph, provenance_dataset_ref=self.provenance_ref)
990 self.butler.registry.refresh()
991 self.assertEqual(self.butler.get("intermediate"), {"zero": 0, "one": 1})
992 # The value of output is undefined; it depends on which task ran first.
993 self.assertTrue(self.butler.exists("output", {}))
995 def test_run_pipeline_noskip_clobber_unconnected(self):
996 executor = SeparablePipelineExecutor(self.butler, skip_existing_in=None, clobber_output=True)
997 pipeline = Pipeline.fromFile(self.pipeline_file)
998 self.butler.put({"zero": 0}, "input")
999 self.butler.put({"zero": 0}, "intermediate")
1000 self.butler.put(lsst.daf.butler.ButlerLogRecords.from_records([]), "a_log")
1001 self.butler.put(TaskMetadata(), "a_metadata")
1002 graph = executor.build_quantum_graph(pipeline)
1003 executor.pre_execute_qgraph(
1004 graph,
1005 register_dataset_types=True,
1006 save_init_outputs=True,
1007 save_versions=False,
1008 )
1010 executor.run_pipeline(graph, provenance_dataset_ref=self.provenance_ref)
1011 self.butler.registry.refresh()
1012 self.assertEqual(self.butler.get("intermediate"), {"zero": 0, "one": 1})
1013 # The value of output is undefined; it depends on which task ran first.
1014 self.assertTrue(self.butler.exists("output", {}))
1015 self.check_provenance_fullgraph()
1017 def test_run_pipeline_skipnone_clobber_old(self):
1018 executor = SeparablePipelineExecutor(
1019 self.butler,
1020 skip_existing_in=[self.butler.run],
1021 clobber_output=True,
1022 )
1023 pipeline = Pipeline.fromFile(self.pipeline_file)
1024 self.butler.put({"zero": 0}, "input")
1025 graph = executor.make_quantum_graph(pipeline)
1026 executor.pre_execute_qgraph(
1027 graph,
1028 register_dataset_types=True,
1029 save_init_outputs=True,
1030 save_versions=False,
1031 )
1033 executor.run_pipeline(graph)
1034 self.butler.registry.refresh()
1035 self.assertEqual(self.butler.get("intermediate"), {"zero": 0, "one": 1})
1036 self.assertEqual(self.butler.get("output"), {"zero": 0, "one": 1, "two": 2})
1038 def test_run_pipeline_skipnone_clobber(self):
1039 executor = SeparablePipelineExecutor(
1040 self.butler,
1041 skip_existing_in=[self.butler.run],
1042 clobber_output=True,
1043 )
1044 pipeline = Pipeline.fromFile(self.pipeline_file)
1045 self.butler.put({"zero": 0}, "input")
1046 graph = executor.build_quantum_graph(pipeline)
1047 executor.pre_execute_qgraph(
1048 graph,
1049 register_dataset_types=True,
1050 save_init_outputs=True,
1051 save_versions=False,
1052 )
1054 executor.run_pipeline(graph)
1055 self.butler.registry.refresh()
1056 self.assertEqual(self.butler.get("intermediate"), {"zero": 0, "one": 1})
1057 self.assertEqual(self.butler.get("output"), {"zero": 0, "one": 1, "two": 2})
1059 def test_run_pipeline_skiptotal_clobber_connected_old(self):
1060 executor = SeparablePipelineExecutor(
1061 self.butler,
1062 skip_existing_in=[self.butler.run],
1063 clobber_output=True,
1064 )
1065 pipeline = Pipeline.fromFile(self.pipeline_file)
1066 self.butler.put({"zero": 0}, "input")
1067 self.butler.put({"zero": 0}, "intermediate")
1068 self.butler.put(lsst.daf.butler.ButlerLogRecords.from_records([]), "a_log")
1069 self.butler.put(TaskMetadata(), "a_metadata")
1070 self.butler.put(lsst.pex.config.Config(), "a_config")
1071 graph = executor.make_quantum_graph(pipeline)
1072 executor.pre_execute_qgraph(
1073 graph,
1074 register_dataset_types=True,
1075 save_init_outputs=True,
1076 save_versions=False,
1077 )
1079 executor.run_pipeline(graph)
1080 self.butler.registry.refresh()
1081 self.assertEqual(self.butler.get("output"), {"zero": 0, "two": 2})
1083 def test_run_pipeline_skiptotal_clobber_connected(self):
1084 executor = SeparablePipelineExecutor(
1085 self.butler,
1086 skip_existing_in=[self.butler.run],
1087 clobber_output=True,
1088 )
1089 pipeline = Pipeline.fromFile(self.pipeline_file)
1090 self.butler.put({"zero": 0}, "input")
1091 self.butler.put({"zero": 0}, "intermediate")
1092 self.butler.put(lsst.daf.butler.ButlerLogRecords.from_records([]), "a_log")
1093 self.butler.put(TaskMetadata(), "a_metadata")
1094 self.butler.put(lsst.pex.config.Config(), "a_config")
1095 graph = executor.build_quantum_graph(pipeline)
1096 executor.pre_execute_qgraph(
1097 graph,
1098 register_dataset_types=True,
1099 save_init_outputs=True,
1100 save_versions=False,
1101 )
1103 executor.run_pipeline(graph)
1104 self.butler.registry.refresh()
1105 self.assertEqual(self.butler.get("output"), {"zero": 0, "two": 2})
1107 def test_run_pipeline_skippartial_clobber_unconnected_old(self):
1108 executor = SeparablePipelineExecutor(
1109 self.butler,
1110 skip_existing_in=[self.butler.run],
1111 clobber_output=True,
1112 )
1113 pipeline = Pipeline.fromFile(self.pipeline_file)
1114 self.butler.put({"zero": 0}, "input")
1115 self.butler.put({"zero": 0}, "intermediate")
1116 graph = executor.make_quantum_graph(pipeline)
1117 executor.pre_execute_qgraph(
1118 graph,
1119 register_dataset_types=True,
1120 save_init_outputs=True,
1121 save_versions=False,
1122 )
1124 executor.run_pipeline(graph)
1125 self.butler.registry.refresh()
1126 self.assertEqual(self.butler.get("intermediate"), {"zero": 0, "one": 1})
1127 # The value of output is undefined; it depends on which task ran first.
1128 self.assertTrue(self.butler.exists("output", {}))
1130 def test_run_pipeline_skippartial_clobber_unconnected(self):
1131 executor = SeparablePipelineExecutor(
1132 self.butler,
1133 skip_existing_in=[self.butler.run],
1134 clobber_output=True,
1135 )
1136 pipeline = Pipeline.fromFile(self.pipeline_file)
1137 self.butler.put({"zero": 0}, "input")
1138 self.butler.put({"zero": 0}, "intermediate")
1139 graph = executor.build_quantum_graph(pipeline)
1140 executor.pre_execute_qgraph(
1141 graph,
1142 register_dataset_types=True,
1143 save_init_outputs=True,
1144 save_versions=False,
1145 )
1146 executor.run_pipeline(graph)
1147 self.butler.registry.refresh()
1148 self.assertEqual(self.butler.get("intermediate"), {"zero": 0, "one": 1})
1149 # The value of output is undefined; it depends on which task ran first.
1150 self.assertTrue(self.butler.exists("output", {}))
1153class SeparablePipelineExecutorMockTests(lsst.utils.tests.TestCase):
1154 """Additional tests for SeparablePipelineExecutor API that use
1155 the lsst.pipe.base.tests.mocks system to define complex pipelines.
1156 """
1158 def setUp(self):
1159 # 'base.yaml' adds an instrument, 'Cam1', with four detectors and
1160 # two physical filters.
1161 self.helper, _ = self.enterContext(DirectButlerRepo.make_temporary("base.yaml"))
1163 def run_base_test(
1164 self, b_config: DynamicTestPipelineTaskConfig, expected_error: type[Exception] | None
1165 ) -> ProvenanceQuantumGraph:
1166 """Build and run a quantum graph with three tasks and four data IDs,
1167 with customization of the middle task.
1168 """
1169 self.helper.add_task("a", dimensions=["detector"])
1170 self.helper.add_task("b", dimensions=["detector"], config=b_config)
1171 self.helper.add_task("c", dimensions=["detector"])
1172 qg = self.helper.make_quantum_graph()
1173 self.helper.butler.collections.register(qg.header.output_run)
1174 qg.init_output_run(self.helper.butler, existing=False)
1175 executor = SeparablePipelineExecutor(
1176 self.helper.butler.clone(collections=qg.header.inputs, run=qg.header.output_run)
1177 )
1178 provenance_type = lsst.daf.butler.DatasetType(
1179 PROVENANCE_DATASET_TYPE_NAME,
1180 self.helper.butler.dimensions.empty,
1181 PROVENANCE_STORAGE_CLASS,
1182 )
1183 self.helper.butler.registry.registerDatasetType(provenance_type)
1184 provenance_ref = lsst.daf.butler.DatasetRef(
1185 provenance_type,
1186 lsst.daf.butler.DataCoordinate.make_empty(self.helper.butler.dimensions),
1187 run=qg.header.output_run,
1188 )
1189 if expected_error is None:
1190 executor.run_pipeline(qg, provenance_dataset_ref=provenance_ref)
1191 else:
1192 with self.assertRaises(expected_error):
1193 executor.run_pipeline(qg, provenance_dataset_ref=provenance_ref)
1194 provenance_graph = self.helper.butler.get(provenance_ref)
1195 self.assertEqual(len(provenance_graph.quanta_by_task), 3)
1196 self.assertEqual(len(provenance_graph.quanta_by_task["a"]), 4)
1197 self.assertEqual(len(provenance_graph.quanta_by_task["b"]), 4)
1198 self.assertEqual(len(provenance_graph.quanta_by_task["c"]), 4)
1199 return provenance_graph
1201 def test_no_work_chain_provenance(self):
1202 """Test provenance recording when a NoWorkFound error chains to
1203 downstream tasks during execution.
1204 """
1205 b_config = DynamicTestPipelineTaskConfig()
1206 b_config.fail_exception = "lsst.pipe.base.NoWorkFound"
1207 b_config.fail_condition = "detector=2"
1208 provenance_graph = self.run_base_test(b_config, expected_error=None)
1209 xgraph = provenance_graph.quantum_only_xgraph
1210 for quantum_id in provenance_graph.quanta_by_task["a"].values():
1211 self.assertEqual(xgraph.nodes[quantum_id]["status"], QuantumAttemptStatus.SUCCESSFUL)
1212 self.assertEqual(xgraph.nodes[quantum_id]["caveats"], QuantumSuccessCaveats.NO_CAVEATS)
1213 for data_id, quantum_id in provenance_graph.quanta_by_task["b"].items():
1214 self.assertEqual(xgraph.nodes[quantum_id]["status"], QuantumAttemptStatus.SUCCESSFUL)
1215 if data_id["detector"] == 2:
1216 self.assertTrue(xgraph.nodes[quantum_id]["caveats"] & QuantumSuccessCaveats.NO_WORK)
1217 else:
1218 self.assertEqual(xgraph.nodes[quantum_id]["caveats"], QuantumSuccessCaveats.NO_CAVEATS)
1219 for data_id, quantum_id in provenance_graph.quanta_by_task["c"].items():
1220 self.assertEqual(xgraph.nodes[quantum_id]["status"], QuantumAttemptStatus.SUCCESSFUL)
1221 if data_id["detector"] == 2:
1222 self.assertTrue(
1223 xgraph.nodes[quantum_id]["caveats"] & QuantumSuccessCaveats.ADJUST_QUANTUM_RAISED
1224 )
1225 else:
1226 self.assertEqual(xgraph.nodes[quantum_id]["caveats"], QuantumSuccessCaveats.NO_CAVEATS)
1228 def test_failure_block_provenance(self):
1229 """Test provenance recording when an exception blocks one branch of a
1230 QG but not another.
1231 """
1232 # 'base.yaml' adds an instrument, 'Cam1', with four detectors and
1233 # two physical filters.
1234 b_config = DynamicTestPipelineTaskConfig()
1235 b_config.fail_exception = "builtins.RuntimeError"
1236 b_config.fail_condition = "detector=2"
1237 provenance_graph = self.run_base_test(b_config, MPGraphExecutorError)
1238 self.assertEqual(len(provenance_graph.quanta_by_task), 3)
1239 self.assertEqual(len(provenance_graph.quanta_by_task["a"]), 4)
1240 self.assertEqual(len(provenance_graph.quanta_by_task["b"]), 4)
1241 self.assertEqual(len(provenance_graph.quanta_by_task["c"]), 4)
1242 xgraph = provenance_graph.quantum_only_xgraph
1243 for quantum_id in provenance_graph.quanta_by_task["a"].values():
1244 self.assertEqual(xgraph.nodes[quantum_id]["status"], QuantumAttemptStatus.SUCCESSFUL)
1245 self.assertEqual(xgraph.nodes[quantum_id]["caveats"], QuantumSuccessCaveats.NO_CAVEATS)
1246 for data_id, quantum_id in provenance_graph.quanta_by_task["b"].items():
1247 if data_id["detector"] == 2:
1248 self.assertEqual(xgraph.nodes[quantum_id]["status"], QuantumAttemptStatus.FAILED)
1249 else:
1250 self.assertEqual(xgraph.nodes[quantum_id]["status"], QuantumAttemptStatus.SUCCESSFUL)
1251 self.assertEqual(xgraph.nodes[quantum_id]["caveats"], QuantumSuccessCaveats.NO_CAVEATS)
1252 for data_id, quantum_id in provenance_graph.quanta_by_task["c"].items():
1253 if data_id["detector"] == 2:
1254 self.assertEqual(xgraph.nodes[quantum_id]["status"], QuantumAttemptStatus.BLOCKED)
1255 else:
1256 self.assertEqual(xgraph.nodes[quantum_id]["status"], QuantumAttemptStatus.SUCCESSFUL)
1257 self.assertEqual(xgraph.nodes[quantum_id]["caveats"], QuantumSuccessCaveats.NO_CAVEATS)
1260class MemoryTester(lsst.utils.tests.MemoryTestCase):
1261 """Generic test for file leaks."""
1264def setup_module(module):
1265 """Set up the module for pytest.
1267 Parameters
1268 ----------
1269 module : `~types.ModuleType`
1270 Module to set up.
1271 """
1272 lsst.utils.tests.init()
1275if __name__ == "__main__":
1276 lsst.utils.tests.init()
1277 unittest.main()