Coverage for python / lsst / pipe / base / pipeline_graph / visualization / _show.py: 20%

64 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-28 08:44 +0000

1# This file is part of pipe_base. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27from __future__ import annotations 

28 

29__all__ = ("parse_display_args", "show") 

30 

31import sys 

32from collections.abc import Sequence 

33from shutil import get_terminal_size 

34from typing import Any, Literal, TextIO 

35 

36import networkx 

37 

38from .._nodes import NodeKey 

39from .._pipeline_graph import PipelineGraph 

40from .._tasks import TaskInitNode, TaskNode 

41from ._formatting import GetNodeText, get_node_symbol 

42from ._layout import ColumnSelector, Layout 

43from ._merge import ( 

44 MergedNodeKey, 

45 merge_graph_input_trees, 

46 merge_graph_intermediates, 

47 merge_graph_output_trees, 

48) 

49from ._options import NodeAttributeOptions 

50from ._printer import make_default_printer 

51from ._status_annotator import NodeStatusAnnotator, NodeStatusOptions 

52 

53DisplayNodeKey = NodeKey | MergedNodeKey 

54 

55 

56def parse_display_args( 

57 pipeline_graph: PipelineGraph, 

58 *, 

59 dataset_types: bool = False, 

60 init: bool | None = False, 

61 dimensions: Literal["full"] | Literal["concise"] | Literal[False] | None = None, 

62 task_classes: Literal["full"] | Literal["concise"] | Literal[False] = False, 

63 storage_classes: bool | None = None, 

64 merge_input_trees: int = 4, 

65 merge_output_trees: int = 4, 

66 merge_intermediates: bool = True, 

67 include_automatic_connections: bool = False, 

68 status_annotator: NodeStatusAnnotator | None = None, 

69 status_options: NodeStatusOptions | None = None, 

70) -> tuple[networkx.DiGraph | networkx.MultiDiGraph, NodeAttributeOptions]: 

71 """Print a text-based ~.PipelineGraph` visualization. 

72 

73 Parameters 

74 ---------- 

75 pipeline_graph : `PipelineGraph` 

76 Graph to display. 

77 dataset_types : `bool`, optional 

78 Whether to include dataset type nodes (default is `False`). 

79 init : `bool`, optional 

80 Whether to include task initialization nodes (i.e. the producers of 

81 init-input and init-output dataset types). Default is `False`. `None` 

82 will show both runtime and init nodes, while `True` will show only 

83 init nodes. 

84 dimensions : `str` or `False`, optional 

85 How to display the dimensions of tasks and dataset types. 

86 

87 Possible values include: 

88 

89 - ``"full"``: report fully-expanded dimensions. 

90 - ``"concise"``: report only dimensions that are not required or 

91 implied dependencies of any reported dimension. 

92 - `False`: do not report dimensions at all. 

93 - `None` (default): report concise dimensions only if 

94 `PipelineGraph.is_fully_resolved` is `True`. 

95 

96 This also sets whether merge options consider dimensions when merging 

97 nodes. 

98 task_classes : `str` or `False`, optional 

99 How to display task class names (not task labels, which are always 

100 shown). 

101 

102 Possible values include: 

103 

104 - ``"full"``: report the fully-qualified task class name. 

105 - ``"concise"``: report the task class name with no module 

106 or package. 

107 - `False`: (default) do not report task classes at all. 

108 

109 This also sets whether merge options consider task classes when merging 

110 nodes. It is ignored if ``tasks=False``. 

111 storage_classes : `bool`, optional 

112 Whether to display storage classes in dataset type nodes. This also 

113 sets whether merge options consider storage classes when merging nodes. 

114 It is ignored if ``dataset_types=False``. 

115 merge_input_trees : `int`, optional 

116 If positive, merge input trees of the graph whose nodes have the same 

117 outputs and other properties (dimensions, task classes, storage 

118 classes), traversing this many nodes deep into the graph from the 

119 beginning. Default is ``4``. 

120 merge_output_trees : `int`, optional 

121 If positive, merge output trees of the graph whose nodes have the same 

122 outputs and other properties (dimensions, task classes, storage 

123 classes), traversing this many nodes deep into the graph from the end. 

124 Default is ``4``. 

125 merge_intermediates : `bool`, optional 

126 If `True` (default) merge interior parallel nodes with the same inputs, 

127 outputs, and other properties (dimensions, task classes, storage 

128 classes). 

129 include_automatic_connections : `bool`, optional 

130 Whether to include automatically-added connections like the config, 

131 log, and metadata dataset types for each task. Default is `False`. 

132 status_annotator : `NodeStatusAnnotator`, optional 

133 Annotator to add status information to the graph. Default is `None`. 

134 status_options : `NodeStatusOptions`, optional 

135 Options for displaying execution status. Default is `None`. 

136 """ 

137 if init is None: 

