Coverage for python / lsst / pipe / base / pipeline_graph / visualization / _layout.py: 21%

153 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:47 +0000

1# This file is part of pipe_base. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27from __future__ import annotations 

28 

29__all__ = ("ColumnSelector", "Layout", "LayoutRow") 

30 

31import dataclasses 

32from collections.abc import Iterable, Iterator, Mapping, Set 

33from typing import TextIO 

34 

35import networkx 

36import networkx.algorithms.components 

37import networkx.algorithms.dag 

38import networkx.algorithms.shortest_paths 

39import networkx.algorithms.traversal 

40 

41 

42class Layout[K]: 

43 """A class that positions nodes and edges in text-art graph visualizations. 

44 

45 Parameters 

46 ---------- 

47 xgraph : `networkx.DiGraph` or `networkx.MultiDiGraph` 

48 NetworkX export of a `.PipelineGraph` being visualized. 

49 column_selector : `ColumnSelector`, optional 

50 Parameterized helper for selecting which column each node should be 

51 added to. 

52 """ 

53 

54 def __init__( 

55 self, 

56 xgraph: networkx.DiGraph | networkx.MultiDiGraph, 

57 column_selector: ColumnSelector | None = None, 

58 ): 

59 if column_selector is None: 

60 column_selector = ColumnSelector() 

61 self._xgraph = xgraph 

62 self._column_selector = column_selector 

63 # Mapping from the column (i.e. 'x') of an already-positioned node to 

64 # the node keys its outgoing edges terminate at. These and all other 

65 # column/x variables are multiples of 2 when they refer to 

66 # already-existing columns, allowing potential insertion of new columns 

67 # between them to be represented by odd integers. Positions are also 

68 # inverted from the order they are usually displayed; it's best to 

69 # think of them as the distance (to the left) from the column where 

70 # text appears on the right. This is natural because prefer for nodes 

71 # to be close to that text when possible (or maybe it's historical, and 

72 # it's just a lot of work to re-invert the algorithm now that it's 

73 # written). 

74 self._active_columns: dict[int, set[K]] = {} 

75 # Mapping from node key to its column. 

76 self._locations: dict[K, int] = {} 

77 # Minimum and maximum column (may go negative; will be shifted as 

78 # needed before actual display). 

79 self._x_min = 0 

80 self._x_max = 0 

81 # Run the algorithm! 

82 self._add_graph(self._xgraph) 

83 del self._active_columns 

84 

85 def _add_graph(self, xgraph: networkx.DiGraph | networkx.MultiDiGraph) -> None: 

86 """Highest-level routine for the layout algorithm. 

87 

88 Parameters 

89 ---------- 

90 xgraph : `networkx.DiGraph` or `networkx.MultiDiGraph` 

91 Graph or subgraph to add to the layout. 

92 """ 

93 # Start by identifying unconnected subgraphs ("components"); we'll 

94 # display these in series, as the first of our many attempts to 

95 # minimize tangles of edges. 

96 component_xgraphs_and_orders = [] 

97 single_nodes = [] 

98 for component_nodes in networkx.components.weakly_connected_components(xgraph): 

99 if len(component_nodes) == 1: 

100 single_nodes.append(component_nodes.pop()) 

101 else: 

102 component_xgraph = xgraph.subgraph(component_nodes) 

103 component_order = list( 

104 networkx.algorithms.dag.lexicographical_topological_sort(component_xgraph, key=str) 

105 ) 

106 component_xgraphs_and_orders.append((component_xgraph, component_order)) 

107 # Add all single-node components in lexicographical order. 

108 single_nodes.sort(key=str) 

109 for node in single_nodes: 

110 self._add_single_node(node) 

111 # Sort component graphs by their size and then str of their first node. 

112 component_xgraphs_and_orders.sort(key=lambda t: (len(t[1]), str(t[1][0]))) 

113 # Add subgraphs in that order. 

114 for component_xgraph, component_order in component_xgraphs_and_orders: 

115 self._add_connected_graph(component_xgraph, component_order) 

116 

117 def _add_single_node(self, node: K) -> None: 

118 """Add a single node to the layout.""" 

119 assert node not in self._locations 

120 if not self._locations: 

121 # Special-case the first node in a component (disconnectd 

122 # subgraph). 

123 self._locations[node] = 0 

124 self._active_columns[0] = set(self._xgraph.successors(node)) 

125 return 

126 # The candidate_x list holds columns where we could insert this node. 

127 # We start with new columns on the outside and a new column between 

128 # each pair of existing columns. These inner nodes are usually not 

129 # very good candidates, but it's simplest to always include them and 

130 # let the penalty system in ColumnSelector take care of it. 

131 candidate_x = [self._x_max + 2, self._x_min - 2] 

132 # The columns holding edges that will connect to this node. 

133 connecting_x = [] 

134 # Iterate over active columns to populate the above. 

135 for active_column_x, active_column_endpoints in self._active_columns.items(): 

136 if node in active_column_endpoints: 

137 connecting_x.append(active_column_x) 

138 # Delete this node from the active columns it is in, and delete any 

139 # entries that now have empty sets. 

140 for x in connecting_x: 

