Coverage for tests / test_graphBuilder.py: 20%
93 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-06 08:32 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-06 08:32 +0000
1# This file is part of pipe_base.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <https://www.gnu.org/licenses/>.
28"""Tests of things related to the GraphBuilder class."""
30import io
31import logging
32import unittest
34import lsst.utils.tests
35from lsst.daf.butler import Butler, DatasetType
36from lsst.daf.butler.registry import UserExpressionError
37from lsst.pipe.base import PipelineGraph, QuantumGraph
38from lsst.pipe.base.all_dimensions_quantum_graph_builder import (
39 AllDimensionsQuantumGraphBuilder,
40 DatasetQueryConstraintVariant,
41)
42from lsst.pipe.base.tests import simpleQGraph
43from lsst.pipe.base.tests.mocks import (
44 DynamicConnectionConfig,
45 DynamicTestPipelineTask,
46 DynamicTestPipelineTaskConfig,
47 MockDataset,
48 MockStorageClass,
49)
50from lsst.utils.tests import temporaryDirectory
52_LOG = logging.getLogger(__name__)
55class GraphBuilderTestCase(unittest.TestCase):
56 """Test graph building."""
58 def _assertGraph(self, graph: QuantumGraph) -> None:
59 """Check basic structure of the graph."""
60 for taskDef in graph.iterTaskGraph():
61 refs = graph.initOutputRefs(taskDef)
62 # task has one initOutput, second ref is for config dataset
63 self.assertEqual(len(refs), 2)
65 self.assertEqual(len(list(graph.inputQuanta)), 1)
67 # This includes only "packages" dataset for now.
68 refs = graph.globalInitOutputRefs()
69 self.assertEqual(len(refs), 1)
71 def testDefault(self):
72 """Simple test to verify makeSimpleQGraph can be used to make a Quantum
73 Graph.
74 """
75 with temporaryDirectory() as root:
76 # makeSimpleQGraph calls GraphBuilder.
77 butler, qgraph = simpleQGraph.makeSimpleQGraph(root=root)
78 self.enterContext(butler)
79 # by default makeSimpleQGraph makes a graph with 5 nodes
80 self.assertEqual(len(qgraph), 5)
81 self._assertGraph(qgraph)
82 constraint = DatasetQueryConstraintVariant.OFF
83 _, qgraph2 = simpleQGraph.makeSimpleQGraph(
84 butler=butler, datasetQueryConstraint=constraint, callPopulateButler=False
85 )
86 # When all outputs are random resolved refs, direct comparison
87 # of graphs does not work because IDs are different. Can only
88 # verify the number of quanta in the graph without doing something
89 # terribly complicated.
90 self.assertEqual(len(qgraph2), 5)
91 constraint = DatasetQueryConstraintVariant.fromExpression("add_dataset0")
92 _, qgraph3 = simpleQGraph.makeSimpleQGraph(
93 butler=butler, datasetQueryConstraint=constraint, callPopulateButler=False
94 )
95 self.assertEqual(len(qgraph3), 5)
97 def test_empty_qg(self):
98 """Test that making an empty QG doesn't raise exceptions."""
99 config = DynamicTestPipelineTaskConfig()
100 config.inputs["i"] = DynamicConnectionConfig(
101 dataset_type_name="input",
102 storage_class="StructuredDataDict",
103 dimensions=["detector"],
104 )
105 config.init_inputs["ii"] = DynamicConnectionConfig(
106 dataset_type_name="init_input",
107 storage_class="StructuredDataDict",
108 )
109 config.outputs["o"] = DynamicConnectionConfig(
110 dataset_type_name="output",
111 storage_class="StructuredDataDict",
112 dimensions=["detector"],
113 )
114 config.init_outputs["io"] = DynamicConnectionConfig(
115 dataset_type_name="init_output",
116 storage_class="StructuredDataDict",
117 )
118 pipeline_graph = PipelineGraph()
119 pipeline_graph.add_task("a", DynamicTestPipelineTask, config)
120 with temporaryDirectory() as repo_path:
121 Butler.makeRepo(repo_path)
122 butler = Butler.from_config(repo_path, writeable=True, run="test_empty_qg")
123 self.enterContext(butler)
124 MockStorageClass.get_or_register_mock("StructuredDataDict")
125 butler.registry.registerDatasetType(
126 DatasetType(
127 "input",
128 dimensions=butler.dimensions.conform(["detector"]),
129 storageClass="_mock_StructuredDataDict",
130 )
131 )
132 init_input_dataset_type = DatasetType(
133 "init_input",
134 dimensions=butler.dimensions.empty,
135 storageClass="_mock_StructuredDataDict",
136 )
137 butler.registry.registerDatasetType(init_input_dataset_type)
138 # Init-input initially exists, but input does not (hence empty QG).
139 butler.put(
140 MockDataset(
141 dataset_id=None,
142 dataset_type=init_input_dataset_type.to_simple(),
143 data_id={},
144 run=butler.run,
145 ),
146 "init_input",
147 )
148 # Attempt to make QG; should just be empty, with no exceptions.
149 self.assertFalse(AllDimensionsQuantumGraphBuilder(pipeline_graph, butler).build())
150 # Initialize the output run, try again, with same expected result.
151 pipeline_graph.register_dataset_types(butler)
152 pipeline_graph.init_output_run(butler)
153 self.assertFalse(AllDimensionsQuantumGraphBuilder(pipeline_graph, butler).build())
155 # Inconsistent governor dimensions are no longer an error, so this test
156 # fails with the new query system. We should probably check instead that
157 # logging includes an explanation for the empty QG, but it might not
158 # because explain_no_results isn't good enough yet.
159 @unittest.expectedFailure
160 def testAddInstrumentMismatch(self):
161 """Verify that a RuntimeError is raised if the instrument in the user
162 query does not match the instrument in the pipeline.
163 """
164 with temporaryDirectory() as root:
165 pipeline = simpleQGraph.makeSimplePipeline(
166 nQuanta=5, instrument="lsst.pipe.base.tests.simpleQGraph.SimpleInstrument"
167 )
168 with self.assertRaises(UserExpressionError):
169 butler, _ = simpleQGraph.makeSimpleQGraph(
170 root=root, pipeline=pipeline, userQuery="instrument = 'foo'"
171 )
172 self.enterContext(butler)
174 def testUserQueryBind(self):
175 """Verify that bind values work for user query."""
176 pipeline = simpleQGraph.makeSimplePipeline(
177 nQuanta=5, instrument="lsst.pipe.base.tests.simpleQGraph.SimpleInstrument"
178 )
179 instr = simpleQGraph.SimpleInstrument.getName()
180 # With a literal in the user query
181 with temporaryDirectory() as root:
182 butler, _ = simpleQGraph.makeSimpleQGraph(
183 root=root, pipeline=pipeline, userQuery=f"instrument = '{instr}'"
184 )
185 self.enterContext(butler)
186 # With a bind for the user query
187 with temporaryDirectory() as root:
188 butler, _ = simpleQGraph.makeSimpleQGraph(
189 root=root, pipeline=pipeline, userQuery="instrument = :instr", bind={"instr": instr}
190 )
191 self.enterContext(butler)
193 def test_datastore_records(self):
194 """Test for generating datastore records."""
195 with temporaryDirectory() as root:
196 # need FileDatastore for this tests
197 butler, qgraph1 = simpleQGraph.makeSimpleQGraph(
198 root=root, inMemory=False, makeDatastoreRecords=True
199 )
200 self.enterContext(butler)
202 # save and reload
203 buffer = io.BytesIO()
204 qgraph1.save(buffer)
205 buffer.seek(0)
206 qgraph2 = QuantumGraph.load(buffer, universe=butler.dimensions)
207 del buffer
209 for qgraph in (qgraph1, qgraph2):
210 self.assertEqual(len(qgraph), 5)
211 for i, qnode in enumerate(qgraph):
212 quantum = qnode.quantum
213 self.assertIsNotNone(quantum.datastore_records)
214 # only the first quantum has a pre-existing input
215 if i == 0:
216 datastore_name = "FileDatastore@<butlerRoot>"
217 self.assertEqual(set(quantum.datastore_records.keys()), {datastore_name})
218 records_data = quantum.datastore_records[datastore_name]
219 records = dict(records_data.records)
220 self.assertEqual(len(records), 1)
221 _, records = records.popitem()
222 records = records["file_datastore_records"]
223 self.assertEqual(
224 [record.path for record in records],
225 ["test/add_dataset0/add_dataset0_INSTR_det0_test.pickle"],
226 )
227 else:
228 self.assertEqual(quantum.datastore_records, {})
231if __name__ == "__main__":
232 lsst.utils.tests.init()
233 unittest.main()