Coverage for python / lsst / pipe / base / pipeline_graph / visualization / _show.py: 20%
64 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-24 08:19 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-24 08:19 +0000
1# This file is part of pipe_base.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
27from __future__ import annotations
29__all__ = ("parse_display_args", "show")
31import sys
32from collections.abc import Sequence
33from shutil import get_terminal_size
34from typing import Any, Literal, TextIO
36import networkx
38from .._nodes import NodeKey
39from .._pipeline_graph import PipelineGraph
40from .._tasks import TaskInitNode, TaskNode
41from ._formatting import GetNodeText, get_node_symbol
42from ._layout import ColumnSelector, Layout
43from ._merge import (
44 MergedNodeKey,
45 merge_graph_input_trees,
46 merge_graph_intermediates,
47 merge_graph_output_trees,
48)
49from ._options import NodeAttributeOptions
50from ._printer import make_default_printer
51from ._status_annotator import NodeStatusAnnotator, NodeStatusOptions
53DisplayNodeKey = NodeKey | MergedNodeKey
56def parse_display_args(
57 pipeline_graph: PipelineGraph,
58 *,
59 dataset_types: bool = False,
60 init: bool | None = False,
61 dimensions: Literal["full"] | Literal["concise"] | Literal[False] | None = None,
62 task_classes: Literal["full"] | Literal["concise"] | Literal[False] = False,
63 storage_classes: bool | None = None,
64 merge_input_trees: int = 4,
65 merge_output_trees: int = 4,
66 merge_intermediates: bool = True,
67 include_automatic_connections: bool = False,
68 status_annotator: NodeStatusAnnotator | None = None,
69 status_options: NodeStatusOptions | None = None,
70) -> tuple[networkx.DiGraph | networkx.MultiDiGraph, NodeAttributeOptions]:
71 """Print a text-based ~.PipelineGraph` visualization.
73 Parameters
74 ----------
75 pipeline_graph : `PipelineGraph`
76 Graph to display.
77 dataset_types : `bool`, optional
78 Whether to include dataset type nodes (default is `False`).
79 init : `bool`, optional
80 Whether to include task initialization nodes (i.e. the producers of
81 init-input and init-output dataset types). Default is `False`. `None`
82 will show both runtime and init nodes, while `True` will show only
83 init nodes.
84 dimensions : `str` or `False`, optional
85 How to display the dimensions of tasks and dataset types.
87 Possible values include:
89 - ``"full"``: report fully-expanded dimensions.
90 - ``"concise"``: report only dimensions that are not required or
91 implied dependencies of any reported dimension.
92 - `False`: do not report dimensions at all.
93 - `None` (default): report concise dimensions only if
94 `PipelineGraph.is_fully_resolved` is `True`.
96 This also sets whether merge options consider dimensions when merging
97 nodes.
98 task_classes : `str` or `False`, optional
99 How to display task class names (not task labels, which are always
100 shown).
102 Possible values include:
104 - ``"full"``: report the fully-qualified task class name.
105 - ``"concise"``: report the task class name with no module
106 or package.
107 - `False`: (default) do not report task classes at all.
109 This also sets whether merge options consider task classes when merging
110 nodes. It is ignored if ``tasks=False``.
111 storage_classes : `bool`, optional
112 Whether to display storage classes in dataset type nodes. This also
113 sets whether merge options consider storage classes when merging nodes.
114 It is ignored if ``dataset_types=False``.
115 merge_input_trees : `int`, optional
116 If positive, merge input trees of the graph whose nodes have the same
117 outputs and other properties (dimensions, task classes, storage
118 classes), traversing this many nodes deep into the graph from the
119 beginning. Default is ``4``.
120 merge_output_trees : `int`, optional
121 If positive, merge output trees of the graph whose nodes have the same
122 outputs and other properties (dimensions, task classes, storage
123 classes), traversing this many nodes deep into the graph from the end.
124 Default is ``4``.
125 merge_intermediates : `bool`, optional
126 If `True` (default) merge interior parallel nodes with the same inputs,
127 outputs, and other properties (dimensions, task classes, storage
128 classes).
129 include_automatic_connections : `bool`, optional
130 Whether to include automatically-added connections like the config,
131 log, and metadata dataset types for each task. Default is `False`.
132 status_annotator : `NodeStatusAnnotator`, optional
133 Annotator to add status information to the graph. Default is `None`.
134 status_options : `NodeStatusOptions`, optional
135 Options for displaying execution status. Default is `None`.
136 """
137 if init is None:
138 if not dataset_types:
139 raise ValueError("Cannot show init and runtime graphs unless dataset types are shown.")
140 xgraph = pipeline_graph.make_xgraph()
141 if status_annotator is not None:
142 raise ValueError("Cannot show status with both init and runtime graphs.")
143 elif dataset_types:
144 xgraph = pipeline_graph.make_bipartite_xgraph(init)
145 if status_annotator is not None:
146 status_annotator(xgraph, dataset_types=True)
147 else:
148 xgraph = pipeline_graph.make_task_xgraph(init)
149 storage_classes = False
150 if status_annotator is not None:
151 status_annotator(xgraph, dataset_types=False)
153 options = NodeAttributeOptions(
154 dimensions=dimensions,
155 storage_classes=storage_classes,
156 task_classes=task_classes,
157 status=status_options,
158 )
159 options = options.checked(pipeline_graph.is_fully_resolved, has_status=status_annotator is not None)
161 if dataset_types and not include_automatic_connections:
162 taskish_nodes: list[TaskNode | TaskInitNode] = []
163 for task_node in pipeline_graph.tasks.values():
164 if init is None or init is False:
165 taskish_nodes.append(task_node)
166 if init is None or init is True:
167 taskish_nodes.append(task_node.init)
168 for t in taskish_nodes:
169 xgraph.remove_nodes_from(
170 edge.dataset_type_key
171 for edge in t.iter_all_outputs()
172 if edge.connection_name not in t.outputs and not xgraph.out_degree(edge.dataset_type_key)
173 )
175 if merge_input_trees:
176 merge_graph_input_trees(xgraph, options, depth=merge_input_trees)
177 if merge_output_trees:
178 merge_graph_output_trees(xgraph, options, depth=merge_output_trees)
179 if merge_intermediates:
180 merge_graph_intermediates(xgraph, options)
182 return xgraph, options
185def show(
186 pipeline_graph: PipelineGraph,
187 stream: TextIO = sys.stdout,
188 *,
189 color: bool | Sequence[str] | None = None,
190 width: int = -1,
191 column_interior_penalty: int = 1,
192 column_crossing_penalty: int = 1,
193 column_insertion_penalty: int = 2,
194 **kwargs: Any,
195) -> None:
196 """Print a text-based ~.PipelineGraph` visualization.
198 Parameters
199 ----------
200 pipeline_graph : `PipelineGraph`
201 Graph to display.
202 stream : `TextIO`, optional
203 Output stream. Defaults to STDOUT.
204 color : `bool` or `~collections.abc.Sequence` [ `str` ]
205 Whether to use tto add color to the graph. Default is to add colors
206 only if the `colorama` package can be imported. `False` disables colors
207 unconditionally, while `True` or a sequence of colors (see
208 `make_colorama_printer`) will result in `ImportError` being propagated
209 up if `colorama` is unavailable.
210 width : `int`, optional
211 Number of columns the full graph should occupy, including text
212 descriptions on the right. If ``0``, there is no limit (and hence no
213 text-wrapping). If negative (default) use the current terminal width.
214 column_interior_penalty : `int`
215 Penalty applied to a prospective column for a node when that column is
216 between two existing columns.
217 column_crossing_penalty : `int`
218 Penalty applied to a prospective column for a node for each ongoing
219 (vertical) edge that node's incoming edges would have to "hop".
220 column_insertion_penalty : `int`
221 Penalty applied to a prospective column for a node when considering a
222 new columns on the sides or between two existing columns.
223 **kwargs
224 Forwarded to parse_display_args.
225 """
226 xgraph, options = parse_display_args(pipeline_graph, **kwargs)
228 column_selector = ColumnSelector(
229 interior_penalty=column_interior_penalty,
230 crossing_penalty=column_crossing_penalty,
231 insertion_penalty=column_insertion_penalty,
232 )
233 layout = Layout[DisplayNodeKey](xgraph, column_selector)
235 if width < 0:
236 width, _ = get_terminal_size()
238 # Number of columns used for padding after a symbol.
239 # Must match the padding added by the `Printer` class.
240 symbol_padding = 2
242 printer = make_default_printer(layout.width, color, stream)
243 printer.get_symbol = get_node_symbol
245 get_text = GetNodeText(xgraph, options, (width - printer.width - symbol_padding) if width else 0)
246 printer.get_text = get_text
248 printer.print(stream, layout)
249 for line in get_text.format_deferrals(width):
250 print(line, file=stream)