Coverage for tests / test_pipelines.py: 20%

89 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-21 10:58 +0000

1# This file is part of ap_pipe. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <http://www.gnu.org/licenses/>. 

21 

22import itertools 

23import tempfile 

24import unittest 

25 

26# need to import pyproj to prevent file handle leakage 

27# TODO: Remove import after completing DM-54643. Use DM-54656 

28try: 

29 import pyproj # noqa: F401 

30except ImportError: 

31 pass 

32 

33import lsst.daf.butler.tests as butlerTests 

34import lsst.pipe.base 

35from lsst.pipe.base.tests.pipelineStepTester import PipelineStepTester # Can't use fully-qualified name 

36import lsst.utils 

37import lsst.utils.tests 

38 

39from lsst.resources import ResourcePath 

40 

41 

42class PipelineDefintionsTestSuite(lsst.utils.tests.TestCase): 

43 """Tests of the self-consistency of our pipeline definitions. 

44 """ 

45 def setUp(self): 

46 self.path = ResourcePath("eups://ap_pipe/pipelines/", forceDirectory=True) 

47 # Each pipeline file should have a subset that represents it in 

48 # higher-level pipelines. 

49 self.synonyms = {"ApPipe.yaml": "apPipe", 

50 "ApPipeWithIsrTaskLSST.yaml": "apPipe", 

51 "ApPipeWithFakes.yaml": "apPipe", 

52 "SingleFrame.yaml": "singleFrame", 

53 "SingleFrameWithIsrTaskLSST.yaml": "singleFrame", 

54 "RunIsrWithoutInterChipCrosstalk.yaml": "runIsr", 

55 "RunIsrForCrosstalkSources.yaml": "runOverscan", 

56 } 

57 

58 def test_graph_build(self): 

59 """Test that each pipeline definition file can be 

60 used to build a graph. 

61 """ 

62 files = ResourcePath.findFileResources([self.path], file_filter=r".*\.yaml$") 

63 for file in files: 

64 if "QuickTemplate" in file.path: 

65 # Our QuickTemplate definition cannot be tested here because it 

66 # depends on drp_tasks, which we cannot make a dependency here. 

67 continue 

68 if "PromptTemplate" in file.path: 

69 # Our PromptTemplate definition cannot be tested here because it 

70 # depends on drp_tasks, which we cannot make a dependency here. 

71 continue 

72 with self.subTest(file=str(file)): 

73 pipeline = lsst.pipe.base.Pipeline.from_uri(file) 

74 pipeline.addConfigOverride("parameters", "apdb_config", "some/file/path.yaml") 

75 # If this fails, it will produce a useful error message. 

76 pipeline.to_graph() 

77 

78 def test_datasets(self): 

79 files = [ 

80 f for f in ResourcePath.findFileResources( 

81 [self.path.join("_ingredients", forceDirectory=True)], file_filter=r".*\.yaml$" 

82 ) 

83 # Validation currently broken for injection pipelines. 

84 # TODO: DM-54077 

85 if "injection/" not in f.path 

86 ] 

87 for file in files: 

88 if "QuickTemplate" in file.path: 

89 # Our QuickTemplate definition cannot be tested here because it 

90 # depends on drp_tasks, which we cannot make a dependency here. 

91 continue 

92 with self.subTest(file=str(file)): 

93 expected_inputs = { 

94 # ISR 

95 "raw", "camera", "crosstalk", "crosstalkSources", "bias", "dark", "flat", "ptc", 

96 "fringe", "straylightData", "bfKernel", "newBFKernel", "defects", "linearizer", 

97 "opticsTransmission", "filterTransmission", "atmosphereTransmission", 

98 "illumMaskedImage", "deferredChargeCalib", 

99 # ISR-LSST 

100 "bfk", "cti", "dnlLUT", "gain_correction", 

101 # Everything else 

102 "skyMap", "gaia_dr3_20230707", "gaia_dr2_20200414", "ps1_pv3_3pi_20170110", 

103 "template_coadd", "pretrainedModelPackage", "dia_source_apdb" 

104 } 

105 if "WithFakes" in file.path: 

106 expected_inputs.add("injection_catalog") 

107 tester = PipelineStepTester( 

108 filename=file, 

109 step_suffixes=[""], # Test full pipeline 

110 initial_dataset_types=[("ps1_pv3_3pi_20170110", {"htm7"}, "SimpleCatalog", False), 

111 ("gaia_dr2_20200414", {"htm7"}, "SimpleCatalog", False), 

112 ("gaia_dr3_20230707", {"htm7"}, "SimpleCatalog", False), 

113 ], 

114 expected_inputs=expected_inputs, 

115 # Pipeline outputs highly in flux, don't test 

116 expected_outputs=set(), 

117 pipeline_patches={"parameters:apdb_config": "some/file/path.yaml", 

118 }, 

119 ) 

120 # Tester modifies Butler registry, so need a fresh repo every time 

121 with tempfile.TemporaryDirectory() as tempRepo: 

