Coverage for tests / test_drp-v2.py: 19%
145 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-18 09:24 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-18 09:24 +0000
1# This file is part of drp_pipe.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
22from __future__ import annotations
24import logging
25import os
26import re
27import tempfile
28import unittest
29from collections.abc import Set
30from typing import Any
32from lsst.daf.butler import Butler, Config, DatasetType
33from lsst.daf.butler.tests.utils import makeTestTempDir, removeTestTempDir
34from lsst.pipe.base import Pipeline, PipelineGraph
36from lsst.drp.pipe.tests.correspondence import Correspondence
38_LOG = logging.getLogger(__name__)
41PIPELINES_DIR = os.path.join(os.path.dirname(__file__), "..", "pipelines")
42TEST_DIR = os.path.abspath(os.path.dirname(__file__))
45COMCAM_REFCAT = "the_monster_20250219"
46COMCAM_INPUTS = {
47 COMCAM_REFCAT,
48 "raw",
49 "flat",
50 "dark",
51 "bias",
52 "bfk",
53 "linearizer",
54 "cti",
55 "ptc",
56 "gain_correction",
57 "crosstalk",
58 "camera",
59 "defects",
60 "skyMap",
61 "fgcmLookUpTable",
62 "pretrainedModelPackage",
63 "illuminationCorrection",
64 # Solar system state required to generateEphemerides
65 "mpcorb",
66 "de440s",
67 "sb441_n16",
68 "obscodes",
69 "linux_p1550p2650",
70 "pck00010",
71 "earth_latest_high_prec",
72 "earth_620120_250826",
73 "earth_2025_250826_2125_predict",
74 "naif0012",
75}
78class DrpV2TestCase(unittest.TestCase):
79 """Self-consistency and overall-input checks for the DRP-v2 pipeline
80 variants.
81 """
83 def setUp(self):
84 self.root = makeTestTempDir(TEST_DIR)
85 self.maxDiff = None
87 def tearDown(self):
88 removeTestTempDir(self.root)
90 def make_butler(self, **kwargs: Any) -> Butler:
91 """Construct a butler repository and return a `Butler` client."""
92 config = Config()
94 # make separate temporary directory for registry of this instance
95 tmpdir = tempfile.mkdtemp(dir=self.root)
96 config["registry", "db"] = f"sqlite:///{tmpdir}/gen3.sqlite3"
97 config = Butler.makeRepo(self.root, config)
98 butler = Butler.from_config(config, **kwargs)
99 return butler
101 def register_refcat(self, butler: Butler, name: str) -> None:
102 butler.registry.registerDatasetType(
103 DatasetType(
104 name,
105 {"htm7"},
106 "SimpleCatalog",
107 isCalibration=False,
108 universe=butler.dimensions,
109 )
110 )
112 def register_dataset(self, butler: Butler, name: str, dimensions: set(str), storageClass: str) -> None:
113 butler.registry.registerDatasetType(
114 DatasetType(
115 name,
116 dimensions,
117 storageClass,
118 isCalibration=False,
119 universe=butler.dimensions,
120 )
121 )
123 def test_comcam_full_load(self) -> None:
124 """Test the LSSTComCam/DRP-v2 pipeline after reading the full pipeline
125 at once.
126 """
127 butler = self.make_butler(writeable=True)
128 self.register_refcat(butler, COMCAM_REFCAT)
129 pipeline = Pipeline.from_uri(
130 os.path.join(PIPELINES_DIR, "LSSTComCam/DRP-v2.yaml")
131 )
132 # Just constructing and resolving the pipeline graph does a lot of
133 # validation, since it checks that the step flow and dimensions are
134 # valid (step flow is what the PipelineStepTester checks for the v1
135 # pipelines).
136 pipeline_graph = pipeline.to_graph(registry=butler.registry)
137 self.check_stage(
138 pipeline_graph, pipeline_graph.task_subsets["stage1-single-visit"], "step1"
139 )
140 self.check_stage(
141 pipeline_graph, pipeline_graph.task_subsets["stage2-recalibrate"], "step2"
142 )
143 self.check_stage(
144 pipeline_graph, pipeline_graph.task_subsets["stage3-coadd"], "step3"
145 )
146 self.check_stage(
147 pipeline_graph,
148 pipeline_graph.task_subsets["stage4-measure-variability"],
149 "step4",
150 )
151 # Check that the overall inputs are only the ones we expect.
152 overall_inputs = {name for name, _ in pipeline_graph.iter_overall_inputs()}
153 self.assertEqual(overall_inputs, COMCAM_INPUTS)
155 def test_comcam_stage_load(self) -> None:
156 """Test the LSSTComCam/DRP-v2 pipeline after reading the pipeline
157 one stage at a time.
158 """
159 butler = self.make_butler(writeable=True)
160 self.register_refcat(butler, COMCAM_REFCAT)
161 # Pre-define visit_table.
162 # It is stored as ArrowAstropy but read as a DataFrame.
163 self.register_dataset(butler, "visit_table", {"instrument"}, "ArrowAstropy")
164 pipeline_graph_1 = Pipeline.from_uri(
165 os.path.join(PIPELINES_DIR, "LSSTComCam/DRP-v2.yaml#stage1-single-visit")
166 ).to_graph(registry=butler.registry)
167 pipeline_graph_1.register_dataset_types(butler)
168 self.check_stage(pipeline_graph_1, pipeline_graph_1.tasks.keys(), "")
169 pipeline_graph_2 = Pipeline.from_uri(
170 os.path.join(PIPELINES_DIR, "LSSTComCam/DRP-v2.yaml#stage2-recalibrate")
171 ).to_graph(registry=butler.registry)
172 pipeline_graph_2.register_dataset_types(butler)
173 self.check_stage(
174 pipeline_graph_2, pipeline_graph_2.task_subsets["stage2-recalibrate"], ""
175 )
176 pipeline_graph_3 = Pipeline.from_uri(
177 os.path.join(PIPELINES_DIR, "LSSTComCam/DRP-v2.yaml#stage3-coadd")
178 ).to_graph(registry=butler.registry)
179 pipeline_graph_3.register_dataset_types(butler)
180 self.check_stage(
181 pipeline_graph_3, pipeline_graph_3.task_subsets["stage3-coadd"], ""
182 )
183 pipeline_graph_4 = Pipeline.from_uri(
184 os.path.join(
185 PIPELINES_DIR, "LSSTComCam/DRP-v2.yaml#stage4-measure-variability"
186 )
187 ).to_graph(registry=butler.registry)
188 pipeline_graph_4.register_dataset_types(butler)
189 self.check_stage(
190 pipeline_graph_4,
191 pipeline_graph_4.task_subsets["stage4-measure-variability"],
192 "",
193 )
194 # Spot-check a few prominent outputs for each stage.
195 self.assertIsNotNone(pipeline_graph_1.producer_of("single_visit_star"))
196 self.assertIsNotNone(pipeline_graph_1.producer_of("preliminary_visit_image"))
197 self.assertIsNotNone(pipeline_graph_1.producer_of("preliminary_visit_summary"))
198 self.assertIsNotNone(pipeline_graph_1.producer_of("preliminary_visit_table"))
199 self.assertIsNotNone(
200 pipeline_graph_1.producer_of("preliminary_visit_detector_table")
201 )
202 self.assertIsNotNone(
203 pipeline_graph_1.producer_of("single_visit_star_association_metrics")
204 )
205 self.assertIsNotNone(pipeline_graph_2.producer_of("recalibrated_star"))
206 self.assertIsNotNone(
207 pipeline_graph_2.producer_of("recalibrated_star_association_metrics")
208 )
209 self.assertIsNotNone(pipeline_graph_2.producer_of("visit_summary"))
210 self.assertIsNotNone(pipeline_graph_2.producer_of("visit_table"))
211 self.assertIsNotNone(pipeline_graph_2.producer_of("visit_detector_table"))
212 self.assertIsNotNone(pipeline_graph_3.producer_of("deep_coadd"))
213 self.assertIsNotNone(pipeline_graph_3.producer_of("template_coadd"))
214 self.assertIsNotNone(pipeline_graph_3.producer_of("object"))
215 self.assertIsNotNone(pipeline_graph_4.producer_of("dia_source"))
216 self.assertIsNotNone(pipeline_graph_4.producer_of("dia_object"))
217 self.assertIsNotNone(pipeline_graph_4.producer_of("source"))
218 self.assertIsNotNone(
219 pipeline_graph_4.producer_of("analysis_source_association_metrics")
220 )
221 self.assertIsNotNone(pipeline_graph_4.producer_of("visit_image"))
222 self.assertIsNotNone(pipeline_graph_4.producer_of("object_forced_source"))
223 self.assertIsNotNone(pipeline_graph_4.producer_of("dia_object_forced_source"))
225 def check_stage(
226 self, pipeline_graph: PipelineGraph, stage_members: Set[str], step_prefix: str
227 ) -> None:
228 """Check that the formal steps that should subdivide a stage subset
229 actually do.
231 Parameters
232 ----------
233 pipeline_graph : `lsst.pipe.base.pipeline_graph.PipelineGraph`
234 A pipeline graph that has step definitions.
235 stage_members : `Set` [ `str` ]
236 The task labels that belong to the stage being tested.
237 step_prefix : `str`
238 String prefix for all steps that are part of this stage. May be
239 an empty string to match all steps in the pipeline and check that
240 only the desired steps are in the pipeline.
241 """
242 step_labels = [
243 label for label in pipeline_graph.steps if label.startswith(step_prefix)
244 ]
245 # Steps should be sorted alphabetically.
246 self.assertEqual(sorted(step_labels), step_labels)
247 # Test that the checkpoint/stage subset is the unions of its steps.
248 step_member_union = set()
249 for step_label in step_labels:
250 step_member_union.update(pipeline_graph.task_subsets[step_label])
251 self.assertSetEqual(step_member_union, set(stage_members))
253 def test_comcam_correspondence(self) -> None:
254 butler = self.make_butler(writeable=True)
255 self.register_refcat(butler, COMCAM_REFCAT)
256 correspondence_filename = os.path.join(PIPELINES_DIR, "LSSTComCam/DRP-v2.yaml")
257 new_pipeline_graph = Pipeline.from_uri(correspondence_filename).to_graph(
258 registry=butler.registry
259 )
260 old_steps = [
261 "step1",
262 "step2a",
263 "step2b",
264 "step2c",
265 "step2d",
266 "step2e",
267 "step3a",
268 "step3b",
269 "step4",
270 "step5",
271 "step6",
272 "step7",
273 ]
274 old_pipeline_graph = Pipeline.from_uri(
275 os.path.join(PIPELINES_DIR, f"LSSTComCam/DRP.yaml#{','.join(old_steps)}")
276 ).to_graph(registry=butler.registry)
277 ignore_edges = {
278 # Use recalibrated_star instead of single_visit_star for calib
279 # flag propagation.
280 ("measureObjectUnforced", "sourceTableHandles"),
281 # Use deep_coadd instead of deep_coadd_preliminary for WCS.
282 ("measureObjectForced", "refWcs"),
283 }
284 correspondence_filename = os.path.join(
285 TEST_DIR, "migration_data", "LSSTComCam", "DRP-v2.json"
286 )
287 correspondence = Correspondence.read(correspondence_filename)
288 if correspondence.check(
289 new_pipeline_graph,
290 old_pipeline_graph,
291 "LSSTComCam/DRP-v2",
292 "LSSTComCam/DRP",
293 ignore_edges=ignore_edges,
294 ):
295 new_correspondence = correspondence.find_matches(
296 new_pipeline_graph, old_pipeline_graph
297 ).sorted(new_pipeline_graph, old_pipeline_graph)
298 new_correspondence_filename = (
299 f"{os.path.splitext(correspondence_filename)[0]}.new.json"
300 )
301 new_correspondence.write(new_correspondence_filename)
302 print(
303 "An attempt to fix the correspondence file has "
304 f"been written to {new_correspondence_filename!r}."
305 )
306 if messages := new_correspondence.check(
307 new_pipeline_graph,
308 old_pipeline_graph,
309 "LSSTComCam/DRP-v2",
310 "LSSTComCam/DRP",
311 ignore_edges=ignore_edges,
312 ):
313 print("Some problems remain that require manual edits:")
314 for message in messages:
315 print(message)
316 else:
317 print("This file should be reviewed and moved to replace the original.")
318 raise AssertionError(
319 f"Pipeline correspondence file {correspondence_filename!r} is out of date."
320 )
321 for new_label, old_label in correspondence.tasks_new_to_old.items():
322 new_node = new_pipeline_graph.tasks[new_label]
323 old_node = old_pipeline_graph.tasks[old_label]
324 task_config_diff = correspondence.diff_task_configs(new_node, old_node)
325 if task_config_diff:
326 print(task_config_diff)
327 raise AssertionError(
328 f"Differences detected in configs for task {new_label!r} (previously {old_label!r})."
329 )
331 def test_comcam_compat(self) -> None:
332 """Test the LSSTComCam/DRP-v2-compat pipeline, which should be
333 identical to DRP-v2 aside from replacing 'source' -> 'source2'
334 throughout.
335 """
336 butler = self.make_butler(writeable=True)
337 self.register_refcat(butler, COMCAM_REFCAT)
338 pipeline = Pipeline.from_uri(
339 os.path.join(PIPELINES_DIR, "LSSTComCam/DRP-v2.yaml")
340 )
341 pipeline_graph = pipeline.to_graph(registry=butler.registry)
342 pipeline_compat = Pipeline.from_uri(
343 os.path.join(PIPELINES_DIR, "LSSTComCam/DRP-v2-compat.yaml")
344 )
345 pipeline_graph_compat = pipeline_compat.to_graph(registry=butler.registry)
346 self.assertFalse(
347 Correspondence(
348 dataset_types_new_to_old={"source2": "source"},
349 ).find_matches(pipeline_graph_compat, pipeline_graph).check(
350 pipeline_graph_compat, pipeline_graph, "DRP-v2-compat", "DRP-v2"
351 )
352 )
354 def test_comcam_release_id_parameter(self) -> None:
355 """Test that changing the release_id parameter affects all appropriate
356 config options in the pipeline.
357 """
358 pipeline = Pipeline.fromString(
359 """
360 description: test pipeline
361 imports:
362 - $DRP_PIPE_DIR/pipelines/LSSTComCam/DRP-v2.yaml
363 parameters:
364 release_id: 52
365 """
366 )
367 pipeline_graph = pipeline.to_graph()
368 regex = re.compile(r"config\.(.+)\.release_id\=")
369 release_id_options = []
370 for task_node in pipeline_graph.tasks.values():
371 for match_string in regex.findall(task_node.get_config_str()):
372 attribute_path: list[str] = match_string.split(".")
373 if any(not term.isidentifier() for term in attribute_path):
374 _LOG.warning(
375 f"Not checking config option {task_node.label}:{match_string}.release_id "
376 "because the test code is not sophisticated enough. Please improve it."
377 )
378 continue
379 config_attribute = task_node.config
380 for term in attribute_path:
381 config_attribute = getattr(config_attribute, term)
382 self.assertEqual(config_attribute.release_id, 52)
383 release_id_options.append((task_node.label, attribute_path))
384 # Spot check a few expected entries to make sure a logic bug or bad
385 # regex isn't preventing this test from doing anything.
386 self.assertIn(("calibrateImage", ["id_generator"]), release_id_options)
387 self.assertIn(("detectCoaddPeaks", ["idGenerator"]), release_id_options)
390if __name__ == "__main__": 390 ↛ 391line 390 didn't jump to line 391 because the condition on line 390 was never true
391 unittest.main()