Coverage for python / lsst / pipe / base / pipeline_graph / visualization / _merge.py: 26%

112 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:47 +0000

1# This file is part of pipe_base. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27from __future__ import annotations 

28 

29__all__ = ( 

30 "MergedNodeKey", 

31 "merge_graph_input_trees", 

32 "merge_graph_intermediates", 

33 "merge_graph_output_trees", 

34) 

35 

36import dataclasses 

37import hashlib 

38from collections import defaultdict 

39from collections.abc import Iterable 

40from functools import cached_property 

41from typing import Any 

42 

43import networkx 

44import networkx.algorithms.dag 

45import networkx.algorithms.tree 

46 

47from lsst.daf.butler import DimensionGroup 

48 

49from .._nodes import NodeKey, NodeType 

50from ._options import NodeAttributeOptions 

51 

52 

53class MergedNodeKey(frozenset[NodeKey]): 

54 """A key for NetworkX graph nodes that represent multiple similar tasks 

55 or dataset types that have been merged to simplify graph visualization. 

56 """ 

57 

58 def __str__(self) -> str: 

59 members = [str(k) for k in self] 

60 members.sort(reverse=True) 

61 return ", ".join(members) 

62 

63 @property 

64 def node_type(self) -> NodeType: 

65 """Enum value for whether this is a task, task initialization, or 

66 dataset type node. 

67 """ 

68 return next(iter(self)).node_type 

69 

70 @cached_property 

71 def node_id(self) -> str: 

72 """A unique string representation of the merged node key.""" 

73 hasher = hashlib.blake2b(digest_size=4) 

74 for member in sorted(self): 

75 hasher.update(str(member).encode()) 

76 return f"{hasher.digest().hex()}:{self.node_type.value}" 

77 

78 

79def merge_graph_input_trees( 

80 xgraph: networkx.DiGraph | networkx.MultiDiGraph, options: NodeAttributeOptions, depth: int 

81) -> None: 

82 """Merge trees of overall-input dataset type nodes and/or 

83 beginning-of-pipeline task nodes that have similar properties and the same 

84 structure. 

85 

86 Parameters 

87 ---------- 

88 xgraph : `networkx.DiGraph` or `networkx.MultiDiGraph` 

89 Graph to be processed; modified in-place. 

90 options : `NodeAttributeOptions` 

91 Properties of nodes that should be considered when determining whether 

92 they are similar enough to be merged. Only the truthiness of 

93 attributes is considered (e.g. ``options.dimensions == 'full'`` and 

94 ``options.dimensions == 'concise'`` are both interpreted to mean "only 

95 merge trees with the same dimensions"). This is typically the same set 

96 of options that controls whether to display these attributes in the 

97 graph visualization. 

98 depth : `int` 

99 How many nodes to traverse from the beginning of the graph before 

100 terminating the merging algorithm. 

101 """ 

102 groups = _make_tree_merge_groups(xgraph, options, depth) 

103 _apply_tree_merges(xgraph, groups) 

104 

105 

106def merge_graph_output_trees( 

107 xgraph: networkx.DiGraph | networkx.MultiDiGraph, options: NodeAttributeOptions, depth: int 

108) -> None: 

109 """Merge trees of overall-output dataset type nodes and/or 

110 end-of-pipeline task nodes that have similar properties and the same 

111 structure. 

112 

113 Parameters 

114 ---------- 

115 xgraph : `networkx.DiGraph` or `networkx.MultiDiGraph` 

116 Graph to be processed; modified in-place. 

117 options : `NodeAttributeOptions` 

118 Properties of nodes that should be considered when determining whether 

119 they are similar enough to be merged. Only the truthiness of 

120 attributes is considered (e.g. ``options.dimensions == 'full'`` and 

121 ``options.dimensions == 'concise'`` are both interpreted to mean "only 

122 merge trees with the same dimensions"). This is typically the same set 

123 of options that controls whether to display these attributes in the 

124 graph visualization. 

125 depth : `int` 

126 How many nodes to traverse from the beginning of the graph before 

127 terminating the merging algorithm. 

128 """ 

129 groups = _make_tree_merge_groups(xgraph.reverse(copy=False), options, depth) 

130 _apply_tree_merges(xgraph, groups) 

131 

132 

133def merge_graph_intermediates( 

134 xgraph: networkx.DiGraph | networkx.MultiDiGraph, options: NodeAttributeOptions 

135) -> None: 

136 """Merge parallel interior nodes of a graph with similar properties. 

137 

138 Parameters 

139 ---------- 

140 xgraph : `networkx.DiGraph` or `networkx.MultiDiGraph` 

141 Graph to be processed; modified in-place. 

142 options : `NodeAttributeOptions` 

143 Properties of nodes that should be considered when determining whether 

144 they are similar enough to be merged. Only the truthiness of 

145 attributes is considered (e.g. ``options.dimensions == 'full'`` and 

146 ``options.dimensions == 'concise'`` are both interpreted to mean "only 

147 merge trees with the same dimensions"). This is typically the same set 

148 of options that controls whether to display these attributes in the 

149 graph visualization. 

150 

151 Notes 

152 ----- 

153 "Parallel" nodes here are nodes that have the exact same predecessor and 

154 successors. 

155 """ 