141 destinations = self._active_columns[x] 

142 destinations.remove(node) 

143 if not destinations: 

144 del self._active_columns[x] 

145 # Add all empty columns between the current min and max as candidates. 

146 for x in range(self._x_min, self._x_max + 2, 2): 

147 if x not in self._active_columns: 

148 candidate_x.append(x) 

149 # Sort the list of connecting columns so we can easily get min and max. 

150 connecting_x.sort() 

151 best_x = min( 

152 candidate_x, 

153 key=lambda x: self._column_selector( 

154 connecting_x, x, self._active_columns, self._x_min, self._x_max 

155 ), 

156 ) 

157 if best_x % 2: 

158 # We're inserting a new column between two existing ones; shift 

159 # all existing column values above this one to make room while 

160 # using only even numbers. 

161 best_x = self._shift(best_x) 

162 self._x_min = min(self._x_min, best_x) 

163 self._x_max = max(self._x_max, best_x) 

164 

165 self._locations[node] = best_x 

166 successors = set(self._xgraph.successors(node)) 

167 if successors: 

168 self._active_columns[best_x] = successors 

169 

170 def _shift(self, x: int) -> int: 

171 """Shift all columns above the given one up by two, allowing a new 

172 column to be inserted while leaving all columns as even integers. 

173 """ 

174 for node, old_x in self._locations.items(): 

175 if old_x > x: 

176 self._locations[node] += 2 

177 self._active_columns = { 

178 old_x + 2 if old_x > x else old_x: destinations 

179 for old_x, destinations in self._active_columns.items() 

180 } 

181 self._x_max += 2 

182 return x + 1 

183 

184 def _add_connected_graph( 

185 self, xgraph: networkx.DiGraph | networkx.MultiDiGraph, order: list[K] | None = None 

186 ) -> None: 

187 """Add a subgraph whose nodes are connected. 

188 

189 Parameters 

190 ---------- 

191 xgraph : `networkx.DiGraph` or `networkx.MultiDiGraph` 

192 Graph or subgraph to add to the layout. 

193 order : `list`, optional 

194 List providing a lexicographical/topological sort of ``xgraph``. 

195 Will be computed if not provided. 

196 """ 

197 if order is None: 

198 order = list(networkx.algorithms.dag.lexicographical_topological_sort(xgraph, key=str)) 

199 # Find the longest path between two nodes, which we'll call the 

200 # "backbone" of our layout; we'll step through this path and add 

201 # recurse via calls to `_add_graph` on the nodes that we think should 

202 # go between the backbone nodes. 

203 backbone: list[K] = networkx.algorithms.dag.dag_longest_path(xgraph, topo_order=order) 

204 # Add the first backbone node and any ancestors according to the full 

205 # graph (it can't have ancestors in this _subgraph_ because they'd have 

206 # been part of the longest path themselves, but the subgraph doesn't 

207 # have a complete picture). 

208 current = backbone.pop(0) 

209 self._add_blockers_of(current) 

210 self._add_single_node(current) 

211 # Remember all recursive descendants of the current backbone node for 

212 # later use. 

213 descendants = frozenset(networkx.algorithms.dag.descendants(xgraph, current)) 

214 while backbone: 

215 current = backbone.pop(0) 

216 # Find descendants of the previous node that are: 

217 # - in this subgraph 

218 # - not descendants of the current node. 

219 followers_of_previous = set(descendants) 

220 descendants = frozenset(networkx.algorithms.dag.descendants(xgraph, current)) 

221 followers_of_previous.remove(current) 

222 followers_of_previous.difference_update(descendants) 

223 followers_of_previous.difference_update(self._locations.keys()) 

224 # Add those followers of the previous node. We like adding these 

225 # here because this terminates edges as soon as we can, freeing up 

226 # those columns for new new nodes. 

227 self._add_graph(xgraph.subgraph(followers_of_previous)) 

228 # Add the current backbone node and all remaining blockers (which 

229 # may not even be in this subgraph). 

230 self._add_blockers_of(current) 

231 self._add_single_node(current) 

232 # Any remaining subgraph nodes were not directly connected to the 

233 # backbone nodes. 

234 remaining = xgraph.copy() 

235 remaining.remove_nodes_from(self._locations.keys()) 

236 self._add_graph(remaining) 

237 

238 def _add_blockers_of(self, node: K) -> None: 

239 """Add all nodes that are ancestors of the given node according to the 

240 full graph. 

241 """ 

242 blockers = set(networkx.algorithms.dag.ancestors(self._xgraph, node)) 

243 blockers.difference_update(self._locations.keys()) 

244 self._add_graph(self._xgraph.subgraph(blockers)) 

245 

246 @property 

247 def width(self) -> int: 

248 """The number of actual (not multiple-of-two) columns in the layout.""" 

249 return (self._x_max - self._x_min) // 2 

250 

251 @property 

252 def nodes(self) -> Iterable[K]: 

253 """The graph nodes in the order they appear in the layout.""" 

254 return self._locations.keys() 

255 

256 def print(self, stream: TextIO) -> None: 

