Coverage for tests / test_pipeline.py: 19%

116 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-06 08:32 +0000

1# This file is part of pipe_base. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28"""Simple unit test for Pipeline.""" 

29 

30import os 

31import pickle 

32import textwrap 

33import unittest 

34 

35import lsst.utils.tests 

36from lsst.pipe.base import LabelSpecifier, Pipeline, TaskDef 

37from lsst.pipe.base.pipelineIR import LabeledSubset 

38from lsst.pipe.base.tests.simpleQGraph import AddTask, makeSimplePipeline 

39 

40# Find where the test pipelines exist and store it in an environment variable. 

41os.environ["TESTDIR"] = os.path.dirname(__file__) 

42 

43 

44class PipelineTestCase(unittest.TestCase): 

45 """A test case for TaskDef and Pipeline.""" 

46 

47 def testTaskDef(self): 

48 """Tests for TaskDef structure.""" 

49 task1 = TaskDef(taskClass=AddTask, config=AddTask.ConfigClass()) 

50 self.assertIn("Add", task1.taskName) 

51 self.assertIsInstance(task1.config, AddTask.ConfigClass) 

52 self.assertIsNotNone(task1.taskClass) 

53 self.assertEqual(task1.label, "add_task") 

54 task1a = pickle.loads(pickle.dumps(task1)) 

55 self.assertEqual(task1, task1a) 

56 

57 def testEmpty(self): 

58 """Creating empty pipeline.""" 

59 pipeline = Pipeline("test") 

60 self.assertEqual(len(pipeline), 0) 

61 

62 def testInitial(self): 

63 """Testing constructor with initial data.""" 

64 pipeline = makeSimplePipeline(2) 

65 self.assertEqual(len(pipeline), 2) 

66 pipeline_graph = pipeline.to_graph() 

67 pipeline_graph.sort() 

68 task_nodes = list(pipeline_graph.tasks.values()) 

69 self.assertEqual(task_nodes[0].task_class, AddTask) 

70 self.assertEqual(task_nodes[1].task_class, AddTask) 

71 self.assertEqual(task_nodes[0].label, "task0") 

72 self.assertEqual(task_nodes[1].label, "task1") 

73 self.assertEqual(pipeline.task_labels, {"task0", "task1"}) 

74 

75 def testModifySubset(self): 

76 pipeline = makeSimplePipeline(2) 

77 

78 # Test adding labels. 

79 with self.assertRaises(ValueError): 

80 pipeline.addLabelToSubset("test", "new_label") 

81 pipeline._pipelineIR.labeled_subsets["test"] = LabeledSubset("test", set(), None) 

82 with self.assertRaises(ValueError): 

83 pipeline.addLabelToSubset("test", "missing_label") 

84 pipeline.addLabelToSubset("test", "task0") 

85 self.assertEqual(pipeline._pipelineIR.labeled_subsets["test"].subset, {"task0"}) 

86 

87 # Test removing labels. 

88 with self.assertRaises(ValueError): 

89 pipeline.addLabelToSubset("missing_subset", "task0") 

90 with self.assertRaises(ValueError): 

91 pipeline.addLabelToSubset("test", "missing_label") 

92 pipeline.removeLabelFromSubset("test", "task0") 

93 self.assertEqual(pipeline._pipelineIR.labeled_subsets["test"].subset, set()) 

94 

95 # Test creating new labeled subsets. 

96 with self.assertRaises(ValueError): 

97 # missing task label 

98 pipeline.addLabeledSubset("newSubset", "test description", {"missing_task_label"}) 

99 with self.assertRaises(ValueError): 

100 # duplicate labeled subset 

101 pipeline.addLabeledSubset("test", "test description", {"missing_task_label"}) 

102 

103 taskLabels = {"task0", "task1"} 

104 pipeline.addLabeledSubset("newSubset", "test description", taskLabels) 

105 

106 # verify using the subset property interface 

107 self.assertEqual(pipeline.subsets["newSubset"], taskLabels) 

108 

109 # Test removing labeled subsets 

110 with self.assertRaises(ValueError): 

111 pipeline.removeLabeledSubset("missing_subset") 

112 

113 pipeline.removeLabeledSubset("newSubset") 

114 self.assertNotIn("newSubset", pipeline.subsets.keys()) 

115 

116 pipeline.addLabeledSubset("testSubset", "Test subset description", taskLabels) 

117 taskSubset = {"task0"} 

118 pipelineDrop = pipeline.subsetFromLabels(LabelSpecifier(taskSubset), pipeline.PipelineSubsetCtrl.DROP) 

119 pipelineEdit = pipeline.subsetFromLabels(LabelSpecifier(taskSubset), pipeline.PipelineSubsetCtrl.EDIT) 

120 

121 # Test subsetting from labels 

122 self.assertNotIn(taskLabels - taskSubset, set(pipelineDrop.task_labels)) 

123 self.assertNotIn("testSubset", pipelineDrop.subsets.keys()) 

124 self.assertNotIn(taskLabels - taskSubset, set(pipelineEdit.task_labels)) 

