Coverage for tests / test_pipelines.py: 18%
85 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-30 09:30 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-30 09:30 +0000
1# This file is part of ap_pipe.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
22import itertools
23import tempfile
24import unittest
26import lsst.daf.butler.tests as butlerTests
27import lsst.pipe.base
28from lsst.pipe.base.tests.pipelineStepTester import PipelineStepTester # Can't use fully-qualified name
29import lsst.utils
30import lsst.utils.tests
32from lsst.resources import ResourcePath
35class PipelineDefintionsTestSuite(lsst.utils.tests.TestCase):
36 """Tests of the self-consistency of our pipeline definitions.
37 """
38 def setUp(self):
39 self.path = ResourcePath("eups://ap_pipe/pipelines/", forceDirectory=True)
40 # Each pipeline file should have a subset that represents it in
41 # higher-level pipelines.
42 self.synonyms = {"ApPipe.yaml": "apPipe",
43 "ApPipeWithIsrTaskLSST.yaml": "apPipe",
44 "ApPipeWithFakes.yaml": "apPipe",
45 "SingleFrame.yaml": "singleFrame",
46 "SingleFrameWithIsrTaskLSST.yaml": "singleFrame",
47 "RunIsrWithoutInterChipCrosstalk.yaml": "runIsr",
48 "RunIsrForCrosstalkSources.yaml": "runOverscan",
49 }
51 def test_graph_build(self):
52 """Test that each pipeline definition file can be
53 used to build a graph.
54 """
55 files = ResourcePath.findFileResources([self.path], file_filter=r".*\.yaml$")
56 for file in files:
57 if "QuickTemplate" in file.path:
58 # Our QuickTemplate definition cannot be tested here because it
59 # depends on drp_tasks, which we cannot make a dependency here.
60 continue
61 if "PromptTemplate" in file.path:
62 # Our PromptTemplate definition cannot be tested here because it
63 # depends on drp_tasks, which we cannot make a dependency here.
64 continue
65 with self.subTest(file=str(file)):
66 pipeline = lsst.pipe.base.Pipeline.from_uri(file)
67 pipeline.addConfigOverride("parameters", "apdb_config", "some/file/path.yaml")
68 # If this fails, it will produce a useful error message.
69 pipeline.to_graph()
71 def test_datasets(self):
72 files = [
73 f for f in ResourcePath.findFileResources(
74 [self.path.join("_ingredients", forceDirectory=True)], file_filter=r".*\.yaml$"
75 )
76 # Validation currently broken for injection pipelines.
77 # TODO: DM-54077
78 if "injection/" not in f.path
79 ]
80 for file in files:
81 if "QuickTemplate" in file.path:
82 # Our QuickTemplate definition cannot be tested here because it
83 # depends on drp_tasks, which we cannot make a dependency here.
84 continue
85 with self.subTest(file=str(file)):
86 expected_inputs = {
87 # ISR
88 "raw", "camera", "crosstalk", "crosstalkSources", "bias", "dark", "flat", "ptc",
89 "fringe", "straylightData", "bfKernel", "newBFKernel", "defects", "linearizer",
90 "opticsTransmission", "filterTransmission", "atmosphereTransmission",
91 "illumMaskedImage", "deferredChargeCalib",
92 # ISR-LSST
93 "bfk", "cti", "dnlLUT", "gain_correction",
94 # Everything else
95 "skyMap", "gaia_dr3_20230707", "gaia_dr2_20200414", "ps1_pv3_3pi_20170110",
96 "template_coadd", "pretrainedModelPackage", "dia_source_apdb"
97 }
98 if "WithFakes" in file.path:
99 expected_inputs.add("injection_catalog")
100 tester = PipelineStepTester(
101 filename=file,
102 step_suffixes=[""], # Test full pipeline
103 initial_dataset_types=[("ps1_pv3_3pi_20170110", {"htm7"}, "SimpleCatalog", False),
104 ("gaia_dr2_20200414", {"htm7"}, "SimpleCatalog", False),
105 ("gaia_dr3_20230707", {"htm7"}, "SimpleCatalog", False),
106 ],
107 expected_inputs=expected_inputs,
108 # Pipeline outputs highly in flux, don't test
109 expected_outputs=set(),
110 pipeline_patches={"parameters:apdb_config": "some/file/path.yaml",
111 },
112 )
113 # Tester modifies Butler registry, so need a fresh repo every time
114 with tempfile.TemporaryDirectory() as tempRepo:
115 butler = butlerTests.makeTestRepo(tempRepo)
116 tester.run(butler, self)
118 def test_whole_subset(self):
119 """Test that each pipeline's synonymous subset includes all tasks,
120 including those imported from other files.
121 """
122 files = [
123 f for f in ResourcePath.findFileResources([self.path], file_filter=r".*\.yaml$")
124 # Validation currently broken for injection pipelines.
125 # TODO: DM-54077
126 if "injection/" not in f.path
127 ]
128 for file in files:
129 if "QuickTemplate" in file.path:
130 # Our QuickTemplate definition cannot be tested here because it
131 # depends on drp_tasks, which we cannot make a dependency here.
132 continue
133 elif "ApdbDeduplication" in file.path:
134 # The task to export catalogs from the APDB and re-run
135 # association is not intended to be part of Prompt Processing
136 # or batch AP pipeline runs.
137 continue
138 elif "PromptTemplate" in file.path:
139 # Our PromptTemplate definition cannot be tested here because it
140 # depends on drp_tasks, which we cannot make a dependency here.
141 continue
142 with self.subTest(file=str(file)):
143 pipeline = lsst.pipe.base.Pipeline.from_uri(file)
144 subset = self.synonyms.get(file.basename(), "<unknown_synonym>")
145 self.assertEqual(pipeline.subsets.get(subset, "<missing>"), set(pipeline.task_labels),
146 msg=f"These tasks are missing from subset '{subset}'")
148 def test_ap_pipe_subsets(self):
149 """Test the unique subsets of ApPipe.
150 """
151 files = [
152 f for f in ResourcePath.findFileResources([self.path], file_filter=r"^ApPipe.*\.yaml$")
153 # Validation currently broken for injection pipelines.
154 # TODO: DM-54077
155 if "injection/" not in f.path
156 ]
157 required_subsets = {"preload", "prompt", "afterburner"}
158 # getRegionTimeFromVisit is part of no subset besides apPipe. This is a
159 # very deliberate exception; see RFC-997.
160 no_subset_wanted = {"getRegionTimeFromVisit"}
162 for file in files:
163 with self.subTest(file=str(file)):
164 pipeline = lsst.pipe.base.Pipeline.from_uri(file)
165 # Do all steps exist?
166 self.assertGreaterEqual(pipeline.subsets.keys(), required_subsets,
167 msg="An AP pipeline is missing subsets "
168 f"{required_subsets - pipeline.subsets.keys()}.")
169 # Is each task part of exactly one step?
170 for set1, set2 in itertools.product(required_subsets, required_subsets):
171 if set1 == set2:
172 continue
173 tasks1 = pipeline.subsets[set1]
174 tasks2 = pipeline.subsets[set2]
175 self.assertTrue(tasks1.isdisjoint(tasks2),
176 msg=f"Subsets '{set1}' and '{set2}' share tasks "
177 f"{tasks1.intersection(tasks2)}.")
178 subsetted = set().union(*[pipeline.subsets[s] for s in required_subsets])
179 self.assertEqual(subsetted, set(pipeline.task_labels) - no_subset_wanted,
180 msg=f"These tasks are not in any of the subsets {required_subsets}.")
182 def test_inherited_subsets(self):
183 """Test that instrument-specific pipelines have all the subsets of their
184 generic counterparts.
186 Note that this does not check inheritance *within* `_ingredients`!
187 """
188 files = [
189 f for f in ResourcePath.findFileResources([self.path], file_filter=r".*\.yaml$")
190 if "_ingredients" not in f.path
191 ]
192 for file in files:
193 if "QuickTemplate" in file.path:
194 # Our QuickTemplate definition cannot be tested here because it
195 # depends on drp_tasks, which we cannot make a dependency here.
196 continue
197 with self.subTest(file=str(file)):
198 generic = self.path.join("_ingredients/", forceDirectory=True).join(file.basename())
199 if not generic.exists():
200 continue
201 special_subsets = lsst.pipe.base.Pipeline.from_uri(file).subsets.keys()
202 generic_subsets = lsst.pipe.base.Pipeline.from_uri(generic).subsets.keys()
203 self.assertGreaterEqual(special_subsets, generic_subsets,
204 msg="The instrument-specific pipeline is missing subsets "
205 f"{generic_subsets - special_subsets}.")
208class MemoryTester(lsst.utils.tests.MemoryTestCase):
209 pass
212def setup_module(module):
213 lsst.utils.tests.init()
216if __name__ == "__main__": 216 ↛ 217line 216 didn't jump to line 217 because the condition on line 216 was never true
217 lsst.utils.tests.init()
218 unittest.main()