Coverage for tests / test_predicted_qg.py: 13%
216 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-18 08:44 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-18 08:44 +0000
1# This file is part of pipe_base.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <https://www.gnu.org/licenses/>.
28import itertools
29import logging
30import os
31import shutil
32import subprocess
33import tempfile
34import unittest
35import uuid
36from io import StringIO
38import lsst.utils.tests
39from lsst.pipe.base import QuantumGraph
40from lsst.pipe.base.dot_tools import graph2dot
41from lsst.pipe.base.mermaid_tools import graph2mermaid
42from lsst.pipe.base.quantum_graph import (
43 FORMAT_VERSION,
44 AddressReader,
45 PredictedDatasetInfo,
46 PredictedQuantumGraph,
47 PredictedQuantumInfo,
48)
49from lsst.pipe.base.tests.mocks import DynamicConnectionConfig, InMemoryRepo
51logging.getLogger("timer").setLevel(logging.INFO)
52logging.getLogger("lsst").setLevel(logging.INFO)
53logging.getLogger("lsst.pipe.base.quantum_graph").setLevel(logging.DEBUG)
56class PredictedQuantumGraphTestCase(unittest.TestCase):
57 """Unit tests for the predicted quantum graph classes.
59 These tests focus on the interface of the predicted quantum graph
60 especially I/O and conversion to and from the old quantum graph class.
62 Additional test coverage is provided by the tests for quantum graph
63 builders and executors, which all use this class as well.
64 """
66 def setUp(self):
67 self.helper = InMemoryRepo("base.yaml", "spatial.yaml")
68 self.enterContext(self.helper)
69 self.helper.add_task(
70 "calibrate",
71 dimensions=["visit", "detector"],
72 inputs={
73 "input_image": DynamicConnectionConfig(
74 dataset_type_name="raw",
75 dimensions=["visit", "detector"],
76 )
77 },
78 prerequisite_inputs={
79 "refcat": DynamicConnectionConfig(
80 dataset_type_name="references",
81 dimensions=["htm7"],
82 multiple=True,
83 )
84 },
85 init_outputs={
86 "output_schema": DynamicConnectionConfig(
87 dataset_type_name="source_schema",
88 )
89 },
90 outputs={
91 "output_image": DynamicConnectionConfig(
92 dataset_type_name="image",
93 dimensions=["visit", "detector"],
94 ),
95 "output_table": DynamicConnectionConfig(
96 dataset_type_name="source_detector",
97 dimensions=["visit", "detector"],
98 ),
99 },
100 )
101 self.helper.add_task(
102 "consolidate",
103 dimensions=["visit"],
104 init_inputs={
105 "input_schema": DynamicConnectionConfig(
106 dataset_type_name="source_schema",
107 )
108 },
109 inputs={
110 "input_table": DynamicConnectionConfig(
111 dataset_type_name="source_detector",
112 dimensions=["visit", "detector"],
113 multiple=True,
114 )
115 },
116 outputs={
117 "output_table": DynamicConnectionConfig(
118 dataset_type_name="source",
119 dimensions=["visit"],
120 )
121 },
122 )
123 self.helper.add_task(
124 "resample",
125 dimensions=["patch", "visit"],
126 inputs={
127 "input_image": DynamicConnectionConfig(
128 dataset_type_name="image",
129 dimensions=["visit", "detector"],
130 multiple=True,
131 )
132 },
133 outputs={
134 "output_image": DynamicConnectionConfig(
135 dataset_type_name="warp",
136 dimensions=["patch", "visit"],
137 )
138 },
139 )
140 self.helper.add_task(
141 "coadd",
142 dimensions=["patch", "band"],
143 inputs={
144 "input_image": DynamicConnectionConfig(
145 dataset_type_name="warp",
146 dimensions=["patch", "visit"],
147 multiple=True,
148 )
149 },
150 outputs={
151 "output_image": DynamicConnectionConfig(
152 dataset_type_name="coadd",
153 dimensions=["patch", "band"],
154 ),
155 "output_table": DynamicConnectionConfig(
156 dataset_type_name="object",
157 dimensions=["patch", "band"],
158 # Like all other (defaulted) storage classes here,
159 # 'ArrowAstropy' below will be mocked; we pick it so we can
160 # try a component input, and because we know its pytype is
161 # safe to import here.
162 storage_class="ArrowAstropy",
163 ),
164 },
165 )
166 self.helper.add_task(
167 "match",
168 dimensions=["htm6"],
169 inputs={
170 "input_object": DynamicConnectionConfig(
171 dataset_type_name="object",
172 dimensions=["patch", "band"],
173 multiple=True,
174 storage_class="ArrowAstropy",
175 ),
176 "input_source": DynamicConnectionConfig(
177 dataset_type_name="source",
178 dimensions=["visit"],
179 multiple=True,
180 ),
181 "input_object_schema": DynamicConnectionConfig(
182 dataset_type_name="object.schema",
183 dimensions=["visit"],
184 multiple=True,
185 storage_class="ArrowAstropySchema",
186 ),
187 },
188 outputs={
189 "output_table": DynamicConnectionConfig(
190 dataset_type_name="matches",
191 dimensions=["htm6"],
192 )
193 },
194 )
195 self.builder = self.helper.make_quantum_graph_builder()
197 def check_quantum_graph(
198 self,
199 qg: PredictedQuantumGraph,
200 *,
201 all_tasks: bool = True,
202 thin_graph: bool = True,
203 all_quantum_datasets: bool = True,
204 init_quanta: bool = True,
205 dimension_data_loaded: bool = True,
206 dimension_data_deserialized: bool = True,
207 converting_partial: bool = False,
208 ) -> None:
209 """Test the attributes and methods of a quantum graph produced by this
210 test case's builder.
212 Parameters
213 ----------
214 qg : `lsst.pipe.base.quantum_graph.PredictedQuantumGraph`
215 Graph to test.
216 all_tasks : `bool`, optional
217 Whether all tasks in the pipeline were loaded.
218 thin_graph : `bool`, optional
219 Whether the ``thin_graph`` component was loaded.
220 all_quantum_datasets : `bool`, optional
221 Whether all quantum datasets were loaded.
222 init_quanta : `bool`, optional
223 Whether the ``init_quanta`` component was loaded.
224 dimension_data_loaded : `bool`, optional
225 Whether the ``dimension_data`` component was loaded at all.
226 dimension_data_deserialized : `bool`, optional
227 Whether all dimension records are expected to have been
228 deserialized. Ignored if ``dimension_data_loaded`` is `False`.
229 converting_partial : `bool`, optional
230 Whether this was a partial load that only included some quanta.
231 """
232 self.assertEqual(qg.header.inputs, ["input_chain"])
233 self.assertEqual(qg.header.output, "output_chain")
234 self.assertEqual(qg.header.output_run, "output_run")
235 self.assertEqual(qg.header.metadata["stuff"], "whatever")
236 self.assertEqual(qg.header.version, FORMAT_VERSION)
237 if not converting_partial:
238 # When partial-reading an old QG, counts reflect what was loaded.
239 # In all other cases they reflect the full original graph.
240 self.assertEqual(qg.header.n_task_quanta["calibrate"], 8)
241 self.assertEqual(qg.header.n_task_quanta["consolidate"], 2)
242 self.assertEqual(qg.header.n_task_quanta["resample"], 10)
243 self.assertEqual(qg.header.n_task_quanta["coadd"], 10)
244 self.assertEqual(qg.header.n_task_quanta["match"], 2)
245 self.assertEqual(qg.header.n_quanta, 32)
246 self.assertEqual(
247 qg.header.n_datasets,
248 63 # non-automatic datasets (see breakdown below)
249 + 32 * 2 # one metadata and one log for each quantam
250 + 5 # one config for each task
251 + 1, # packages
252 )
253 if all_tasks:
254 self.assertFalse(self.helper.pipeline_graph.diff_tasks(qg.pipeline_graph))
255 if thin_graph or all_quantum_datasets:
256 self.assertEqual(len(qg), qg.header.n_quanta)
257 self.assertEqual(len(qg.quantum_only_xgraph), qg.header.n_quanta)
258 self.assertEqual(len(qg.quanta_by_task["calibrate"]), 8)
259 self.assertEqual(len(qg.quanta_by_task["consolidate"]), 2)
260 self.assertEqual(len(qg.quanta_by_task["resample"]), 10)
261 self.assertEqual(len(qg.quanta_by_task["coadd"]), 10)
262 self.assertEqual(len(qg.quanta_by_task["match"]), 2)
263 if init_quanta:
264 if all_tasks:
265 self.assertEqual(len(qg.datasets_by_type["calibrate_config"]), 1)
266 self.assertEqual(len(qg.datasets_by_type["consolidate_config"]), 1)
267 self.assertEqual(len(qg.datasets_by_type["resample_config"]), 1)
268 self.assertEqual(len(qg.datasets_by_type["coadd_config"]), 1)
269 self.assertEqual(len(qg.datasets_by_type["match_config"]), 1)
270 self.assertEqual(len(qg.datasets_by_type["source_schema"]), 1)
271 self.assertEqual(len(qg.datasets_by_type["packages"]), 1)
272 n_init_datasets = 7
273 else:
274 n_init_datasets = 0
275 if all_quantum_datasets:
276 assert all_tasks
277 self.assertEqual(
278 len(qg.bipartite_xgraph),
279 qg.header.n_quanta + qg.header.n_datasets - n_init_datasets,
280 )
281 self.assertEqual(len(qg.datasets_by_type["raw"]), 8)
282 self.assertEqual(len(qg.datasets_by_type["references"]), 4)
283 self.assertEqual(len(qg.datasets_by_type["image"]), 8)
284 self.assertEqual(len(qg.datasets_by_type["source_detector"]), 8)
285 self.assertEqual(len(qg.datasets_by_type["source"]), 2)
286 self.assertEqual(len(qg.datasets_by_type["warp"]), 10)
287 self.assertEqual(len(qg.datasets_by_type["coadd"]), 10)
288 self.assertEqual(len(qg.datasets_by_type["object"]), 10)
289 self.assertEqual(len(qg.datasets_by_type["matches"]), 2)
290 # Spot-check some edges and graph structure.
291 for data_id, dataset_id in qg.datasets_by_type["source_detector"].items():
292 for producer_id, _, edge_data in qg.bipartite_xgraph.in_edges(dataset_id, data=True):
293 self.assertIn(producer_id, qg.quanta_by_task["calibrate"].values())
294 self.assertFalse(edge_data["is_read"])
295 self.assertEqual(
296 edge_data["pipeline_edges"],
297 [qg.pipeline_graph.tasks["calibrate"].get_output_edge("output_table")],
298 )
299 for _, consumer_id, edge_data in qg.bipartite_xgraph.out_edges(dataset_id, data=True):
300 self.assertIn(consumer_id, qg.quanta_by_task["consolidate"].values())
301 self.assertTrue(edge_data["is_read"])
302 self.assertEqual(
303 edge_data["pipeline_edges"],
304 [qg.pipeline_graph.tasks["consolidate"].get_input_edge("input_table")],
305 )
306 # We use 'is' checks instead of just equality because we don't want
307 # duplicates of these objects floating around wasting memory.
308 for task_label, quanta_for_task in qg.quanta_by_task.items():
309 for data_id, quantum_id in quanta_for_task.items():
310 d1: PredictedQuantumInfo = qg.quantum_only_xgraph.nodes[quantum_id]
311 self.assertEqual(d1["task_label"], task_label)
312 self.assertIs(d1["data_id"], data_id)
313 self.assertIs(d1["pipeline_node"], qg.pipeline_graph.tasks[task_label])
314 d2: PredictedQuantumInfo = qg.bipartite_xgraph.nodes[quantum_id]
315 self.assertEqual(d2["task_label"], task_label)
316 self.assertIs(d2["data_id"], data_id)
317 self.assertIs(d2["pipeline_node"], qg.pipeline_graph.tasks[task_label])
318 for dataset_type_name, datasets_for_type in qg.datasets_by_type.items():
319 if (
320 dataset_type_name == "source_schema"
321 or dataset_type_name == "packages"
322 or dataset_type_name.endswith("_config")
323 ):
324 continue
325 for data_id, dataset_id in datasets_for_type.items():
326 d3: PredictedDatasetInfo = qg.bipartite_xgraph.nodes[dataset_id]
327 self.assertIs(d3["data_id"], data_id)
328 self.assertIs(d3["pipeline_node"], qg.pipeline_graph.dataset_types[dataset_type_name])
329 if dimension_data_loaded:
330 self.assertIsNotNone(qg.dimension_data)
331 if dimension_data_deserialized and not converting_partial:
332 self.assertEqual(len(qg.dimension_data.records["instrument"]), 1)
333 self.assertEqual(len(qg.dimension_data.records["visit"]), 2)
334 self.assertEqual(len(qg.dimension_data.records["detector"]), 4)
335 self.assertEqual(len(qg.dimension_data.records["visit_detector_region"]), 8)
336 self.assertEqual(len(qg.dimension_data.records["tract"]), 2)
337 self.assertEqual(len(qg.dimension_data.records["patch"]), 9)
338 self.assertEqual(len(qg.dimension_data.records["band"]), 2)
339 self.assertEqual(len(qg.dimension_data.records["physical_filter"]), 2)
341 def define_partial_read(self, qg: PredictedQuantumGraph) -> list[uuid.UUID]:
342 """Return a list of quantum IDs to test partial loads.
344 Parameters
345 ----------
346 qg : `lsst.pipe.base.quantum_graph.PredictedQuantumGraph`
347 Full graph to obtain quantum IDs from. Note that quantum IDs are
348 random and change every test run, and hence test behavior should
349 never depend on explicit values or their sort order.
351 Returns
352 -------
353 quanta : `list` [ `uuid.UUID` ]
354 Quanta to use for partial-read tests. These always have the same
355 task labels and data IDs: all ``calibrate`` and ``consolidate``
356 quanta for ``visit==1``.
357 """
358 return [
359 quantum_id
360 for data_id, quantum_id in qg.quanta_by_task["calibrate"].items()
361 if data_id["visit"] == 1
362 ] + [
363 quantum_id
364 for data_id, quantum_id in qg.quanta_by_task["consolidate"].items()
365 if data_id["visit"] == 1
366 ]
368 def check_partial_read(
369 self, qg: PredictedQuantumGraph, quanta: list[uuid.UUID], converting: bool = False
370 ) -> None:
371 """Test the attributes and methods of a partially-loaded quantum graph.
373 Parameters
374 ----------
375 qg : `lsst.pipe.base.quantum_graph.PredictedQuantumGraph`
376 Graph for which only ``quanta`` were loaded.
377 quanta : `list` [ `uuid.UUID` ]
378 IDs of the quanta that were loaded. Must be the result of a call
379 to `define_partial_read`.
380 converting : `bool`, optional
381 Whether this graph was read via the old
382 `lsst.pipe.base.QuantumGraph` class and then converted.
383 """
384 self.check_quantum_graph(
385 qg,
386 all_tasks=False,
387 thin_graph=False,
388 all_quantum_datasets=False,
389 converting_partial=converting,
390 dimension_data_deserialized=converting,
391 )
392 self.assertEqual(len(qg), 5)
393 self.assertEqual(len(qg.quantum_only_xgraph), 5)
394 execution_quanta = qg.build_execution_quanta()
395 self.assertEqual(execution_quanta.keys(), set(quanta))
396 for quantum_id, quantum in execution_quanta.items():
397 self.assertIs(qg.bipartite_xgraph.nodes[quantum_id]["quantum"], quantum)
398 self.assertIs(qg.quantum_only_xgraph.nodes[quantum_id]["quantum"], quantum)
400 def test_build(self) -> None:
401 """Test building a `PredictedQuantumGraph` by
402 inspecting the result.
403 """
404 qg = self.builder.finish(
405 output="output_chain",
406 metadata={"stuff": "whatever"},
407 attach_datastore_records=False,
408 ).assemble()
409 self.check_quantum_graph(qg)
411 def test_from_old_quantum_graph(self) -> None:
412 """Test building a old `QuantumGraph` and then
413 converting it to a `PredictedQuantumGraph`.
414 """
415 old_qg = self.builder.build(
416 metadata={
417 "input": ["input_chain"],
418 "output": "output_chain",
419 "output_run": "output_run",
420 "stuff": "whatever",
421 },
422 attach_datastore_records=False,
423 )
424 new_qg = PredictedQuantumGraph.from_old_quantum_graph(old_qg)
425 self.check_quantum_graph(new_qg)
427 def test_read_execution_quanta_old_file(self) -> None:
428 """Test building a old `QuantumGraph`, saving it, and then reading it
429 via `PredictedQuantumGraph.read_execution_quanta`.
430 """
431 old_qg = self.builder.build(
432 metadata={
433 "input": ["input_chain"],
434 "output": "output_chain",
435 "output_run": "output_run",
436 "stuff": "whatever",
437 },
438 attach_datastore_records=False,
439 )
440 with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
441 tmpfile = os.path.join(tmpdir, "old.qgraph")
442 old_qg.saveUri(tmpfile)
443 # Read everything.
444 qg1 = PredictedQuantumGraph.read_execution_quanta(tmpfile)
445 self.check_quantum_graph(qg1, thin_graph=False)
446 # Read just a few quanta.
447 quanta = self.define_partial_read(qg1)
448 qg2 = PredictedQuantumGraph.read_execution_quanta(tmpfile, quantum_ids=quanta)
449 self.check_partial_read(qg2, quanta, converting=True)
451 def test_roundtrip_old_quantum_graph(self) -> None:
452 """Test building a `PredictedQuantumGraph` and round-tripping it
453 through the old `QuantumGraph` class.
454 """
455 qg1 = self.builder.finish(
456 output="output_chain",
457 metadata={"stuff": "whatever"},
458 attach_datastore_records=False,
459 ).assemble()
460 old_qg = qg1.to_old_quantum_graph()
461 qg2 = PredictedQuantumGraph.from_old_quantum_graph(old_qg)
462 self.check_quantum_graph(qg2)
464 def test_write_new_as_old(self) -> None:
465 """Test building a `PredictedQuantumGraphComponents`, saving it with
466 the old extension, and reading it back in both the old class and the
467 new class.
468 """
469 components = self.builder.finish(
470 output="output_chain",
471 metadata={"stuff": "whatever"},
472 attach_datastore_records=False,
473 )
474 with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
475 tmpfile = os.path.join(tmpdir, "old.qgraph")
476 components.write(tmpfile)
477 qg1 = PredictedQuantumGraph.read_execution_quanta(tmpfile)
478 self.check_quantum_graph(qg1)
479 old_qg = QuantumGraph.loadUri(tmpfile)
480 qg2 = PredictedQuantumGraph.from_old_quantum_graph(old_qg)
481 self.check_quantum_graph(qg2)
483 def test_read_new_as_old(self) -> None:
484 """Test building a `PredictedQuantumGraphComponents`, saving it with
485 the new extension, and reading it back in via the old class.
486 """
487 components = self.builder.finish(
488 output="output_chain",
489 metadata={"stuff": "whatever"},
490 attach_datastore_records=False,
491 )
492 with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
493 tmpfile = os.path.join(tmpdir, "new.qg")
494 components.write(tmpfile)
495 old_qg = QuantumGraph.loadUri(tmpfile)
496 new_qg = PredictedQuantumGraph.from_old_quantum_graph(old_qg)
497 self.check_quantum_graph(new_qg)
499 def test_io(self) -> None:
500 """Test saving a `PredictedQuantumGraphComponents` and reading it back
501 in as a `PredictedQuantumGraph`, both fully and partially, and as a
502 `QuantumGraph`, both fully and partially.
503 """
504 components = self.builder.finish(
505 output="output_chain",
506 metadata={"stuff": "whatever"},
507 attach_datastore_records=False,
508 )
509 # We use a small page size since this is a tiny graph and hence
510 # otherwise all addresses would fit in a single page.
511 four_row_page_size = AddressReader.compute_row_size(int_size=8, n_addresses=1) * 4
512 with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
513 tmpfile = os.path.join(tmpdir, "new.qg")
514 components.write(tmpfile, zstd_dict_n_inputs=24) # enable dict compression code path
515 # Test a full read with the new class.
516 with PredictedQuantumGraph.open(tmpfile, page_size=four_row_page_size) as reader:
517 reader.read_all()
518 full_qg = reader.finish()
519 self.check_quantum_graph(full_qg, dimension_data_deserialized=False)
520 # Test a full read with the old class (uses new class and then
521 # converts to old, and we convert back to new for the test).
522 self.check_quantum_graph(
523 PredictedQuantumGraph.from_old_quantum_graph(QuantumGraph.loadUri(tmpfile))
524 )
525 # Test a partial read with the old class.
526 quanta = self.define_partial_read(full_qg)
527 self.check_partial_read(
528 PredictedQuantumGraph.from_old_quantum_graph(QuantumGraph.loadUri(tmpfile, nodes=quanta)),
529 quanta,
530 converting=True,
531 )
532 # Test a thin but shallow read with the new class.
533 with PredictedQuantumGraph.open(tmpfile, page_size=four_row_page_size) as reader:
534 reader.read_thin_graph()
535 thin_qg = reader.finish()
536 self.check_quantum_graph(
537 thin_qg,
538 dimension_data_deserialized=False,
539 all_quantum_datasets=False,
540 dimension_data_loaded=False,
541 init_quanta=False,
542 )
543 # Test a deep read of just a few quanta with the new class.
544 narrow_qg = PredictedQuantumGraph.read_execution_quanta(
545 tmpfile,
546 quantum_ids=quanta,
547 page_size=four_row_page_size,
548 )
549 self.check_partial_read(narrow_qg, quanta)
551 def test_no_compression_dict(self) -> None:
552 """Test saving a `PredictedQuantumGraphComponents` and reading it back
553 in as a `PredictedQuantumGraph` with the compression dictionary
554 disabled.
555 """
556 components = self.builder.finish(
557 output="output_chain",
558 metadata={"stuff": "whatever"},
559 attach_datastore_records=False,
560 )
561 # We use a small page size since this is a tiny graph and hence
562 # otherwise all addresses would fit in a single page.
563 three_row_page_size = AddressReader.compute_row_size(int_size=8, n_addresses=1) * 3
564 with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir:
565 tmpfile = os.path.join(tmpdir, "new.qg")
566 components.write(tmpfile, zstd_dict_size=0)
567 with PredictedQuantumGraph.open(tmpfile, page_size=three_row_page_size) as reader:
568 reader.read_all()
569 full_qg = reader.finish()
570 self.check_quantum_graph(full_qg, dimension_data_deserialized=False)
572 def test_dot(self) -> None:
573 """Test visualization via GraphViz dot."""
574 qg = self.builder.finish(attach_datastore_records=False).assemble()
575 stream = StringIO()
576 graph2dot(qg, stream)
577 if (dot_cmd := shutil.which("dot")) is None:
578 raise unittest.SkipTest("Aborting test early; 'dot' command is not available.")
579 # Just check that the dot command can parse what we've given it.
580 result = subprocess.run(
581 [dot_cmd, "-Txdot"], input=stream.getvalue().encode(), stdout=subprocess.DEVNULL
582 )
583 result.check_returncode()
585 def test_mermaid(self) -> None:
586 """Test visualization via Mermaid."""
587 qg = self.builder.finish(attach_datastore_records=False).assemble()
588 stream = StringIO()
589 # Just check that it runs without error.
590 graph2mermaid(qg, stream)
592 def test_update_output_run(self) -> None:
593 """Test the PredictedQuantumGraphComponents.output_run method."""
594 components = self.builder.finish(
595 output="output_chain",
596 metadata={"stuff": "whatever"},
597 attach_datastore_records=False,
598 )
599 components.update_output_run("new_output_run")
600 qg = components.assemble()
601 for ref in qg.get_init_outputs("calibrate").values():
602 self.assertEqual(ref.run, "new_output_run")
603 for quantum in qg.build_execution_quanta().values():
604 for ref in itertools.chain.from_iterable(quantum.outputs.values()):
605 self.assertEqual(ref.run, "new_output_run", msg=str(ref))
606 for ref in itertools.chain.from_iterable(quantum.inputs.values()):
607 if ref.datasetType.name not in ("raw", "references"):
608 self.assertEqual(ref.run, "new_output_run", msg=str(ref))
611if __name__ == "__main__":
612 lsst.utils.tests.init()
613 unittest.main()