Coverage for tests / test_init_output_run.py: 14%
257 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-18 08:44 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-18 08:44 +0000
1# This file is part of pipe_base.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <https://www.gnu.org/licenses/>.
28import itertools
29import tempfile
30import unittest
31from collections.abc import Iterator
32from contextlib import contextmanager
33from typing import ClassVar
35import lsst.utils.tests
36from lsst.daf.butler import (
37 Butler,
38 DatasetRef,
39 DatasetType,
40 MissingDatasetTypeError,
41 QuantumBackedButler,
42 SerializedDatasetType,
43 StorageClassFactory,
44)
45from lsst.daf.butler.registry import ConflictingDefinitionError
46from lsst.pipe.base.all_dimensions_quantum_graph_builder import AllDimensionsQuantumGraphBuilder
47from lsst.pipe.base.pipeline_graph import PipelineGraph
48from lsst.pipe.base.quantum_graph import PredictedQuantumGraph
49from lsst.pipe.base.tests.mocks import (
50 DynamicConnectionConfig,
51 DynamicTestPipelineTask,
52 DynamicTestPipelineTaskConfig,
53 MockDataset,
54)
57def _have_example_storage_classes() -> bool:
58 """Check whether some storage classes work as expected.
60 Given that these have registered converters, it shouldn't actually be
61 necessary to import those types in order to determine that they're
62 convertible, but the storage class machinery is implemented such that types
63 that can't be imported can't be converted, and while that's inconvenient
64 here it's totally fine in non-testing scenarios where you only care about a
65 storage class if you can actually use it.
66 """
67 getter = StorageClassFactory().getStorageClass
68 return (
69 getter("ArrowTable").can_convert(getter("ArrowAstropy"))
70 and getter("ArrowAstropy").can_convert(getter("ArrowTable"))
71 and getter("ArrowTable").can_convert(getter("DataFrame"))
72 and getter("DataFrame").can_convert(getter("ArrowTable"))
73 )
76class InitOutputRunTestCase(unittest.TestCase):
77 """Tests for the init_output_run methods of PipelineGraph and
78 QuantumGraph.
79 """
81 INPUT_COLLECTION: ClassVar[str] = "overall_inputs"
83 @contextmanager
84 def make_butler(self) -> Iterator[Butler]:
85 """Wrap a temporary local butler repository in a context manager."""
86 with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as root:
87 config = Butler.makeRepo(root)
88 with Butler.from_config(config, writeable=True) as butler:
89 yield butler
91 @contextmanager
92 def prep_butler(self, pipeline_graph: PipelineGraph) -> Iterator[Butler]:
93 """Create a temporary local butler repository with the dataset types
94 and input datasets needed by a pipeline graph.
96 This also resolves the pipeline graph and checks dataset types
97 immediately after they are registered, providing test coverage for the
98 methods that do that.
99 """
100 with self.make_butler() as butler:
101 butler.collections.register(self.INPUT_COLLECTION)
102 pipeline_graph.resolve(butler.registry)
103 with self.assertRaises(MissingDatasetTypeError):
104 pipeline_graph.check_dataset_type_registrations(butler)
105 pipeline_graph.register_dataset_types(butler)
106 pipeline_graph.check_dataset_type_registrations(butler)
107 for _, dataset_type_node in pipeline_graph.iter_overall_inputs():
108 butler.put(
109 MockDataset(
110 dataset_id=None,
111 dataset_type=SerializedDatasetType(
112 name=dataset_type_node.name,
113 dimensions=[],
114 storageClass=dataset_type_node.storage_class_name,
115 ),
116 data_id={},
117 run=self.INPUT_COLLECTION,
118 ),
119 dataset_type_node.name,
120 run=self.INPUT_COLLECTION,
121 )
122 yield butler
124 def find_init_output_refs(
125 self, pipeline_graph: PipelineGraph, butler: Butler
126 ) -> dict[str, list[DatasetRef]]:
127 """Find the init-output datasets of a pipeline graph in a butler
128 repository.
130 Parameters
131 ----------
132 pipeline_graph : `PipelineGraph`
133 Pipeline graph.
134 butler : `lsst.daf.butler.Butler`
135 Full butler client.
137 Returns
138 -------
139 init_output_refs : `dict`
140 Dataset references, keyed by task label. Storage classes will
141 match the data repository definitions of the dataset types. The
142 special 'packages' dataset type will be included under a '*' key.
143 """
144 init_output_refs: dict[str, list[DatasetRef]] = {}
145 for task_node in pipeline_graph.tasks.values():
146 init_output_refs_for_task: list[DatasetRef] = []
147 for write_edge in task_node.init.iter_all_outputs():
148 ref = butler.find_dataset(write_edge.dataset_type_name)
149 assert ref is not None # For linters.
150 # Check that the ref we got back uses the dataset type node's
151 # definition of the dataset type (including storage class).
152 self.assertEqual(
153 ref.datasetType, pipeline_graph.dataset_types[write_edge.dataset_type_name].dataset_type
154 )
155 # Remember the version of the ref that has the task's storage
156 # class, in case they differ.
157 init_output_refs_for_task.append(write_edge.adapt_dataset_ref(ref))
158 init_output_refs[task_node.label] = init_output_refs_for_task
159 init_output_refs["*"] = [butler.find_dataset(pipeline_graph.packages_dataset_type)]
160 return init_output_refs
162 def get_quantum_graph_init_output_refs(
163 self, quantum_graph: PredictedQuantumGraph
164 ) -> dict[str, list[DatasetRef]]:
165 """Extract dataset references from a QuantumGraph into the same form
166 as returned by `find_init_output_refs`.
167 """
168 init_output_refs: dict[str, list[DatasetRef]] = {}
169 for task_label in quantum_graph.pipeline_graph.tasks:
170 init_output_refs[task_label] = list(quantum_graph.get_init_outputs(task_label).values())
171 init_output_refs["*"] = list(quantum_graph.get_init_outputs("").values())
172 return init_output_refs
174 def assert_init_output_refs_equal(
175 self, a: dict[str, list[DatasetRef]], b: dict[str, list[DatasetRef]]
176 ) -> None:
177 """Check that two dictionaries of the form returned by
178 `find_init_output_refs` are equal.
179 """
180 self.assertEqual(a.keys(), b.keys())
181 for task_label, init_output_refs_for_task in a.items():
182 self.assertCountEqual(init_output_refs_for_task, b[task_label])
184 def check_qbb_consistency(
185 self, init_output_refs: dict[str, list[DatasetRef]], qbb: QuantumBackedButler
186 ) -> None:
187 """Check that a quantum-backed butler sees all of the given datasets.
189 Parameters
190 ----------
191 init_output_refs : `dict`
192 Dataset references, keyed by task label. Storage classes should
193 match the data repository definitions of the dataset types. The
194 special 'packages' dataset type should be included under a '*' key.
195 qbb : `lsst.daf.butler.QuantumBackedButler`
196 A quantum-backed butler.
197 """
198 for task_label, init_output_refs_for_task in init_output_refs.items():
199 for ref, stored_in in qbb.stored_many(init_output_refs_for_task).items():
200 self.assertTrue(
201 stored_in, msg=f"Init-input {ref} of task {task_label} not stored according to QBB."
202 )
204 def init_with_pipeline_graph_first(
205 self, pipeline_graph: PipelineGraph, butler: Butler, run: str
206 ) -> PredictedQuantumGraph:
207 """Test the init_output_run methods of PipelineGraph and QuantumGraph,
208 using the former to actually write init-outputs (with later attempts
209 correctly failing or doing nothing, depending on parameters).
210 """
211 butler = butler.clone(run=run, collections=[self.INPUT_COLLECTION, run])
212 pipeline_graph.init_output_run(butler)
213 init_output_refs = self.find_init_output_refs(pipeline_graph, butler)
214 # Build a QG with the init outputs already in place.
215 quantum_graph_builder = AllDimensionsQuantumGraphBuilder(
216 pipeline_graph,
217 butler,
218 skip_existing_in=[run],
219 output_run=run,
220 input_collections=[self.INPUT_COLLECTION],
221 )
222 quantum_graph = quantum_graph_builder.finish(metadata={"output_run": run}).assemble()
223 # Check that the QG refs are the same as the ones that were present
224 # already.
225 self.assert_init_output_refs_equal(
226 self.get_quantum_graph_init_output_refs(quantum_graph),
227 init_output_refs,
228 )
229 # Initialize with the pipeline graph, should be a no-op.
230 pipeline_graph.init_output_run(butler)
231 self.assert_init_output_refs_equal(
232 self.find_init_output_refs(pipeline_graph, butler),
233 init_output_refs,
234 )
235 # Initialize again with the QG; should be a no-op.
236 quantum_graph.init_output_run(butler)
237 self.assert_init_output_refs_equal(
238 self.find_init_output_refs(pipeline_graph, butler),
239 init_output_refs,
240 )
241 # Initialize again with the QG but tell it to expect an empty run.
242 with self.assertRaises(ConflictingDefinitionError):
243 quantum_graph.init_output_run(butler, existing=False)
244 with self.assertRaises(ConflictingDefinitionError):
245 quantum_graph.write_configs(butler, compare_existing=False)
246 with self.assertRaises(ConflictingDefinitionError):
247 quantum_graph.write_packages(butler, compare_existing=False)
248 with self.assertRaises(ConflictingDefinitionError):
249 quantum_graph.write_init_outputs(butler, skip_existing=False)
250 # Make a QBB, check that it can see the init outputs.
251 qbb = quantum_graph.make_init_qbb(butler._config)
252 self.enterContext(qbb)
253 self.check_qbb_consistency(init_output_refs, qbb)
254 # Use QBB to initialize again, should be a no-op.
255 quantum_graph.init_output_run(qbb)
256 self.check_qbb_consistency(init_output_refs, qbb)
257 # Use QBB to initialize again but tell it to expect an empty run.
258 with self.assertRaises(ConflictingDefinitionError):
259 quantum_graph.init_output_run(qbb, existing=False)
260 return quantum_graph
262 def init_with_quantum_graph_first(
263 self, pipeline_graph: PipelineGraph, butler: Butler, run: str
264 ) -> PredictedQuantumGraph:
265 """Test the init_output_run methods of PipelineGraph and QuantumGraph,
266 using the latter to actually write init-outputs (with later attempts
267 correctly failing or doing nothing, depending on parameters).
268 """
269 butler = butler.clone(run=run, collections=[self.INPUT_COLLECTION, run])
270 # Build a QG.
271 quantum_graph_builder = AllDimensionsQuantumGraphBuilder(
272 pipeline_graph,
273 butler,
274 input_collections=[self.INPUT_COLLECTION],
275 )
276 quantum_graph = quantum_graph_builder.finish(metadata={"output_run": run}).assemble()
277 # Initialize with the QG.
278 quantum_graph.init_output_run(butler)
279 # Check that the QG refs are the same as the ones we find in the repo.
280 init_output_refs = self.find_init_output_refs(pipeline_graph, butler)
281 self.assert_init_output_refs_equal(
282 self.get_quantum_graph_init_output_refs(quantum_graph),
283 init_output_refs,
284 )
285 # Initialize again with the QG; should be a no-op.
286 quantum_graph.init_output_run(butler)
287 self.assert_init_output_refs_equal(
288 self.find_init_output_refs(pipeline_graph, butler),
289 init_output_refs,
290 )
291 # Initialize again with the QG but tell it to expect an empty run.
292 with self.assertRaises(ConflictingDefinitionError):
293 quantum_graph.init_output_run(butler, existing=False)
294 with self.assertRaises(ConflictingDefinitionError):
295 quantum_graph.write_configs(butler, compare_existing=False)
296 with self.assertRaises(ConflictingDefinitionError):
297 quantum_graph.write_packages(butler, compare_existing=False)
298 with self.assertRaises(ConflictingDefinitionError):
299 quantum_graph.write_init_outputs(butler, skip_existing=False)
300 # Initialize with the pipeline graph, should be a no-op.
301 pipeline_graph.init_output_run(butler)
302 # Make a QBB, check that it can see the init outputs.
303 qbb = quantum_graph.make_init_qbb(butler._config)
304 self.enterContext(qbb)
305 self.check_qbb_consistency(init_output_refs, qbb)
306 # Use QBB to initialize again, should be a no-op.
307 quantum_graph.init_output_run(qbb)
308 self.check_qbb_consistency(init_output_refs, qbb)
309 # Use QBB to initialize again but tell it to expect an empty run.
310 with self.assertRaises(ConflictingDefinitionError):
311 quantum_graph.init_output_run(qbb, existing=False)
312 return quantum_graph
314 def init_with_qbb_first(
315 self, pipeline_graph: PipelineGraph, butler: Butler, run: str
316 ) -> PredictedQuantumGraph:
317 """Test the init_output_run methods of PipelineGraph and QuantumGraph,
318 using the latter a quantum-backed butler to actually write init-outputs
319 (with later attempts correctly failing or doing nothing, depending on
320 parameters).
321 """
322 butler = butler.clone(run=run, collections=[self.INPUT_COLLECTION, run])
323 # Build a QG.
324 quantum_graph_builder = AllDimensionsQuantumGraphBuilder(
325 pipeline_graph,
326 butler,
327 input_collections=[self.INPUT_COLLECTION],
328 )
329 quantum_graph = quantum_graph_builder.finish(metadata={"output_run": run}).assemble()
330 # Make a quantum-backed butler and use it to initialize the run.
331 qbb = quantum_graph.make_init_qbb(butler._config)
332 self.enterContext(qbb)
333 quantum_graph.init_output_run(qbb)
334 init_output_refs = self.get_quantum_graph_init_output_refs(quantum_graph)
335 self.check_qbb_consistency(init_output_refs, qbb)
336 # Use QBB to initialize again, should be a no-op.
337 self.check_qbb_consistency(init_output_refs, qbb)
338 # Use QBB to initialize again but tell it to expect an empty run.
339 with self.assertRaises(ConflictingDefinitionError):
340 quantum_graph.init_output_run(qbb, existing=False)
341 # Transferring datasets back to the main butler (i.e. insert DB entries
342 # for them).
343 butler.transfer_from(qbb, itertools.chain.from_iterable(init_output_refs.values()))
344 # Check that the QG refs are the same as the ones we find in the repo.
345 self.assert_init_output_refs_equal(
346 self.find_init_output_refs(pipeline_graph, butler),
347 init_output_refs,
348 )
349 # Initialize again with the QG; should be a no-op.
350 quantum_graph.init_output_run(butler)
351 self.assert_init_output_refs_equal(
352 self.find_init_output_refs(pipeline_graph, butler),
353 init_output_refs,
354 )
355 # Initialize again with the QG but tell it to expect an empty run.
356 with self.assertRaises(ConflictingDefinitionError):
357 quantum_graph.init_output_run(butler, existing=False)
358 # Initialize with the pipeline graph, should be a no-op.
359 pipeline_graph.init_output_run(butler)
360 return quantum_graph
362 def test_two_tasks_no_conversions(self) -> None:
363 """Test a two-task pipeline with an overall init-input, an overall
364 init-output, and an init-intermediate.
365 """
366 a_config = DynamicTestPipelineTaskConfig()
367 a_config.inputs["i"] = DynamicConnectionConfig(dataset_type_name="input_runtime")
368 a_config.outputs["o"] = DynamicConnectionConfig(dataset_type_name="intermediate_runtime")
369 a_config.init_inputs["ii"] = DynamicConnectionConfig(dataset_type_name="input_init")
370 a_config.init_outputs["io"] = DynamicConnectionConfig(dataset_type_name="intermediate_init")
371 b_config = DynamicTestPipelineTaskConfig()
372 b_config.inputs["i"] = DynamicConnectionConfig(dataset_type_name="intermediate_runtime")
373 b_config.outputs["o"] = DynamicConnectionConfig(dataset_type_name="output_runtime")
374 b_config.init_inputs["ii"] = DynamicConnectionConfig(dataset_type_name="intermediate_init")
375 b_config.init_outputs["io"] = DynamicConnectionConfig(dataset_type_name="output_init")
376 pipeline_graph = PipelineGraph()
377 pipeline_graph.add_task("a", DynamicTestPipelineTask, a_config)
378 pipeline_graph.add_task("b", DynamicTestPipelineTask, b_config)
379 with self.prep_butler(pipeline_graph) as butler:
380 self.init_with_pipeline_graph_first(pipeline_graph, butler, "run1")
381 self.assertEqual(butler.get("a_config", collections="run1"), a_config)
382 self.assertEqual(butler.get("b_config", collections="run1"), b_config)
383 self.init_with_quantum_graph_first(pipeline_graph, butler, "run2")
384 self.assertEqual(butler.get("a_config", collections="run2"), a_config)
385 self.assertEqual(butler.get("b_config", collections="run2"), b_config)
386 self.init_with_qbb_first(pipeline_graph, butler, "run3")
387 self.assertEqual(butler.get("a_config", collections="run3"), a_config)
388 self.assertEqual(butler.get("b_config", collections="run3"), b_config)
390 def test_optional_input_unregistered(self) -> None:
391 """Test that an optional input dataset type that is not registered is
392 not considered an error.
393 """
394 a_config = DynamicTestPipelineTaskConfig()
395 a_config.inputs["i"] = DynamicConnectionConfig(dataset_type_name="input_runtime", minimum=0)
396 a_config.outputs["o"] = DynamicConnectionConfig(dataset_type_name="output_runtime")
397 pipeline_graph = PipelineGraph()
398 pipeline_graph.add_task("a", DynamicTestPipelineTask, a_config)
399 with self.make_butler() as butler:
400 pipeline_graph.resolve(butler.registry)
401 butler.registry.registerDatasetType(pipeline_graph.dataset_types["a_config"].dataset_type)
402 butler.registry.registerDatasetType(pipeline_graph.dataset_types["a_log"].dataset_type)
403 butler.registry.registerDatasetType(pipeline_graph.dataset_types["a_metadata"].dataset_type)
404 butler.registry.registerDatasetType(pipeline_graph.dataset_types["output_runtime"].dataset_type)
405 pipeline_graph.check_dataset_type_registrations(butler, include_packages=False)
407 def test_registration_changed(self) -> None:
408 """Test that we get an error when dataset type registrations in a data
409 repository change between the time a pipeline graph is resolved (e.g.
410 at QG generation) and when dataset types are checked later (e.g. during
411 execution).
412 """
413 a_config = DynamicTestPipelineTaskConfig()
414 a_config.inputs["i"] = DynamicConnectionConfig(dataset_type_name="input_runtime")
415 a_config.outputs["o"] = DynamicConnectionConfig(dataset_type_name="output_runtime")
416 pipeline_graph = PipelineGraph()
417 pipeline_graph.add_task("a", DynamicTestPipelineTask, a_config)
418 with self.make_butler() as butler:
419 pipeline_graph.resolve(butler.registry)
420 pipeline_graph.register_dataset_types(butler)
421 butler.registry.removeDatasetType("input_runtime")
422 butler.registry.registerDatasetType(
423 DatasetType("input_runtime", {"instrument"}, "StructuredDataList", universe=butler.dimensions)
424 )
425 with self.assertRaises(ConflictingDefinitionError):
426 pipeline_graph.check_dataset_type_registrations(butler)
428 @unittest.skipUnless(
429 _have_example_storage_classes(), "Arrow/Astropy/Pandas storage classes are not available."
430 )
431 def test_init_intermediate_component(self) -> None:
432 """Test init_output_run with an init-intermediate that is written as
433 a composite and read as a component.
434 """
435 a_config = DynamicTestPipelineTaskConfig()
436 a_config.inputs["i"] = DynamicConnectionConfig(dataset_type_name="input_runtime")
437 a_config.outputs["o"] = DynamicConnectionConfig(dataset_type_name="intermediate_runtime")
438 a_config.init_outputs["io"] = DynamicConnectionConfig(
439 dataset_type_name="intermediate_init", storage_class="ArrowTable"
440 )
441 b_config = DynamicTestPipelineTaskConfig()
442 b_config.inputs["i"] = DynamicConnectionConfig(dataset_type_name="intermediate_runtime")
443 b_config.outputs["o"] = DynamicConnectionConfig(dataset_type_name="output_runtime")
444 b_config.init_inputs["ii"] = DynamicConnectionConfig(
445 dataset_type_name="intermediate_init.schema", storage_class="ArrowSchema"
446 )
447 pipeline_graph = PipelineGraph()
448 pipeline_graph.add_task("a", DynamicTestPipelineTask, a_config)
449 pipeline_graph.add_task("b", DynamicTestPipelineTask, b_config)
450 with self.prep_butler(pipeline_graph) as butler:
451 self.init_with_pipeline_graph_first(pipeline_graph, butler, "run1")
452 self.assertEqual(butler.get("a_config", collections="run1"), a_config)
453 self.assertEqual(butler.get("b_config", collections="run1"), b_config)
454 self.init_with_quantum_graph_first(pipeline_graph, butler, "run2")
455 self.assertEqual(butler.get("a_config", collections="run2"), a_config)
456 self.assertEqual(butler.get("b_config", collections="run2"), b_config)
457 self.init_with_qbb_first(pipeline_graph, butler, "run3")
458 self.assertEqual(butler.get("a_config", collections="run3"), a_config)
459 self.assertEqual(butler.get("b_config", collections="run3"), b_config)
461 def test_no_get_init_input_callback(self) -> None:
462 """Test calling PipelineGraph.instantiate_tasks with no get_init_input
463 callback when one is necessary.
464 """
465 a_config = DynamicTestPipelineTaskConfig()
466 a_config.inputs["i"] = DynamicConnectionConfig(dataset_type_name="input_runtime")
467 a_config.outputs["o"] = DynamicConnectionConfig(dataset_type_name="output_runtime")
468 a_config.init_inputs["ii"] = DynamicConnectionConfig(dataset_type_name="input_init")
469 pipeline_graph = PipelineGraph()
470 pipeline_graph.add_task("a", DynamicTestPipelineTask, a_config)
471 with self.make_butler() as butler:
472 pipeline_graph.resolve(butler.registry)
473 with self.assertRaises(ValueError):
474 pipeline_graph.instantiate_tasks()
476 def test_multiple_init_input_consumers(self) -> None:
477 """Test init_output_run when there are two tasks consuming the same
478 init-input.
479 """
480 a_config = DynamicTestPipelineTaskConfig()
481 a_config.inputs["i"] = DynamicConnectionConfig(dataset_type_name="input_runtime")
482 a_config.outputs["o"] = DynamicConnectionConfig(dataset_type_name="intermediate_runtime")
483 a_config.init_inputs["ii"] = DynamicConnectionConfig(dataset_type_name="input_init")
484 a_config.init_outputs["io"] = DynamicConnectionConfig(dataset_type_name="output_init")
485 b_config = DynamicTestPipelineTaskConfig()
486 b_config.inputs["i"] = DynamicConnectionConfig(dataset_type_name="intermediate_runtime")
487 b_config.outputs["o"] = DynamicConnectionConfig(dataset_type_name="output_runtime")
488 b_config.init_inputs["ii"] = DynamicConnectionConfig(dataset_type_name="input_init")
489 pipeline_graph = PipelineGraph()
490 pipeline_graph.add_task("a", DynamicTestPipelineTask, a_config)
491 pipeline_graph.add_task("b", DynamicTestPipelineTask, b_config)
492 with self.prep_butler(pipeline_graph) as butler:
493 self.init_with_pipeline_graph_first(pipeline_graph, butler, "run1")
494 self.assertEqual(butler.get("a_config", collections="run1"), a_config)
495 self.assertEqual(butler.get("b_config", collections="run1"), b_config)
496 self.init_with_quantum_graph_first(pipeline_graph, butler, "run2")
497 self.assertEqual(butler.get("a_config", collections="run2"), a_config)
498 self.assertEqual(butler.get("b_config", collections="run2"), b_config)
499 self.init_with_qbb_first(pipeline_graph, butler, "run3")
500 self.assertEqual(butler.get("a_config", collections="run3"), a_config)
501 self.assertEqual(butler.get("b_config", collections="run3"), b_config)
503 def test_config_change(self) -> None:
504 """Test init_output_run when there is an existing config that is
505 inconsistent with the one in the pipeline graph.
506 """
507 a_config = DynamicTestPipelineTaskConfig()
508 a_config.inputs["i"] = DynamicConnectionConfig(dataset_type_name="input_runtime")
509 a_config.outputs["o"] = DynamicConnectionConfig(dataset_type_name="output_runtime")
510 pipeline_graph = PipelineGraph()
511 pipeline_graph.add_task("a", DynamicTestPipelineTask, a_config)
512 with self.prep_butler(pipeline_graph) as butler:
513 butler.collections.register("run1")
514 butler.put(DynamicTestPipelineTaskConfig(), "a_config", run="run1")
515 with self.assertRaises(ConflictingDefinitionError):
516 pipeline_graph.init_output_run(
517 butler.clone(run="run1", collections=[self.INPUT_COLLECTION, "run1"])
518 )
519 quantum_graph_builder = AllDimensionsQuantumGraphBuilder(
520 pipeline_graph,
521 butler,
522 skip_existing_in=["run1"],
523 output_run="run1",
524 input_collections=[self.INPUT_COLLECTION],
525 )
526 quantum_graph = quantum_graph_builder.finish(metadata={"output_run": "run1"}).assemble()
527 with self.assertRaises(ConflictingDefinitionError):
528 quantum_graph.init_output_run(
529 butler.clone(run="run1", collections=[self.INPUT_COLLECTION, "run1"])
530 )
533if __name__ == "__main__":
534 lsst.utils.tests.init()
535 unittest.main()