Coverage for tests / test_pipelines.py: 18%

85 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-26 09:39 +0000

1# This file is part of ap_pipe. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <http://www.gnu.org/licenses/>. 

21 

22import itertools 

23import tempfile 

24import unittest 

25 

26import lsst.daf.butler.tests as butlerTests 

27import lsst.pipe.base 

28from lsst.pipe.base.tests.pipelineStepTester import PipelineStepTester # Can't use fully-qualified name 

29import lsst.utils 

30import lsst.utils.tests 

31 

32from lsst.resources import ResourcePath 

33 

34 

35class PipelineDefintionsTestSuite(lsst.utils.tests.TestCase): 

36 """Tests of the self-consistency of our pipeline definitions. 

37 """ 

38 def setUp(self): 

39 self.path = ResourcePath("eups://ap_pipe/pipelines/", forceDirectory=True) 

40 # Each pipeline file should have a subset that represents it in 

41 # higher-level pipelines. 

42 self.synonyms = {"ApPipe.yaml": "apPipe", 

43 "ApPipeWithIsrTaskLSST.yaml": "apPipe", 

44 "ApPipeWithFakes.yaml": "apPipe", 

45 "SingleFrame.yaml": "singleFrame", 

46 "SingleFrameWithIsrTaskLSST.yaml": "singleFrame", 

47 "RunIsrWithoutInterChipCrosstalk.yaml": "runIsr", 

48 "RunIsrForCrosstalkSources.yaml": "runOverscan", 

49 } 

50 

51 def test_graph_build(self): 

52 """Test that each pipeline definition file can be 

53 used to build a graph. 

54 """ 

55 files = ResourcePath.findFileResources([self.path], file_filter=r".*\.yaml$") 

56 for file in files: 

57 if "QuickTemplate" in file.path: 

58 # Our QuickTemplate definition cannot be tested here because it 

59 # depends on drp_tasks, which we cannot make a dependency here. 

60 continue 

61 if "PromptTemplate" in file.path: 

62 # Our PromptTemplate definition cannot be tested here because it 

63 # depends on drp_tasks, which we cannot make a dependency here. 

64 continue 

65 with self.subTest(file=str(file)): 

66 pipeline = lsst.pipe.base.Pipeline.from_uri(file) 

67 pipeline.addConfigOverride("parameters", "apdb_config", "some/file/path.yaml") 

68 # If this fails, it will produce a useful error message. 

69 pipeline.to_graph() 

70 

71 def test_datasets(self): 

72 files = [ 

73 f for f in ResourcePath.findFileResources( 

74 [self.path.join("_ingredients", forceDirectory=True)], file_filter=r".*\.yaml$" 

75 ) 

76 # Validation currently broken for injection pipelines. 

77 # TODO: DM-54077 

78 if "injection/" not in f.path 

79 ] 

80 for file in files: 

81 if "QuickTemplate" in file.path: 

82 # Our QuickTemplate definition cannot be tested here because it 

83 # depends on drp_tasks, which we cannot make a dependency here. 

84 continue 

85 with self.subTest(file=str(file)): 

86 expected_inputs = { 

87 # ISR 

88 "raw", "camera", "crosstalk", "crosstalkSources", "bias", "dark", "flat", "ptc", 

89 "fringe", "straylightData", "bfKernel", "newBFKernel", "defects", "linearizer", 

90 "opticsTransmission", "filterTransmission", "atmosphereTransmission", 

91 "illumMaskedImage", "deferredChargeCalib", 

92 # ISR-LSST 

93 "bfk", "cti", "dnlLUT", "gain_correction", 

94 # Everything else 

95 "skyMap", "gaia_dr3_20230707", "gaia_dr2_20200414", "ps1_pv3_3pi_20170110", 

96 "template_coadd", "pretrainedModelPackage", "dia_source_apdb" 

97 } 

98 if "WithFakes" in file.path: 

99 expected_inputs.add("injection_catalog") 

100 tester = PipelineStepTester( 

101 filename=file, 

102 step_suffixes=[""], # Test full pipeline 

103 initial_dataset_types=[("ps1_pv3_3pi_20170110", {"htm7"}, "SimpleCatalog", False), 

104 ("gaia_dr2_20200414", {"htm7"}, "SimpleCatalog", False), 

105 ("gaia_dr3_20230707", {"htm7"}, "SimpleCatalog", False), 

106 ], 

107 expected_inputs=expected_inputs, 

108 # Pipeline outputs highly in flux, don't test 

109 expected_outputs=set(), 

110 pipeline_patches={"parameters:apdb_config": "some/file/path.yaml", 

111 }, 

112 ) 

113 # Tester modifies Butler registry, so need a fresh repo every time 

114 with tempfile.TemporaryDirectory() as tempRepo: 

115 butler = butlerTests.makeTestRepo(tempRepo) 

116 tester.run(butler, self) 

