Coverage for tests / test_pipelines.py: 16%

100 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-07 08:56 +0000

1# This file is part of ap_pipe. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <http://www.gnu.org/licenses/>. 

21 

22import itertools 

23import tempfile 

24import unittest 

25 

26import lsst.daf.butler.tests as butlerTests 

27import lsst.pipe.base 

28from lsst.pipe.base.tests.pipelineStepTester import PipelineStepTester # Can't use fully-qualified name 

29import lsst.utils 

30import lsst.utils.tests 

31 

32from lsst.resources import ResourcePath 

33 

34 

35class PipelineDefintionsTestSuite(lsst.utils.tests.TestCase): 

36 """Tests of the self-consistency of our pipeline definitions. 

37 """ 

38 def setUp(self): 

39 self.path = ResourcePath("eups://ap_pipe/pipelines/", forceDirectory=True) 

40 # Each pipeline file should have a subset that represents it in 

41 # higher-level pipelines. 

42 self.synonyms = {"ApPipe.yaml": "apPipe", 

43 "ApPipeWithIsrTaskLSST.yaml": "apPipe", 

44 "ApPipeWithPreconvolution.yaml": "apPipe", 

45 "ApPipeWithFakes.yaml": "apPipe", 

46 "SingleFrame.yaml": "singleFrame", 

47 "SingleFrameWithIsrTaskLSST.yaml": "singleFrame", 

48 "RunIsrWithoutInterChipCrosstalk.yaml": "runIsr", 

49 "RunIsrForCrosstalkSources.yaml": "runOverscan", 

50 } 

51 

52 def test_graph_build(self): 

53 """Test that each pipeline definition file can be 

54 used to build a graph. 

55 """ 

56 files = ResourcePath.findFileResources([self.path], file_filter=r".*\.yaml$") 

57 for file in files: 

58 if "QuickTemplate" in file.path: 

59 # Our QuickTemplate definition cannot be tested here because it 

60 # depends on drp_tasks, which we cannot make a dependency here. 

61 continue 

62 if "PromptTemplate" in file.path: 

63 # Our PromptTemplate definition cannot be tested here because it 

64 # depends on drp_tasks, which we cannot make a dependency here. 

65 continue 

66 with self.subTest(file=str(file)): 

67 pipeline = lsst.pipe.base.Pipeline.from_uri(file) 

68 pipeline.addConfigOverride("parameters", "apdb_config", "some/file/path.yaml") 

69 # If this fails, it will produce a useful error message. 

70 pipeline.to_graph() 

71 

72 def test_datasets(self): 

73 files = [ 

74 f for f in ResourcePath.findFileResources( 

75 [self.path.join("_ingredients", forceDirectory=True)], file_filter=r".*\.yaml$" 

76 ) 

77 # Validation currently broken for injection pipelines. 

78 # TODO: DM-54077 

79 if "injection/" not in f.path 

80 ] 

81 for file in files: 

82 if "QuickTemplate" in file.path: 

83 # Our QuickTemplate definition cannot be tested here because it 

84 # depends on drp_tasks, which we cannot make a dependency here. 

85 continue 

86 with self.subTest(file=str(file)): 

87 expected_inputs = { 

88 # ISR 

89 "raw", "camera", "crosstalk", "crosstalkSources", "bias", "dark", "flat", "ptc", 

90 "fringe", "straylightData", "bfKernel", "newBFKernel", "defects", "linearizer", 

91 "opticsTransmission", "filterTransmission", "atmosphereTransmission", 

92 "illumMaskedImage", "deferredChargeCalib", 

93 # ISR-LSST 

94 "bfk", "cti", "dnlLUT", "gain_correction", 

95 # Everything else 

96 "skyMap", "gaia_dr3_20230707", "gaia_dr2_20200414", "ps1_pv3_3pi_20170110", 

97 "template_coadd", "pretrainedModelPackage", "dia_source_apdb" 

98 } 

99 if "WithFakes" in file.path: 

100 expected_inputs.add("injection_catalog") 