156 groups: dict[_MergeKey, set[NodeKey]] = defaultdict(set) 

157 for node, state in xgraph.nodes.items(): 

158 merge_key = _MergeKey.from_node_state( 

159 state, 

160 xgraph.predecessors(node), 

161 xgraph.successors(node), 

162 options, 

163 ) 

164 if merge_key.parents and merge_key.children: 

165 groups[merge_key].add(node) 

166 replacements: dict[NodeKey, MergedNodeKey] = {} 

167 for merge_key, members in groups.items(): 

168 if len(members) < 2: 

169 continue 

170 new_node_key = MergedNodeKey(frozenset(members)) 

171 xgraph.add_node( 

172 new_node_key, 

173 storage_class_name=merge_key.storage_class_name, 

174 task_class_name=merge_key.task_class_name, 

175 dimensions=merge_key.dimensions, 

176 ) 

177 for parent in merge_key.parents: 

178 xgraph.add_edge(replacements.get(parent, parent), new_node_key) 

179 for child in merge_key.children: 

180 xgraph.add_edge(new_node_key, replacements.get(child, child)) 

181 for member in members: 

182 replacements[member] = new_node_key 

183 xgraph.remove_nodes_from(members) 

184 

185 

186@dataclasses.dataclass(frozen=True) 

187class _MergeKey: 

188 """A helper class for merge algorithms that is used as a dictionary key 

189 when grouping nodes that may be merged by their attributes. 

190 """ 

191 

192 parents: frozenset[Any] 

193 """Nodes of the original graph that are successors or predecessors of 

194 the nodes being considered for merging. 

195 """ 

196 

197 dimensions: DimensionGroup | None 

198 """Dimensions of the nodes being considered for merging, or `None` if 

199 dimensions are not included in the similarity criteria. 

200 """ 

201 

202 storage_class_name: str | None 

203 """Storage class of the nodes being considered for merging, or `None` if 

204 storage classes are not included in the similarity criteria or this is a 

205 task or task initialization node group. 

206 """ 

207 

208 task_class_name: str | None 

209 """Name of the task class for the nodes being considered for merging, or 

210 `None` if task classes are not included in the similarity criteria or 

211 this is a dataset type node group. 

212 """ 

213 

214 children: frozenset[Any] 

215 """Nodes that are predecessors or successors (the opposite of ``parents`` 

216 of the nodes being considered for merging. 

217 

218 In the `merge_graph_intermediates` algorithm, these are regular unmerged 

219 nodes. In the `merge_graph_input_trees` or `merge_graph_output_trees` 

220 algorithms, these are more `_MergeKey` instances, representing 

221 already-processed trees. 

222 """ 

223 

224 @classmethod 

225 def from_node_state[P, C]( 

226 cls, 

227 state: dict[str, Any], 

228 parents: Iterable[P], 

229 children: Iterable[C], 

230 options: NodeAttributeOptions, 

231 ) -> _MergeKey: 

232 """Construct from a NetworkX node attribute state dictionary. 

233 

234 Parameters 

235 ---------- 

236 state : `dict` 

237 Dictionary used to hold NetworkX node attributes. 

238 parents : `~collections.abc.Iterable` [ `NodeKey` ] 

239 Predecessor or successor nodes (depending on the orientation of 

240 the algorithm). 

241 children : ~collections.abc.Iterable` 

242 Successor or predecessor nodes (depending on the orientation of 

243 the algorithm). 

244 options : `NodeAttributeOptions` 

245 Options for which node attributes to include in the new key. 

246 """ 

247 return cls( 

248 parents=frozenset(parents), 

249 dimensions=state.get("dimensions"), 

250 storage_class_name=(state.get("storage_class_name") if options.storage_classes else None), 

251 task_class_name=(state.get("task_class_name") if options.task_classes else None), 

252 children=frozenset(children), 

253 ) 

254 

255 

256def _make_tree_merge_groups( 

257 xgraph: networkx.DiGraph | networkx.MultiDiGraph, 

258 options: NodeAttributeOptions, 

259 depth: int, 

260) -> list[dict[_MergeKey, set[NodeKey]]]: 

261 """First-stage implementation of `merge_graph_input_trees` and 

262 (when run on the reversed graph) `merge_graph_output_trees`. 

263 """ 

264 # Our goal is to obtain mappings that groups trees of nodes by the 

265 # attributes in a _TreeMergeKey. The nested dictionaries are the root of a 