117 

118 def test_whole_subset(self): 

119 """Test that each pipeline's synonymous subset includes all tasks, 

120 including those imported from other files. 

121 """ 

122 files = [ 

123 f for f in ResourcePath.findFileResources([self.path], file_filter=r".*\.yaml$") 

124 # Validation currently broken for injection pipelines. 

125 # TODO: DM-54077 

126 if "injection/" not in f.path 

127 ] 

128 for file in files: 

129 if "QuickTemplate" in file.path: 

130 # Our QuickTemplate definition cannot be tested here because it 

131 # depends on drp_tasks, which we cannot make a dependency here. 

132 continue 

133 elif "ApdbDeduplication" in file.path: 

134 # The task to export catalogs from the APDB and re-run 

135 # association is not intended to be part of Prompt Processing 

136 # or batch AP pipeline runs. 

137 continue 

138 elif "PromptTemplate" in file.path: 

139 # Our PromptTemplate definition cannot be tested here because it 

140 # depends on drp_tasks, which we cannot make a dependency here. 

141 continue 

142 with self.subTest(file=str(file)): 

143 pipeline = lsst.pipe.base.Pipeline.from_uri(file) 

144 subset = self.synonyms.get(file.basename(), "<unknown_synonym>") 

145 self.assertEqual(pipeline.subsets.get(subset, "<missing>"), set(pipeline.task_labels), 

146 msg=f"These tasks are missing from subset '{subset}'") 

147 

148 def test_ap_pipe_subsets(self): 

149 """Test the unique subsets of ApPipe. 

150 """ 

151 files = [ 

152 f for f in ResourcePath.findFileResources([self.path], file_filter=r"^ApPipe.*\.yaml$") 

153 # Validation currently broken for injection pipelines. 

154 # TODO: DM-54077 

155 if "injection/" not in f.path 

156 ] 

157 required_subsets = {"preload", "prompt", "afterburner"} 

158 # getRegionTimeFromVisit is part of no subset besides apPipe. This is a 

159 # very deliberate exception; see RFC-997. 

160 no_subset_wanted = {"getRegionTimeFromVisit"} 

161 

162 for file in files: 

163 with self.subTest(file=str(file)): 

164 pipeline = lsst.pipe.base.Pipeline.from_uri(file) 

165 # Do all steps exist? 

166 self.assertGreaterEqual(pipeline.subsets.keys(), required_subsets, 

167 msg="An AP pipeline is missing subsets " 

168 f"{required_subsets - pipeline.subsets.keys()}.") 

169 # Is each task part of exactly one step? 

170 for set1, set2 in itertools.product(required_subsets, required_subsets): 

171 if set1 == set2: 

172 continue 

173 tasks1 = pipeline.subsets[set1] 

174 tasks2 = pipeline.subsets[set2] 

175 self.assertTrue(tasks1.isdisjoint(tasks2), 

176 msg=f"Subsets '{set1}' and '{set2}' share tasks " 

177 f"{tasks1.intersection(tasks2)}.") 

178 subsetted = set().union(*[pipeline.subsets[s] for s in required_subsets]) 

179 self.assertEqual(subsetted, set(pipeline.task_labels) - no_subset_wanted, 

180 msg=f"These tasks are not in any of the subsets {required_subsets}.") 

181 

182 def test_inherited_subsets(self): 

183 """Test that instrument-specific pipelines have all the subsets of their 

184 generic counterparts. 

185 

186 Note that this does not check inheritance *within* `_ingredients`! 

187 """ 

188 files = [ 

189 f for f in ResourcePath.findFileResources([self.path], file_filter=r".*\.yaml$") 

190 if "_ingredients" not in f.path 

191 ] 

192 for file in files: 

193 if "QuickTemplate" in file.path: 

194 # Our QuickTemplate definition cannot be tested here because it 

195 # depends on drp_tasks, which we cannot make a dependency here. 

196 continue 

197 with self.subTest(file=str(file)): 

198 generic = self.path.join("_ingredients/", forceDirectory=True).join(file.basename()) 

199 if not generic.exists(): 

200 continue 

201 special_subsets = lsst.pipe.base.Pipeline.from_uri(file).subsets.keys() 

202 generic_subsets = lsst.pipe.base.Pipeline.from_uri(generic).subsets.keys() 

203 self.assertGreaterEqual(special_subsets, generic_subsets, 

204 msg="The instrument-specific pipeline is missing subsets " 

205 f"{generic_subsets - special_subsets}.") 

206 

207 

208class MemoryTester(lsst.utils.tests.MemoryTestCase): 

209 pass 

210 

211 

212def setup_module(module): 

213 lsst.utils.tests.init() 

214 

215 

216if __name__ == "__main__": 216 ↛ 217line 216 didn't jump to line 217 because the condition on line 216 was never true

217 lsst.utils.tests.init() 

218 unittest.main()