Coverage for python / lsst / pipe / base / pipeline_graph / visualization / _dot.py: 15%

131 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-28 08:44 +0000

1# This file is part of pipe_base. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27from __future__ import annotations 

28 

29__all__ = ("show_dot",) 

30 

31import html 

32import os 

33import sys 

34from collections.abc import Mapping 

35from typing import Any, TextIO 

36 

37from .._nodes import NodeType 

38from .._pipeline_graph import PipelineGraph 

39from ._formatting import NodeKey, format_dimensions, format_task_class 

40from ._options import NodeAttributeOptions 

41from ._show import parse_display_args 

42 

43_ATTRIBS = { 

44 NodeType.TASK: dict(style="filled", color="black", fillcolor="#B1F2EF"), 

45 NodeType.DATASET_TYPE: dict(style="rounded,filled,bold", color="#00BABC", fillcolor="#F5F5F5"), 

46 NodeType.TASK_INIT: dict(style="filled", color="black", fillcolor="#F4DEFA"), 

47} 

48_DEFAULT_GRAPH = dict(splines="ortho", nodesep="0.5", ranksep="0.75") 

49_DEFAULT_NODE = dict(shape="box", fontname="Monospace", fontsize="14", margin="0.2,0.1", penwidth="3") 

50_DEFAULT_EDGE = dict(color="black", arrowsize="1.5", penwidth="1.5", pad="10mm") 

51_LABEL_POINT_SIZE = "18" 

52_LABEL_MAX_LINES_SOFT = 10 

53_LABEL_MAX_LINES_HARD = 15 

54_OVERFLOW_MAX_LINES = 20 

55 

56 

57def show_dot( 

58 pipeline_graph: PipelineGraph, 

59 stream: TextIO = sys.stdout, 

60 label_edge_connections: bool = False, 

61 **kwargs: Any, 

62) -> None: 

63 """Write a DOT representation of the pipeline graph to a stream. 

64 

65 Parameters 

66 ---------- 

67 pipeline_graph : `PipelineGraph` 

68 Pipeline graph to show. 

69 stream : `io.TextIO`, optional 

70 Stream to write the DOT representation to. 

71 label_edge_connections : `bool`, optional 

72 If `True`, label edges with their connection names. 

73 **kwargs 

74 Additional keyword arguments to pass to `parse_display_args`. 

75 """ 

76 xgraph, options = parse_display_args(pipeline_graph, **kwargs) 

77 

78 print("digraph Pipeline {", file=stream) 

79 _render_default("graph", _DEFAULT_GRAPH, stream) 

80 _render_default("node", _DEFAULT_NODE, stream) 

81 _render_default("edge", _DEFAULT_EDGE, stream) 

82 

83 overflow_ref = 1 

84 overflow_ids = [] 

85 for node_key, node_data in xgraph.nodes.items(): 

86 match node_key.node_type: 

87 case NodeType.TASK | NodeType.TASK_INIT: 

88 _render_task_node(node_key, node_data, options, stream) 

89 case NodeType.DATASET_TYPE: 

90 overflow_ref, node_overflow_ids = _render_dataset_type_node( 

91 node_key, node_data, options, stream, overflow_ref 

92 ) 

93 if node_overflow_ids: 

94 overflow_ids += node_overflow_ids 

95 case _: 

96 raise AssertionError(f"Unexpected node type: {node_key.node_type}") 

97 

98 if overflow_ids: 

99 formatted_overflow_ids = [f'"{overflow_id}"' for overflow_id in overflow_ids] 

100 print(f"{{rank=sink; {'; '.join(formatted_overflow_ids)};}}", file=stream) 

101 

102 for from_node, to_node, edge_data in xgraph.edges(data=True): 

103 edge_kwargs = {} 

104 if edge_data.get("is_prerequisite", False): 

105 edge_kwargs["style"] = "dashed" 

106 if (connection_name := edge_data.get("connection_name", None)) is not None: 

107 if (component := edge_data.get("component", None)) is not None: 

108 if label_edge_connections: 

109 edge_kwargs["xlabel"] = f"{connection_name} (.{component})" 

110 else: 

111 edge_kwargs["xlabel"] = f".{component}" 

112 elif label_edge_connections: 

113 edge_kwargs["xlabel"] = connection_name 

114 _render_edge(from_node.node_id, to_node.node_id, stream, **edge_kwargs) 

115 

116 print("}", file=stream) 

117 

118 

119def _render_default(type: str, attribs: dict[str, str], stream: TextIO) -> None: 

120 """Set default attributes for a given type.""" 

121 default_attribs = ", ".join([f'{key}="{val}"' for key, val in attribs.items()]) 

122 print(f"{type} [{default_attribs}];", file=stream) 

123 

124 

125def _render_task_node( 

126 node_key: NodeKey, 

127 node_data: Mapping[str, Any], 

128 options: NodeAttributeOptions, 

129 stream: TextIO, 

130) -> None: 

