Coverage for tests / test_pipelines.py: 21%
81 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-22 09:12 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-22 09:12 +0000
1# This file is part of ap_pipe.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
22import glob
23import itertools
24import os.path
25import tempfile
26import unittest
28# need to import pyproj to prevent file handle leakage
29import pyproj # noqa: F401
31import lsst.daf.butler.tests as butlerTests
32import lsst.pipe.base
33from lsst.pipe.base.tests.pipelineStepTester import PipelineStepTester # Can't use fully-qualified name
34import lsst.utils
35import lsst.utils.tests
38class PipelineDefintionsTestSuite(lsst.utils.tests.TestCase):
39 """Tests of the self-consistency of our pipeline definitions.
40 """
41 def setUp(self):
42 self.path = os.path.join(lsst.utils.getPackageDir("ap_pipe"), "pipelines")
43 # Each pipeline file should have a subset that represents it in
44 # higher-level pipelines.
45 self.synonyms = {"ApPipe.yaml": "apPipe",
46 "ApPipeWithIsrTaskLSST.yaml": "apPipe",
47 "ApPipeWithFakes.yaml": "apPipe",
48 "SingleFrame.yaml": "singleFrame",
49 "SingleFrameWithIsrTaskLSST.yaml": "singleFrame",
50 "RunIsrWithoutInterChipCrosstalk.yaml": "runIsr",
51 "RunIsrForCrosstalkSources.yaml": "runOverscan",
52 }
54 def test_graph_build(self):
55 """Test that each pipeline definition file can be
56 used to build a graph.
57 """
58 files = glob.glob(os.path.join(self.path, "**", "*.yaml"))
59 for file in files:
60 if "QuickTemplate" in file:
61 # Our QuickTemplate definition cannot be tested here because it
62 # depends on drp_tasks, which we cannot make a dependency here.
63 continue
64 with self.subTest(file):
65 pipeline = lsst.pipe.base.Pipeline.from_uri(file)
66 pipeline.addConfigOverride("parameters", "apdb_config", "some/file/path.yaml")
67 # If this fails, it will produce a useful error message.
68 pipeline.to_graph()
70 def test_datasets(self):
71 files = glob.glob(os.path.join(self.path, "_ingredients", "*.yaml"))
72 for file in files:
73 if "QuickTemplate" in file:
74 # Our QuickTemplate definition cannot be tested here because it
75 # depends on drp_tasks, which we cannot make a dependency here.
76 continue
77 with self.subTest(file):
78 expected_inputs = {
79 # ISR
80 "raw", "camera", "crosstalk", "crosstalkSources", "bias", "dark", "flat", "ptc",
81 "fringe", "straylightData", "bfKernel", "newBFKernel", "defects", "linearizer",
82 "opticsTransmission", "filterTransmission", "atmosphereTransmission",
83 "illumMaskedImage", "deferredChargeCalib",
84 # ISR-LSST
85 "bfk", "cti", "dnlLUT", "gain_correction",
86 # Everything else
87 "skyMap", "gaia_dr3_20230707", "gaia_dr2_20200414", "ps1_pv3_3pi_20170110",
88 "template_coadd", "pretrainedModelPackage", "dia_source_apdb"
89 }
90 if "WithFakes" in file:
91 expected_inputs.add("injection_catalog")
92 tester = PipelineStepTester(
93 filename=file,
94 step_suffixes=[""], # Test full pipeline
95 initial_dataset_types=[("ps1_pv3_3pi_20170110", {"htm7"}, "SimpleCatalog", False),
96 ("gaia_dr2_20200414", {"htm7"}, "SimpleCatalog", False),
97 ("gaia_dr3_20230707", {"htm7"}, "SimpleCatalog", False),
98 ],
99 expected_inputs=expected_inputs,
100 # Pipeline outputs highly in flux, don't test
101 expected_outputs=set(),
102 pipeline_patches={"parameters:apdb_config": "some/file/path.yaml",
103 },
104 )
105 # Tester modifies Butler registry, so need a fresh repo every time
106 with tempfile.TemporaryDirectory() as tempRepo:
107 butler = butlerTests.makeTestRepo(tempRepo)
108 tester.run(butler, self)
110 def test_whole_subset(self):
111 """Test that each pipeline's synonymous subset includes all tasks,
112 including those imported from other files.
113 """
114 files = glob.glob(os.path.join(self.path, "**", "*.yaml"))
115 for file in files:
116 if "QuickTemplate" in file:
117 # Our QuickTemplate definition cannot be tested here because it
118 # depends on drp_tasks, which we cannot make a dependency here.
119 continue
120 elif "ApdbDeduplication" in file:
121 # The task to export catalogs from the APDB and re-run
122 # association is not intended to be part of
123 continue
124 with self.subTest(file):
125 pipeline = lsst.pipe.base.Pipeline.from_uri(file)
126 subset = self.synonyms[os.path.basename(file)]
127 self.assertEqual(pipeline.subsets[subset], set(pipeline.task_labels),
128 msg=f"These tasks are missing from subset '{subset}'.")
130 def test_ap_pipe_subsets(self):
131 """Test the unique subsets of ApPipe.
132 """
133 files = glob.glob(os.path.join(self.path, "**", "ApPipe*.yaml"))
134 required_subsets = {"preload", "prompt", "afterburner"}
135 # getRegionTimeFromVisit is part of no subset besides apPipe. This is a
136 # very deliberate exception; see RFC-997.
137 no_subset_wanted = {"getRegionTimeFromVisit"}
139 for file in files:
140 with self.subTest(file):
141 pipeline = lsst.pipe.base.Pipeline.from_uri(file)
142 # Do all steps exist?
143 self.assertGreaterEqual(pipeline.subsets.keys(), required_subsets,
144 msg="An AP pipeline is missing subsets "
145 f"{required_subsets - pipeline.subsets.keys()}.")
146 # Is each task part of exactly one step?
147 for set1, set2 in itertools.product(required_subsets, required_subsets):
148 if set1 == set2:
149 continue
150 tasks1 = pipeline.subsets[set1]
151 tasks2 = pipeline.subsets[set2]
152 self.assertTrue(tasks1.isdisjoint(tasks2),
153 msg=f"Subsets '{set1}' and '{set2}' share tasks "
154 f"{tasks1.intersection(tasks2)}.")
155 subsetted = set().union(*[pipeline.subsets[s] for s in required_subsets])
156 self.assertEqual(subsetted, set(pipeline.task_labels) - no_subset_wanted,
157 msg=f"These tasks are not in any of the subsets {required_subsets}.")
159 def test_inherited_subsets(self):
160 """Test that instrument-specific pipelines have all the subsets of their
161 generic counterparts.
163 Note that this does not check inheritance *within* `_ingredients`!
164 """
165 files = [f for f in glob.glob(os.path.join(self.path, "**", "*.yaml"))
166 if "_ingredients" not in f]
167 for file in files:
168 with self.subTest(file):
169 generic = os.path.join(self.path, "_ingredients", os.path.basename(file))
170 if not os.path.exists(generic):
171 continue
172 special_subsets = lsst.pipe.base.Pipeline.from_uri(file).subsets.keys()
173 generic_subsets = lsst.pipe.base.Pipeline.from_uri(generic).subsets.keys()
174 self.assertGreaterEqual(special_subsets, generic_subsets,
175 msg="The instrument-specific pipeline is missing subsets "
176 f"{generic_subsets - special_subsets}.")
179class MemoryTester(lsst.utils.tests.MemoryTestCase):
180 pass
183def setup_module(module):
184 lsst.utils.tests.init()
187if __name__ == "__main__": 187 ↛ 188line 187 didn't jump to line 188 because the condition on line 187 was never true
188 lsst.utils.tests.init()
189 unittest.main()