138 if not dataset_types: 

139 raise ValueError("Cannot show init and runtime graphs unless dataset types are shown.") 

140 xgraph = pipeline_graph.make_xgraph() 

141 if status_annotator is not None: 

142 raise ValueError("Cannot show status with both init and runtime graphs.") 

143 elif dataset_types: 

144 xgraph = pipeline_graph.make_bipartite_xgraph(init) 

145 if status_annotator is not None: 

146 status_annotator(xgraph, dataset_types=True) 

147 else: 

148 xgraph = pipeline_graph.make_task_xgraph(init) 

149 storage_classes = False 

150 if status_annotator is not None: 

151 status_annotator(xgraph, dataset_types=False) 

152 

153 options = NodeAttributeOptions( 

154 dimensions=dimensions, 

155 storage_classes=storage_classes, 

156 task_classes=task_classes, 

157 status=status_options, 

158 ) 

159 options = options.checked(pipeline_graph.is_fully_resolved, has_status=status_annotator is not None) 

160 

161 if dataset_types and not include_automatic_connections: 

162 taskish_nodes: list[TaskNode | TaskInitNode] = [] 

163 for task_node in pipeline_graph.tasks.values(): 

164 if init is None or init is False: 

165 taskish_nodes.append(task_node) 

166 if init is None or init is True: 

167 taskish_nodes.append(task_node.init) 

168 for t in taskish_nodes: 

169 xgraph.remove_nodes_from( 

170 edge.dataset_type_key 

171 for edge in t.iter_all_outputs() 

172 if edge.connection_name not in t.outputs and not xgraph.out_degree(edge.dataset_type_key) 

173 ) 

174 

175 if merge_input_trees: 

176 merge_graph_input_trees(xgraph, options, depth=merge_input_trees) 

177 if merge_output_trees: 

178 merge_graph_output_trees(xgraph, options, depth=merge_output_trees) 

179 if merge_intermediates: 

180 merge_graph_intermediates(xgraph, options) 

181 

182 return xgraph, options 

183 

184 

185def show( 

186 pipeline_graph: PipelineGraph, 

187 stream: TextIO = sys.stdout, 

188 *, 

189 color: bool | Sequence[str] | None = None, 

190 width: int = -1, 

191 column_interior_penalty: int = 1, 

192 column_crossing_penalty: int = 1, 

193 column_insertion_penalty: int = 2, 

194 **kwargs: Any, 

195) -> None: 

196 """Print a text-based ~.PipelineGraph` visualization. 

197 

198 Parameters 

199 ---------- 

200 pipeline_graph : `PipelineGraph` 

201 Graph to display. 

202 stream : `TextIO`, optional 

203 Output stream. Defaults to STDOUT. 

204 color : `bool` or `~collections.abc.Sequence` [ `str` ] 

205 Whether to use tto add color to the graph. Default is to add colors 

206 only if the `colorama` package can be imported. `False` disables colors 

207 unconditionally, while `True` or a sequence of colors (see 

208 `make_colorama_printer`) will result in `ImportError` being propagated 

209 up if `colorama` is unavailable. 

210 width : `int`, optional 

211 Number of columns the full graph should occupy, including text 

212 descriptions on the right. If ``0``, there is no limit (and hence no 

213 text-wrapping). If negative (default) use the current terminal width. 

214 column_interior_penalty : `int` 

215 Penalty applied to a prospective column for a node when that column is 

216 between two existing columns. 

217 column_crossing_penalty : `int` 

218 Penalty applied to a prospective column for a node for each ongoing 

219 (vertical) edge that node's incoming edges would have to "hop". 

220 column_insertion_penalty : `int` 

221 Penalty applied to a prospective column for a node when considering a 

222 new columns on the sides or between two existing columns. 

223 **kwargs 

224 Forwarded to parse_display_args. 

225 """ 

226 xgraph, options = parse_display_args(pipeline_graph, **kwargs) 

227 

228 column_selector = ColumnSelector( 

229 interior_penalty=column_interior_penalty, 

230 crossing_penalty=column_crossing_penalty, 

231 insertion_penalty=column_insertion_penalty, 

232 ) 

233 layout = Layout[DisplayNodeKey](xgraph, column_selector) 

234 

235 if width < 0: 

236 width, _ = get_terminal_size() 

237 

238 # Number of columns used for padding after a symbol. 

239 # Must match the padding added by the `Printer` class. 

240 symbol_padding = 2 

241 

242 printer = make_default_printer(layout.width, color, stream) 

243 printer.get_symbol = get_node_symbol 

244 

245 get_text = GetNodeText(xgraph, options, (width - printer.width - symbol_padding) if width else 0) 

246 printer.get_text = get_text 

247 

248 printer.print(stream, layout) 

249 for line in get_text.format_deferrals(width): 

250 print(line, file=stream)