131 """Render a Graphviz node for a task. 

132 

133 Parameters 

134 ---------- 

135 node_key : NodeKey 

136 The key for the node 

137 node_data : Mapping[str, Any] 

138 The data associated with the node 

139 options : NodeAttributeOptions 

140 Options for rendering the node 

141 stream : TextIO 

142 The stream to write the node to 

143 """ 

144 labels, *_ = _format_label(str(node_key)) 

145 

146 # Add the fully resolved task class name 

147 if options.task_classes and (node_key.node_type in (NodeType.TASK, NodeType.TASK_INIT)): 

148 labels.append(html.escape(format_task_class(options, node_data["task_class_name"]))) 

149 

150 # Append dimensions to the node 

151 if options.dimensions and node_key.node_type != NodeType.TASK_INIT: 

152 labels.append( 

153 f"<I>dimensions:</I>&nbsp;{html.escape(format_dimensions(options, node_data['dimensions']))}" 

154 ) 

155 

156 _render_node(node_key.node_id, node_key.node_type, labels, stream) 

157 

158 

159def _render_dataset_type_node( 

160 node_key: NodeKey, 

161 node_data: Mapping[str, Any], 

162 options: NodeAttributeOptions, 

163 stream: TextIO, 

164 overflow_ref: int = 1, 

165) -> tuple[int, list[str]]: 

166 """Render a Graphviz node for a dataset type. 

167 

168 Parameters 

169 ---------- 

170 node_key : `NodeKey` 

171 The key for the node. 

172 node_data : `~collections.abc.Mapping` [`str`, `typing.Any`] 

173 The data associated with the node. 

174 options : `NodeAttributeOptions` 

175 Options for rendering the node. 

176 stream : `io.TextIO` 

177 The stream to write the node to. 

178 overflow_ref : `int`, optional 

179 

180 Returns 

181 ------- 

182 overflow_ref : int 

183 The reference number for the next overflow node. 

184 overflow_ids : str | None 

185 The ID of the overflow node, if any. 

186 """ 

187 labels, label_extras, common_prefix = _format_label(str(node_key), _LABEL_MAX_LINES_SOFT) 

188 if len(labels) + len(label_extras) <= _LABEL_MAX_LINES_HARD: 

189 labels += label_extras 

190 label_extras = [] 

191 if common_prefix: 

192 labels.insert(0, common_prefix) 

193 

194 # Add a reference to a free-floating overflow node 

195 label_extras_grouped = {} 

196 if label_extras: 

197 overflow_to_text = f"and {len(label_extras)} more, continued in [{overflow_ref}]" 

198 labels.append(f'<B><FONT POINT-SIZE="{_LABEL_POINT_SIZE}">{overflow_to_text}</FONT></B>') 

199 for i in range(0, len(label_extras), _OVERFLOW_MAX_LINES): 

200 overflow_id = f"{node_key.node_id}_{overflow_ref}_{i}" 

201 overflow_label_extras = label_extras[i : i + _OVERFLOW_MAX_LINES] 

202 if common_prefix: 

203 overflow_label_extras.insert(0, common_prefix) 

204 overflow_label_extras.insert( 

205 0, f'<B><FONT POINT-SIZE="{_LABEL_POINT_SIZE}">[{overflow_ref}]</FONT></B>' 

206 ) 

207 label_extras_grouped[overflow_id] = overflow_label_extras 

208 overflow_ref += 1 

209 

210 # Append dimensions to the node 

211 if options.dimensions: 

212 labels.append( 

213 "<I>dimensions:</I>&nbsp;" + html.escape(format_dimensions(options, node_data["dimensions"])) 

214 ) 

215 

216 # Append storage class to the node 

217 if options.storage_classes: 

218 labels.append("<I>storage class:</I>&nbsp;" + html.escape(node_data["storage_class_name"])) 

219 

220 _render_node(node_key.node_id, node_key.node_type, labels, stream) 

221 

222 # Render the overflow nodes and invisible edges, if any 

223 if label_extras_grouped: 

224 for overflow_id, overflow_labels in label_extras_grouped.items(): 

225 _render_node(overflow_id, node_key.node_type, overflow_labels, stream) 

226 _render_edge(node_key.node_id, overflow_id, stream, **{"style": "invis"}) 

227 

228 overflow_ids = list(label_extras_grouped.keys()) 

229 return overflow_ref, overflow_ids 

230 

231 

232def _render_node( 

233 node_id: str, 

234 node_type: NodeType, 

235 labels: list[str], 

236 stream: TextIO, 

237) -> None: 

238 """Render a Graphviz node. 

239 

240 Parameters 

241 ---------- 

242 node_id : str 

243 The unique name of the node 

244 node_type : NodeType 

245 The type of the node 

246 labels : list[str] 

247 The label elements to display on the node 

248 stream : TextIO 

249 The stream to write the node to 

250 """ 

