Coverage for tests / test_qg_builder_dimensions.py: 19%
121 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-22 08:57 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-22 08:57 +0000
1# This file is part of pipe_base.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <https://www.gnu.org/licenses/>.
28import logging
29import unittest
31import astropy.table
33import lsst.utils.tests
34from lsst.daf.butler import Butler, DataCoordinate
35from lsst.daf.butler.tests.utils import create_populated_sqlite_registry
36from lsst.pipe.base import PipelineGraph
37from lsst.pipe.base.all_dimensions_quantum_graph_builder import (
38 AllDimensionsQuantumGraphBuilder,
39 DatasetQueryConstraintVariant,
40)
41from lsst.pipe.base.tests.mocks import (
42 DynamicConnectionConfig,
43 DynamicTestPipelineTask,
44 DynamicTestPipelineTaskConfig,
45)
46from lsst.resources import ResourcePath
47from lsst.sphgeom import RangeSet
49_LOG = logging.getLogger(__name__)
52class AllDimensionsQuantumGraphBuilderTestCase(unittest.TestCase):
53 """Tests for AllDimensionsQuantumGraphBuilder with various interesting
54 combinations of dimensions.
55 """
57 @staticmethod
58 def make_butler() -> Butler:
59 DATA_ROOT = ResourcePath("resource://lsst.daf.butler/tests/registry_data", forceDirectory=True)
60 return create_populated_sqlite_registry(
61 *[DATA_ROOT.join(filename) for filename in ("base.yaml", "spatial.yaml")]
62 )
64 def setUp(self):
65 self.butler = self.make_butler()
66 self.enterContext(self.butler)
68 def tearDown(self):
69 del self.butler
70 return super().tearDown()
72 def test_one_to_one(self) -> None:
73 """Test building a QG with a single task whose inputs and outputs and
74 quanta all have the same dimensions.
75 """
76 config = DynamicTestPipelineTaskConfig()
77 config.dimensions = ["visit", "detector"]
78 config.inputs["i1"] = DynamicConnectionConfig(
79 dataset_type_name="d1", dimensions=["visit", "detector"]
80 )
81 config.outputs["o2"] = DynamicConnectionConfig(
82 dataset_type_name="d2", dimensions=["visit", "detector"]
83 )
84 pipeline_graph = PipelineGraph(universe=self.butler.dimensions)
85 pipeline_graph.add_task("t1", DynamicTestPipelineTask, config=config)
86 pipeline_graph.resolve(self.butler.registry)
87 self._insert_overall_inputs(pipeline_graph)
88 builder = AllDimensionsQuantumGraphBuilder(
89 pipeline_graph,
90 self.butler,
91 input_collections=["c1"],
92 output_run="c2",
93 )
94 qg = builder.finish(attach_datastore_records=False).assemble()
95 quanta = qg.quanta_by_task["t1"]
96 self.assertEqual(len(quanta), 8) # 2 visits x 4 detectors
98 def test_patch_to_hpx_to_global(self) -> None:
99 """Test building a QG with patch inputs and a hierarchy of healpix
100 outputs and a final global step.
101 """
102 pipeline_graph = self._make_hpx_pipeline_graph(
103 patch_to_hpx8_and_hpx11=True,
104 hpx8_to_hpx5=True,
105 hpx5_to_global=True,
106 )
107 self._insert_overall_inputs(pipeline_graph)
108 builder = AllDimensionsQuantumGraphBuilder(
109 pipeline_graph,
110 self.butler,
111 input_collections=["c1"],
112 output_run="c2",
113 )
114 qg = builder.finish(attach_datastore_records=False).assemble()
115 self.assertEqual(len(qg.quanta_by_task["patch_to_hpx8_and_hpx11"]), 22)
116 self.assertEqual(len(qg.quanta_by_task["hpx8_to_hpx5"]), 2)
117 self.assertEqual(len(qg.quanta_by_task["hpx5_to_global"]), 1)
119 def test_patch_to_hpx_to_instrument(self) -> None:
120 """Test building a QG with patch inputs and a hierarchy of healpix
121 outputs and a final per-instrument step.
122 """
123 pipeline_graph = self._make_hpx_pipeline_graph(
124 patch_to_hpx8_and_hpx11=True,
125 hpx8_to_hpx5=True,
126 hpx5_to_instrument=True,
127 )
128 self._insert_overall_inputs(pipeline_graph)
129 builder = AllDimensionsQuantumGraphBuilder(
130 pipeline_graph,
131 self.butler,
132 input_collections=["c1"],
133 output_run="c2",
134 )
135 qg = builder.finish(attach_datastore_records=False).assemble()
136 self.assertEqual(len(qg.quanta_by_task["patch_to_hpx8_and_hpx11"]), 22)
137 self.assertEqual(len(qg.quanta_by_task["hpx8_to_hpx5"]), 2)
138 self.assertEqual(len(qg.quanta_by_task["hpx5_to_instrument"]), 1)
140 def test_hpx_to_global_dataset_constraint(self) -> None:
141 """Test building a QG with healpix inputs and a hierarchy of healpix
142 outputs and a final global step, with the healpix input used as a
143 dataset constraint (i.e. the default behavior).
144 """
145 pipeline_graph = self._make_hpx_pipeline_graph(
146 hpx8_to_hpx5=True,
147 hpx5_to_global=True,
148 )
149 self._insert_overall_inputs(pipeline_graph)
150 builder = AllDimensionsQuantumGraphBuilder(
151 pipeline_graph,
152 self.butler,
153 input_collections=["c1"],
154 output_run="c2",
155 )
156 qg = builder.finish(attach_datastore_records=False).assemble()
157 self.assertEqual(len(qg.quanta_by_task["hpx8_to_hpx5"]), 2)
158 self.assertEqual(len(qg.quanta_by_task["hpx5_to_global"]), 1)
160 @unittest.expectedFailure
161 def test_hpx_to_global_where_constraint(self) -> None:
162 """Test building a QG with healpix inputs and a hierarchy of healpix
163 outputs and a final global step, with a 'where' constraint instead of
164 a dataset constraint.
166 It would be nice for this to work, but we do not currently expect it to
167 given the limitations of the butler query system.
168 """
169 pipeline_graph = self._make_hpx_pipeline_graph(
170 hpx8_to_hpx5=True,
171 hpx5_to_global=True,
172 )
173 self._insert_overall_inputs(pipeline_graph)
174 builder = AllDimensionsQuantumGraphBuilder(
175 pipeline_graph,
176 self.butler,
177 input_collections=["c1"],
178 output_run="c2",
179 dataset_query_constraint=DatasetQueryConstraintVariant.OFF,
180 where="healpix5 IN (4864, 4522)",
181 )
182 qg = builder.finish(attach_datastore_records=False).assemble()
183 self.assertEqual(len(qg.quanta_by_task["hpx8_to_hpx5"]), 2)
184 self.assertEqual(len(qg.quanta_by_task["hpx5_to_global"]), 1)
186 def test_hpx_to_global_data_id_table(self) -> None:
187 """Test building a QG with healpix inputs and a hierarchy of healpix
188 outputs and a final global step, with healpix IDs provided via a data
189 ID table.
190 """
191 pipeline_graph = self._make_hpx_pipeline_graph(
192 hpx8_to_hpx5=True,
193 hpx5_to_global=True,
194 )
195 self._insert_overall_inputs(pipeline_graph)
196 tbl = astropy.table.Table(rows=[(4864,), (4522,)], names=["healpix5"])
197 builder = AllDimensionsQuantumGraphBuilder(
198 pipeline_graph,
199 self.butler,
200 input_collections=["c1"],
201 output_run="c2",
202 dataset_query_constraint=DatasetQueryConstraintVariant.OFF,
203 data_id_tables=[tbl],
204 )
205 qg = builder.finish(attach_datastore_records=False).assemble()
206 self.assertEqual(len(qg.quanta_by_task["hpx8_to_hpx5"]), 2)
207 self.assertEqual(len(qg.quanta_by_task["hpx5_to_global"]), 1)
209 def _make_hpx_pipeline_graph(
210 self,
211 *,
212 patch_to_hpx8_and_hpx11: bool = False,
213 hpx8_to_hpx5: bool = False,
214 hpx5_to_global=False,
215 hpx5_to_instrument: bool = False,
216 ) -> PipelineGraph:
217 """Generate a pipeline graph with tasks that use healpix dimensions
218 in various ways.
219 """
220 pipeline_graph = PipelineGraph(universe=self.butler.dimensions)
221 if patch_to_hpx8_and_hpx11:
222 config = DynamicTestPipelineTaskConfig()
223 config.dimensions = {"healpix8"}
224 config.inputs["i1"] = DynamicConnectionConfig(
225 dataset_type_name="patch_dataset",
226 dimensions={"patch"},
227 multiple=True,
228 )
229 config.outputs["o1"] = DynamicConnectionConfig(
230 dataset_type_name="hpx11_dataset",
231 dimensions={"healpix11"},
232 multiple=True,
233 )
234 config.outputs["o2"] = DynamicConnectionConfig(
235 dataset_type_name="hpx8_dataset",
236 dimensions={"healpix8"},
237 )
238 pipeline_graph.add_task("patch_to_hpx8_and_hpx11", DynamicTestPipelineTask, config=config)
239 if hpx8_to_hpx5:
240 config = DynamicTestPipelineTaskConfig()
241 config.dimensions = {"healpix5"}
242 config.inputs["i1"] = DynamicConnectionConfig(
243 dataset_type_name="hpx8_dataset",
244 dimensions={"healpix8"},
245 multiple=True,
246 )
247 config.outputs["o1"] = DynamicConnectionConfig(
248 dataset_type_name="hpx5_dataset",
249 dimensions={"healpix5"},
250 )
251 pipeline_graph.add_task("hpx8_to_hpx5", DynamicTestPipelineTask, config=config)
252 if hpx5_to_global:
253 config = DynamicTestPipelineTaskConfig()
254 config.dimensions = {}
255 config.inputs["i1"] = DynamicConnectionConfig(
256 dataset_type_name="hpx5_dataset",
257 dimensions={"healpix5"},
258 multiple=True,
259 )
260 config.outputs["o1"] = DynamicConnectionConfig(
261 dataset_type_name="global_dataset",
262 dimensions={},
263 )
264 pipeline_graph.add_task("hpx5_to_global", DynamicTestPipelineTask, config=config)
265 if hpx5_to_instrument:
266 config = DynamicTestPipelineTaskConfig()
267 config.dimensions = {"instrument"}
268 config.inputs["i1"] = DynamicConnectionConfig(
269 dataset_type_name="hpx5_dataset",
270 dimensions={"healpix5"},
271 multiple=True,
272 )
273 config.outputs["o1"] = DynamicConnectionConfig(
274 dataset_type_name="instrument_dataset",
275 dimensions={"instrument"},
276 )
277 pipeline_graph.add_task("hpx5_to_instrument", DynamicTestPipelineTask, config=config)
278 pipeline_graph.resolve(self.butler.registry)
279 return pipeline_graph
281 def _insert_overall_inputs(self, pipeline_graph: PipelineGraph) -> None:
282 """Insert overall-input datasets for a pipeline graph."""
283 self.butler.collections.register("c1")
284 for _, node in pipeline_graph.iter_overall_inputs():
285 self.butler.registry.registerDatasetType(node.dataset_type)
286 if node.dimensions.skypix:
287 if len(node.dimensions) == 1:
288 (skypix_name,) = node.dimensions.skypix
289 pixelization = node.dimensions.universe.skypix_dimensions[skypix_name].pixelization
290 ranges = RangeSet()
291 for patch_record in self.butler.query_dimension_records("patch"):
292 ranges |= pixelization.envelope(patch_record.region)
293 data_ids = []
294 for begin, end in ranges:
295 for index in range(begin, end):
296 data_ids.append(DataCoordinate.from_required_values(node.dimensions, (index,)))
297 else:
298 raise NotImplementedError(
299 "Can only generate data IDs for queryable dimensions and isolated skypix."
300 )
301 else:
302 data_ids = self.butler.query_data_ids(node.dimensions, explain=False)
303 self.butler.registry.insertDatasets(node.dataset_type, data_ids, run="c1")
306if __name__ == "__main__":
307 lsst.utils.tests.init()
308 unittest.main()