Coverage for tests / test_run.py: 11%
315 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-01 08:20 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-01 08:20 +0000
1# This file is part of ctrl_mpexec.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <https://www.gnu.org/licenses/>.
28import logging
29import os
30import time
31import unittest
32import unittest.mock
34import click.testing
36import lsst.utils.tests
37from lsst.ctrl.mpexec import PipelineGraphFactory
38from lsst.ctrl.mpexec.cli import opt, script
39from lsst.ctrl.mpexec.cli.cmd.commands import PipetaskCommand, coverage_context
40from lsst.ctrl.mpexec.cli.utils import collect_pipeline_actions
41from lsst.ctrl.mpexec.showInfo import ShowInfo
42from lsst.daf.butler import CollectionType, MissingCollectionError
43from lsst.daf.butler.cli.utils import LogCliRunner
44from lsst.pipe.base.mp_graph_executor import MPGraphExecutorError
45from lsst.pipe.base.script import transfer_from_graph
46from lsst.pipe.base.tests.mocks import DirectButlerRepo, DynamicTestPipelineTaskConfig
49class RunTestCase(unittest.TestCase):
50 """Test pipetask run command-line."""
52 @staticmethod
53 def _make_run_args(*args: str, **kwargs: object) -> dict[str, object]:
54 mock = unittest.mock.Mock()
56 @click.command(cls=PipetaskCommand)
57 @opt.run_options()
58 @opt.config_search_path_option()
59 @opt.no_existing_outputs_option()
60 def fake_run(ctx: click.Context, **kwargs: object):
61 kwargs = collect_pipeline_actions(ctx, **kwargs)
62 mock(**kwargs)
64 # At least one tests requires that we enable INFO logging so use
65 # the specialist runner.
66 runner = LogCliRunner()
67 result = runner.invoke(fake_run, args, catch_exceptions=False)
68 if result.exit_code != 0:
69 raise RuntimeError(f"Failure getting default args for 'run': {result}")
70 mock.assert_called_once()
71 result: dict[str, object] = mock.call_args[1]
72 result["show"] = ShowInfo([])
73 result.update(kwargs)
74 return result
76 def test_missing_options(self):
77 """Test that if options for the run script are missing that it
78 fails.
79 """
81 @click.command()
82 @opt.pipeline_build_options()
83 def cli(**kwargs):
84 script.run(**kwargs)
86 runner = click.testing.CliRunner()
87 result = runner.invoke(cli)
88 # The cli call should fail, because qgraph.run takes more options
89 # than are defined by pipeline_build_options.
90 self.assertNotEqual(result.exit_code, 0)
92 def test_simple_qg(self):
93 """Test execution of a trivial quantum graph."""
94 with DirectButlerRepo.make_temporary() as (helper, root):
95 helper.add_task()
96 helper.add_task()
97 helper.insert_datasets("dataset_auto0")
98 kwargs = self._make_run_args(
99 "-b",
100 root,
101 "-i",
102 helper.input_chain,
103 "-o",
104 "output",
105 "--register-dataset-types",
106 pipeline_graph_factory=PipelineGraphFactory(pipeline_graph=helper.pipeline_graph),
107 )
108 qg = script.qgraph(**kwargs)
109 self.assertEqual(len(qg.quanta_by_task), 2)
110 self.assertEqual(len(qg), 2)
111 # Ensure that the output run used in the graph is also used in the
112 # pipeline execution. It is possible for 'qgraph' and 'run' to
113 # calculate time-stamped runs across a second boundary.
114 kwargs["output_run"] = qg.header.output_run
115 # Execute the graph and check for output existence.
116 script.run(qg, **kwargs)
117 with helper.butler.query() as query:
118 self.assertEqual(query.datasets("dataset_auto1", collections=["output"]).count(), 1)
119 self.assertEqual(query.datasets("dataset_auto2", collections=["output"]).count(), 1)
120 # Test that we've disabled implicit threading
121 self.assertEqual(os.environ["OMP_NUM_THREADS"], "1")
123 def test_simple_qg_rebase(self):
124 """Test execution of a trivial quantum graph, with --rebase used to
125 force redefinition of the output collection.
126 """
127 with DirectButlerRepo.make_temporary(input_chain="test1") as (helper, root):
128 helper.add_task()
129 helper.add_task()
130 helper.insert_datasets("dataset_auto0")
131 # Pass one input collection here for the usual test setup; we'll
132 # override it later.
133 kwargs = self._make_run_args(
134 "-b",
135 root,
136 "-i",
137 helper.input_chain,
138 "-o",
139 "output",
140 "--register-dataset-types",
141 pipeline_graph_factory=PipelineGraphFactory(pipeline_graph=helper.pipeline_graph),
142 )
143 # We'll actually pass two input collections in. One is empty, but
144 # the stuff we're testing here doesn't care.
145 kwargs["input"] = ["test2", "test1"]
146 helper.butler.collections.register("test2", CollectionType.RUN)
147 # Set up the output collection with a sequence that doesn't end the
148 # same way as the input collection. This is normally an error.
149 helper.butler.collections.register("output", CollectionType.CHAINED)
150 helper.butler.collections.register("unexpected_input", CollectionType.RUN)
151 helper.butler.collections.register("output/run0", CollectionType.RUN)
152 helper.butler.collections.redefine_chain(
153 "output", ["test2", "unexpected_input", "test1", "output/run0"]
154 )
155 # Without --rebase, the inconsistent input and output collections
156 # are an error.
157 with self.assertRaises(ValueError):
158 script.qgraph(**kwargs)
159 # With --rebase, the output collection gets redefined.
160 kwargs["rebase"] = True
161 qg = script.qgraph(**kwargs)
162 self.assertEqual(len(qg.quanta_by_task), 2)
163 self.assertEqual(len(qg), 2)
164 # Ensure that the output run used in the graph is also used in the
165 # pipeline execution.
166 kwargs["output_run"] = qg.header.output_run
167 # Execute the graph and check for output existence.
168 script.run(qg, **kwargs)
169 with helper.butler.query() as query:
170 self.assertEqual(query.datasets("dataset_auto1", collections=["output"]).count(), 1)
171 self.assertEqual(query.datasets("dataset_auto2", collections=["output"]).count(), 1)
173 def test_simple_qgraph_qbb(self):
174 """Test execution of a trivial quantum graph in QBB mode."""
175 with DirectButlerRepo.make_temporary() as (helper, root):
176 helper.add_task()
177 helper.add_task()
178 helper.insert_datasets("dataset_auto0")
179 # It's unusual to put a QG in a butler root, but since we've
180 # already got a temp dir, we might as well use it.
181 qg_file_1 = os.path.join(root, "test1.qg")
182 kwargs = self._make_run_args(
183 "-b",
184 root,
185 "-i",
186 helper.input_chain,
187 "-o",
188 "output",
189 "--register-dataset-types",
190 "--qgraph-datastore-records",
191 "--save-qgraph",
192 qg_file_1,
193 pipeline_graph_factory=PipelineGraphFactory(pipeline_graph=helper.pipeline_graph),
194 )
195 qg = script.qgraph(**kwargs)
196 output_run = qg.header.output_run
197 output = qg.header.output
198 self.assertEqual(len(qg.quanta_by_task), 2)
199 self.assertEqual(len(qg), 2)
200 # Execute with QBB.
201 kwargs.update(output_run=output_run, qgraph=qg_file_1)
202 script.pre_exec_init_qbb(**kwargs)
203 script.run_qbb(**kwargs)
204 # Transfer the datasets to the butler.
205 n1 = transfer_from_graph(
206 qg_file_1,
207 root,
208 register_dataset_types=True,
209 transfer_dimensions=False,
210 update_output_chain=True,
211 dry_run=False,
212 dataset_type=(),
213 )
214 self.assertEqual(n1, 9)
215 # Check that the expected outputs exist.
216 with helper.butler.query() as query:
217 self.assertEqual(query.datasets("dataset_auto1", collections=output).count(), 1)
218 self.assertEqual(query.datasets("dataset_auto2", collections=output).count(), 1)
219 # Check that some metadata keys were written.
220 some_task_label = next(iter(qg.pipeline_graph.tasks))
221 (some_metadata_ref,) = helper.butler.query_datasets(
222 f"{some_task_label}_metadata",
223 limit=1,
224 collections=output,
225 )
226 some_metadata = helper.butler.get(some_metadata_ref)
227 self.assertIn("qg_read_time", some_metadata["job"])
228 self.assertIn("qg_size", some_metadata["job"])
230 # Update the output run and try again.
231 new_output_run = output_run + "_new"
232 qg_file_2 = os.path.join(root, "test2.qg")
233 script.update_graph_run(qg_file_1, new_output_run, qg_file_2)
234 kwargs.update(qgraph=qg_file_2)
235 # Execute with QBB again.
236 script.pre_exec_init_qbb(**kwargs)
237 script.run_qbb(**kwargs)
238 # Transfer the datasets to the butler.
239 n2 = transfer_from_graph(
240 qg_file_2,
241 root,
242 register_dataset_types=True,
243 transfer_dimensions=False,
244 update_output_chain=False,
245 dry_run=False,
246 dataset_type=(),
247 )
248 self.assertEqual(n2, 9)
249 # Check that the expected outputs exist in the new run.
250 with helper.butler.query() as query:
251 self.assertEqual(query.datasets("dataset_auto1", collections=new_output_run).count(), 1)
252 self.assertEqual(query.datasets("dataset_auto2", collections=new_output_run).count(), 1)
254 def test_empty_qg(self):
255 """Test that making an empty QG produces the right error messages."""
256 with DirectButlerRepo.make_temporary("base.yaml") as (helper, root):
257 helper.add_task(dimensions=["instrument"])
258 helper.add_task(dimensions=["instrument"])
259 helper.pipeline_graph.resolve(registry=helper.butler.registry)
260 helper.butler.registry.registerDatasetType(
261 helper.pipeline_graph.dataset_types["dataset_auto0"].dataset_type
262 )
263 kwargs = self._make_run_args(
264 "-b",
265 root,
266 "-i",
267 helper.input_chain,
268 "-o",
269 "output",
270 pipeline_graph_factory=PipelineGraphFactory(pipeline_graph=helper.pipeline_graph),
271 )
272 # Note that we haven't inserted any datasets into this repo; that's
273 # how we'll force an empty graph.
274 with self.assertLogs(level=logging.ERROR) as cm:
275 qg = script.qgraph(**kwargs)
276 self.assertRegex(
277 cm.output[0], ".*Initial data ID query returned no rows, so QuantumGraph will be empty.*"
278 )
279 self.assertRegex(cm.output[0], ".*dataset_auto0.*input_run.*doomed to fail.")
280 self.assertIsNone(qg)
282 def test_simple_qg_no_skip_existing_inputs(self):
283 """Test for case when output data for one task already appears in
284 the *input* collection, but no ``--extend-run`` or ``-skip-existing``
285 option is present.
286 """
287 with DirectButlerRepo.make_temporary() as (helper, root):
288 helper.add_task()
289 helper.add_task()
290 helper.insert_datasets("dataset_auto0")
291 kwargs = self._make_run_args(
292 "-b",
293 root,
294 "-i",
295 helper.input_chain,
296 "-o",
297 "output",
298 "--register-dataset-types",
299 pipeline_graph_factory=PipelineGraphFactory(pipeline_graph=helper.pipeline_graph),
300 )
301 qg1 = script.qgraph(**kwargs)
302 run1 = qg1.header.output_run
303 self.assertEqual(len(qg1.quanta_by_task["task_auto1"]), 1)
304 self.assertEqual(len(qg1.quanta_by_task["task_auto2"]), 1)
305 self.assertEqual(len(qg1), 2)
306 # Ensure that the output run used in the graph is also used in the
307 # pipeline execution. It is possible for 'qgraph' and 'run' to
308 # calculate time-stamped runs across a second boundary.
309 kwargs["output_run"] = run1
310 # Execute the graph and check for output existence.
311 script.run(qg1, **kwargs)
312 with helper.butler.query() as query:
313 self.assertEqual(query.datasets("dataset_auto1", collections=["output"]).count(), 1)
314 self.assertEqual(query.datasets("dataset_auto2", collections=["output"]).count(), 1)
315 # Make a new QG with the same output collection, but a new RUN
316 # collection, it should run again, shadowing the previous outputs.
317 kwargs["output_run"] = None
318 time.sleep(1) # Make sure we don't get the same RUN timestamp.
319 qg2 = script.qgraph(**kwargs)
320 run2 = qg2.header.output_run
321 self.assertNotEqual(run1, run2)
322 self.assertEqual(len(qg1.quanta_by_task["task_auto1"]), 1)
323 self.assertEqual(len(qg1.quanta_by_task["task_auto2"]), 1)
324 self.assertEqual(len(qg2), 2)
325 kwargs["output_run"] = run2
326 script.run(qg2, **kwargs)
327 with helper.butler.query() as query:
328 self.assertEqual(query.datasets("dataset_auto1", collections=[run2]).count(), 1)
329 self.assertEqual(query.datasets("dataset_auto2", collections=[run2]).count(), 1)
331 def test_simple_qg_skip_existing_inputs(self):
332 """Test for case when output data for one task already appears in
333 the *input* collection, but no ``--extend-run`` or ``-skip-existing``
334 option is present.
335 """
336 with DirectButlerRepo.make_temporary() as (helper, root):
337 helper.add_task()
338 helper.insert_datasets("dataset_auto0")
339 kwargs = self._make_run_args(
340 "-b",
341 root,
342 "-i",
343 helper.input_chain,
344 "-o",
345 "output",
346 "--register-dataset-types",
347 pipeline_graph_factory=PipelineGraphFactory(pipeline_graph=helper.pipeline_graph),
348 )
349 qg1 = script.qgraph(**kwargs)
350 run1 = qg1.header.output_run
351 self.assertEqual(len(qg1.quanta_by_task["task_auto1"]), 1)
352 self.assertEqual(len(qg1), 1)
353 # Ensure that the output run used in the graph is also used in the
354 # pipeline execution. It is possible for 'qgraph' and 'run' to
355 # calculate time-stamped runs across a second boundary.
356 kwargs["output_run"] = run1
357 # Execute the graph and check for output existence.
358 script.run(qg1, **kwargs)
359 with helper.butler.query() as query:
360 self.assertEqual(query.datasets("dataset_auto1", collections=["output"]).count(), 1)
361 # Make a new QG with the same output collection, but a new RUN
362 # collection, with --skip-existing-in, and one more task. The
363 # first task should be skipped and the second should be run.
364 helper.add_task()
365 kwargs = self._make_run_args(
366 "-b",
367 root,
368 "-i",
369 helper.input_chain,
370 "-o",
371 "output",
372 "--register-dataset-types",
373 "--skip-existing-in",
374 "output",
375 pipeline_graph_factory=PipelineGraphFactory(pipeline_graph=helper.pipeline_graph),
376 )
377 time.sleep(1) # Make sure we don't get the same RUN timestamp.
378 qg2 = script.qgraph(**kwargs)
379 run2 = qg2.header.output_run
380 self.assertNotEqual(run1, run2)
381 self.assertEqual(len(qg2.quanta_by_task["task_auto1"]), 0)
382 self.assertEqual(len(qg2.quanta_by_task["task_auto2"]), 1)
383 self.assertEqual(len(qg2), 1)
384 kwargs["output_run"] = run2
385 script.run(qg2, **kwargs)
386 with helper.butler.query() as query:
387 self.assertEqual(query.datasets("dataset_auto1", collections=[run2]).count(), 0)
388 self.assertEqual(query.datasets("dataset_auto2", collections=[run2]).count(), 1)
389 self.assertEqual(query.datasets("dataset_auto1", collections=["output"]).count(), 1)
390 self.assertEqual(query.datasets("dataset_auto2", collections=["output"]).count(), 1)
392 def test_simple_qg_extend_run(self):
393 """Test for case when output data for one task already appears in
394 the output RUN collection, and `--extend-run` is used to skip it.
395 """
396 with DirectButlerRepo.make_temporary() as (helper, root):
397 helper.add_task()
398 helper.insert_datasets("dataset_auto0")
399 kwargs = self._make_run_args(
400 "-b",
401 root,
402 "-i",
403 helper.input_chain,
404 "-o",
405 "output",
406 "--register-dataset-types",
407 pipeline_graph_factory=PipelineGraphFactory(pipeline_graph=helper.pipeline_graph),
408 )
409 qg1 = script.qgraph(**kwargs)
410 run1 = qg1.header.output_run
411 self.assertEqual(len(qg1.quanta_by_task["task_auto1"]), 1)
412 self.assertEqual(len(qg1), 1)
413 # Ensure that the output run used in the graph is also used in the
414 # pipeline execution. It is possible for 'qgraph' and 'run' to
415 # calculate time-stamped runs across a second boundary.
416 kwargs["output_run"] = run1
417 # Execute the graph and check for output existence.
418 script.run(qg1, **kwargs)
419 with helper.butler.query() as query:
420 self.assertEqual(query.datasets("dataset_auto1", collections=["output"]).count(), 1)
421 # Make a new QG with the same output collection, but a new RUN
422 # collection, with --extend-run, and one more task. The first task
423 # should be skipped and the second should be run.
424 helper.add_task()
425 kwargs = self._make_run_args(
426 "-b",
427 root,
428 "-i",
429 helper.input_chain,
430 "-o",
431 "output",
432 "--register-dataset-types",
433 "--extend-run",
434 pipeline_graph_factory=PipelineGraphFactory(pipeline_graph=helper.pipeline_graph),
435 )
436 qg2 = script.qgraph(**kwargs)
437 run2 = qg2.header.output_run
438 self.assertEqual(run1, run2)
439 self.assertEqual(len(qg2.quanta_by_task["task_auto1"]), 0)
440 self.assertEqual(len(qg2.quanta_by_task["task_auto2"]), 1)
441 self.assertEqual(len(qg2), 1)
442 kwargs["output_run"] = run2
443 script.run(qg2, **kwargs)
444 with helper.butler.query() as query:
445 self.assertEqual(query.datasets("dataset_auto1", collections=[run2]).count(), 1)
446 self.assertEqual(query.datasets("dataset_auto2", collections=[run2]).count(), 1)
447 self.assertEqual(query.datasets("dataset_auto1", collections=["output"]).count(), 1)
448 self.assertEqual(query.datasets("dataset_auto2", collections=["output"]).count(), 1)
450 def test_simple_qg_clobber(self):
451 """Test for case when output data for one task already appears in
452 the output RUN collection, and `--extend-run --clobber-outputs` is used
453 to skip it.
454 """
455 with DirectButlerRepo.make_temporary() as (helper, root):
456 helper.add_task()
457 helper.insert_datasets("dataset_auto0")
458 kwargs = self._make_run_args(
459 "-b",
460 root,
461 "-i",
462 helper.input_chain,
463 "-o",
464 "output",
465 "--register-dataset-types",
466 pipeline_graph_factory=PipelineGraphFactory(pipeline_graph=helper.pipeline_graph),
467 )
468 qg1 = script.qgraph(**kwargs)
469 run1 = qg1.header.output_run
470 self.assertEqual(len(qg1.quanta_by_task), 1)
471 self.assertEqual(len(qg1), 1)
472 # Ensure that the output run used in the graph is also used in the
473 # pipeline execution. It is possible for 'qgraph' and 'run' to
474 # calculate time-stamped runs across a second boundary.
475 kwargs["output_run"] = run1
476 # Execute the graph and check for output existence.
477 script.run(qg1, **kwargs)
478 with helper.butler.query() as query:
479 self.assertEqual(query.datasets("dataset_auto1", collections=["output"]).count(), 1)
480 # Delete the metadata output so we don't take the skip-existing
481 # logic path instead of the clobbering one.
482 helper.butler.pruneDatasets(
483 helper.butler.query_datasets("task_auto1_metadata", collections=run1),
484 purge=True,
485 unstore=True,
486 disassociate=True,
487 )
488 # Make a new QG with the same output collection, but a new RUN
489 # collection, with --clobber-outputs, and one more task. Both
490 # tasks should be run.
491 helper.add_task()
492 kwargs = self._make_run_args(
493 "-b",
494 root,
495 "-i",
496 helper.input_chain,
497 "-o",
498 "output",
499 "--register-dataset-types",
500 "--extend-run",
501 "--clobber-outputs",
502 pipeline_graph_factory=PipelineGraphFactory(pipeline_graph=helper.pipeline_graph),
503 )
504 qg2 = script.qgraph(**kwargs)
505 run2 = qg2.header.output_run
506 self.assertEqual(run1, run2)
507 self.assertEqual(len(qg2.quanta_by_task), 2)
508 self.assertEqual(len(qg2), 2)
509 kwargs["output_run"] = run2
510 script.run(qg2, **kwargs)
511 with helper.butler.query() as query:
512 self.assertEqual(query.datasets("dataset_auto1", collections=[run2]).count(), 1)
513 self.assertEqual(query.datasets("dataset_auto2", collections=[run2]).count(), 1)
514 self.assertEqual(query.datasets("dataset_auto1", collections=["output"]).count(), 1)
515 self.assertEqual(query.datasets("dataset_auto2", collections=["output"]).count(), 1)
517 def test_simple_qg_replace_run(self):
518 """Test repeated execution of a trivial quantum graph with
519 --replace-run.
520 """
521 with DirectButlerRepo.make_temporary() as (helper, root):
522 helper.add_task()
523 helper.insert_datasets("dataset_auto0")
524 kwargs = self._make_run_args(
525 "-b",
526 root,
527 "-i",
528 helper.input_chain,
529 "-o",
530 "output",
531 "--register-dataset-types",
532 pipeline_graph_factory=PipelineGraphFactory(pipeline_graph=helper.pipeline_graph),
533 )
534 qg1 = script.qgraph(**kwargs)
535 run1 = qg1.header.output_run
536 self.assertEqual(len(qg1.quanta_by_task["task_auto1"]), 1)
537 self.assertEqual(len(qg1), 1)
538 # Ensure that the output run used in the graph is also used in the
539 # pipeline execution. It is possible for 'qgraph' and 'run' to
540 # calculate time-stamped runs across a second boundary.
541 kwargs["output_run"] = run1
542 # Execute the graph and check for output existence.
543 script.run(qg1, **kwargs)
544 with helper.butler.query() as query:
545 self.assertEqual(query.datasets("dataset_auto1", collections=["output"]).count(), 1)
546 # Delete the metadata output so we don't take the skip-existing
547 # logic path instead of the clobbering one.
548 helper.butler.pruneDatasets(
549 helper.butler.query_datasets("task_auto1_metadata", collections=run1),
550 purge=True,
551 unstore=True,
552 disassociate=True,
553 )
554 # Make a new QG with the same output collection, but a new RUN
555 # collection, with --clobber-outputs, and one more task. Both
556 # tasks should be run.
557 time.sleep(1) # Make sure we don't get the same RUN timestamp.
558 kwargs = self._make_run_args(
559 "-b",
560 root,
561 "-i",
562 helper.input_chain,
563 "-o",
564 "output",
565 "--replace-run",
566 pipeline_graph_factory=PipelineGraphFactory(pipeline_graph=helper.pipeline_graph),
567 )
568 qg2 = script.qgraph(**kwargs)
569 run2 = qg2.header.output_run
570 self.assertNotEqual(run1, run2)
571 self.assertEqual(len(qg2.quanta_by_task["task_auto1"]), 1)
572 self.assertEqual(len(qg2), 1)
573 kwargs["output_run"] = run2
574 script.run(qg2, **kwargs)
575 self.assertNotIn(run1, helper.butler.collections.get_info("output").children)
576 self.assertIn(run2, helper.butler.collections.get_info("output").children)
577 with helper.butler.query() as query:
578 self.assertEqual(query.datasets("dataset_auto1", collections=[run2]).count(), 1)
579 self.assertEqual(query.datasets("dataset_auto1", collections=["output"]).count(), 1)
580 # Repeat once again with --prune-replaced as well.
581 time.sleep(1) # Make sure we don't get the same RUN timestamp.
582 kwargs = self._make_run_args(
583 "-b",
584 root,
585 "-i",
586 helper.input_chain,
587 "-o",
588 "output",
589 "--replace-run",
590 "--prune-replaced",
591 "purge",
592 pipeline_graph_factory=PipelineGraphFactory(pipeline_graph=helper.pipeline_graph),
593 )
594 qg3 = script.qgraph(**kwargs)
595 run3 = qg3.header.output_run
596 self.assertNotEqual(run2, run3)
597 self.assertEqual(len(qg3.quanta_by_task["task_auto1"]), 1)
598 self.assertEqual(len(qg3), 1)
599 kwargs["output_run"] = run3
600 script.run(qg3, **kwargs)
601 self.assertNotIn(run2, helper.butler.collections.get_info("output").children)
602 with self.assertRaises(MissingCollectionError):
603 helper.butler.collections.get_info(run2)
604 self.assertIn(run3, helper.butler.collections.get_info("output").children)
605 with helper.butler.query() as query:
606 self.assertEqual(query.datasets("dataset_auto1", collections=[run3]).count(), 1)
607 self.assertEqual(query.datasets("dataset_auto1", collections=["output"]).count(), 1)
608 # Trying to run again with inputs that aren't exactly what we
609 # started with is an error, and the kind that should not modify the
610 # data repo.
611 kwargs = self._make_run_args(
612 "-b",
613 root,
614 "-i",
615 run1,
616 "-o",
617 "output",
618 "--replace-run",
619 "--prune-replaced",
620 "purge",
621 pipeline_graph_factory=PipelineGraphFactory(pipeline_graph=helper.pipeline_graph),
622 )
623 with self.assertRaises(ValueError):
624 script.qgraph(**kwargs)
626 def test_qg_partial_failure(self):
627 """Test execution of a quantum graph where one quantum fails but others
628 should continue.
629 """
630 with DirectButlerRepo.make_temporary("base.yaml") as (helper, root):
631 helper.add_task(
632 dimensions=["detector"], config=DynamicTestPipelineTaskConfig(fail_condition="detector=3")
633 )
634 helper.insert_datasets("dataset_auto0")
635 kwargs = self._make_run_args(
636 "-b",
637 root,
638 "-i",
639 helper.input_chain,
640 "-o",
641 "output",
642 "--register-dataset-types",
643 pipeline_graph_factory=PipelineGraphFactory(pipeline_graph=helper.pipeline_graph),
644 )
645 qg = script.qgraph(**kwargs)
646 self.assertEqual(len(qg.quanta_by_task), 1)
647 self.assertEqual(len(qg), 4)
648 kwargs["output_run"] = qg.header.output_run
649 # Execute the graph and check for output existence.
650 with self.assertRaises(MPGraphExecutorError):
651 script.run(qg, **kwargs)
652 with helper.butler.query() as query:
653 self.assertEqual(query.datasets("dataset_auto1", collections=["output"]).count(), 3)
656class CoverageTestCase(unittest.TestCase):
657 """Test the coverage context manager."""
659 @unittest.mock.patch.dict("sys.modules", coverage=unittest.mock.MagicMock())
660 def testWithCoverage(self):
661 """Test that the coverage context manager runs when invoked."""
662 with coverage_context({"coverage": True}):
663 self.assertTrue(True)
665 @unittest.mock.patch("lsst.ctrl.mpexec.cli.cmd.commands.import_module", side_effect=ModuleNotFoundError())
666 def testWithMissingCoverage(self, mock_import): # numpydoc ignore=PR01
667 """Test that the coverage context manager complains when coverage is
668 not available.
669 """
670 with self.assertRaises(click.exceptions.ClickException):
671 with coverage_context({"coverage": True}):
672 pass
675if __name__ == "__main__":
676 lsst.utils.tests.init()
677 unittest.main()