Coverage for python / lsst / pipe / base / pipeline_graph / visualization / _layout.py: 21%
153 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-28 08:44 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-28 08:44 +0000
1# This file is part of pipe_base.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
27from __future__ import annotations
29__all__ = ("ColumnSelector", "Layout", "LayoutRow")
31import dataclasses
32from collections.abc import Iterable, Iterator, Mapping, Set
33from typing import TextIO
35import networkx
36import networkx.algorithms.components
37import networkx.algorithms.dag
38import networkx.algorithms.shortest_paths
39import networkx.algorithms.traversal
42class Layout[K]:
43 """A class that positions nodes and edges in text-art graph visualizations.
45 Parameters
46 ----------
47 xgraph : `networkx.DiGraph` or `networkx.MultiDiGraph`
48 NetworkX export of a `.PipelineGraph` being visualized.
49 column_selector : `ColumnSelector`, optional
50 Parameterized helper for selecting which column each node should be
51 added to.
52 """
54 def __init__(
55 self,
56 xgraph: networkx.DiGraph | networkx.MultiDiGraph,
57 column_selector: ColumnSelector | None = None,
58 ):
59 if column_selector is None:
60 column_selector = ColumnSelector()
61 self._xgraph = xgraph
62 self._column_selector = column_selector
63 # Mapping from the column (i.e. 'x') of an already-positioned node to
64 # the node keys its outgoing edges terminate at. These and all other
65 # column/x variables are multiples of 2 when they refer to
66 # already-existing columns, allowing potential insertion of new columns
67 # between them to be represented by odd integers. Positions are also
68 # inverted from the order they are usually displayed; it's best to
69 # think of them as the distance (to the left) from the column where
70 # text appears on the right. This is natural because prefer for nodes
71 # to be close to that text when possible (or maybe it's historical, and
72 # it's just a lot of work to re-invert the algorithm now that it's
73 # written).
74 self._active_columns: dict[int, set[K]] = {}
75 # Mapping from node key to its column.
76 self._locations: dict[K, int] = {}
77 # Minimum and maximum column (may go negative; will be shifted as
78 # needed before actual display).
79 self._x_min = 0
80 self._x_max = 0
81 # Run the algorithm!
82 self._add_graph(self._xgraph)
83 del self._active_columns
85 def _add_graph(self, xgraph: networkx.DiGraph | networkx.MultiDiGraph) -> None:
86 """Highest-level routine for the layout algorithm.
88 Parameters
89 ----------
90 xgraph : `networkx.DiGraph` or `networkx.MultiDiGraph`
91 Graph or subgraph to add to the layout.
92 """
93 # Start by identifying unconnected subgraphs ("components"); we'll
94 # display these in series, as the first of our many attempts to
95 # minimize tangles of edges.
96 component_xgraphs_and_orders = []
97 single_nodes = []
98 for component_nodes in networkx.components.weakly_connected_components(xgraph):
99 if len(component_nodes) == 1:
100 single_nodes.append(component_nodes.pop())
101 else:
102 component_xgraph = xgraph.subgraph(component_nodes)
103 component_order = list(
104 networkx.algorithms.dag.lexicographical_topological_sort(component_xgraph, key=str)
105 )
106 component_xgraphs_and_orders.append((component_xgraph, component_order))
107 # Add all single-node components in lexicographical order.
108 single_nodes.sort(key=str)
109 for node in single_nodes:
110 self._add_single_node(node)
111 # Sort component graphs by their size and then str of their first node.
112 component_xgraphs_and_orders.sort(key=lambda t: (len(t[1]), str(t[1][0])))
113 # Add subgraphs in that order.
114 for component_xgraph, component_order in component_xgraphs_and_orders:
115 self._add_connected_graph(component_xgraph, component_order)
117 def _add_single_node(self, node: K) -> None:
118 """Add a single node to the layout."""
119 assert node not in self._locations
120 if not self._locations:
121 # Special-case the first node in a component (disconnectd
122 # subgraph).
123 self._locations[node] = 0
124 self._active_columns[0] = set(self._xgraph.successors(node))
125 return
126 # The candidate_x list holds columns where we could insert this node.
127 # We start with new columns on the outside and a new column between
128 # each pair of existing columns. These inner nodes are usually not
129 # very good candidates, but it's simplest to always include them and
130 # let the penalty system in ColumnSelector take care of it.
131 candidate_x = [self._x_max + 2, self._x_min - 2]
132 # The columns holding edges that will connect to this node.
133 connecting_x = []
134 # Iterate over active columns to populate the above.
135 for active_column_x, active_column_endpoints in self._active_columns.items():
136 if node in active_column_endpoints:
137 connecting_x.append(active_column_x)
138 # Delete this node from the active columns it is in, and delete any
139 # entries that now have empty sets.
140 for x in connecting_x:
141 destinations = self._active_columns[x]
142 destinations.remove(node)
143 if not destinations:
144 del self._active_columns[x]
145 # Add all empty columns between the current min and max as candidates.
146 for x in range(self._x_min, self._x_max + 2, 2):
147 if x not in self._active_columns:
148 candidate_x.append(x)
149 # Sort the list of connecting columns so we can easily get min and max.
150 connecting_x.sort()
151 best_x = min(
152 candidate_x,
153 key=lambda x: self._column_selector(
154 connecting_x, x, self._active_columns, self._x_min, self._x_max
155 ),
156 )
157 if best_x % 2:
158 # We're inserting a new column between two existing ones; shift
159 # all existing column values above this one to make room while
160 # using only even numbers.
161 best_x = self._shift(best_x)
162 self._x_min = min(self._x_min, best_x)
163 self._x_max = max(self._x_max, best_x)
165 self._locations[node] = best_x
166 successors = set(self._xgraph.successors(node))
167 if successors:
168 self._active_columns[best_x] = successors
170 def _shift(self, x: int) -> int:
171 """Shift all columns above the given one up by two, allowing a new
172 column to be inserted while leaving all columns as even integers.
173 """
174 for node, old_x in self._locations.items():
175 if old_x > x:
176 self._locations[node] += 2
177 self._active_columns = {
178 old_x + 2 if old_x > x else old_x: destinations
179 for old_x, destinations in self._active_columns.items()
180 }
181 self._x_max += 2
182 return x + 1
184 def _add_connected_graph(
185 self, xgraph: networkx.DiGraph | networkx.MultiDiGraph, order: list[K] | None = None
186 ) -> None:
187 """Add a subgraph whose nodes are connected.
189 Parameters
190 ----------
191 xgraph : `networkx.DiGraph` or `networkx.MultiDiGraph`
192 Graph or subgraph to add to the layout.
193 order : `list`, optional
194 List providing a lexicographical/topological sort of ``xgraph``.
195 Will be computed if not provided.
196 """
197 if order is None:
198 order = list(networkx.algorithms.dag.lexicographical_topological_sort(xgraph, key=str))
199 # Find the longest path between two nodes, which we'll call the
200 # "backbone" of our layout; we'll step through this path and add
201 # recurse via calls to `_add_graph` on the nodes that we think should
202 # go between the backbone nodes.
203 backbone: list[K] = networkx.algorithms.dag.dag_longest_path(xgraph, topo_order=order)
204 # Add the first backbone node and any ancestors according to the full
205 # graph (it can't have ancestors in this _subgraph_ because they'd have
206 # been part of the longest path themselves, but the subgraph doesn't
207 # have a complete picture).
208 current = backbone.pop(0)
209 self._add_blockers_of(current)
210 self._add_single_node(current)
211 # Remember all recursive descendants of the current backbone node for
212 # later use.
213 descendants = frozenset(networkx.algorithms.dag.descendants(xgraph, current))
214 while backbone:
215 current = backbone.pop(0)
216 # Find descendants of the previous node that are:
217 # - in this subgraph
218 # - not descendants of the current node.
219 followers_of_previous = set(descendants)
220 descendants = frozenset(networkx.algorithms.dag.descendants(xgraph, current))
221 followers_of_previous.remove(current)
222 followers_of_previous.difference_update(descendants)
223 followers_of_previous.difference_update(self._locations.keys())
224 # Add those followers of the previous node. We like adding these
225 # here because this terminates edges as soon as we can, freeing up
226 # those columns for new new nodes.
227 self._add_graph(xgraph.subgraph(followers_of_previous))
228 # Add the current backbone node and all remaining blockers (which
229 # may not even be in this subgraph).
230 self._add_blockers_of(current)
231 self._add_single_node(current)
232 # Any remaining subgraph nodes were not directly connected to the
233 # backbone nodes.
234 remaining = xgraph.copy()
235 remaining.remove_nodes_from(self._locations.keys())
236 self._add_graph(remaining)
238 def _add_blockers_of(self, node: K) -> None:
239 """Add all nodes that are ancestors of the given node according to the
240 full graph.
241 """
242 blockers = set(networkx.algorithms.dag.ancestors(self._xgraph, node))
243 blockers.difference_update(self._locations.keys())
244 self._add_graph(self._xgraph.subgraph(blockers))
246 @property
247 def width(self) -> int:
248 """The number of actual (not multiple-of-two) columns in the layout."""
249 return (self._x_max - self._x_min) // 2
251 @property
252 def nodes(self) -> Iterable[K]:
253 """The graph nodes in the order they appear in the layout."""
254 return self._locations.keys()
256 def print(self, stream: TextIO) -> None:
257 """Print the nodes (but not their edges) as symbols in the right
258 locations.
260 This is intended for use as a debugging diagnostic, not part of a real
261 visualization system.
263 Parameters
264 ----------
265 stream : `io.TextIO`
266 Output stream to use for printing.
267 """
268 for row in self:
269 print(f"{' ' * row.x}●{' ' * (self.width - row.x)} {row.node}", file=stream)
271 def _external_location(self, x: int) -> int:
272 """Return the actual (not multiple-of-two, stating from zero) location,
273 given the internal multiple-of-two.
274 """
275 return (self._x_max - x) // 2
277 def __iter__(self) -> Iterator[LayoutRow]:
278 active_edges: dict[K, set[K]] = {}
279 for node, node_x in self._locations.items():
280 row = LayoutRow(node, self._external_location(node_x))
281 for origin, destinations in active_edges.items():
282 if node in destinations:
283 row.connecting.append((self._external_location(self._locations[origin]), origin))
284 destinations.remove(node)
285 if destinations:
286 row.continuing.append(
287 (self._external_location(self._locations[origin]), origin, frozenset(destinations))
288 )
289 row.connecting.sort(key=str)
290 row.continuing.sort(key=str)
291 yield row
292 active_edges[node] = set(self._xgraph.successors(node))
295@dataclasses.dataclass
296class LayoutRow[K]:
297 """Information about a single text-art row in a graph."""
299 node: K
300 """Key for the node in the exported NetworkX graph."""
302 x: int
303 """Column of the node's symbol and its outgoing edges."""
305 connecting: list[tuple[int, K]] = dataclasses.field(default_factory=list)
306 """The columns and node keys of edges that terminate at this row.
307 """
309 continuing: list[tuple[int, K, frozenset[K]]] = dataclasses.field(default_factory=list)
310 """The columns and node keys of edges that continue through this row.
311 """
314@dataclasses.dataclass
315class ColumnSelector:
316 """Helper class that weighs the columns a new node could be added to in a
317 text DAG visualization.
318 """
320 crossing_penalty: int = 1
321 """Penalty for each ongoing edge the new node's outgoing edge would have to
322 "hop" if it were put at the candidate column.
323 """
325 interior_penalty: int = 1
326 """Penalty for adding a new column to the layout between existing columns
327 (in addition to `insertion_penaly`.
328 """
330 insertion_penalty: int = 2
331 """Penalty for adding a new column to the layout instead of reusing an
332 empty one.
334 This penalty is applied even when there is no empty column; it just cancels
335 out in that case because it's applied to all candidate columns.
336 """
338 def __call__[K](
339 self,
340 connecting_x: list[int],
341 node_x: int,
342 active_columns: Mapping[int, Set[K]],
343 x_min: int,
344 x_max: int,
345 ) -> int:
346 """Compute the penalty score for adding a node in the given column.
348 Parameters
349 ----------
350 connecting_x : `list` [ `int` ]
351 The columns of incoming edges for this node. All values are even.
352 node_x : `int`
353 The column being considered for the new node. Will be odd if it
354 proposes an insertion between existing columns, or outside the
355 bounds of ``x_min`` and ``x_max`` if it proposes an insertion
356 on a side.
357 active_columns : `~collections.abc.Mapping` [ `int`, \
358 `~collections.abc.Set` ]
359 The columns of nodes already in the visualization (in previous
360 lines) and the nodes at which their edges terminate. All keys are
361 even.
362 x_min : `int`
363 Current minimum column position (inclusive). Always even.
364 x_max : `int`
365 Current maximum column position (exclusive). Always even.
367 Returns
368 -------
369 penalty : `int`
370 Penalty score for this location. Nodes should be placed at the
371 column with the lowest penalty.
372 """
373 # Start with a penalty for inserting a new column between two existing
374 # columns or on either side, if that's what this is (i.e. x is odd).
375 penalty = (node_x % 2) * (self.interior_penalty + self.insertion_penalty)
376 if node_x < x_min:
377 penalty += self.insertion_penalty
378 elif node_x > x_max:
379 penalty += self.insertion_penalty
380 # If there are no active edges connecting to this node, we're done.
381 if not connecting_x:
382 return penalty
383 # Find the bounds of the horizontal lines that connect
384 horizontal_min_x = min(connecting_x[0], node_x)
385 horizontal_max_x = max(connecting_x[-1], node_x)
386 # Add the (scaled) number of unrelated continuing (vertical) edges that
387 # the (horizontal) input edges for this node would have to "hop".
388 penalty += sum(
389 self.crossing_penalty
390 for x in range(horizontal_min_x, horizontal_max_x + 2)
391 if x in active_columns and x not in connecting_x
392 )
393 return penalty