Coverage for tests / test_trivial_qg_builder.py: 17%
42 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 08:59 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 08:59 +0000
1# This file is part of pipe_base.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <https://www.gnu.org/licenses/>.
28from __future__ import annotations
30import unittest
32from lsst.daf.butler import DatasetIdGenEnum, DatasetRef
33from lsst.pipe.base.tests.mocks import DynamicConnectionConfig, InMemoryRepo
34from lsst.pipe.base.trivial_quantum_graph_builder import TrivialQuantumGraphBuilder
37class TrivialQuantumGraphBuilderTestCase(unittest.TestCase):
38 """Tests for the TrivialQuantumGraphBuilder class."""
40 def test_trivial_qg_builder(self) -> None:
41 # Make a test helper with a mock task appropriate for the QG builder:
42 # - the QG will have no branching
43 # - while the task have different dimensions, they can be 1-1 related
44 # (for the purposes of this test, at least).
45 helper = InMemoryRepo("base.yaml")
46 helper.add_task(
47 "a",
48 dimensions=["band", "detector"],
49 prerequisite_inputs={
50 "prereq_connection": DynamicConnectionConfig(
51 dataset_type_name="dataset_prereq0", dimensions=["detector"]
52 )
53 },
54 )
55 helper.add_task(
56 "b",
57 dimensions=["physical_filter", "detector"],
58 inputs={
59 "input_connection": DynamicConnectionConfig(
60 dataset_type_name="dataset_auto1", dimensions=["band", "detector"]
61 ),
62 "extra_input_connection": DynamicConnectionConfig(
63 dataset_type_name="dataset_extra1", dimensions=["physical_filter", "detector"]
64 ),
65 },
66 )
67 # Use the helper to make a quantum graph using the general-purpose
68 # builder. This will cover all data IDs in the test dataset, which
69 # includes 4 detectors, 3 physical_filters, and 2 bands.
70 # This also has useful side-effects: it inserts the input datasets
71 # and registers all dataset types.
72 general_qg = helper.make_quantum_graph()
73 # Make the trivial QG builder we want to test giving it only one
74 # detector and one band (is the one that corresponds to only one
75 # physical_filter).
76 (a_data_id,) = [
77 data_id
78 for data_id in general_qg.quanta_by_task["a"]
79 if data_id["detector"] == 1 and data_id["band"] == "g"
80 ]
81 (b_data_id,) = [
82 data_id
83 for data_id in general_qg.quanta_by_task["b"]
84 if data_id["detector"] == 1 and data_id["band"] == "g"
85 ]
86 prereq_data_id = a_data_id.subset(["detector"])
87 dataset_auto0_ref = helper.butler.get_dataset(general_qg.datasets_by_type["dataset_auto0"][a_data_id])
88 assert dataset_auto0_ref is not None, "Input dataset should have been inserted above."
89 dataset_prereq0_ref = helper.butler.get_dataset(
90 general_qg.datasets_by_type["dataset_prereq0"][prereq_data_id]
91 )
92 assert dataset_prereq0_ref is not None, "Input dataset should have been inserted above."
93 trivial_builder = TrivialQuantumGraphBuilder(
94 helper.pipeline_graph,
95 helper.butler,
96 data_ids={a_data_id.dimensions: a_data_id, b_data_id.dimensions: b_data_id},
97 input_refs={
98 "a": {"input_connection": [dataset_auto0_ref], "prereq_connection": [dataset_prereq0_ref]}
99 },
100 dataset_id_modes={"dataset_auto2": DatasetIdGenEnum.DATAID_TYPE_RUN},
101 output_run="trivial_output_run",
102 input_collections=general_qg.header.inputs,
103 )
104 trivial_qg = trivial_builder.finish(attach_datastore_records=False).assemble()
105 self.assertEqual(len(trivial_qg.quanta_by_task), 2)
106 self.assertEqual(trivial_qg.quanta_by_task["a"].keys(), {a_data_id})
107 self.assertEqual(trivial_qg.quanta_by_task["b"].keys(), {b_data_id})
108 self.assertEqual(trivial_qg.datasets_by_type["dataset_prereq0"].keys(), {prereq_data_id})
109 self.assertEqual(
110 trivial_qg.datasets_by_type["dataset_prereq0"][prereq_data_id],
111 general_qg.datasets_by_type["dataset_prereq0"][prereq_data_id],
112 )
113 self.assertEqual(trivial_qg.datasets_by_type["dataset_auto0"].keys(), {a_data_id})
114 self.assertEqual(
115 trivial_qg.datasets_by_type["dataset_auto0"][a_data_id],
116 general_qg.datasets_by_type["dataset_auto0"][a_data_id],
117 )
118 self.assertEqual(trivial_qg.datasets_by_type["dataset_extra1"].keys(), {b_data_id})
119 self.assertEqual(
120 trivial_qg.datasets_by_type["dataset_extra1"][b_data_id],
121 general_qg.datasets_by_type["dataset_extra1"][b_data_id],
122 )
123 self.assertEqual(trivial_qg.datasets_by_type["dataset_auto1"].keys(), {a_data_id})
124 self.assertNotEqual(
125 trivial_qg.datasets_by_type["dataset_auto1"][a_data_id],
126 general_qg.datasets_by_type["dataset_auto1"][a_data_id],
127 )
128 self.assertEqual(trivial_qg.datasets_by_type["dataset_auto2"].keys(), {b_data_id})
129 self.assertNotEqual(
130 trivial_qg.datasets_by_type["dataset_auto2"][b_data_id],
131 general_qg.datasets_by_type["dataset_auto2"][b_data_id],
132 )
133 self.assertEqual(
134 trivial_qg.datasets_by_type["dataset_auto2"][b_data_id],
135 DatasetRef(
136 helper.pipeline_graph.dataset_types["dataset_auto2"].dataset_type,
137 b_data_id,
138 run="trivial_output_run",
139 id_generation_mode=DatasetIdGenEnum.DATAID_TYPE_RUN,
140 ).id,
141 )
142 qo_xg = trivial_qg.quantum_only_xgraph
143 self.assertEqual(len(qo_xg.nodes), 2)
144 self.assertEqual(len(qo_xg.edges), 1)
145 bp_xg = trivial_qg.bipartite_xgraph
146 self.assertEqual(
147 set(bp_xg.predecessors(trivial_qg.quanta_by_task["a"][a_data_id])),
148 set(trivial_qg.datasets_by_type["dataset_auto0"].values())
149 | set(trivial_qg.datasets_by_type["dataset_prereq0"].values()),
150 )
151 self.assertEqual(
152 set(bp_xg.successors(trivial_qg.quanta_by_task["a"][a_data_id])),
153 set(trivial_qg.datasets_by_type["dataset_auto1"].values())
154 | set(trivial_qg.datasets_by_type["a_metadata"].values())
155 | set(trivial_qg.datasets_by_type["a_log"].values()),
156 )
157 self.assertEqual(
158 set(bp_xg.predecessors(trivial_qg.quanta_by_task["b"][b_data_id])),
159 set(trivial_qg.datasets_by_type["dataset_auto1"].values())
160 | set(trivial_qg.datasets_by_type["dataset_extra1"].values()),
161 )
162 self.assertEqual(
163 set(bp_xg.successors(trivial_qg.quanta_by_task["b"][b_data_id])),
164 set(trivial_qg.datasets_by_type["dataset_auto2"].values())
165 | set(trivial_qg.datasets_by_type["b_metadata"].values())
166 | set(trivial_qg.datasets_by_type["b_log"].values()),
167 )
170if __name__ == "__main__":
171 unittest.main()