101 tester = PipelineStepTester( 

102 filename=file, 

103 step_suffixes=[""], # Test full pipeline 

104 initial_dataset_types=[("ps1_pv3_3pi_20170110", {"htm7"}, "SimpleCatalog", False), 

105 ("gaia_dr2_20200414", {"htm7"}, "SimpleCatalog", False), 

106 ("gaia_dr3_20230707", {"htm7"}, "SimpleCatalog", False), 

107 ], 

108 expected_inputs=expected_inputs, 

109 # Pipeline outputs highly in flux, don't test 

110 expected_outputs=set(), 

111 pipeline_patches={"parameters:apdb_config": "some/file/path.yaml", 

112 }, 

113 ) 

114 # Tester modifies Butler registry, so need a fresh repo every time 

115 with tempfile.TemporaryDirectory() as tempRepo: 

116 butler = butlerTests.makeTestRepo(tempRepo) 

117 tester.run(butler, self) 

118 

119 def test_whole_subset(self): 

120 """Test that each pipeline's synonymous subset includes all tasks, 

121 including those imported from other files. 

122 """ 

123 files = [ 

124 f for f in ResourcePath.findFileResources([self.path], file_filter=r".*\.yaml$") 

125 # Validation currently broken for injection pipelines. 

126 # TODO: DM-54077 

127 if "injection/" not in f.path 

128 ] 

129 for file in files: 

130 if "QuickTemplate" in file.path: 

131 # Our QuickTemplate definition cannot be tested here because it 

132 # depends on drp_tasks, which we cannot make a dependency here. 

133 continue 

134 elif "ApdbDeduplication" in file.path: 

135 # The task to export catalogs from the APDB and re-run 

136 # association is not intended to be part of Prompt Processing 

137 # or batch AP pipeline runs. 

138 continue 

139 elif "PromptTemplate" in file.path: 

140 # Our PromptTemplate definition cannot be tested here because it 

141 # depends on drp_tasks, which we cannot make a dependency here. 

142 continue 

143 with self.subTest(file=str(file)): 

144 pipeline = lsst.pipe.base.Pipeline.from_uri(file) 

145 subset = self.synonyms.get(file.basename(), "<unknown_synonym>") 

146 self.assertEqual(pipeline.subsets.get(subset, "<missing>"), set(pipeline.task_labels), 

147 msg=f"These tasks are missing from subset '{subset}'") 

148 

149 def test_ap_pipe_subsets(self): 

150 """Test the unique subsets of ApPipe. 

151 """ 

152 files = [ 

153 f for f in ResourcePath.findFileResources([self.path], file_filter=r"^ApPipe.*\.yaml$") 

154 # Validation currently broken for injection pipelines. 

155 # TODO: DM-54077 

156 if "injection/" not in f.path 

157 ] 

158 required_subsets = {"preload", "prompt", "afterburner"} 

159 # getRegionTimeFromVisit is part of no subset besides apPipe. This is a 

160 # very deliberate exception; see RFC-997. 

161 no_subset_wanted = {"getRegionTimeFromVisit"} 

162 

163 for file in files: 

164 with self.subTest(file=str(file)): 

165 pipeline = lsst.pipe.base.Pipeline.from_uri(file) 

166 # Do all steps exist? 

167 self.assertGreaterEqual(pipeline.subsets.keys(), required_subsets, 

168 msg="An AP pipeline is missing subsets " 

169 f"{required_subsets - pipeline.subsets.keys()}.") 

170 # Is each task part of exactly one step? 

171 for set1, set2 in itertools.product(required_subsets, required_subsets): 

172 if set1 == set2: 

173 continue 

174 tasks1 = pipeline.subsets[set1] 

175 tasks2 = pipeline.subsets[set2] 

176 self.assertTrue(tasks1.isdisjoint(tasks2), 

177 msg=f"Subsets '{set1}' and '{set2}' share tasks " 

178 f"{tasks1.intersection(tasks2)}.") 

179 subsetted = set().union(*[pipeline.subsets[s] for s in required_subsets]) 

