Coverage for tests / test_pipelines.py: 20%
89 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-21 10:58 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-21 10:58 +0000
1# This file is part of ap_pipe.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
22import itertools
23import tempfile
24import unittest
26# need to import pyproj to prevent file handle leakage
27# TODO: Remove import after completing DM-54643. Use DM-54656
28try:
29 import pyproj # noqa: F401
30except ImportError:
31 pass
33import lsst.daf.butler.tests as butlerTests
34import lsst.pipe.base
35from lsst.pipe.base.tests.pipelineStepTester import PipelineStepTester # Can't use fully-qualified name
36import lsst.utils
37import lsst.utils.tests
39from lsst.resources import ResourcePath
42class PipelineDefintionsTestSuite(lsst.utils.tests.TestCase):
43 """Tests of the self-consistency of our pipeline definitions.
44 """
45 def setUp(self):
46 self.path = ResourcePath("eups://ap_pipe/pipelines/", forceDirectory=True)
47 # Each pipeline file should have a subset that represents it in
48 # higher-level pipelines.
49 self.synonyms = {"ApPipe.yaml": "apPipe",
50 "ApPipeWithIsrTaskLSST.yaml": "apPipe",
51 "ApPipeWithFakes.yaml": "apPipe",
52 "SingleFrame.yaml": "singleFrame",
53 "SingleFrameWithIsrTaskLSST.yaml": "singleFrame",
54 "RunIsrWithoutInterChipCrosstalk.yaml": "runIsr",
55 "RunIsrForCrosstalkSources.yaml": "runOverscan",
56 }
58 def test_graph_build(self):
59 """Test that each pipeline definition file can be
60 used to build a graph.
61 """
62 files = ResourcePath.findFileResources([self.path], file_filter=r".*\.yaml$")
63 for file in files:
64 if "QuickTemplate" in file.path:
65 # Our QuickTemplate definition cannot be tested here because it
66 # depends on drp_tasks, which we cannot make a dependency here.
67 continue
68 if "PromptTemplate" in file.path:
69 # Our PromptTemplate definition cannot be tested here because it
70 # depends on drp_tasks, which we cannot make a dependency here.
71 continue
72 with self.subTest(file=str(file)):
73 pipeline = lsst.pipe.base.Pipeline.from_uri(file)
74 pipeline.addConfigOverride("parameters", "apdb_config", "some/file/path.yaml")
75 # If this fails, it will produce a useful error message.
76 pipeline.to_graph()
78 def test_datasets(self):
79 files = [
80 f for f in ResourcePath.findFileResources(
81 [self.path.join("_ingredients", forceDirectory=True)], file_filter=r".*\.yaml$"
82 )
83 # Validation currently broken for injection pipelines.
84 # TODO: DM-54077
85 if "injection/" not in f.path
86 ]
87 for file in files:
88 if "QuickTemplate" in file.path:
89 # Our QuickTemplate definition cannot be tested here because it
90 # depends on drp_tasks, which we cannot make a dependency here.
91 continue
92 with self.subTest(file=str(file)):
93 expected_inputs = {
94 # ISR
95 "raw", "camera", "crosstalk", "crosstalkSources", "bias", "dark", "flat", "ptc",
96 "fringe", "straylightData", "bfKernel", "newBFKernel", "defects", "linearizer",
97 "opticsTransmission", "filterTransmission", "atmosphereTransmission",
98 "illumMaskedImage", "deferredChargeCalib",
99 # ISR-LSST
100 "bfk", "cti", "dnlLUT", "gain_correction",
101 # Everything else
102 "skyMap", "gaia_dr3_20230707", "gaia_dr2_20200414", "ps1_pv3_3pi_20170110",
103 "template_coadd", "pretrainedModelPackage", "dia_source_apdb"
104 }
105 if "WithFakes" in file.path:
106 expected_inputs.add("injection_catalog")
107 tester = PipelineStepTester(
108 filename=file,
109 step_suffixes=[""], # Test full pipeline
110 initial_dataset_types=[("ps1_pv3_3pi_20170110", {"htm7"}, "SimpleCatalog", False),
111 ("gaia_dr2_20200414", {"htm7"}, "SimpleCatalog", False),
112 ("gaia_dr3_20230707", {"htm7"}, "SimpleCatalog", False),
113 ],
114 expected_inputs=expected_inputs,
115 # Pipeline outputs highly in flux, don't test
116 expected_outputs=set(),
117 pipeline_patches={"parameters:apdb_config": "some/file/path.yaml",
118 },
119 )
120 # Tester modifies Butler registry, so need a fresh repo every time
121 with tempfile.TemporaryDirectory() as tempRepo:
122 butler = butlerTests.makeTestRepo(tempRepo)
123 tester.run(butler, self)
125 def test_whole_subset(self):
126 """Test that each pipeline's synonymous subset includes all tasks,
127 including those imported from other files.
128 """
129 files = [
130 f for f in ResourcePath.findFileResources([self.path], file_filter=r".*\.yaml$")
131 # Validation currently broken for injection pipelines.
132 # TODO: DM-54077
133 if "injection/" not in f.path
134 ]
135 for file in files:
136 if "QuickTemplate" in file.path:
137 # Our QuickTemplate definition cannot be tested here because it
138 # depends on drp_tasks, which we cannot make a dependency here.
139 continue
140 elif "ApdbDeduplication" in file.path:
141 # The task to export catalogs from the APDB and re-run
142 # association is not intended to be part of Prompt Processing
143 # or batch AP pipeline runs.
144 continue
145 elif "PromptTemplate" in file.path:
146 # Our PromptTemplate definition cannot be tested here because it
147 # depends on drp_tasks, which we cannot make a dependency here.
148 continue
149 with self.subTest(file=str(file)):
150 pipeline = lsst.pipe.base.Pipeline.from_uri(file)
151 subset = self.synonyms.get(file.basename(), "<unknown_synonym>")
152 self.assertEqual(pipeline.subsets.get(subset, "<missing>"), set(pipeline.task_labels),
153 msg=f"These tasks are missing from subset '{subset}'")
155 def test_ap_pipe_subsets(self):
156 """Test the unique subsets of ApPipe.
157 """
158 files = [
159 f for f in ResourcePath.findFileResources([self.path], file_filter=r"^ApPipe.*\.yaml$")
160 # Validation currently broken for injection pipelines.
161 # TODO: DM-54077
162 if "injection/" not in f.path
163 ]
164 required_subsets = {"preload", "prompt", "afterburner"}
165 # getRegionTimeFromVisit is part of no subset besides apPipe. This is a
166 # very deliberate exception; see RFC-997.
167 no_subset_wanted = {"getRegionTimeFromVisit"}
169 for file in files:
170 with self.subTest(file=str(file)):
171 pipeline = lsst.pipe.base.Pipeline.from_uri(file)
172 # Do all steps exist?
173 self.assertGreaterEqual(pipeline.subsets.keys(), required_subsets,
174 msg="An AP pipeline is missing subsets "
175 f"{required_subsets - pipeline.subsets.keys()}.")
176 # Is each task part of exactly one step?
177 for set1, set2 in itertools.product(required_subsets, required_subsets):
178 if set1 == set2:
179 continue
180 tasks1 = pipeline.subsets[set1]
181 tasks2 = pipeline.subsets[set2]
182 self.assertTrue(tasks1.isdisjoint(tasks2),
183 msg=f"Subsets '{set1}' and '{set2}' share tasks "
184 f"{tasks1.intersection(tasks2)}.")
185 subsetted = set().union(*[pipeline.subsets[s] for s in required_subsets])
186 self.assertEqual(subsetted, set(pipeline.task_labels) - no_subset_wanted,
187 msg=f"These tasks are not in any of the subsets {required_subsets}.")
189 def test_inherited_subsets(self):
190 """Test that instrument-specific pipelines have all the subsets of their
191 generic counterparts.
193 Note that this does not check inheritance *within* `_ingredients`!
194 """
195 files = [
196 f for f in ResourcePath.findFileResources([self.path], file_filter=r".*\.yaml$")
197 if "_ingredients" not in f.path
198 ]
199 for file in files:
200 if "QuickTemplate" in file.path:
201 # Our QuickTemplate definition cannot be tested here because it
202 # depends on drp_tasks, which we cannot make a dependency here.
203 continue
204 with self.subTest(file=str(file)):
205 generic = self.path.join("_ingredients/", forceDirectory=True).join(file.basename())
206 if not generic.exists():
207 continue
208 special_subsets = lsst.pipe.base.Pipeline.from_uri(file).subsets.keys()
209 generic_subsets = lsst.pipe.base.Pipeline.from_uri(generic).subsets.keys()
210 self.assertGreaterEqual(special_subsets, generic_subsets,
211 msg="The instrument-specific pipeline is missing subsets "
212 f"{generic_subsets - special_subsets}.")
215class MemoryTester(lsst.utils.tests.MemoryTestCase):
216 pass
219def setup_module(module):
220 lsst.utils.tests.init()
223if __name__ == "__main__": 223 ↛ 224line 223 didn't jump to line 224 because the condition on line 223 was never true
224 lsst.utils.tests.init()
225 unittest.main()