122 butler = butlerTests.makeTestRepo(tempRepo) 

123 tester.run(butler, self) 

124 

125 def test_whole_subset(self): 

126 """Test that each pipeline's synonymous subset includes all tasks, 

127 including those imported from other files. 

128 """ 

129 files = [ 

130 f for f in ResourcePath.findFileResources([self.path], file_filter=r".*\.yaml$") 

131 # Validation currently broken for injection pipelines. 

132 # TODO: DM-54077 

133 if "injection/" not in f.path 

134 ] 

135 for file in files: 

136 if "QuickTemplate" in file.path: 

137 # Our QuickTemplate definition cannot be tested here because it 

138 # depends on drp_tasks, which we cannot make a dependency here. 

139 continue 

140 elif "ApdbDeduplication" in file.path: 

141 # The task to export catalogs from the APDB and re-run 

142 # association is not intended to be part of Prompt Processing 

143 # or batch AP pipeline runs. 

144 continue 

145 elif "PromptTemplate" in file.path: 

146 # Our PromptTemplate definition cannot be tested here because it 

147 # depends on drp_tasks, which we cannot make a dependency here. 

148 continue 

149 with self.subTest(file=str(file)): 

150 pipeline = lsst.pipe.base.Pipeline.from_uri(file) 

151 subset = self.synonyms.get(file.basename(), "<unknown_synonym>") 

152 self.assertEqual(pipeline.subsets.get(subset, "<missing>"), set(pipeline.task_labels), 

153 msg=f"These tasks are missing from subset '{subset}'") 

154 

155 def test_ap_pipe_subsets(self): 

156 """Test the unique subsets of ApPipe. 

157 """ 

158 files = [ 

159 f for f in ResourcePath.findFileResources([self.path], file_filter=r"^ApPipe.*\.yaml$") 

160 # Validation currently broken for injection pipelines. 

161 # TODO: DM-54077 

162 if "injection/" not in f.path 

163 ] 

164 required_subsets = {"preload", "prompt", "afterburner"} 

165 # getRegionTimeFromVisit is part of no subset besides apPipe. This is a 

166 # very deliberate exception; see RFC-997. 

167 no_subset_wanted = {"getRegionTimeFromVisit"} 

168 

169 for file in files: 

170 with self.subTest(file=str(file)): 

171 pipeline = lsst.pipe.base.Pipeline.from_uri(file) 

172 # Do all steps exist? 

173 self.assertGreaterEqual(pipeline.subsets.keys(), required_subsets, 

174 msg="An AP pipeline is missing subsets " 

175 f"{required_subsets - pipeline.subsets.keys()}.") 

176 # Is each task part of exactly one step? 

177 for set1, set2 in itertools.product(required_subsets, required_subsets): 

178 if set1 == set2: 

179 continue 

180 tasks1 = pipeline.subsets[set1] 

181 tasks2 = pipeline.subsets[set2] 

182 self.assertTrue(tasks1.isdisjoint(tasks2), 

183 msg=f"Subsets '{set1}' and '{set2}' share tasks " 

184 f"{tasks1.intersection(tasks2)}.") 

185 subsetted = set().union(*[pipeline.subsets[s] for s in required_subsets]) 

186 self.assertEqual(subsetted, set(pipeline.task_labels) - no_subset_wanted, 

187 msg=f"These tasks are not in any of the subsets {required_subsets}.") 

188 

189 def test_inherited_subsets(self): 

190 """Test that instrument-specific pipelines have all the subsets of their 

191 generic counterparts. 

192 

193 Note that this does not check inheritance *within* `_ingredients`! 

194 """ 

195 files = [ 

196 f for f in ResourcePath.findFileResources([self.path], file_filter=r".*\.yaml$") 

197 if "_ingredients" not in f.path 

198 ] 

199 for file in files: 

200 if "QuickTemplate" in file.path: 

201 # Our QuickTemplate definition cannot be tested here because it 

202 # depends on drp_tasks, which we cannot make a dependency here. 

203 continue 

204 with self.subTest(file=str(file)): 

205 generic = self.path.join("_ingredients/", forceDirectory=True).join(file.basename()) 

206 if not generic.exists(): 

207 continue 

208 special_subsets = lsst.pipe.base.Pipeline.from_uri(file).subsets.keys() 

209 generic_subsets = lsst.pipe.base.Pipeline.from_uri(generic).subsets.keys() 

210 self.assertGreaterEqual(special_subsets, generic_subsets, 

211 msg="The instrument-specific pipeline is missing subsets " 

212 f"{generic_subsets - special_subsets}.") 

213 

214 

215class MemoryTester(lsst.utils.tests.MemoryTestCase): 

216 pass 

217 

218 

219def setup_module(module): 

220 lsst.utils.tests.init() 

221 

222 

223if __name__ == "__main__": 223 ↛ 224line 223 didn't jump to line 224 because the condition on line 223 was never true

224 lsst.utils.tests.init() 

225 unittest.main()