257 """Print the nodes (but not their edges) as symbols in the right 

258 locations. 

259 

260 This is intended for use as a debugging diagnostic, not part of a real 

261 visualization system. 

262 

263 Parameters 

264 ---------- 

265 stream : `io.TextIO` 

266 Output stream to use for printing. 

267 """ 

268 for row in self: 

269 print(f"{' ' * row.x}{' ' * (self.width - row.x)} {row.node}", file=stream) 

270 

271 def _external_location(self, x: int) -> int: 

272 """Return the actual (not multiple-of-two, stating from zero) location, 

273 given the internal multiple-of-two. 

274 """ 

275 return (self._x_max - x) // 2 

276 

277 def __iter__(self) -> Iterator[LayoutRow]: 

278 active_edges: dict[K, set[K]] = {} 

279 for node, node_x in self._locations.items(): 

280 row = LayoutRow(node, self._external_location(node_x)) 

281 for origin, destinations in active_edges.items(): 

282 if node in destinations: 

283 row.connecting.append((self._external_location(self._locations[origin]), origin)) 

284 destinations.remove(node) 

285 if destinations: 

286 row.continuing.append( 

287 (self._external_location(self._locations[origin]), origin, frozenset(destinations)) 

288 ) 

289 row.connecting.sort(key=str) 

290 row.continuing.sort(key=str) 

291 yield row 

292 active_edges[node] = set(self._xgraph.successors(node)) 

293 

294 

295@dataclasses.dataclass 

296class LayoutRow[K]: 

297 """Information about a single text-art row in a graph.""" 

298 

299 node: K 

300 """Key for the node in the exported NetworkX graph.""" 

301 

302 x: int 

303 """Column of the node's symbol and its outgoing edges.""" 

304 

305 connecting: list[tuple[int, K]] = dataclasses.field(default_factory=list) 

306 """The columns and node keys of edges that terminate at this row. 

307 """ 

308 

309 continuing: list[tuple[int, K, frozenset[K]]] = dataclasses.field(default_factory=list) 

310 """The columns and node keys of edges that continue through this row. 

311 """ 

312 

313 

314@dataclasses.dataclass 

315class ColumnSelector: 

316 """Helper class that weighs the columns a new node could be added to in a 

317 text DAG visualization. 

318 """ 

319 

320 crossing_penalty: int = 1 

321 """Penalty for each ongoing edge the new node's outgoing edge would have to 

322 "hop" if it were put at the candidate column. 

323 """ 

324 

325 interior_penalty: int = 1 

326 """Penalty for adding a new column to the layout between existing columns 

327 (in addition to `insertion_penaly`. 

328 """ 

329 

330 insertion_penalty: int = 2 

331 """Penalty for adding a new column to the layout instead of reusing an 

332 empty one. 

333 

334 This penalty is applied even when there is no empty column; it just cancels 

335 out in that case because it's applied to all candidate columns. 

336 """ 

337 

338 def __call__[K]( 

339 self, 

340 connecting_x: list[int], 

341 node_x: int, 

342 active_columns: Mapping[int, Set[K]], 

343 x_min: int, 

344 x_max: int, 

345 ) -> int: 

346 """Compute the penalty score for adding a node in the given column. 

347 

348 Parameters 

349 ---------- 

350 connecting_x : `list` [ `int` ] 

351 The columns of incoming edges for this node. All values are even. 

352 node_x : `int` 

353 The column being considered for the new node. Will be odd if it 

354 proposes an insertion between existing columns, or outside the 

355 bounds of ``x_min`` and ``x_max`` if it proposes an insertion 

356 on a side. 

357 active_columns : `~collections.abc.Mapping` [ `int`, \ 

358 `~collections.abc.Set` ] 

359 The columns of nodes already in the visualization (in previous 

360 lines) and the nodes at which their edges terminate. All keys are 

361 even. 

362 x_min : `int` 

363 Current minimum column position (inclusive). Always even. 

364 x_max : `int` 

365 Current maximum column position (exclusive). Always even. 

366 

367 Returns 

368 ------- 

369 penalty : `int` 

370 Penalty score for this location. Nodes should be placed at the 

371 column with the lowest penalty. 

372 """ 

373 # Start with a penalty for inserting a new column between two existing 

374 # columns or on either side, if that's what this is (i.e. x is odd). 

375 penalty = (node_x % 2) * (self.interior_penalty + self.insertion_penalty) 

376 if node_x < x_min: 

377 penalty += self.insertion_penalty 

378 elif node_x > x_max: 

379 penalty += self.insertion_penalty 

380 # If there are no active edges connecting to this node, we're done. 

381 if not connecting_x: 

382 return penalty 

383 # Find the bounds of the horizontal lines that connect 

384 horizontal_min_x = min(connecting_x[0], node_x) 

385 horizontal_max_x = max(connecting_x[-1], node_x) 

386 # Add the (scaled) number of unrelated continuing (vertical) edges that 

387 # the (horizontal) input edges for this node would have to "hop". 

388 penalty += sum( 

389 self.crossing_penalty 

390 for x in range(horizontal_min_x, horizontal_max_x + 2) 

391 if x in active_columns and x not in connecting_x 

392 ) 

393 return penalty