Coverage for python / lsst / pipe / base / pipeline_graph / visualization / _merge.py: 26%
112 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 08:59 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 08:59 +0000
1# This file is part of pipe_base.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
27from __future__ import annotations
29__all__ = (
30 "MergedNodeKey",
31 "merge_graph_input_trees",
32 "merge_graph_intermediates",
33 "merge_graph_output_trees",
34)
36import dataclasses
37import hashlib
38from collections import defaultdict
39from collections.abc import Iterable
40from functools import cached_property
41from typing import Any
43import networkx
44import networkx.algorithms.dag
45import networkx.algorithms.tree
47from lsst.daf.butler import DimensionGroup
49from .._nodes import NodeKey, NodeType
50from ._options import NodeAttributeOptions
53class MergedNodeKey(frozenset[NodeKey]):
54 """A key for NetworkX graph nodes that represent multiple similar tasks
55 or dataset types that have been merged to simplify graph visualization.
56 """
58 def __str__(self) -> str:
59 members = [str(k) for k in self]
60 members.sort(reverse=True)
61 return ", ".join(members)
63 @property
64 def node_type(self) -> NodeType:
65 """Enum value for whether this is a task, task initialization, or
66 dataset type node.
67 """
68 return next(iter(self)).node_type
70 @cached_property
71 def node_id(self) -> str:
72 """A unique string representation of the merged node key."""
73 hasher = hashlib.blake2b(digest_size=4)
74 for member in sorted(self):
75 hasher.update(str(member).encode())
76 return f"{hasher.digest().hex()}:{self.node_type.value}"
79def merge_graph_input_trees(
80 xgraph: networkx.DiGraph | networkx.MultiDiGraph, options: NodeAttributeOptions, depth: int
81) -> None:
82 """Merge trees of overall-input dataset type nodes and/or
83 beginning-of-pipeline task nodes that have similar properties and the same
84 structure.
86 Parameters
87 ----------
88 xgraph : `networkx.DiGraph` or `networkx.MultiDiGraph`
89 Graph to be processed; modified in-place.
90 options : `NodeAttributeOptions`
91 Properties of nodes that should be considered when determining whether
92 they are similar enough to be merged. Only the truthiness of
93 attributes is considered (e.g. ``options.dimensions == 'full'`` and
94 ``options.dimensions == 'concise'`` are both interpreted to mean "only
95 merge trees with the same dimensions"). This is typically the same set
96 of options that controls whether to display these attributes in the
97 graph visualization.
98 depth : `int`
99 How many nodes to traverse from the beginning of the graph before
100 terminating the merging algorithm.
101 """
102 groups = _make_tree_merge_groups(xgraph, options, depth)
103 _apply_tree_merges(xgraph, groups)
106def merge_graph_output_trees(
107 xgraph: networkx.DiGraph | networkx.MultiDiGraph, options: NodeAttributeOptions, depth: int
108) -> None:
109 """Merge trees of overall-output dataset type nodes and/or
110 end-of-pipeline task nodes that have similar properties and the same
111 structure.
113 Parameters
114 ----------
115 xgraph : `networkx.DiGraph` or `networkx.MultiDiGraph`
116 Graph to be processed; modified in-place.
117 options : `NodeAttributeOptions`
118 Properties of nodes that should be considered when determining whether
119 they are similar enough to be merged. Only the truthiness of
120 attributes is considered (e.g. ``options.dimensions == 'full'`` and
121 ``options.dimensions == 'concise'`` are both interpreted to mean "only
122 merge trees with the same dimensions"). This is typically the same set
123 of options that controls whether to display these attributes in the
124 graph visualization.
125 depth : `int`
126 How many nodes to traverse from the beginning of the graph before
127 terminating the merging algorithm.
128 """
129 groups = _make_tree_merge_groups(xgraph.reverse(copy=False), options, depth)
130 _apply_tree_merges(xgraph, groups)
133def merge_graph_intermediates(
134 xgraph: networkx.DiGraph | networkx.MultiDiGraph, options: NodeAttributeOptions
135) -> None:
136 """Merge parallel interior nodes of a graph with similar properties.
138 Parameters
139 ----------
140 xgraph : `networkx.DiGraph` or `networkx.MultiDiGraph`
141 Graph to be processed; modified in-place.
142 options : `NodeAttributeOptions`
143 Properties of nodes that should be considered when determining whether
144 they are similar enough to be merged. Only the truthiness of
145 attributes is considered (e.g. ``options.dimensions == 'full'`` and
146 ``options.dimensions == 'concise'`` are both interpreted to mean "only
147 merge trees with the same dimensions"). This is typically the same set
148 of options that controls whether to display these attributes in the
149 graph visualization.
151 Notes
152 -----
153 "Parallel" nodes here are nodes that have the exact same predecessor and
154 successors.
155 """
156 groups: dict[_MergeKey, set[NodeKey]] = defaultdict(set)
157 for node, state in xgraph.nodes.items():
158 merge_key = _MergeKey.from_node_state(
159 state,
160 xgraph.predecessors(node),
161 xgraph.successors(node),
162 options,
163 )
164 if merge_key.parents and merge_key.children:
165 groups[merge_key].add(node)
166 replacements: dict[NodeKey, MergedNodeKey] = {}
167 for merge_key, members in groups.items():
168 if len(members) < 2:
169 continue
170 new_node_key = MergedNodeKey(frozenset(members))
171 xgraph.add_node(
172 new_node_key,
173 storage_class_name=merge_key.storage_class_name,
174 task_class_name=merge_key.task_class_name,
175 dimensions=merge_key.dimensions,
176 )
177 for parent in merge_key.parents:
178 xgraph.add_edge(replacements.get(parent, parent), new_node_key)
179 for child in merge_key.children:
180 xgraph.add_edge(new_node_key, replacements.get(child, child))
181 for member in members:
182 replacements[member] = new_node_key
183 xgraph.remove_nodes_from(members)
186@dataclasses.dataclass(frozen=True)
187class _MergeKey:
188 """A helper class for merge algorithms that is used as a dictionary key
189 when grouping nodes that may be merged by their attributes.
190 """
192 parents: frozenset[Any]
193 """Nodes of the original graph that are successors or predecessors of
194 the nodes being considered for merging.
195 """
197 dimensions: DimensionGroup | None
198 """Dimensions of the nodes being considered for merging, or `None` if
199 dimensions are not included in the similarity criteria.
200 """
202 storage_class_name: str | None
203 """Storage class of the nodes being considered for merging, or `None` if
204 storage classes are not included in the similarity criteria or this is a
205 task or task initialization node group.
206 """
208 task_class_name: str | None
209 """Name of the task class for the nodes being considered for merging, or
210 `None` if task classes are not included in the similarity criteria or
211 this is a dataset type node group.
212 """
214 children: frozenset[Any]
215 """Nodes that are predecessors or successors (the opposite of ``parents``
216 of the nodes being considered for merging.
218 In the `merge_graph_intermediates` algorithm, these are regular unmerged
219 nodes. In the `merge_graph_input_trees` or `merge_graph_output_trees`
220 algorithms, these are more `_MergeKey` instances, representing
221 already-processed trees.
222 """
224 @classmethod
225 def from_node_state[P, C](
226 cls,
227 state: dict[str, Any],
228 parents: Iterable[P],
229 children: Iterable[C],
230 options: NodeAttributeOptions,
231 ) -> _MergeKey:
232 """Construct from a NetworkX node attribute state dictionary.
234 Parameters
235 ----------
236 state : `dict`
237 Dictionary used to hold NetworkX node attributes.
238 parents : `~collections.abc.Iterable` [ `NodeKey` ]
239 Predecessor or successor nodes (depending on the orientation of
240 the algorithm).
241 children : ~collections.abc.Iterable`
242 Successor or predecessor nodes (depending on the orientation of
243 the algorithm).
244 options : `NodeAttributeOptions`
245 Options for which node attributes to include in the new key.
246 """
247 return cls(
248 parents=frozenset(parents),
249 dimensions=state.get("dimensions"),
250 storage_class_name=(state.get("storage_class_name") if options.storage_classes else None),
251 task_class_name=(state.get("task_class_name") if options.task_classes else None),
252 children=frozenset(children),
253 )
256def _make_tree_merge_groups(
257 xgraph: networkx.DiGraph | networkx.MultiDiGraph,
258 options: NodeAttributeOptions,
259 depth: int,
260) -> list[dict[_MergeKey, set[NodeKey]]]:
261 """First-stage implementation of `merge_graph_input_trees` and
262 (when run on the reversed graph) `merge_graph_output_trees`.
263 """
264 # Our goal is to obtain mappings that groups trees of nodes by the
265 # attributes in a _TreeMergeKey. The nested dictionaries are the root of a
266 # tree and the nodes under that root, recursively (but not including the
267 # root). We nest these mappings inside a list, which each mapping
268 # corresponding to a different depth for the trees it represents. We start
269 # with a special empty dict for "0-depth trees", since that makes
270 # result[depth] valid and hence off-by-one errors less likely.
271 result: list[dict[_MergeKey, set[NodeKey]]] = [{}]
272 if depth == 0:
273 return result
274 # We start with the nodes that have no predecessors in the graph.
275 # Ignore for now the fact that the 'current_candidates' data structure
276 # we process is actually a dict that associates each of those nodes
277 # with an empty dict. All of these initial nodes are valid trees,
278 # since they're just single nodes.
279 first_generation = next(networkx.algorithms.dag.topological_generations(xgraph))
280 current_candidates: dict[NodeKey, dict[NodeKey, _MergeKey]] = dict.fromkeys(first_generation, {})
281 # Set up an outer loop over tree depth; we'll construct a new set of
282 # candidates at each iteration.
283 while current_candidates:
284 # As we go, we'll remember nodes that have just one predecessor, as
285 # those predecessors might be the roots of slightly taller trees.
286 # We store the successors and their merge keys under them.
287 next_candidates: dict[NodeKey, dict[NodeKey, _MergeKey]] = defaultdict(dict)
288 # We also want to track the nodes the level up that are not trees
289 # because some node has both them and some other node as a
290 # predecessor.
291 nontrees: set[NodeKey] = set()
292 # Make a dictionary for the results at this depth, then start the
293 # inner iteration over candidates and (after the first iteration)
294 # their children.
295 result_for_depth: dict[_MergeKey, set[NodeKey]] = defaultdict(set)
296 for node, children in current_candidates.items():
297 # Make a _TreeMergeKey for this node and add it to the results for
298 # this depth. Two nodes with the same _TreeMergeKey are roots of
299 # isomorphic trees that have the same predecessor(s), and can be
300 # merged (with isomorphism defined as both both structure and
301 # whatever comparisons are in 'options').
302 merge_key = _MergeKey.from_node_state(
303 xgraph.nodes[node], xgraph.successors(node), children.values(), options
304 )
305 result_for_depth[merge_key].add(node)
306 if len(result) <= depth:
307 # See if this node's successor might be the root of a
308 # larger tree.
309 if len(merge_key.parents) == 1:
310 (parent,) = merge_key.parents
311 next_candidates[parent][node] = dataclasses.replace(merge_key, parents=frozenset())
312 else:
313 nontrees.update(merge_key.parents)
314 # Append the results for this depth.
315 result.append(result_for_depth)
316 # Trim out candidates that aren't trees after all.
317 for nontree_node in nontrees & next_candidates.keys():
318 del next_candidates[nontree_node]
319 current_candidates = next_candidates
320 return result
323def _apply_tree_merges(
324 xgraph: networkx.DiGraph | networkx.MultiDiGraph,
325 groups: list[dict[_MergeKey, set[NodeKey]]],
326) -> None:
327 """Second-stage implementation of `merge_graph_input_trees` and
328 `merge_graph_output_trees`.
329 """
330 replacements: dict[NodeKey, MergedNodeKey] = {}
331 for group in reversed(groups):
332 new_group: dict[_MergeKey, set[NodeKey]] = defaultdict(set)
333 for merge_key, members in group.items():
334 if merge_key.parents & replacements.keys():
335 replaced_parents = frozenset(replacements.get(p, p) for p in merge_key.parents)
336 new_group[dataclasses.replace(merge_key, parents=replaced_parents)].update(members)
337 else:
338 new_group[merge_key].update(members)
339 for merge_key, members in new_group.items():
340 if len(members) < 2:
341 continue
342 new_node_key = MergedNodeKey(frozenset(members))
343 new_edges: set[tuple[NodeKey | MergedNodeKey, NodeKey | MergedNodeKey]] = set()
344 for member_key in members:
345 replacements[member_key] = new_node_key
346 new_edges.update(
347 (replacements.get(a, a), replacements.get(b, b)) for a, b in xgraph.in_edges(member_key)
348 )
349 new_edges.update(
350 (replacements.get(a, a), replacements.get(b, b)) for a, b in xgraph.out_edges(member_key)
351 )
352 xgraph.add_node(
353 new_node_key,
354 storage_class_name=merge_key.storage_class_name,
355 task_class_name=merge_key.task_class_name,
356 dimensions=merge_key.dimensions,
357 )
358 xgraph.add_edges_from(new_edges)
359 xgraph.remove_nodes_from(replacements.keys())