Coverage for python / lsst / pipe / base / pipeline_graph / visualization / _dot.py: 15%
131 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-30 08:49 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-30 08:49 +0000
1# This file is part of pipe_base.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
27from __future__ import annotations
29__all__ = ("show_dot",)
31import html
32import os
33import sys
34from collections.abc import Mapping
35from typing import Any, TextIO
37from .._nodes import NodeType
38from .._pipeline_graph import PipelineGraph
39from ._formatting import NodeKey, format_dimensions, format_task_class
40from ._options import NodeAttributeOptions
41from ._show import parse_display_args
43_ATTRIBS = {
44 NodeType.TASK: dict(style="filled", color="black", fillcolor="#B1F2EF"),
45 NodeType.DATASET_TYPE: dict(style="rounded,filled,bold", color="#00BABC", fillcolor="#F5F5F5"),
46 NodeType.TASK_INIT: dict(style="filled", color="black", fillcolor="#F4DEFA"),
47}
48_DEFAULT_GRAPH = dict(splines="ortho", nodesep="0.5", ranksep="0.75")
49_DEFAULT_NODE = dict(shape="box", fontname="Monospace", fontsize="14", margin="0.2,0.1", penwidth="3")
50_DEFAULT_EDGE = dict(color="black", arrowsize="1.5", penwidth="1.5", pad="10mm")
51_LABEL_POINT_SIZE = "18"
52_LABEL_MAX_LINES_SOFT = 10
53_LABEL_MAX_LINES_HARD = 15
54_OVERFLOW_MAX_LINES = 20
57def show_dot(
58 pipeline_graph: PipelineGraph,
59 stream: TextIO = sys.stdout,
60 label_edge_connections: bool = False,
61 **kwargs: Any,
62) -> None:
63 """Write a DOT representation of the pipeline graph to a stream.
65 Parameters
66 ----------
67 pipeline_graph : `PipelineGraph`
68 Pipeline graph to show.
69 stream : `io.TextIO`, optional
70 Stream to write the DOT representation to.
71 label_edge_connections : `bool`, optional
72 If `True`, label edges with their connection names.
73 **kwargs
74 Additional keyword arguments to pass to `parse_display_args`.
75 """
76 xgraph, options = parse_display_args(pipeline_graph, **kwargs)
78 print("digraph Pipeline {", file=stream)
79 _render_default("graph", _DEFAULT_GRAPH, stream)
80 _render_default("node", _DEFAULT_NODE, stream)
81 _render_default("edge", _DEFAULT_EDGE, stream)
83 overflow_ref = 1
84 overflow_ids = []
85 for node_key, node_data in xgraph.nodes.items():
86 match node_key.node_type:
87 case NodeType.TASK | NodeType.TASK_INIT:
88 _render_task_node(node_key, node_data, options, stream)
89 case NodeType.DATASET_TYPE:
90 overflow_ref, node_overflow_ids = _render_dataset_type_node(
91 node_key, node_data, options, stream, overflow_ref
92 )
93 if node_overflow_ids:
94 overflow_ids += node_overflow_ids
95 case _:
96 raise AssertionError(f"Unexpected node type: {node_key.node_type}")
98 if overflow_ids:
99 formatted_overflow_ids = [f'"{overflow_id}"' for overflow_id in overflow_ids]
100 print(f"{{rank=sink; {'; '.join(formatted_overflow_ids)};}}", file=stream)
102 for from_node, to_node, edge_data in xgraph.edges(data=True):
103 edge_kwargs = {}
104 if edge_data.get("is_prerequisite", False):
105 edge_kwargs["style"] = "dashed"
106 if (connection_name := edge_data.get("connection_name", None)) is not None:
107 if (component := edge_data.get("component", None)) is not None:
108 if label_edge_connections:
109 edge_kwargs["xlabel"] = f"{connection_name} (.{component})"
110 else:
111 edge_kwargs["xlabel"] = f".{component}"
112 elif label_edge_connections:
113 edge_kwargs["xlabel"] = connection_name
114 _render_edge(from_node.node_id, to_node.node_id, stream, **edge_kwargs)
116 print("}", file=stream)
119def _render_default(type: str, attribs: dict[str, str], stream: TextIO) -> None:
120 """Set default attributes for a given type."""
121 default_attribs = ", ".join([f'{key}="{val}"' for key, val in attribs.items()])
122 print(f"{type} [{default_attribs}];", file=stream)
125def _render_task_node(
126 node_key: NodeKey,
127 node_data: Mapping[str, Any],
128 options: NodeAttributeOptions,
129 stream: TextIO,
130) -> None:
131 """Render a Graphviz node for a task.
133 Parameters
134 ----------
135 node_key : NodeKey
136 The key for the node
137 node_data : Mapping[str, Any]
138 The data associated with the node
139 options : NodeAttributeOptions
140 Options for rendering the node
141 stream : TextIO
142 The stream to write the node to
143 """
144 labels, *_ = _format_label(str(node_key))
146 # Add the fully resolved task class name
147 if options.task_classes and (node_key.node_type in (NodeType.TASK, NodeType.TASK_INIT)):
148 labels.append(html.escape(format_task_class(options, node_data["task_class_name"])))
150 # Append dimensions to the node
151 if options.dimensions and node_key.node_type != NodeType.TASK_INIT:
152 labels.append(
153 f"<I>dimensions:</I> {html.escape(format_dimensions(options, node_data['dimensions']))}"
154 )
156 _render_node(node_key.node_id, node_key.node_type, labels, stream)
159def _render_dataset_type_node(
160 node_key: NodeKey,
161 node_data: Mapping[str, Any],
162 options: NodeAttributeOptions,
163 stream: TextIO,
164 overflow_ref: int = 1,
165) -> tuple[int, list[str]]:
166 """Render a Graphviz node for a dataset type.
168 Parameters
169 ----------
170 node_key : `NodeKey`
171 The key for the node.
172 node_data : `~collections.abc.Mapping` [`str`, `typing.Any`]
173 The data associated with the node.
174 options : `NodeAttributeOptions`
175 Options for rendering the node.
176 stream : `io.TextIO`
177 The stream to write the node to.
178 overflow_ref : `int`, optional
180 Returns
181 -------
182 overflow_ref : int
183 The reference number for the next overflow node.
184 overflow_ids : str | None
185 The ID of the overflow node, if any.
186 """
187 labels, label_extras, common_prefix = _format_label(str(node_key), _LABEL_MAX_LINES_SOFT)
188 if len(labels) + len(label_extras) <= _LABEL_MAX_LINES_HARD:
189 labels += label_extras
190 label_extras = []
191 if common_prefix:
192 labels.insert(0, common_prefix)
194 # Add a reference to a free-floating overflow node
195 label_extras_grouped = {}
196 if label_extras:
197 overflow_to_text = f"and {len(label_extras)} more, continued in [{overflow_ref}]"
198 labels.append(f'<B><FONT POINT-SIZE="{_LABEL_POINT_SIZE}">{overflow_to_text}</FONT></B>')
199 for i in range(0, len(label_extras), _OVERFLOW_MAX_LINES):
200 overflow_id = f"{node_key.node_id}_{overflow_ref}_{i}"
201 overflow_label_extras = label_extras[i : i + _OVERFLOW_MAX_LINES]
202 if common_prefix:
203 overflow_label_extras.insert(0, common_prefix)
204 overflow_label_extras.insert(
205 0, f'<B><FONT POINT-SIZE="{_LABEL_POINT_SIZE}">[{overflow_ref}]</FONT></B>'
206 )
207 label_extras_grouped[overflow_id] = overflow_label_extras
208 overflow_ref += 1
210 # Append dimensions to the node
211 if options.dimensions:
212 labels.append(
213 "<I>dimensions:</I> " + html.escape(format_dimensions(options, node_data["dimensions"]))
214 )
216 # Append storage class to the node
217 if options.storage_classes:
218 labels.append("<I>storage class:</I> " + html.escape(node_data["storage_class_name"]))
220 _render_node(node_key.node_id, node_key.node_type, labels, stream)
222 # Render the overflow nodes and invisible edges, if any
223 if label_extras_grouped:
224 for overflow_id, overflow_labels in label_extras_grouped.items():
225 _render_node(overflow_id, node_key.node_type, overflow_labels, stream)
226 _render_edge(node_key.node_id, overflow_id, stream, **{"style": "invis"})
228 overflow_ids = list(label_extras_grouped.keys())
229 return overflow_ref, overflow_ids
232def _render_node(
233 node_id: str,
234 node_type: NodeType,
235 labels: list[str],
236 stream: TextIO,
237) -> None:
238 """Render a Graphviz node.
240 Parameters
241 ----------
242 node_id : str
243 The unique name of the node
244 node_type : NodeType
245 The type of the node
246 labels : list[str]
247 The label elements to display on the node
248 stream : TextIO
249 The stream to write the node to
250 """
251 label = "".join([f'<TR><TD ALIGN="LEFT">{element}</TD></TR>' for element in labels])
252 attrib_dict = dict(_ATTRIBS[node_type], label=label)
253 pre = '<<TABLE BORDER="0" CELLPADDING="5">'
254 post = "</TABLE>>"
255 attrib = ", ".join(
256 [
257 f'{key}="{val}"' if key != "label" else f"{key}={pre}{val}{post}"
258 for key, val in attrib_dict.items()
259 ]
260 )
261 print(f'"{node_id}" [{attrib}];', file=stream)
264def _render_edge(from_node_id: str, to_node_id: str, stream: TextIO, **kwargs: Any) -> None:
265 """Render GV edge
267 Parameters
268 ----------
269 from_node_id : str
270 The unique ID of the node the edge is coming from
271 to_node_id : str
272 The unique ID of the node the edge is going to
273 stream : TextIO
274 The stream to write the edge to
275 **kwargs : Any
276 Additional keyword arguments to pass to the edge
277 """
278 if kwargs:
279 attrib = ", ".join([f'{key}="{val}"' for key, val in kwargs.items()])
280 print(f'"{from_node_id}" -> "{to_node_id}" [{attrib}];', file=stream)
281 else:
282 print(f'"{from_node_id}" -> "{to_node_id}";', file=stream)
285def _format_label(
286 label: str,
287 max_lines: int = 10,
288 min_common_prefix_len: int = 1000,
289) -> tuple[list[str], list[str], str]:
290 """Add HTML-style formatting to label text.
292 Parameters
293 ----------
294 label : str
295 The label text to parse
296 max_lines : int, optional
297 The maximum number of lines to display
298 min_common_prefix_len : int, optional
299 The minimum length of a common prefix to consider
301 Returns
302 -------
303 labels : list[str]
304 Parsed and formatted label text elements
305 label_extras : list[str]
306 Parsed and formatted overflow text elements, if any
307 common_prefix : str
308 The common prefix of the label text, if any
309 """
310 parsed_labels, parsed_label_extras, common_prefix = _parse_label(label, max_lines, min_common_prefix_len)
311 if common_prefix:
312 common_prefix = f'<B><FONT POINT-SIZE="{_LABEL_POINT_SIZE}">{common_prefix}:</FONT></B>'
314 labels = []
315 label_extras = []
316 indent = " " if common_prefix else ""
317 for element in parsed_labels:
318 labels.append(f'<B><FONT POINT-SIZE="{_LABEL_POINT_SIZE}">{indent}{element}</FONT></B>')
319 for element in parsed_label_extras:
320 label_extras.append(f'<B><FONT POINT-SIZE="{_LABEL_POINT_SIZE}">{indent}{element}</FONT></B>')
322 return labels, label_extras, common_prefix
325def _parse_label(
326 label: str,
327 max_lines: int,
328 min_common_prefix_len: int,
329) -> tuple[list[str], list[str], str]:
330 """Parse label text into label elements.
332 Parameters
333 ----------
334 label : str
335 The label text to parse
336 max_lines : int, optional
337 The maximum number of lines to return (-1 if a common prefix present)
338 min_common_prefix_len : int, optional
339 The minimum length of a common prefix to consider
341 Returns
342 -------
343 labels : list[str]
344 Parsed label text elements
345 label_extras : list[str]
346 Overflow text elements, if any
347 common_prefix : str
348 The common prefix of the label text, if any
349 """
350 labels = label.split(", ")
352 if len(labels) > 3 and len(common_prefix := os.path.commonprefix(labels)) > min_common_prefix_len:
353 final_underscore_index = common_prefix.rfind("_")
354 if final_underscore_index > 0:
355 # Only use common prefixes that end in an underscore. This prevents
356 # prefixes that may equal an entire element. For example, the label
357 # "srcMatchFull, srcMatch" would return "srcMatch" as a common
358 # prefix, and the labels list would contain an empty label.
359 common_prefix = common_prefix[: final_underscore_index + 1]
360 labels = [element[len(common_prefix) :] for element in labels]
361 else:
362 common_prefix = ""
363 else:
364 common_prefix = ""
366 if (len(labels) + bool(common_prefix)) > max_lines:
367 label_extras = labels[max_lines - bool(common_prefix) :]
368 labels = labels[: max_lines - bool(common_prefix)]
369 else:
370 label_extras = []
372 return labels, label_extras, common_prefix