266 # tree and the nodes under that root, recursively (but not including the 

267 # root). We nest these mappings inside a list, which each mapping 

268 # corresponding to a different depth for the trees it represents. We start 

269 # with a special empty dict for "0-depth trees", since that makes 

270 # result[depth] valid and hence off-by-one errors less likely. 

271 result: list[dict[_MergeKey, set[NodeKey]]] = [{}] 

272 if depth == 0: 

273 return result 

274 # We start with the nodes that have no predecessors in the graph. 

275 # Ignore for now the fact that the 'current_candidates' data structure 

276 # we process is actually a dict that associates each of those nodes 

277 # with an empty dict. All of these initial nodes are valid trees, 

278 # since they're just single nodes. 

279 first_generation = next(networkx.algorithms.dag.topological_generations(xgraph)) 

280 current_candidates: dict[NodeKey, dict[NodeKey, _MergeKey]] = dict.fromkeys(first_generation, {}) 

281 # Set up an outer loop over tree depth; we'll construct a new set of 

282 # candidates at each iteration. 

283 while current_candidates: 

284 # As we go, we'll remember nodes that have just one predecessor, as 

285 # those predecessors might be the roots of slightly taller trees. 

286 # We store the successors and their merge keys under them. 

287 next_candidates: dict[NodeKey, dict[NodeKey, _MergeKey]] = defaultdict(dict) 

288 # We also want to track the nodes the level up that are not trees 

289 # because some node has both them and some other node as a 

290 # predecessor. 

291 nontrees: set[NodeKey] = set() 

292 # Make a dictionary for the results at this depth, then start the 

293 # inner iteration over candidates and (after the first iteration) 

294 # their children. 

295 result_for_depth: dict[_MergeKey, set[NodeKey]] = defaultdict(set) 

296 for node, children in current_candidates.items(): 

297 # Make a _TreeMergeKey for this node and add it to the results for 

298 # this depth. Two nodes with the same _TreeMergeKey are roots of 

299 # isomorphic trees that have the same predecessor(s), and can be 

300 # merged (with isomorphism defined as both both structure and 

301 # whatever comparisons are in 'options'). 

302 merge_key = _MergeKey.from_node_state( 

303 xgraph.nodes[node], xgraph.successors(node), children.values(), options 

304 ) 

305 result_for_depth[merge_key].add(node) 

306 if len(result) <= depth: 

307 # See if this node's successor might be the root of a 

308 # larger tree. 

309 if len(merge_key.parents) == 1: 

310 (parent,) = merge_key.parents 

311 next_candidates[parent][node] = dataclasses.replace(merge_key, parents=frozenset()) 

312 else: 

313 nontrees.update(merge_key.parents) 

314 # Append the results for this depth. 

315 result.append(result_for_depth) 

316 # Trim out candidates that aren't trees after all. 

317 for nontree_node in nontrees & next_candidates.keys(): 

318 del next_candidates[nontree_node] 

319 current_candidates = next_candidates 

320 return result 

321 

322 

323def _apply_tree_merges( 

324 xgraph: networkx.DiGraph | networkx.MultiDiGraph, 

325 groups: list[dict[_MergeKey, set[NodeKey]]], 

326) -> None: 

327 """Second-stage implementation of `merge_graph_input_trees` and 

328 `merge_graph_output_trees`. 

329 """ 

330 replacements: dict[NodeKey, MergedNodeKey] = {} 

331 for group in reversed(groups): 

332 new_group: dict[_MergeKey, set[NodeKey]] = defaultdict(set) 

333 for merge_key, members in group.items(): 

334 if merge_key.parents & replacements.keys(): 

335 replaced_parents = frozenset(replacements.get(p, p) for p in merge_key.parents) 

336 new_group[dataclasses.replace(merge_key, parents=replaced_parents)].update(members) 

337 else: 

338 new_group[merge_key].update(members) 

339 for merge_key, members in new_group.items(): 

340 if len(members) < 2: 

341 continue 

342 new_node_key = MergedNodeKey(frozenset(members)) 

343 new_edges: set[tuple[NodeKey | MergedNodeKey, NodeKey | MergedNodeKey]] = set() 

344 for member_key in members: 

345 replacements[member_key] = new_node_key 

346 new_edges.update( 

347 (replacements.get(a, a), replacements.get(b, b)) for a, b in xgraph.in_edges(member_key) 

348 ) 

349 new_edges.update( 

350 (replacements.get(a, a), replacements.get(b, b)) for a, b in xgraph.out_edges(member_key) 

351 ) 

352 xgraph.add_node( 

353 new_node_key, 

354 storage_class_name=merge_key.storage_class_name, 

355 task_class_name=merge_key.task_class_name, 

356 dimensions=merge_key.dimensions, 

357 ) 

358 xgraph.add_edges_from(new_edges) 

359 xgraph.remove_nodes_from(replacements.keys())