Coverage for tests / test_dot_tools.py: 24%

72 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-24 08:19 +0000

1# This file is part of pipe_base. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <https://www.gnu.org/licenses/>. 

27 

28"""Simple unit test for Pipeline visualization.""" 

29 

30import io 

31import re 

32import unittest 

33 

34import lsst.pipe.base.connectionTypes as cT 

35import lsst.utils.tests 

36from lsst.pipe.base import Pipeline, PipelineTask, PipelineTaskConfig, PipelineTaskConnections 

37from lsst.pipe.base.dot_tools import pipeline2dot 

38 

39 

40class ExamplePipelineTaskConnections(PipelineTaskConnections, dimensions=()): 

41 """Connections class used for testing. 

42 

43 Parameters 

44 ---------- 

45 config : `PipelineTaskConfig` 

46 The config to use for this connections class. 

47 """ 

48 

49 input1 = cT.Input( 

50 name="", dimensions=["visit", "detector"], storageClass="example", doc="Input for this task" 

51 ) 

52 input2 = cT.Input( 

53 name="", dimensions=["visit", "detector"], storageClass="example", doc="Input for this task" 

54 ) 

55 output1 = cT.Output( 

56 name="", dimensions=["visit", "detector"], storageClass="example", doc="Output for this task" 

57 ) 

58 output2 = cT.Output( 

59 name="", dimensions=["visit", "detector"], storageClass="example", doc="Output for this task" 

60 ) 

61 

62 def __init__(self, *, config=None): 

63 super().__init__(config=config) 

64 if not config.connections.input2: 

65 self.inputs.remove("input2") 

66 if not config.connections.output2: 

67 self.outputs.remove("output2") 

68 

69 

70class ExamplePipelineTaskConfig(PipelineTaskConfig, pipelineConnections=ExamplePipelineTaskConnections): 

71 """Example config used for testing.""" 

72 

73 

74def _makeConfig(inputName, outputName, pipeline, label): 

75 """Add config overrides. 

76 

77 Factory method for config instances. 

78 

79 inputName and outputName can be either string or tuple of strings 

80 with two items max. 

81 """ 

82 if isinstance(inputName, tuple): 

83 pipeline.addConfigOverride(label, "connections.input1", inputName[0]) 

84 pipeline.addConfigOverride(label, "connections.input2", inputName[1] if len(inputName) > 1 else "") 

85 else: 

86 pipeline.addConfigOverride(label, "connections.input1", inputName) 

87 

88 if isinstance(outputName, tuple): 

89 pipeline.addConfigOverride(label, "connections.output1", outputName[0]) 

90 pipeline.addConfigOverride(label, "connections.output2", outputName[1] if len(outputName) > 1 else "") 

91 else: 

92 pipeline.addConfigOverride(label, "connections.output1", outputName) 

93 

94 

95class ExamplePipelineTask(PipelineTask): 

96 """Example pipeline task used for testing.""" 

97 

98 ConfigClass = ExamplePipelineTaskConfig 

99 

100 

101def _makePipeline(tasks): 

102 """Generate Pipeline instance. 

103 

104 Parameters 

105 ---------- 

106 tasks : list of tuples 

107 Each tuple in the list has 3 or 4 items: 

108 - input DatasetType name(s), string or tuple of strings 

109 - output DatasetType name(s), string or tuple of strings 

110 - task label, string 

111 - optional task class object, can be None 

112 

113 Returns 

114 ------- 

115 Pipeline instance 

116 """ 

117 pipe = Pipeline("test pipeline") 

118 for task in tasks: 

119 inputs = task[0] 

120 outputs = task[1] 

121 label = task[2] 

122 klass = task[3] if len(task) > 3 else ExamplePipelineTask 

123 pipe.addTask(klass, label) 

124 _makeConfig(inputs, outputs, pipe, label) 

125 return list(pipe.to_graph()._iter_task_defs()) 

126 

127 

128class DotToolsTestCase(unittest.TestCase): 

129 """A test case for dotTools.""" 

130 

131 def test_pipeline2dot(self): 

132 """Tests for dot_tools.pipeline2dot method.""" 

133 pipeline = _makePipeline( 

134 [ 

135 ("A", ("B", "C"), "task0"), 

136 ("C", "E", "task1"), 

137 ("B", "D", "task2"), 

138 (("D", "E"), "F", "task3"), 

139 ("D.C", "G", "task4"), 

140 ("task3_metadata", "H", "task5"), 

141 ] 

142 ) 

143 file = io.StringIO() 

144 pipeline2dot(pipeline, file) 

145 

146 # It's hard to validate complete output, just checking few basic 

147 # things, even that is not terribly stable. 

148 lines = file.getvalue().strip().split("\n") 

149 nglobals = 3 

150 ndatasets = 9 # component variant doesn't count 

151 ntasks = 6 

152 nedges = 15 

153 nextra = 2 # graph header and closing 

154 self.assertEqual(len(lines), nglobals + ndatasets + ntasks + nedges + nextra) 

155 

156 # make sure that all node names are quoted 

157 nodeRe = re.compile(r"^([^ ]+) \[.+\];$") 

158 edgeRe = re.compile(r"^([^ ]+) *-> *([^ ]+);$") 

159 for line in lines: 

160 match = nodeRe.match(line) 

161 if match: 

162 node = match.group(1) 

163 if node not in ["graph", "node", "edge"]: 

164 self.assertEqual(node[0] + node[-1], '""') 

165 continue 

166 match = edgeRe.match(line) 

167 if match: 

168 for group in (1, 2): 

169 node = match.group(group) 

170 self.assertEqual(node[0] + node[-1], '""') 

171 continue 

172 

173 # make sure components are connected appropriately 

174 self.assertIn('"D:0" -> "task4:2"', file.getvalue()) 

175 

176 # make sure there is a connection created for metadata if someone 

177 # tries to read it in 

178 self.assertIn('"task3:2" -> "task3_metadata:0"', file.getvalue()) 

179 

180 

181class MyMemoryTestCase(lsst.utils.tests.MemoryTestCase): 

182 """Generic file handle leak check.""" 

183 

184 

185def setup_module(module): 

186 """Set up the module for pytest. 

187 

188 Parameters 

189 ---------- 

190 module : `~types.ModuleType` 

191 Module to set up. 

192 """ 

193 lsst.utils.tests.init() 

194 

195 

196if __name__ == "__main__": 

197 lsst.utils.tests.init() 

198 unittest.main()