Coverage for tests / test_pipelines.py: 21%

81 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-05 18:56 +0000

1# This file is part of ap_pipe. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <http://www.gnu.org/licenses/>. 

21 

22import glob 

23import itertools 

24import os.path 

25import tempfile 

26import unittest 

27 

28# need to import pyproj to prevent file handle leakage 

29import pyproj # noqa: F401 

30 

31import lsst.daf.butler.tests as butlerTests 

32import lsst.pipe.base 

33from lsst.pipe.base.tests.pipelineStepTester import PipelineStepTester # Can't use fully-qualified name 

34import lsst.utils 

35import lsst.utils.tests 

36 

37 

38class PipelineDefintionsTestSuite(lsst.utils.tests.TestCase): 

39 """Tests of the self-consistency of our pipeline definitions. 

40 """ 

41 def setUp(self): 

42 self.path = os.path.join(lsst.utils.getPackageDir("ap_pipe"), "pipelines") 

43 # Each pipeline file should have a subset that represents it in 

44 # higher-level pipelines. 

45 self.synonyms = {"ApPipe.yaml": "apPipe", 

46 "ApPipeWithIsrTaskLSST.yaml": "apPipe", 

47 "ApPipeWithFakes.yaml": "apPipe", 

48 "SingleFrame.yaml": "singleFrame", 

49 "SingleFrameWithIsrTaskLSST.yaml": "singleFrame", 

50 "RunIsrWithoutInterChipCrosstalk.yaml": "runIsr", 

51 "RunIsrForCrosstalkSources.yaml": "runOverscan", 

52 } 

53 

54 def test_graph_build(self): 

55 """Test that each pipeline definition file can be 

56 used to build a graph. 

57 """ 

58 files = glob.glob(os.path.join(self.path, "**", "*.yaml")) 

59 for file in files: 

60 if "QuickTemplate" in file: 

61 # Our QuickTemplate definition cannot be tested here because it 

62 # depends on drp_tasks, which we cannot make a dependency here. 

63 continue 

64 with self.subTest(file): 

65 pipeline = lsst.pipe.base.Pipeline.from_uri(file) 

66 pipeline.addConfigOverride("parameters", "apdb_config", "some/file/path.yaml") 

67 # If this fails, it will produce a useful error message. 

68 pipeline.to_graph() 

69 

70 def test_datasets(self): 

71 files = glob.glob(os.path.join(self.path, "_ingredients", "*.yaml")) 

72 for file in files: 

73 if "QuickTemplate" in file: 

74 # Our QuickTemplate definition cannot be tested here because it 

75 # depends on drp_tasks, which we cannot make a dependency here. 

76 continue 

77 with self.subTest(file): 

78 expected_inputs = { 

79 # ISR 

80 "raw", "camera", "crosstalk", "crosstalkSources", "bias", "dark", "flat", "ptc", 

81 "fringe", "straylightData", "bfKernel", "newBFKernel", "defects", "linearizer", 

82 "opticsTransmission", "filterTransmission", "atmosphereTransmission", 

83 "illumMaskedImage", "deferredChargeCalib", 

84 # ISR-LSST 

85 "bfk", "cti", "dnlLUT", "gain_correction", 

86 # Everything else 

87 "skyMap", "gaia_dr3_20230707", "gaia_dr2_20200414", "ps1_pv3_3pi_20170110", 

88 "template_coadd", "pretrainedModelPackage", "dia_source_apdb" 

89 } 

90 if "WithFakes" in file: 

91 expected_inputs.add("injection_catalog") 

92 tester = PipelineStepTester( 

93 filename=file, 

94 step_suffixes=[""], # Test full pipeline 

95 initial_dataset_types=[("ps1_pv3_3pi_20170110", {"htm7"}, "SimpleCatalog", False), 

96 ("gaia_dr2_20200414", {"htm7"}, "SimpleCatalog", False), 

97 ("gaia_dr3_20230707", {"htm7"}, "SimpleCatalog", False), 

98 ], 

99 expected_inputs=expected_inputs, 

100 # Pipeline outputs highly in flux, don't test 

101 expected_outputs=set(), 

102 pipeline_patches={"parameters:apdb_config": "some/file/path.yaml", 

103 }, 

104 ) 

