Coverage for tests / test_dot_tools.py: 24%
72 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-28 08:44 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-28 08:44 +0000
1# This file is part of pipe_base.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <https://www.gnu.org/licenses/>.
28"""Simple unit test for Pipeline visualization."""
30import io
31import re
32import unittest
34import lsst.pipe.base.connectionTypes as cT
35import lsst.utils.tests
36from lsst.pipe.base import Pipeline, PipelineTask, PipelineTaskConfig, PipelineTaskConnections
37from lsst.pipe.base.dot_tools import pipeline2dot
40class ExamplePipelineTaskConnections(PipelineTaskConnections, dimensions=()):
41 """Connections class used for testing.
43 Parameters
44 ----------
45 config : `PipelineTaskConfig`
46 The config to use for this connections class.
47 """
49 input1 = cT.Input(
50 name="", dimensions=["visit", "detector"], storageClass="example", doc="Input for this task"
51 )
52 input2 = cT.Input(
53 name="", dimensions=["visit", "detector"], storageClass="example", doc="Input for this task"
54 )
55 output1 = cT.Output(
56 name="", dimensions=["visit", "detector"], storageClass="example", doc="Output for this task"
57 )
58 output2 = cT.Output(
59 name="", dimensions=["visit", "detector"], storageClass="example", doc="Output for this task"
60 )
62 def __init__(self, *, config=None):
63 super().__init__(config=config)
64 if not config.connections.input2:
65 self.inputs.remove("input2")
66 if not config.connections.output2:
67 self.outputs.remove("output2")
70class ExamplePipelineTaskConfig(PipelineTaskConfig, pipelineConnections=ExamplePipelineTaskConnections):
71 """Example config used for testing."""
74def _makeConfig(inputName, outputName, pipeline, label):
75 """Add config overrides.
77 Factory method for config instances.
79 inputName and outputName can be either string or tuple of strings
80 with two items max.
81 """
82 if isinstance(inputName, tuple):
83 pipeline.addConfigOverride(label, "connections.input1", inputName[0])
84 pipeline.addConfigOverride(label, "connections.input2", inputName[1] if len(inputName) > 1 else "")
85 else:
86 pipeline.addConfigOverride(label, "connections.input1", inputName)
88 if isinstance(outputName, tuple):
89 pipeline.addConfigOverride(label, "connections.output1", outputName[0])
90 pipeline.addConfigOverride(label, "connections.output2", outputName[1] if len(outputName) > 1 else "")
91 else:
92 pipeline.addConfigOverride(label, "connections.output1", outputName)
95class ExamplePipelineTask(PipelineTask):
96 """Example pipeline task used for testing."""
98 ConfigClass = ExamplePipelineTaskConfig
101def _makePipeline(tasks):
102 """Generate Pipeline instance.
104 Parameters
105 ----------
106 tasks : list of tuples
107 Each tuple in the list has 3 or 4 items:
108 - input DatasetType name(s), string or tuple of strings
109 - output DatasetType name(s), string or tuple of strings
110 - task label, string
111 - optional task class object, can be None
113 Returns
114 -------
115 Pipeline instance
116 """
117 pipe = Pipeline("test pipeline")
118 for task in tasks:
119 inputs = task[0]
120 outputs = task[1]
121 label = task[2]
122 klass = task[3] if len(task) > 3 else ExamplePipelineTask
123 pipe.addTask(klass, label)
124 _makeConfig(inputs, outputs, pipe, label)
125 return list(pipe.to_graph()._iter_task_defs())
128class DotToolsTestCase(unittest.TestCase):
129 """A test case for dotTools."""
131 def test_pipeline2dot(self):
132 """Tests for dot_tools.pipeline2dot method."""
133 pipeline = _makePipeline(
134 [
135 ("A", ("B", "C"), "task0"),
136 ("C", "E", "task1"),
137 ("B", "D", "task2"),
138 (("D", "E"), "F", "task3"),
139 ("D.C", "G", "task4"),
140 ("task3_metadata", "H", "task5"),
141 ]
142 )
143 file = io.StringIO()
144 pipeline2dot(pipeline, file)
146 # It's hard to validate complete output, just checking few basic
147 # things, even that is not terribly stable.
148 lines = file.getvalue().strip().split("\n")
149 nglobals = 3
150 ndatasets = 9 # component variant doesn't count
151 ntasks = 6
152 nedges = 15
153 nextra = 2 # graph header and closing
154 self.assertEqual(len(lines), nglobals + ndatasets + ntasks + nedges + nextra)
156 # make sure that all node names are quoted
157 nodeRe = re.compile(r"^([^ ]+) \[.+\];$")
158 edgeRe = re.compile(r"^([^ ]+) *-> *([^ ]+);$")
159 for line in lines:
160 match = nodeRe.match(line)
161 if match:
162 node = match.group(1)
163 if node not in ["graph", "node", "edge"]:
164 self.assertEqual(node[0] + node[-1], '""')
165 continue
166 match = edgeRe.match(line)
167 if match:
168 for group in (1, 2):
169 node = match.group(group)
170 self.assertEqual(node[0] + node[-1], '""')
171 continue
173 # make sure components are connected appropriately
174 self.assertIn('"D:0" -> "task4:2"', file.getvalue())
176 # make sure there is a connection created for metadata if someone
177 # tries to read it in
178 self.assertIn('"task3:2" -> "task3_metadata:0"', file.getvalue())
181class MyMemoryTestCase(lsst.utils.tests.MemoryTestCase):
182 """Generic file handle leak check."""
185def setup_module(module):
186 """Set up the module for pytest.
188 Parameters
189 ----------
190 module : `~types.ModuleType`
191 Module to set up.
192 """
193 lsst.utils.tests.init()
196if __name__ == "__main__":
197 lsst.utils.tests.init()
198 unittest.main()