251 label = "".join([f'<TR><TD ALIGN="LEFT">{element}</TD></TR>' for element in labels]) 

252 attrib_dict = dict(_ATTRIBS[node_type], label=label) 

253 pre = '<<TABLE BORDER="0" CELLPADDING="5">' 

254 post = "</TABLE>>" 

255 attrib = ", ".join( 

256 [ 

257 f'{key}="{val}"' if key != "label" else f"{key}={pre}{val}{post}" 

258 for key, val in attrib_dict.items() 

259 ] 

260 ) 

261 print(f'"{node_id}" [{attrib}];', file=stream) 

262 

263 

264def _render_edge(from_node_id: str, to_node_id: str, stream: TextIO, **kwargs: Any) -> None: 

265 """Render GV edge 

266 

267 Parameters 

268 ---------- 

269 from_node_id : str 

270 The unique ID of the node the edge is coming from 

271 to_node_id : str 

272 The unique ID of the node the edge is going to 

273 stream : TextIO 

274 The stream to write the edge to 

275 **kwargs : Any 

276 Additional keyword arguments to pass to the edge 

277 """ 

278 if kwargs: 

279 attrib = ", ".join([f'{key}="{val}"' for key, val in kwargs.items()]) 

280 print(f'"{from_node_id}" -> "{to_node_id}" [{attrib}];', file=stream) 

281 else: 

282 print(f'"{from_node_id}" -> "{to_node_id}";', file=stream) 

283 

284 

285def _format_label( 

286 label: str, 

287 max_lines: int = 10, 

288 min_common_prefix_len: int = 1000, 

289) -> tuple[list[str], list[str], str]: 

290 """Add HTML-style formatting to label text. 

291 

292 Parameters 

293 ---------- 

294 label : str 

295 The label text to parse 

296 max_lines : int, optional 

297 The maximum number of lines to display 

298 min_common_prefix_len : int, optional 

299 The minimum length of a common prefix to consider 

300 

301 Returns 

302 ------- 

303 labels : list[str] 

304 Parsed and formatted label text elements 

305 label_extras : list[str] 

306 Parsed and formatted overflow text elements, if any 

307 common_prefix : str 

308 The common prefix of the label text, if any 

309 """ 

310 parsed_labels, parsed_label_extras, common_prefix = _parse_label(label, max_lines, min_common_prefix_len) 

311 if common_prefix: 

312 common_prefix = f'<B><FONT POINT-SIZE="{_LABEL_POINT_SIZE}">{common_prefix}:</FONT></B>' 

313 

314 labels = [] 

315 label_extras = [] 

316 indent = "&nbsp;&nbsp;" if common_prefix else "" 

317 for element in parsed_labels: 

318 labels.append(f'<B><FONT POINT-SIZE="{_LABEL_POINT_SIZE}">{indent}{element}</FONT></B>') 

319 for element in parsed_label_extras: 

320 label_extras.append(f'<B><FONT POINT-SIZE="{_LABEL_POINT_SIZE}">{indent}{element}</FONT></B>') 

321 

322 return labels, label_extras, common_prefix 

323 

324 

325def _parse_label( 

326 label: str, 

327 max_lines: int, 

328 min_common_prefix_len: int, 

329) -> tuple[list[str], list[str], str]: 

330 """Parse label text into label elements. 

331 

332 Parameters 

333 ---------- 

334 label : str 

335 The label text to parse 

336 max_lines : int, optional 

337 The maximum number of lines to return (-1 if a common prefix present) 

338 min_common_prefix_len : int, optional 

339 The minimum length of a common prefix to consider 

340 

341 Returns 

342 ------- 

343 labels : list[str] 

344 Parsed label text elements 

345 label_extras : list[str] 

346 Overflow text elements, if any 

347 common_prefix : str 

348 The common prefix of the label text, if any 

349 """ 

350 labels = label.split(", ") 

351 

352 if len(labels) > 3 and len(common_prefix := os.path.commonprefix(labels)) > min_common_prefix_len: 

353 final_underscore_index = common_prefix.rfind("_") 

354 if final_underscore_index > 0: 

355 # Only use common prefixes that end in an underscore. This prevents 

356 # prefixes that may equal an entire element. For example, the label 

357 # "srcMatchFull, srcMatch" would return "srcMatch" as a common 

358 # prefix, and the labels list would contain an empty label. 

359 common_prefix = common_prefix[: final_underscore_index + 1] 

360 labels = [element[len(common_prefix) :] for element in labels] 

361 else: 

362 common_prefix = "" 

363 else: 

364 common_prefix = "" 

365 

366 if (len(labels) + bool(common_prefix)) > max_lines: 

367 label_extras = labels[max_lines - bool(common_prefix) :] 

368 labels = labels[: max_lines - bool(common_prefix)] 

369 else: 

370 label_extras = [] 

371 

372 return labels, label_extras, common_prefix