125 self.assertIn("testSubset", pipelineEdit.subsets.keys()) 

126 self.assertEqual(pipelineEdit.subsets["testSubset"], taskSubset) 

127 

128 def testMergingPipelines(self): 

129 pipeline1 = makeSimplePipeline(2) 

130 pipeline2 = makeSimplePipeline(4) 

131 pipeline2.removeTask("task0") 

132 pipeline2.removeTask("task1") 

133 

134 pipeline1.mergePipeline(pipeline2) 

135 self.assertEqual(pipeline1._pipelineIR.tasks.keys(), {"task0", "task1", "task2", "task3"}) 

136 

137 def testFindingSubset(self): 

138 pipeline = makeSimplePipeline(2) 

139 pipeline._pipelineIR.labeled_subsets["test1"] = LabeledSubset("test1", set(), None) 

140 pipeline._pipelineIR.labeled_subsets["test2"] = LabeledSubset("test2", set(), None) 

141 pipeline._pipelineIR.labeled_subsets["test3"] = LabeledSubset("test3", set(), None) 

142 

143 pipeline.addLabelToSubset("test1", "task0") 

144 pipeline.addLabelToSubset("test3", "task0") 

145 

146 with self.assertRaises(ValueError): 

147 pipeline.findSubsetsWithLabel("missing_label") 

148 

149 self.assertEqual(pipeline.findSubsetsWithLabel("task0"), {"test1", "test3"}) 

150 

151 def testParameters(self): 

152 """Test that parameters can be set and used to format.""" 

153 pipeline_str = textwrap.dedent( 

154 """ 

155 description: Test Pipeline 

156 parameters: 

157 testValue: 5 

158 tasks: 

159 add: 

160 class: test_pipeline.AddTask 

161 config: 

162 addend: parameters.testValue 

163 """ 

164 ) 

165 # verify that parameters are used in expanding a pipeline 

166 pipeline = Pipeline.fromString(pipeline_str) 

167 pipeline_graph = pipeline.to_graph() 

168 self.assertEqual(pipeline_graph.tasks["add"].config.addend, 5) 

169 

170 # verify that a parameter can be overridden on the "command line" 

171 pipeline.addConfigOverride("parameters", "testValue", 14) 

172 pipeline_graph = pipeline.to_graph() 

173 self.assertEqual(pipeline_graph.tasks["add"].config.addend, 14) 

174 

175 # verify that parameters does not support files or python overrides 

176 with self.assertRaises(ValueError): 

177 pipeline.addConfigFile("parameters", "fakeFile") 

178 with self.assertRaises(ValueError): 

179 pipeline.addConfigPython("parameters", "fakePythonString") 

180 

181 def testSerialization(self): 

182 pipeline = makeSimplePipeline(2) 

183 dump = str(pipeline) 

184 load = Pipeline.fromString(dump) 

185 self.assertEqual(pipeline, load) 

186 

187 def testStepDefinition(self): 

188 """Test that step definitions propagate from Pipeline YAML definitions 

189 to PipelineGraph. 

190 """ 

191 pipeline_str = textwrap.dedent( 

192 """ 

193 description: Test Pipeline 

194 tasks: 

195 add: test_pipeline.AddTask 

196 subsets: 

197 step1: ["add"] 

198 steps: 

199 - label: "step1" 

200 dimensions: [] 

201 """ 

202 ) 

203 # verify that parameters are used in expanding a pipeline 

204 pipeline = Pipeline.fromString(pipeline_str) 

205 pipeline_graph = pipeline.to_graph() 

206 self.assertEqual(list(pipeline_graph.steps), ["step1"]) 

207 

208 def test_excluded_steps(self) -> None: 

209 """Test that a step that is modified on import to contain no tasks is 

210 initially preserved, but dropped later in further subsetting with 

211 mode=DROP (e.g. with URI parameters). 

212 """ 

213 # This pipeline has two steps, but the second empty due to an import 

214 # exclusion of the only task in it. 

215 pg1 = Pipeline.from_uri("$TESTDIR/pipelines/step-exclusion.yaml").to_graph() 

216 self.assertEqual(list(pg1.steps), ["step1", "step2"]) 

217 self.assertEqual(pg1.task_subsets["step1"], {"a", "b"}) 

218 self.assertEqual(pg1.task_subsets["step2"], set()) 

219 # If we load just one task from the pipeline (which subsets in DROP 

220 # mode), we want the already-empty step to be dropped as well, since 

221 # otherwise we'll get a complaint that there are steps and task 'a' is 

222 # not in any of them. 

223 pg2 = Pipeline.from_uri("$TESTDIR/pipelines/step-exclusion.yaml#a").to_graph() 

224 self.assertEqual(list(pg2.steps), []) 

225 

226 

227class MyMemoryTestCase(lsst.utils.tests.MemoryTestCase): 

228 """Run file leak tests.""" 

229 

230 

231def setup_module(module): 

232 """Configure pytest.""" 

233 lsst.utils.tests.init() 

234 

235 

236if __name__ == "__main__": 

237 lsst.utils.tests.init() 

238 unittest.main()