180 self.assertEqual(subsetted, set(pipeline.task_labels) - no_subset_wanted, 

181 msg=f"These tasks are not in any of the subsets {required_subsets}.") 

182 

183 def test_preconvolution_isr_matches_ap_pipe(self): 

184 """Test that, for each instrument, ApPipeWithPreconvolution defines 

185 the same isr task (class and config) as the corresponding ApPipe. 

186 

187 Preconvolution changes only image subtraction and DIA-source 

188 detection; instrument signature removal must be unaffected. 

189 """ 

190 files = [ 

191 f for f in ResourcePath.findFileResources( 

192 [self.path], file_filter=r"^ApPipeWithPreconvolution\.yaml$" 

193 ) 

194 if "_ingredients" not in f.path 

195 ] 

196 # Sanity-check that this test actually has cameras to compare. 

197 self.assertGreater(len(files), 0, 

198 msg="No camera-specific ApPipeWithPreconvolution.yaml files found.") 

199 

200 for precon_file in files: 

201 with self.subTest(file=str(precon_file)): 

202 base_file = precon_file.dirname().join("ApPipe.yaml") 

203 self.assertTrue(base_file.exists(), 

204 msg=f"Expected sibling ApPipe.yaml next to {precon_file}: " 

205 f"{base_file} does not exist.") 

206 

207 precon = lsst.pipe.base.Pipeline.from_uri(precon_file) 

208 base = lsst.pipe.base.Pipeline.from_uri(base_file) 

209 # apdb_config has no default and must be set before to_graph(). 

210 precon.addConfigOverride("parameters", "apdb_config", "some/file/path.yaml") 

211 base.addConfigOverride("parameters", "apdb_config", "some/file/path.yaml") 

212 

213 precon_isr = precon.to_graph().tasks["isr"] 

214 base_isr = base.to_graph().tasks["isr"] 

215 

216 self.assertEqual(precon_isr.task_class_name, base_isr.task_class_name, 

217 msg=f"isr task class differs between ApPipe.yaml and " 

218 f"ApPipeWithPreconvolution.yaml in {precon_file.dirname()}.") 

219 # Can't just do `assertEqual(precon_isr, base_isr)` since 

220 # Task nodes are intentionally not equality comparable. 

221 self.assertTrue( 

222 base_isr.config.compare(precon_isr.config, shortcut=False), 

223 msg=f"isr task config differs between ApPipe.yaml and " 

224 f"ApPipeWithPreconvolution.yaml in {precon_file.dirname()}." 

225 ) 

226 

227 def test_inherited_subsets(self): 

228 """Test that instrument-specific pipelines have all the subsets of their 

229 generic counterparts. 

230 

231 Note that this does not check inheritance *within* `_ingredients`! 

232 """ 

233 files = [ 

234 f for f in ResourcePath.findFileResources([self.path], file_filter=r".*\.yaml$") 

235 if "_ingredients" not in f.path 

236 ] 

237 for file in files: 

238 if "QuickTemplate" in file.path: 

239 # Our QuickTemplate definition cannot be tested here because it 

240 # depends on drp_tasks, which we cannot make a dependency here. 

241 continue 

242 with self.subTest(file=str(file)): 

243 generic = self.path.join("_ingredients/", forceDirectory=True).join(file.basename()) 

244 if not generic.exists(): 

245 continue 

246 special_subsets = lsst.pipe.base.Pipeline.from_uri(file).subsets.keys() 

247 generic_subsets = lsst.pipe.base.Pipeline.from_uri(generic).subsets.keys() 

248 self.assertGreaterEqual(special_subsets, generic_subsets, 

249 msg="The instrument-specific pipeline is missing subsets " 

250 f"{generic_subsets - special_subsets}.") 

251 

252 

253class MemoryTester(lsst.utils.tests.MemoryTestCase): 

254 pass 

255 

256 

257def setup_module(module): 

258 lsst.utils.tests.init() 

259 

260 

261if __name__ == "__main__": 261 ↛ 262line 261 didn't jump to line 262 because the condition on line 261 was never true

262 lsst.utils.tests.init() 

263 unittest.main()