Coverage for tests / test_pipelines.py: 16%
100 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-07 08:56 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-07 08:56 +0000
1# This file is part of ap_pipe.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
22import itertools
23import tempfile
24import unittest
26import lsst.daf.butler.tests as butlerTests
27import lsst.pipe.base
28from lsst.pipe.base.tests.pipelineStepTester import PipelineStepTester # Can't use fully-qualified name
29import lsst.utils
30import lsst.utils.tests
32from lsst.resources import ResourcePath
35class PipelineDefintionsTestSuite(lsst.utils.tests.TestCase):
36 """Tests of the self-consistency of our pipeline definitions.
37 """
38 def setUp(self):
39 self.path = ResourcePath("eups://ap_pipe/pipelines/", forceDirectory=True)
40 # Each pipeline file should have a subset that represents it in
41 # higher-level pipelines.
42 self.synonyms = {"ApPipe.yaml": "apPipe",
43 "ApPipeWithIsrTaskLSST.yaml": "apPipe",
44 "ApPipeWithPreconvolution.yaml": "apPipe",
45 "ApPipeWithFakes.yaml": "apPipe",
46 "SingleFrame.yaml": "singleFrame",
47 "SingleFrameWithIsrTaskLSST.yaml": "singleFrame",
48 "RunIsrWithoutInterChipCrosstalk.yaml": "runIsr",
49 "RunIsrForCrosstalkSources.yaml": "runOverscan",
50 }
52 def test_graph_build(self):
53 """Test that each pipeline definition file can be
54 used to build a graph.
55 """
56 files = ResourcePath.findFileResources([self.path], file_filter=r".*\.yaml$")
57 for file in files:
58 if "QuickTemplate" in file.path:
59 # Our QuickTemplate definition cannot be tested here because it
60 # depends on drp_tasks, which we cannot make a dependency here.
61 continue
62 if "PromptTemplate" in file.path:
63 # Our PromptTemplate definition cannot be tested here because it
64 # depends on drp_tasks, which we cannot make a dependency here.
65 continue
66 with self.subTest(file=str(file)):
67 pipeline = lsst.pipe.base.Pipeline.from_uri(file)
68 pipeline.addConfigOverride("parameters", "apdb_config", "some/file/path.yaml")
69 # If this fails, it will produce a useful error message.
70 pipeline.to_graph()
72 def test_datasets(self):
73 files = [
74 f for f in ResourcePath.findFileResources(
75 [self.path.join("_ingredients", forceDirectory=True)], file_filter=r".*\.yaml$"
76 )
77 # Validation currently broken for injection pipelines.
78 # TODO: DM-54077
79 if "injection/" not in f.path
80 ]
81 for file in files:
82 if "QuickTemplate" in file.path:
83 # Our QuickTemplate definition cannot be tested here because it
84 # depends on drp_tasks, which we cannot make a dependency here.
85 continue
86 with self.subTest(file=str(file)):
87 expected_inputs = {
88 # ISR
89 "raw", "camera", "crosstalk", "crosstalkSources", "bias", "dark", "flat", "ptc",
90 "fringe", "straylightData", "bfKernel", "newBFKernel", "defects", "linearizer",
91 "opticsTransmission", "filterTransmission", "atmosphereTransmission",
92 "illumMaskedImage", "deferredChargeCalib",
93 # ISR-LSST
94 "bfk", "cti", "dnlLUT", "gain_correction",
95 # Everything else
96 "skyMap", "gaia_dr3_20230707", "gaia_dr2_20200414", "ps1_pv3_3pi_20170110",
97 "template_coadd", "pretrainedModelPackage", "dia_source_apdb"
98 }
99 if "WithFakes" in file.path:
100 expected_inputs.add("injection_catalog")
101 tester = PipelineStepTester(
102 filename=file,
103 step_suffixes=[""], # Test full pipeline
104 initial_dataset_types=[("ps1_pv3_3pi_20170110", {"htm7"}, "SimpleCatalog", False),
105 ("gaia_dr2_20200414", {"htm7"}, "SimpleCatalog", False),
106 ("gaia_dr3_20230707", {"htm7"}, "SimpleCatalog", False),
107 ],
108 expected_inputs=expected_inputs,
109 # Pipeline outputs highly in flux, don't test
110 expected_outputs=set(),
111 pipeline_patches={"parameters:apdb_config": "some/file/path.yaml",
112 },
113 )
114 # Tester modifies Butler registry, so need a fresh repo every time
115 with tempfile.TemporaryDirectory() as tempRepo:
116 butler = butlerTests.makeTestRepo(tempRepo)
117 tester.run(butler, self)
119 def test_whole_subset(self):
120 """Test that each pipeline's synonymous subset includes all tasks,
121 including those imported from other files.
122 """
123 files = [
124 f for f in ResourcePath.findFileResources([self.path], file_filter=r".*\.yaml$")
125 # Validation currently broken for injection pipelines.
126 # TODO: DM-54077
127 if "injection/" not in f.path
128 ]
129 for file in files:
130 if "QuickTemplate" in file.path:
131 # Our QuickTemplate definition cannot be tested here because it
132 # depends on drp_tasks, which we cannot make a dependency here.
133 continue
134 elif "ApdbDeduplication" in file.path:
135 # The task to export catalogs from the APDB and re-run
136 # association is not intended to be part of Prompt Processing
137 # or batch AP pipeline runs.
138 continue
139 elif "PromptTemplate" in file.path:
140 # Our PromptTemplate definition cannot be tested here because it
141 # depends on drp_tasks, which we cannot make a dependency here.
142 continue
143 with self.subTest(file=str(file)):
144 pipeline = lsst.pipe.base.Pipeline.from_uri(file)
145 subset = self.synonyms.get(file.basename(), "<unknown_synonym>")
146 self.assertEqual(pipeline.subsets.get(subset, "<missing>"), set(pipeline.task_labels),
147 msg=f"These tasks are missing from subset '{subset}'")
149 def test_ap_pipe_subsets(self):
150 """Test the unique subsets of ApPipe.
151 """
152 files = [
153 f for f in ResourcePath.findFileResources([self.path], file_filter=r"^ApPipe.*\.yaml$")
154 # Validation currently broken for injection pipelines.
155 # TODO: DM-54077
156 if "injection/" not in f.path
157 ]
158 required_subsets = {"preload", "prompt", "afterburner"}
159 # getRegionTimeFromVisit is part of no subset besides apPipe. This is a
160 # very deliberate exception; see RFC-997.
161 no_subset_wanted = {"getRegionTimeFromVisit"}
163 for file in files:
164 with self.subTest(file=str(file)):
165 pipeline = lsst.pipe.base.Pipeline.from_uri(file)
166 # Do all steps exist?
167 self.assertGreaterEqual(pipeline.subsets.keys(), required_subsets,
168 msg="An AP pipeline is missing subsets "
169 f"{required_subsets - pipeline.subsets.keys()}.")
170 # Is each task part of exactly one step?
171 for set1, set2 in itertools.product(required_subsets, required_subsets):
172 if set1 == set2:
173 continue
174 tasks1 = pipeline.subsets[set1]
175 tasks2 = pipeline.subsets[set2]
176 self.assertTrue(tasks1.isdisjoint(tasks2),
177 msg=f"Subsets '{set1}' and '{set2}' share tasks "
178 f"{tasks1.intersection(tasks2)}.")
179 subsetted = set().union(*[pipeline.subsets[s] for s in required_subsets])
180 self.assertEqual(subsetted, set(pipeline.task_labels) - no_subset_wanted,
181 msg=f"These tasks are not in any of the subsets {required_subsets}.")
183 def test_preconvolution_isr_matches_ap_pipe(self):
184 """Test that, for each instrument, ApPipeWithPreconvolution defines
185 the same isr task (class and config) as the corresponding ApPipe.
187 Preconvolution changes only image subtraction and DIA-source
188 detection; instrument signature removal must be unaffected.
189 """
190 files = [
191 f for f in ResourcePath.findFileResources(
192 [self.path], file_filter=r"^ApPipeWithPreconvolution\.yaml$"
193 )
194 if "_ingredients" not in f.path
195 ]
196 # Sanity-check that this test actually has cameras to compare.
197 self.assertGreater(len(files), 0,
198 msg="No camera-specific ApPipeWithPreconvolution.yaml files found.")
200 for precon_file in files:
201 with self.subTest(file=str(precon_file)):
202 base_file = precon_file.dirname().join("ApPipe.yaml")
203 self.assertTrue(base_file.exists(),
204 msg=f"Expected sibling ApPipe.yaml next to {precon_file}: "
205 f"{base_file} does not exist.")
207 precon = lsst.pipe.base.Pipeline.from_uri(precon_file)
208 base = lsst.pipe.base.Pipeline.from_uri(base_file)
209 # apdb_config has no default and must be set before to_graph().
210 precon.addConfigOverride("parameters", "apdb_config", "some/file/path.yaml")
211 base.addConfigOverride("parameters", "apdb_config", "some/file/path.yaml")
213 precon_isr = precon.to_graph().tasks["isr"]
214 base_isr = base.to_graph().tasks["isr"]
216 self.assertEqual(precon_isr.task_class_name, base_isr.task_class_name,
217 msg=f"isr task class differs between ApPipe.yaml and "
218 f"ApPipeWithPreconvolution.yaml in {precon_file.dirname()}.")
219 # Can't just do `assertEqual(precon_isr, base_isr)` since
220 # Task nodes are intentionally not equality comparable.
221 self.assertTrue(
222 base_isr.config.compare(precon_isr.config, shortcut=False),
223 msg=f"isr task config differs between ApPipe.yaml and "
224 f"ApPipeWithPreconvolution.yaml in {precon_file.dirname()}."
225 )
227 def test_inherited_subsets(self):
228 """Test that instrument-specific pipelines have all the subsets of their
229 generic counterparts.
231 Note that this does not check inheritance *within* `_ingredients`!
232 """
233 files = [
234 f for f in ResourcePath.findFileResources([self.path], file_filter=r".*\.yaml$")
235 if "_ingredients" not in f.path
236 ]
237 for file in files:
238 if "QuickTemplate" in file.path:
239 # Our QuickTemplate definition cannot be tested here because it
240 # depends on drp_tasks, which we cannot make a dependency here.
241 continue
242 with self.subTest(file=str(file)):
243 generic = self.path.join("_ingredients/", forceDirectory=True).join(file.basename())
244 if not generic.exists():
245 continue
246 special_subsets = lsst.pipe.base.Pipeline.from_uri(file).subsets.keys()
247 generic_subsets = lsst.pipe.base.Pipeline.from_uri(generic).subsets.keys()
248 self.assertGreaterEqual(special_subsets, generic_subsets,
249 msg="The instrument-specific pipeline is missing subsets "
250 f"{generic_subsets - special_subsets}.")
253class MemoryTester(lsst.utils.tests.MemoryTestCase):
254 pass
257def setup_module(module):
258 lsst.utils.tests.init()
261if __name__ == "__main__": 261 ↛ 262line 261 didn't jump to line 262 because the condition on line 261 was never true
262 lsst.utils.tests.init()
263 unittest.main()