Coverage for tests / test_pipeline.py: 19%
116 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 08:59 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 08:59 +0000
1# This file is part of pipe_base.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28"""Simple unit test for Pipeline."""
30import os
31import pickle
32import textwrap
33import unittest
35import lsst.utils.tests
36from lsst.pipe.base import LabelSpecifier, Pipeline, TaskDef
37from lsst.pipe.base.pipelineIR import LabeledSubset
38from lsst.pipe.base.tests.simpleQGraph import AddTask, makeSimplePipeline
40# Find where the test pipelines exist and store it in an environment variable.
41os.environ["TESTDIR"] = os.path.dirname(__file__)
44class PipelineTestCase(unittest.TestCase):
45 """A test case for TaskDef and Pipeline."""
47 def testTaskDef(self):
48 """Tests for TaskDef structure."""
49 task1 = TaskDef(taskClass=AddTask, config=AddTask.ConfigClass())
50 self.assertIn("Add", task1.taskName)
51 self.assertIsInstance(task1.config, AddTask.ConfigClass)
52 self.assertIsNotNone(task1.taskClass)
53 self.assertEqual(task1.label, "add_task")
54 task1a = pickle.loads(pickle.dumps(task1))
55 self.assertEqual(task1, task1a)
57 def testEmpty(self):
58 """Creating empty pipeline."""
59 pipeline = Pipeline("test")
60 self.assertEqual(len(pipeline), 0)
62 def testInitial(self):
63 """Testing constructor with initial data."""
64 pipeline = makeSimplePipeline(2)
65 self.assertEqual(len(pipeline), 2)
66 pipeline_graph = pipeline.to_graph()
67 pipeline_graph.sort()
68 task_nodes = list(pipeline_graph.tasks.values())
69 self.assertEqual(task_nodes[0].task_class, AddTask)
70 self.assertEqual(task_nodes[1].task_class, AddTask)
71 self.assertEqual(task_nodes[0].label, "task0")
72 self.assertEqual(task_nodes[1].label, "task1")
73 self.assertEqual(pipeline.task_labels, {"task0", "task1"})
75 def testModifySubset(self):
76 pipeline = makeSimplePipeline(2)
78 # Test adding labels.
79 with self.assertRaises(ValueError):
80 pipeline.addLabelToSubset("test", "new_label")
81 pipeline._pipelineIR.labeled_subsets["test"] = LabeledSubset("test", set(), None)
82 with self.assertRaises(ValueError):
83 pipeline.addLabelToSubset("test", "missing_label")
84 pipeline.addLabelToSubset("test", "task0")
85 self.assertEqual(pipeline._pipelineIR.labeled_subsets["test"].subset, {"task0"})
87 # Test removing labels.
88 with self.assertRaises(ValueError):
89 pipeline.addLabelToSubset("missing_subset", "task0")
90 with self.assertRaises(ValueError):
91 pipeline.addLabelToSubset("test", "missing_label")
92 pipeline.removeLabelFromSubset("test", "task0")
93 self.assertEqual(pipeline._pipelineIR.labeled_subsets["test"].subset, set())
95 # Test creating new labeled subsets.
96 with self.assertRaises(ValueError):
97 # missing task label
98 pipeline.addLabeledSubset("newSubset", "test description", {"missing_task_label"})
99 with self.assertRaises(ValueError):
100 # duplicate labeled subset
101 pipeline.addLabeledSubset("test", "test description", {"missing_task_label"})
103 taskLabels = {"task0", "task1"}
104 pipeline.addLabeledSubset("newSubset", "test description", taskLabels)
106 # verify using the subset property interface
107 self.assertEqual(pipeline.subsets["newSubset"], taskLabels)
109 # Test removing labeled subsets
110 with self.assertRaises(ValueError):
111 pipeline.removeLabeledSubset("missing_subset")
113 pipeline.removeLabeledSubset("newSubset")
114 self.assertNotIn("newSubset", pipeline.subsets.keys())
116 pipeline.addLabeledSubset("testSubset", "Test subset description", taskLabels)
117 taskSubset = {"task0"}
118 pipelineDrop = pipeline.subsetFromLabels(LabelSpecifier(taskSubset), pipeline.PipelineSubsetCtrl.DROP)
119 pipelineEdit = pipeline.subsetFromLabels(LabelSpecifier(taskSubset), pipeline.PipelineSubsetCtrl.EDIT)
121 # Test subsetting from labels
122 self.assertNotIn(taskLabels - taskSubset, set(pipelineDrop.task_labels))
123 self.assertNotIn("testSubset", pipelineDrop.subsets.keys())
124 self.assertNotIn(taskLabels - taskSubset, set(pipelineEdit.task_labels))
125 self.assertIn("testSubset", pipelineEdit.subsets.keys())
126 self.assertEqual(pipelineEdit.subsets["testSubset"], taskSubset)
128 def testMergingPipelines(self):
129 pipeline1 = makeSimplePipeline(2)
130 pipeline2 = makeSimplePipeline(4)
131 pipeline2.removeTask("task0")
132 pipeline2.removeTask("task1")
134 pipeline1.mergePipeline(pipeline2)
135 self.assertEqual(pipeline1._pipelineIR.tasks.keys(), {"task0", "task1", "task2", "task3"})
137 def testFindingSubset(self):
138 pipeline = makeSimplePipeline(2)
139 pipeline._pipelineIR.labeled_subsets["test1"] = LabeledSubset("test1", set(), None)
140 pipeline._pipelineIR.labeled_subsets["test2"] = LabeledSubset("test2", set(), None)
141 pipeline._pipelineIR.labeled_subsets["test3"] = LabeledSubset("test3", set(), None)
143 pipeline.addLabelToSubset("test1", "task0")
144 pipeline.addLabelToSubset("test3", "task0")
146 with self.assertRaises(ValueError):
147 pipeline.findSubsetsWithLabel("missing_label")
149 self.assertEqual(pipeline.findSubsetsWithLabel("task0"), {"test1", "test3"})
151 def testParameters(self):
152 """Test that parameters can be set and used to format."""
153 pipeline_str = textwrap.dedent(
154 """
155 description: Test Pipeline
156 parameters:
157 testValue: 5
158 tasks:
159 add:
160 class: test_pipeline.AddTask
161 config:
162 addend: parameters.testValue
163 """
164 )
165 # verify that parameters are used in expanding a pipeline
166 pipeline = Pipeline.fromString(pipeline_str)
167 pipeline_graph = pipeline.to_graph()
168 self.assertEqual(pipeline_graph.tasks["add"].config.addend, 5)
170 # verify that a parameter can be overridden on the "command line"
171 pipeline.addConfigOverride("parameters", "testValue", 14)
172 pipeline_graph = pipeline.to_graph()
173 self.assertEqual(pipeline_graph.tasks["add"].config.addend, 14)
175 # verify that parameters does not support files or python overrides
176 with self.assertRaises(ValueError):
177 pipeline.addConfigFile("parameters", "fakeFile")
178 with self.assertRaises(ValueError):
179 pipeline.addConfigPython("parameters", "fakePythonString")
181 def testSerialization(self):
182 pipeline = makeSimplePipeline(2)
183 dump = str(pipeline)
184 load = Pipeline.fromString(dump)
185 self.assertEqual(pipeline, load)
187 def testStepDefinition(self):
188 """Test that step definitions propagate from Pipeline YAML definitions
189 to PipelineGraph.
190 """
191 pipeline_str = textwrap.dedent(
192 """
193 description: Test Pipeline
194 tasks:
195 add: test_pipeline.AddTask
196 subsets:
197 step1: ["add"]
198 steps:
199 - label: "step1"
200 dimensions: []
201 """
202 )
203 # verify that parameters are used in expanding a pipeline
204 pipeline = Pipeline.fromString(pipeline_str)
205 pipeline_graph = pipeline.to_graph()
206 self.assertEqual(list(pipeline_graph.steps), ["step1"])
208 def test_excluded_steps(self) -> None:
209 """Test that a step that is modified on import to contain no tasks is
210 initially preserved, but dropped later in further subsetting with
211 mode=DROP (e.g. with URI parameters).
212 """
213 # This pipeline has two steps, but the second empty due to an import
214 # exclusion of the only task in it.
215 pg1 = Pipeline.from_uri("$TESTDIR/pipelines/step-exclusion.yaml").to_graph()
216 self.assertEqual(list(pg1.steps), ["step1", "step2"])
217 self.assertEqual(pg1.task_subsets["step1"], {"a", "b"})
218 self.assertEqual(pg1.task_subsets["step2"], set())
219 # If we load just one task from the pipeline (which subsets in DROP
220 # mode), we want the already-empty step to be dropped as well, since
221 # otherwise we'll get a complaint that there are steps and task 'a' is
222 # not in any of them.
223 pg2 = Pipeline.from_uri("$TESTDIR/pipelines/step-exclusion.yaml#a").to_graph()
224 self.assertEqual(list(pg2.steps), [])
227class MyMemoryTestCase(lsst.utils.tests.MemoryTestCase):
228 """Run file leak tests."""
231def setup_module(module):
232 """Configure pytest."""
233 lsst.utils.tests.init()
236if __name__ == "__main__":
237 lsst.utils.tests.init()
238 unittest.main()