105 # Tester modifies Butler registry, so need a fresh repo every time 

106 with tempfile.TemporaryDirectory() as tempRepo: 

107 butler = butlerTests.makeTestRepo(tempRepo) 

108 tester.run(butler, self) 

109 

110 def test_whole_subset(self): 

111 """Test that each pipeline's synonymous subset includes all tasks, 

112 including those imported from other files. 

113 """ 

114 files = glob.glob(os.path.join(self.path, "**", "*.yaml")) 

115 for file in files: 

116 if "QuickTemplate" in file: 

117 # Our QuickTemplate definition cannot be tested here because it 

118 # depends on drp_tasks, which we cannot make a dependency here. 

119 continue 

120 elif "ApdbDeduplication" in file: 

121 # The task to export catalogs from the APDB and re-run 

122 # association is not intended to be part of 

123 continue 

124 with self.subTest(file): 

125 pipeline = lsst.pipe.base.Pipeline.from_uri(file) 

126 subset = self.synonyms[os.path.basename(file)] 

127 self.assertEqual(pipeline.subsets[subset], set(pipeline.task_labels), 

128 msg=f"These tasks are missing from subset '{subset}'.") 

129 

130 def test_ap_pipe_subsets(self): 

131 """Test the unique subsets of ApPipe. 

132 """ 

133 files = glob.glob(os.path.join(self.path, "**", "ApPipe*.yaml")) 

134 required_subsets = {"preload", "prompt", "afterburner"} 

135 # getRegionTimeFromVisit is part of no subset besides apPipe. This is a 

136 # very deliberate exception; see RFC-997. 

137 no_subset_wanted = {"getRegionTimeFromVisit"} 

138 

139 for file in files: 

140 with self.subTest(file): 

141 pipeline = lsst.pipe.base.Pipeline.from_uri(file) 

142 # Do all steps exist? 

143 self.assertGreaterEqual(pipeline.subsets.keys(), required_subsets, 

144 msg="An AP pipeline is missing subsets " 

145 f"{required_subsets - pipeline.subsets.keys()}.") 

146 # Is each task part of exactly one step? 

147 for set1, set2 in itertools.product(required_subsets, required_subsets): 

148 if set1 == set2: 

149 continue 

150 tasks1 = pipeline.subsets[set1] 

151 tasks2 = pipeline.subsets[set2] 

152 self.assertTrue(tasks1.isdisjoint(tasks2), 

153 msg=f"Subsets '{set1}' and '{set2}' share tasks " 

154 f"{tasks1.intersection(tasks2)}.") 

155 subsetted = set().union(*[pipeline.subsets[s] for s in required_subsets]) 

156 self.assertEqual(subsetted, set(pipeline.task_labels) - no_subset_wanted, 

157 msg=f"These tasks are not in any of the subsets {required_subsets}.") 

158 

159 def test_inherited_subsets(self): 

160 """Test that instrument-specific pipelines have all the subsets of their 

161 generic counterparts. 

162 

163 Note that this does not check inheritance *within* `_ingredients`! 

164 """ 

165 files = [f for f in glob.glob(os.path.join(self.path, "**", "*.yaml")) 

166 if "_ingredients" not in f] 

167 for file in files: 

168 with self.subTest(file): 

169 generic = os.path.join(self.path, "_ingredients", os.path.basename(file)) 

170 if not os.path.exists(generic): 

171 continue 

172 special_subsets = lsst.pipe.base.Pipeline.from_uri(file).subsets.keys() 

173 generic_subsets = lsst.pipe.base.Pipeline.from_uri(generic).subsets.keys() 

174 self.assertGreaterEqual(special_subsets, generic_subsets, 

175 msg="The instrument-specific pipeline is missing subsets " 

176 f"{generic_subsets - special_subsets}.") 

177 

178 

179class MemoryTester(lsst.utils.tests.MemoryTestCase): 

180 pass 

181 

182 

183def setup_module(module): 

184 lsst.utils.tests.init() 

185 

186 

187if __name__ == "__main__": 187 ↛ 188line 187 didn't jump to line 188 because the condition on line 187 was never true

188 lsst.utils.tests.init() 

189 unittest.main()