Coverage for tests / qg_test_utils.py: 59%
96 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:52 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:52 +0000
1# This file is part of ctrl_bps.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <https://www.gnu.org/licenses/>.
27"""QuantumGraph-related utilities to support ctrl_bps testing."""
29# Not actually running Quantum so do not need to override 'run' Method
30# pylint: disable=abstract-method
32# Many dummy classes for testing.
33# pylint: disable=missing-class-docstring
35import lsst.pipe.base.connectionTypes as cT
36from lsst.daf.butler import DataCoordinate, DatasetRef, DatasetType, DimensionConfig, Quantum
37from lsst.pex.config import Field
38from lsst.pipe.base import PipelineTask, PipelineTaskConfig, PipelineTaskConnections
39from lsst.pipe.base.tests.mocks import DynamicConnectionConfig, InMemoryRepo
41METADATA = {"D1": [1, 2, 3]}
44# For each dummy task, create a Connections, Config, and PipelineTask
47class Dummy1Connections(PipelineTaskConnections, dimensions=("D1", "D2")):
48 """Connections class used for tests."""
50 initOutput = cT.InitOutput(name="Dummy1InitOutput", storageClass="ExposureF", doc="n/a")
51 input = cT.Input(name="Dummy1Input", storageClass="ExposureF", doc="n/a", dimensions=("D1", "D2"))
52 output = cT.Output(name="Dummy1Output", storageClass="ExposureF", doc="n/a", dimensions=("D1", "D2"))
55class Dummy1Config(PipelineTaskConfig, pipelineConnections=Dummy1Connections):
56 """Config class used for testing."""
58 conf1 = Field(dtype=int, default=1, doc="dummy config")
61class Dummy1PipelineTask(PipelineTask):
62 """PipelineTask used for testing."""
64 ConfigClass = Dummy1Config
67class Dummy2Connections(PipelineTaskConnections, dimensions=("D1", "D2")):
68 """Second connections class used for testing."""
70 initInput = cT.InitInput(name="Dummy1InitOutput", storageClass="ExposureF", doc="n/a")
71 initOutput = cT.InitOutput(name="Dummy2InitOutput", storageClass="ExposureF", doc="n/a")
72 input = cT.Input(name="Dummy1Output", storageClass="ExposureF", doc="n/a", dimensions=("D1", "D2"))
73 output = cT.Output(name="Dummy2Output", storageClass="ExposureF", doc="n/a", dimensions=("D1", "D2"))
76class Dummy2Config(PipelineTaskConfig, pipelineConnections=Dummy2Connections):
77 """Config class used for second pipeline task."""
79 conf1 = Field(dtype=int, default=1, doc="dummy config")
82class Dummy2PipelineTask(PipelineTask):
83 """Second test PipelineTask."""
85 ConfigClass = Dummy2Config
88class Dummy2bConnections(PipelineTaskConnections, dimensions=("D1", "D2")):
89 """A connections class used for testing mid-pipeline leaf node."""
91 initInput = cT.InitInput(name="Dummy2InitOutput", storageClass="ExposureF", doc="n/a")
92 initOutput = cT.InitOutput(name="Dummy2bInitOutput", storageClass="ExposureF", doc="n/a")
93 input = cT.Input(name="Dummy2Output", storageClass="ExposureF", doc="n/a", dimensions=("D1", "D2"))
94 output = cT.Output(name="Dummy2bOutput", storageClass="ExposureF", doc="n/a", dimensions=("D1", "D2"))
97class Dummy2bConfig(PipelineTaskConfig, pipelineConnections=Dummy2bConnections):
98 """Config used for testing dummy2b."""
100 conf1 = Field(dtype=int, default=1, doc="dummy config")
103class Dummy2bPipelineTask(PipelineTask):
104 """PipelineTask for dummy2b."""
106 ConfigClass = Dummy2bConfig
109class Dummy3Connections(PipelineTaskConnections, dimensions=("D1", "D2")):
110 """Third connections class used for testing."""
112 initInput = cT.InitInput(name="Dummy2InitOutput", storageClass="ExposureF", doc="n/a")
113 initOutput = cT.InitOutput(name="Dummy3InitOutput", storageClass="ExposureF", doc="n/a")
114 input = cT.Input(name="Dummy2Output", storageClass="ExposureF", doc="n/a", dimensions=("D1", "D2"))
115 output = cT.Output(name="Dummy3Output", storageClass="ExposureF", doc="n/a", dimensions=("D1", "D2"))
118class Dummy3Config(PipelineTaskConfig, pipelineConnections=Dummy3Connections):
119 """Third config used for testing."""
121 conf1 = Field(dtype=int, default=1, doc="dummy config")
124class Dummy3PipelineTask(PipelineTask):
125 """Third test PipelineTask."""
127 ConfigClass = Dummy3Config
130class Dummy4Connections(PipelineTaskConnections, dimensions=("D1", "D2")):
131 """Fourth connections class used for testing."""
133 initInput = cT.InitInput(name="Dummy3InitOutput", storageClass="ExposureF", doc="n/a")
134 initOutput = cT.InitOutput(name="Dummy4InitOutput", storageClass="ExposureF", doc="n/a")
135 input = cT.Input(name="Dummy3Output", storageClass="ExposureF", doc="n/a", dimensions=("D1", "D2"))
136 output = cT.Output(name="Dummy4Output", storageClass="ExposureF", doc="n/a", dimensions=("D1", "D2"))
139class Dummy4Config(PipelineTaskConfig, pipelineConnections=Dummy4Connections):
140 """Fourth config used for testing."""
142 conf1 = Field(dtype=int, default=1, doc="dummy config")
145class Dummy4PipelineTask(PipelineTask):
146 """Fourth test PipelineTask."""
148 ConfigClass = Dummy4Config
151# Test if a Task that does not interact with the other Tasks works fine in
152# the graph.
153class Dummy5Connections(PipelineTaskConnections, dimensions=("D1", "D2")):
154 """Fifth connections class used for testing."""
156 input = cT.Input(name="Dummy5Input", storageClass="ExposureF", doc="n/a", dimensions=("D1", "D2"))
157 output = cT.Output(name="Dummy5Output", storageClass="ExposureF", doc="n/a", dimensions=("D1", "D2"))
160class Dummy5Config(PipelineTaskConfig, pipelineConnections=Dummy5Connections):
161 """Fifth config used for testing."""
163 conf1 = Field(dtype=int, default=1, doc="dummy config")
166class Dummy5PipelineTask(PipelineTask):
167 """Fifth test PipelineTask."""
169 ConfigClass = Dummy5Config
172def _make_quantum(run, universe, task, task_def, dim1, dim2, intermediate_refs):
173 if task_def.connections.initInputs:
174 init_init_ds_type = DatasetType(
175 task_def.connections.initInput.name,
176 (),
177 storageClass=task_def.connections.initInput.storageClass,
178 universe=universe,
179 )
180 init_refs = [DatasetRef(init_init_ds_type, DataCoordinate.make_empty(universe), run=run)]
181 else:
182 init_refs = None
183 input_ds_type = DatasetType(
184 task_def.connections.input.name,
185 task_def.connections.input.dimensions,
186 storageClass=task_def.connections.input.storageClass,
187 universe=universe,
188 )
189 data_id = DataCoordinate.standardize({"D1": dim1, "D2": dim2}, universe=universe)
190 if ref := intermediate_refs.get((input_ds_type, data_id)):
191 input_refs = [ref]
192 else:
193 input_refs = [DatasetRef(input_ds_type, data_id, run=run)]
194 output_ds_type = DatasetType(
195 task_def.connections.output.name,
196 task_def.connections.output.dimensions,
197 storageClass=task_def.connections.output.storageClass,
198 universe=universe,
199 )
200 ref = DatasetRef(output_ds_type, data_id, run=run)
201 intermediate_refs[(output_ds_type, data_id)] = ref
202 output_refs = [ref]
203 quantum = Quantum(
204 taskName=task.__qualname__,
205 dataId=data_id,
206 taskClass=task,
207 initInputs=init_refs,
208 inputs={input_ds_type: input_refs},
209 outputs={output_ds_type: output_refs},
210 )
211 return quantum
214def make_test_helper() -> InMemoryRepo:
215 """Make a test helper that can produce a quantum graph useful for
216 clustering tests.
218 See `make_quantum_graph` for a more complete description of this graph.
219 """
220 dimension_config = DimensionConfig(
221 {
222 "version": 1,
223 "skypix": {
224 "common": "htm7",
225 "htm": {
226 "class": "lsst.sphgeom.HtmPixelization",
227 "max_level": 24,
228 },
229 },
230 "elements": {
231 "D1": {
232 "keys": [
233 {
234 "name": "id",
235 "type": "int",
236 }
237 ],
238 "storage": {
239 "cls": "lsst.daf.butler.registry.dimensions.table.TableDimensionRecordStorage",
240 },
241 },
242 "D2": {
243 "keys": [
244 {
245 "name": "id",
246 "type": "int",
247 }
248 ],
249 "storage": {
250 "cls": "lsst.daf.butler.registry.dimensions.table.TableDimensionRecordStorage",
251 },
252 },
253 },
254 "packers": {},
255 }
256 )
257 helper = InMemoryRepo(dimension_config=dimension_config)
258 helper.butler.registry.insertDimensionData("D1", *[{"id": n} for n in (1, 3)])
259 helper.butler.registry.insertDimensionData("D2", *[{"id": n} for n in (2, 4)])
260 # Note that automatic inputs and outputs work for most of the tasks below
261 # to create a chain from each task to the next.
262 helper.add_task(
263 "T1",
264 dimensions=("D1", "D2"),
265 init_outputs={"initOutput": DynamicConnectionConfig(dataset_type_name="Dummy1InitOutput")},
266 )
267 helper.add_task(
268 "T2",
269 dimensions=("D1", "D2"),
270 init_inputs={"initInput": DynamicConnectionConfig(dataset_type_name="Dummy1InitOutput")},
271 init_outputs={"initOutput": DynamicConnectionConfig(dataset_type_name="Dummy2InitOutput")},
272 )
273 helper.add_task(
274 "T3",
275 dimensions=("D1", "D2"),
276 init_inputs={"initInput": DynamicConnectionConfig(dataset_type_name="Dummy2InitOutput")},
277 init_outputs={"initOutput": DynamicConnectionConfig(dataset_type_name="Dummy3InitOutput")},
278 )
279 helper.add_task(
280 "T4",
281 dimensions=("D1", "D2"),
282 init_inputs={"initInput": DynamicConnectionConfig(dataset_type_name="Dummy3InitOutput")},
283 init_outputs={"initOutput": DynamicConnectionConfig(dataset_type_name="Dummy4InitOutput")},
284 )
285 helper.add_task(
286 "T5",
287 dimensions=("D1", "D2"),
288 inputs={
289 "initInput": DynamicConnectionConfig(dataset_type_name="Dummy5Input", dimensions=("D1", "D2"))
290 },
291 outputs={
292 "initOutput": DynamicConnectionConfig(dataset_type_name="Dummy5Output", dimensions=("D1", "D2"))
293 },
294 )
295 helper.add_task(
296 "T2b",
297 dimensions=("D1", "D2"),
298 init_inputs={"initInput": DynamicConnectionConfig(dataset_type_name="Dummy2InitOutput")},
299 init_outputs={"initOutput": DynamicConnectionConfig(dataset_type_name="Dummy2bInitOutput")},
300 inputs={"input": DynamicConnectionConfig(dataset_type_name="dataset_auto2", dimensions=("D1", "D2"))},
301 )
302 helper.insert_datasets("dataset_auto0", where="D1 < D2")
303 helper.insert_datasets("Dummy5Input", where="D1 < D2")
304 return helper
307def make_test_quantum_graph(run: str = "run", uneven=False):
308 """Create a quantum graph for unit tests.
310 Parameters
311 ----------
312 run : `str`, optional
313 Name of the RUN collection for output datasets.
314 uneven : `bool`, optional
315 Whether some of the quanta for initial tasks are
316 not included as if finished in previous run.
318 Returns
319 -------
320 qgraph : `lsst.pipe.base.quantum_graph.PredictedQuantumGraph`
321 A test QuantumGraph looking like the following:
322 (Task T5 is completely independent).
324 Numbers in parens are the values for the two dimensions (D1, D2).
326 .. code-block::
327 T1(1,2) T1(1,4) T1(3,4) T5(1,2) T5(1,4) T5(3,4)
328 | | |
329 T2(1,2) T2(1,4) T2(3,4)
330 | | | | | |
331 | T2b(1,2) | T2b(1,4) | T2b(3,4)
332 | | |
333 T3(1,2) T3(1,4) T3(3,4)
334 | | |
335 T4(1,2) T4(1,4) T4(3,4)
336 """
337 with make_test_helper() as helper:
338 qgc = helper.make_quantum_graph_builder(output_run=run).finish(attach_datastore_records=False)
339 if uneven:
340 keys_to_drop = {("T1", 1, 2), ("T1", 1, 4), ("T2", 1, 2)}
341 qgc.quantum_datasets = {
342 qd.quantum_id: qd
343 for qd in qgc.quantum_datasets.values()
344 if (qd.task_label, *qd.data_coordinate) not in keys_to_drop
345 }
346 qgc.set_thin_graph()
347 qgc.set_header_counts()
348 return qgc.assemble()