Coverage for python / lsst / ctrl / bps / quantum_clustering_funcs.py: 5%

276 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-01 08:22 +0000

1# This file is part of ctrl_bps. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <https://www.gnu.org/licenses/>. 

27 

28"""Functions that convert QuantumGraph into ClusteredQuantumGraph.""" 

29 

30__all__ = ["check_clustering_config"] 

31 

32import logging 

33import re 

34import uuid 

35from collections import defaultdict 

36from typing import Any 

37from uuid import UUID 

38 

39from networkx import DiGraph, NetworkXNoCycle, find_cycle, topological_sort 

40 

41from lsst.pipe.base.quantum_graph import PredictedQuantumGraph, QuantumInfo 

42 

43from . import BpsConfig, ClusteredQuantumGraph, QuantaCluster 

44 

45_LOG = logging.getLogger(__name__) 

46 

47 

48def single_quantum_clustering( 

49 config: BpsConfig, qgraph: PredictedQuantumGraph, name: str 

50) -> ClusteredQuantumGraph: 

51 """Create clusters with only single quantum. 

52 

53 Parameters 

54 ---------- 

55 config : `lsst.ctrl.bps.BpsConfig` 

56 BPS configuration. 

57 qgraph : `lsst.pipe.base.quantum_graph.PredictedQuantumGraph` 

58 Quantum graph to break into clusters for ClusteredQuantumGraph. 

59 name : `str` 

60 Name to give to ClusteredQuantumGraph. 

61 

62 Returns 

63 ------- 

64 clustered_quantum : `lsst.ctrl.bps.ClusteredQuantumGraph` 

65 ClusteredQuantumGraph with single quantum per cluster created from 

66 given QuantumGraph. 

67 """ 

68 cqgraph = ClusteredQuantumGraph( 

69 name=name, 

70 qgraph=qgraph, 

71 qgraph_filename=config[".bps_defined.runQgraphFile"], 

72 ) 

73 

74 # Save mapping of quantum nodeNumber to name so don't have to create it 

75 # multiple times. 

76 number_to_name = {} 

77 

78 # Cache template per label for speed. 

79 cached_template = {} 

80 

81 # Create cluster of single quantum. 

82 for quantum_id, quantum_info in cqgraph.qxgraph.nodes.items(): 

83 task_label = quantum_info["task_label"] 

84 if task_label not in cached_template: 

85 found, template_data_id = config.search( 

86 "templateDataId", 

87 opt={"curvals": {"curr_pipetask": task_label}, "replaceVars": False}, 

88 ) 

89 if found: 

90 template = "{label}_" + template_data_id 

91 _, use_node_number = config.search( 

92 "useNodeIdInClusterName", 

93 opt={ 

94 "curvals": {"curr_pipetask": task_label}, 

95 "replaceVars": False, 

96 "default": True, 

97 }, 

98 ) 

99 if use_node_number: 

100 template = "{node_number}_" + template 

101 else: 

102 template = "{node_number}" 

103 cached_template[task_label] = template 

104 

105 cluster = QuantaCluster.from_quantum_info(quantum_id, quantum_info, cached_template[task_label]) 

106 

107 # Save mapping for use when creating dependencies. 

108 number_to_name[quantum_id] = cluster.name 

109 

110 cqgraph.add_cluster(cluster) 

111 

112 # Add cluster dependencies. 

113 for quantum_id in cqgraph.qxgraph: 

114 # Get child nodes. 

115 children = cqgraph.qxgraph.successors(quantum_id) 

116 for child in children: 

117 cqgraph.add_dependency(number_to_name[quantum_id], number_to_name[child]) 

118 

119 return cqgraph 

120 

121 

122def check_clustering_config( 

123 cluster_config: BpsConfig, task_graph: DiGraph 

124) -> tuple[list[str], dict[str, list[str]]]: 

125 """Check cluster definitions in terms of pipetask lists. 

126 

127 Parameters 

128 ---------- 

129 cluster_config : `lsst.ctrl.bps.BpsConfig` 

130 The cluster section from the BPS configuration. 

131 task_graph : `networkx.DiGraph` 

132 Directed graph of tasks. 

133 

134 Returns 

135 ------- 

136 cluster_labels: `list` [`str`] 

137 Dependency ordered list of cluster labels (includes 

138 single quantum clusters). 

139 ordered_tasks : `dict` [`str`, `networkx.DiGraph`] 

140 Mapping of cluster label to task subgraph. 

141 

142 Raises 

143 ------ 

144 RuntimeError 

145 Raised if task label appears in more than one cluster def or 

146 if there's a cycle in the cluster defs. 

147 """ 

148 # Build a PipelineTask graph of just labels because PipelineGraph 

149 # methods revolve around NodeKey instead of labels. 

150 label_graph = DiGraph() 

151 for node_key in task_graph: 

152 label_graph.add_node(node_key.name) 

153 for parent in task_graph.predecessors(node_key): 

154 label_graph.add_edge(parent.name, node_key.name) 

155 

156 # Build a "clustered" task graph to check for cycle. 

157 task_to_cluster = {} 

158 used_labels = set() 

159 clustered_task_graph = DiGraph() 

160 ordered_tasks = {} # cluster label to ordered list of task labels 

161 

162 # Create clusters based on given configuration. 

163 for cluster_label in cluster_config: 

164 _LOG.debug("cluster = %s", cluster_label) 

165 cluster_tasks = [pt.strip() for pt in cluster_config[cluster_label]["pipetasks"].split(",")] 

166 cluster_tasks_in_qgraph = [] 

167 for task_label in cluster_tasks: 

168 if task_label in used_labels: 

169 raise RuntimeError( 

170 f"Task label {task_label} appears in more than one cluster definition. " 

171 "Aborting submission." 

172 ) 

173 # Only check cluster defs that affect the QuantumGraph 

174 if label_graph.has_node(task_label): 

175 cluster_tasks_in_qgraph.append(task_label) 

176 used_labels.add(task_label) 

177 task_to_cluster[task_label] = cluster_label 

178 

179 if cluster_tasks_in_qgraph: 

180 # Ensure have list of tasks in dependency order. 

181 quantum_subgraph = label_graph.subgraph(cluster_tasks_in_qgraph) 

182 ordered_tasks[cluster_label] = quantum_subgraph 

183 

184 clustered_task_graph.add_node(cluster_label) 

185 

186 # Create single task clusters for tasks not covered by clusters. 

187 for label in label_graph: 

188 if label not in used_labels: 

189 task_to_cluster[label] = label 

190 clustered_task_graph.add_node(label) 

191 ordered_tasks[label] = label_graph.subgraph([label]) 

192 

193 # Create dependencies between clusters. 

194 for edge in task_graph.edges: 

195 if task_to_cluster[edge[0].name] != task_to_cluster[edge[1].name]: 

196 clustered_task_graph.add_edge(task_to_cluster[edge[0].name], task_to_cluster[edge[1].name]) 

197 

198 _LOG.debug("clustered_task_graph.edges = %s", list(clustered_task_graph.edges)) 

199 

200 # Check if DAG: DiGraph enforces direction, so need to check for cycles 

201 try: 

202 cycle = find_cycle(clustered_task_graph) 

203 except NetworkXNoCycle: 

204 _LOG.debug("Did not find a cycle in the clustered_task_graph") 

205 else: 

206 _LOG.error( 

207 "Found cycle when making clusters: %s. Typically this means a PipelineTask needs to be added to" 

208 " a cluster or removed from a cluster.", 

209 cycle, 

210 ) 

211 raise RuntimeError("Cluster pipetasks do not create a DAG") 

212 

213 return list(topological_sort(clustered_task_graph)), ordered_tasks 

214 

215 

216def dimension_clustering( 

217 config: BpsConfig, qgraph: PredictedQuantumGraph, name: str 

218) -> ClusteredQuantumGraph: 

219 """Follow config instructions to make clusters based upon dimensions. 

220 

221 Parameters 

222 ---------- 

223 config : `lsst.ctrl.bps.BpsConfig` 

224 BPS configuration. 

225 qgraph : `lsst.pipe.base.quantum_graph.PredictedQuantumGraph` 

226 Quantum graph to break into clusters for ClusteredQuantumGraph. 

227 name : `str` 

228 Name to give to ClusteredQuantumGraph. 

229 

230 Returns 

231 ------- 

232 cqgraph : `lsst.ctrl.bps.ClusteredQuantumGraph` 

233 ClusteredQuantumGraph with clustering as defined in config. 

234 """ 

235 cqgraph = ClusteredQuantumGraph( 

236 name=name, 

237 qgraph=qgraph, 

238 qgraph_filename=config[".bps_defined.runQgraphFile"], 

239 ) 

240 

241 # save mapping in order to create dependencies later 

242 quantum_to_cluster: dict[UUID, str] = {} 

243 

244 cluster_section = config["cluster"] 

245 cluster_labels, ordered_tasks = check_clustering_config( 

246 cluster_section, qgraph.pipeline_graph.make_task_xgraph() 

247 ) 

248 for cluster_label in cluster_labels: 

249 _LOG.debug("cluster = %s", cluster_label) 

250 if cluster_label in cluster_section: 

251 if "findDependencyMethod" in cluster_section[cluster_label]: 

252 add_func = add_dim_clusters_dependency 

253 else: 

254 add_func = add_dim_clusters 

255 

256 add_func( 

257 cluster_section[cluster_label], 

258 cluster_label, 

259 qgraph, 

260 ordered_tasks, 

261 cqgraph, 

262 quantum_to_cluster, 

263 ) 

264 else: 

265 add_clusters_per_quantum(config, cluster_label, qgraph, cqgraph, quantum_to_cluster) 

266 

267 return cqgraph 

268 

269 

270def add_clusters_per_quantum( 

271 config: BpsConfig, 

272 label: str, 

273 qgraph: PredictedQuantumGraph, 

274 cqgraph: ClusteredQuantumGraph, 

275 quantum_to_cluster: dict[UUID, str], 

276) -> None: 

277 """Add 1-quantum clusters for a task to a ClusteredQuantumGraph. 

278 

279 Parameters 

280 ---------- 

281 config : `lsst.ctrl.bps.BpsConfig` 

282 BPS configuration. 

283 label : `str` 

284 The taskDef label for which to add clusters. 

285 qgraph : `lsst.pipe.base.quantum_graph.PredictedQuantumGraph` 

286 Quantum graph providing quanta for the clusters. 

287 cqgraph : `lsst.ctrl.bps.ClusteredQuantumGraph` 

288 The ClusteredQuantumGraph to which the new 1-quantum 

289 clusters are added (modified in method). 

290 quantum_to_cluster : `dict` [`uuid.UUID`, `str`] 

291 Mapping of quantum node id to which cluster it was added 

292 (modified in method). 

293 """ 

294 _LOG.info("Creating 1-quantum clusters for task %s", label) 

295 found, template_data_id = config.search( 

296 "templateDataId", opt={"curvals": {"curr_pipetask": label}, "replaceVars": False} 

297 ) 

298 if found: 

299 template = "{label}_" + template_data_id 

300 _, use_node_number = config.search( 

301 "useNodeIdInClusterName", 

302 opt={ 

303 "curvals": {"curr_pipetask": label}, 

304 "replaceVars": False, 

305 "default": True, 

306 }, 

307 ) 

308 if use_node_number: 

309 template = "{node_number}_" + template 

310 else: 

311 template = "{node_number}" 

312 

313 for quantum_id in qgraph.quanta_by_task[label].values(): 

314 cluster = QuantaCluster.from_quantum_info(quantum_id, cqgraph.qxgraph.nodes[quantum_id], template) 

315 cqgraph.add_cluster(cluster) 

316 quantum_to_cluster[quantum_id] = cluster.name 

317 add_cluster_dependencies(cqgraph, cluster, quantum_to_cluster) 

318 

319 

320def _get_dim_config_settings( 

321 cluster_config: BpsConfig, 

322) -> tuple[list[str], list[tuple[str, str]], list[str]]: 

323 """Parse dimension-related cluster configuration. 

324 

325 Parameters 

326 ---------- 

327 cluster_config : `lsst.ctrl.bps.BpsConfig` 

328 BPS configuration for specific cluster label. 

329 

330 Returns 

331 ------- 

332 cluster_dims : `list` [`str`] 

333 Names for dimensions which should be used for clustering. 

334 equal_dims : `list` [`tuple` [`str`, `str`]] 

335 Names for dimensions which should be considered equivalent. 

336 partition_dims : `list` [`str`] 

337 Names for dimensions which should be used for partitioning clusters. 

338 """ 

339 cluster_dims = [] 

340 if "dimensions" in cluster_config: 

341 cluster_dims = [d.strip() for d in cluster_config["dimensions"].split(",")] 

342 _LOG.debug("cluster_dims = %s", cluster_dims) 

343 

344 equal_dims = [] 

345 if "equalDimensions" in cluster_config: 

346 equal_dims = [pt.strip().split(":") for pt in cluster_config["equalDimensions"].split(",")] 

347 _LOG.debug("equal_dims = %s", equal_dims) 

348 

349 partition_dims = [] 

350 if "partitionDimensions" in cluster_config: 

351 partition_dims = [d.strip() for d in cluster_config["partitionDimensions"].split(",")] 

352 _LOG.debug("partition_dims = %s", partition_dims) 

353 

354 return cluster_dims, equal_dims, partition_dims 

355 

356 

357def partition_cluster_values( 

358 cluster_config: BpsConfig, cluster_label: str, partition_values: set[Any], num_cluster_values: int 

359) -> dict[Any, int]: 

360 """Partition given values into appropriately sized chunks. 

361 

362 Parameters 

363 ---------- 

364 cluster_config : `lsst.ctrl.bps.BpsConfig` 

365 BPS configuration for specific cluster label. 

366 cluster_label : `str` 

367 Cluster label for which to add clusters. 

368 partition_values : `set` [`~typing.Any`] 

369 Values that would be internal to clusters to be used in partitioning. 

370 num_cluster_values : `int` 

371 Number of values used when doing main clustering. 

372 

373 Returns 

374 ------- 

375 partition_key_to_id : `dict` [`~typing.Any`, `int`] 

376 Mapping of a value to a partition id. 

377 

378 Raises 

379 ------ 

380 KeyError 

381 When missing one of the partitioning config values or when specifying 

382 multiple sizing values. 

383 RuntimeError 

384 When number of cluster dimension values is larger than 

385 partitionMaxClusters. 

386 """ 

387 _, partition_max_clusters = cluster_config.search("partitionMaxClusters") 

388 _, partition_max_size = cluster_config.search("partitionMaxSize") 

389 _LOG.debug("partition_max_clusters = %s", partition_max_clusters) 

390 _LOG.debug("partition_max_size = %s", partition_max_size) 

391 

392 if not partition_max_clusters and not partition_max_size: 

393 raise KeyError( 

394 f"Defined partitionDimensions for cluster {cluster_label}, but missing one of the following:" 

395 "partitionMaxClusters or partitionMaxSize" 

396 ) 

397 elif partition_max_clusters and partition_max_size: 

398 raise KeyError( 

399 f"Defined more than one of the following for cluster {cluster_label}: " 

400 "partitionMaxClusters and partitionMaxSize" 

401 ) 

402 

403 partition_max_chunks = 0 

404 if partition_max_clusters: 

405 # User wants to define the total number of clusters for this label 

406 # which translates into jobs. 

407 if num_cluster_values > partition_max_clusters: 

408 raise RuntimeError( 

409 f"Cluster {cluster_label}: partitionMaxClusters ({partition_max_clusters}) " 

410 f"must same or larger than the number of cluster dimension values ({num_cluster_values})" 

411 ) 

412 

413 partition_max_chunks = partition_max_clusters // num_cluster_values 

414 _LOG.debug("max_chunks = %s", partition_max_chunks) 

415 

416 if partition_max_chunks: 

417 max_size, remainder = divmod(len(partition_values), partition_max_chunks) 

418 else: 

419 max_size = partition_max_size 

420 remainder = 0 

421 

422 _LOG.debug("max_size = %s, remainder = %s", max_size, remainder) 

423 

424 # Make partitions to be used across all clusters for this label. 

425 # Example: values = 8, 1, 2, 5, 7, 3, 6, 4, 9, 10 

426 # max_size = 3, remainder = 1 

427 # partition 1 gets 1, 2, 3, 4 

428 # partition 2 gets 5, 6, 7 

429 # partition 3 gets 8, 9, 10 

430 # 

431 # Since won't be traversing QuantumGraph in partition value order when 

432 # clustering, actually make a mapping of partition value to partition id 

433 # instead of just sublists of partition values. Example, partition value 

434 # of 5 maps to partition 2 

435 

436 partition_key_to_id = {} 

437 partition_id = 0 # id for current partition 

438 

439 # Track how many partitions so far that had an extra added in order 

440 # to handle remainder 

441 cnt_extra = 0 

442 

443 curr_max_size = max_size 

444 

445 # How many values in current partition, set initial value so 

446 # enters if clause first time through loop to set all the values. 

447 cnt = curr_max_size + 1 

448 

449 for value in sorted(partition_values): 

450 # If filled up a partition, need to (re)set for next partition 

451 if cnt >= curr_max_size: 

452 partition_id += 1 

453 cnt = 0 

454 curr_max_size = max_size 

455 # if still working on the partitions that need the extra value 

456 if cnt_extra < remainder: 

457 curr_max_size += 1 

458 cnt_extra += 1 

459 

460 partition_key_to_id[value] = partition_id 

461 cnt += 1 

462 

463 _LOG.debug("partition cnt = %s, total # clusters = %s", partition_id, partition_id * num_cluster_values) 

464 return partition_key_to_id 

465 

466 

467def add_dim_clusters( 

468 cluster_config: BpsConfig, 

469 cluster_label: str, 

470 qgraph: PredictedQuantumGraph, 

471 ordered_tasks: dict[str, DiGraph], 

472 cqgraph: ClusteredQuantumGraph, 

473 quantum_to_cluster: dict[UUID, str], 

474) -> None: 

475 """Add clusters for a cluster label to a ClusteredQuantumGraph. 

476 

477 Parameters 

478 ---------- 

479 cluster_config : `lsst.ctrl.bps.BpsConfig` 

480 BPS configuration for specific cluster label. 

481 cluster_label : `str` 

482 Cluster label for which to add clusters. 

483 qgraph : `lsst.pipe.base.QuantumGraph` 

484 QuantumGraph providing quanta for the clusters. 

485 ordered_tasks : `dict` [`str`, `networkx.DiGraph`] 

486 Mapping of cluster label to task label subgraph. 

487 cqgraph : `lsst.ctrl.bps.ClusteredQuantumGraph` 

488 The ClusteredQuantumGraph to which the new 1-quantum 

489 clusters are added (modified in method). 

490 quantum_to_cluster : `dict` [`uuid.UUID`, `str`] 

491 Mapping of quantum node id to which cluster it was added 

492 (modified in method). 

493 """ 

494 cluster_dims, equal_dims, partition_dims = _get_dim_config_settings(cluster_config) 

495 

496 found, template = cluster_config.search("clusterTemplate", opt={"replaceVars": False}) 

497 if not found: 

498 if cluster_dims: 

499 template = f"{cluster_label}_" + "_".join(f"{{{dim}}}" for dim in cluster_dims) 

500 else: 

501 template = cluster_label 

502 _LOG.debug("template = %s", template) 

503 

504 # First gather quanta info and all partition values. 

505 partition_values: set[str] = set() 

506 quanta_info: dict[str, list[dict[str, Any]]] = {} 

507 for task_label in topological_sort(ordered_tasks[cluster_label]): 

508 for quantum_id in qgraph.quanta_by_task[task_label].values(): 

509 cluster_name, info = get_cluster_name_from_info( 

510 quantum_id, 

511 cqgraph.qxgraph.nodes[quantum_id], 

512 cluster_dims, 

513 cluster_label, 

514 template, 

515 equal_dims, 

516 partition_dims, 

517 ) 

518 if "partition_key" in info: 

519 partition_values.add(info["partition_key"]) 

520 quanta_info.setdefault(cluster_name, []).append(info) 

521 

522 make_and_add_clusters( 

523 cqgraph, 

524 cluster_config, 

525 cluster_label, 

526 partition_values, 

527 quanta_info, 

528 quantum_to_cluster, 

529 ) 

530 

531 

532def add_cluster_dependencies( 

533 cqgraph: ClusteredQuantumGraph, cluster: QuantaCluster, quantum_to_cluster: dict[UUID, str] 

534) -> None: 

535 """Add dependencies for a cluster within a ClusteredQuantumGraph. 

536 

537 Parameters 

538 ---------- 

539 cqgraph : `lsst.ctrl.bps.ClusteredQuantumGraph` 

540 The ClusteredQuantumGraph to which the new 1-quantum 

541 clusters are added (modified in method). 

542 cluster : `lsst.ctrl.bps.QuantaCluster` 

543 The cluster for which to add dependencies. 

544 quantum_to_cluster : `dict` [`uuid.UUID`, `str`] 

545 Mapping of quantum node id to which cluster it was added 

546 (modified in method). 

547 

548 Raises 

549 ------ 

550 KeyError 

551 Raised if any of the cluster's quantum node ids are missing 

552 from quantum_to_cluster or if their parent quantum node ids 

553 are missing from quantum_to_cluster. 

554 """ 

555 for node_id in cluster.qgraph_node_ids: 

556 cluster_node_info = cqgraph.get_quantum_info(node_id) 

557 parents = cqgraph.qxgraph.predecessors(node_id) 

558 for parent_id in parents: 

559 try: 

560 if quantum_to_cluster[parent_id] != quantum_to_cluster[node_id]: 

561 cqgraph.add_dependency(quantum_to_cluster[parent_id], quantum_to_cluster[node_id]) 

562 except KeyError as e: # pragma: no cover 

563 # For debugging a problem internal to method 

564 qnode_info = cqgraph.get_quantum_info(e.args[0]) 

565 _LOG.error( 

566 "Quanta missing when clustering: cluster node = %s, %s; missing = %s, %s", 

567 cluster_node_info["task_label"], 

568 cluster_node_info["data_id"], 

569 qnode_info["task_label"], 

570 qnode_info["data_id"], 

571 ) 

572 _LOG.error(quantum_to_cluster) 

573 raise 

574 

575 

576def add_dim_clusters_dependency( 

577 cluster_config: BpsConfig, 

578 cluster_label: str, 

579 qgraph: PredictedQuantumGraph, 

580 ordered_tasks: dict[str, DiGraph], 

581 cqgraph: ClusteredQuantumGraph, 

582 quantum_to_cluster: dict[UUID, str], 

583) -> None: 

584 """Add clusters for a cluster label to a ClusteredQuantumGraph using 

585 quantum graph dependencies as well as dimension values to help when 

586 some do not have particular dimension value. 

587 

588 Parameters 

589 ---------- 

590 cluster_config : `lsst.ctrl.bps.BpsConfig` 

591 BPS configuration for specific cluster label. 

592 cluster_label : `str` 

593 Cluster label for which to add clusters. 

594 qgraph : `lsst.pipe.base.quantum_graph.PredictedQuantumGraph` 

595 Quantum graph providing quanta for the clusters. 

596 ordered_tasks : `dict` [`str`, `networkx.DiGraph`] 

597 Mapping of cluster label to task label subgraph. 

598 cqgraph : `lsst.ctrl.bps.ClusteredQuantumGraph` 

599 The ClusteredQuantumGraph to which the new 

600 clusters are added (modified in method). 

601 quantum_to_cluster : `dict` [`uuid.UUID`, `str`] 

602 Mapping of quantum node id to which cluster it was added 

603 (modified in method). 

604 """ 

605 cluster_dims, equal_dims, partition_dims = _get_dim_config_settings(cluster_config) 

606 

607 found, template = cluster_config.search("clusterTemplate", opt={"replaceVars": False}) 

608 if not found: 

609 if cluster_dims: 

610 template = f"{cluster_label}_" + "_".join(f"{{{dim}}}" for dim in cluster_dims) 

611 else: 

612 template = cluster_label 

613 _LOG.debug("template = %s", template) 

614 

615 # Note: Can't use just source/sink labels in case some quanta for those 

616 # aren't in the QuantumGraph (e.g., a rescue QuantumGraph) 

617 label_search_order = list(topological_sort(ordered_tasks[cluster_label])) 

618 method = cluster_config["findDependencyMethod"] 

619 match method: 

620 case "source": 

621 find_possible_nodes = cqgraph.qxgraph.successors 

622 case "sink": 

623 find_possible_nodes = cqgraph.qxgraph.predecessors 

624 label_search_order.reverse() 

625 case _: 

626 raise RuntimeError(f"Invalid findDependencyMethod ({method})") 

627 _LOG.info("label_search_order = %s", label_search_order) 

628 

629 # First gather quanta info and all partition values. 

630 partition_values: set[str] = set() 

631 quanta_info: dict[str, list[dict[str, Any]]] = {} 

632 # Since not only using source/sink labels, might look at 

633 # node more than once. Keep a list of node ids to make 

634 # quicker to check 

635 quanta_visited = set() 

636 for task_label in label_search_order: 

637 for quantum_id in qgraph.quanta_by_task[task_label].values(): 

638 # skip if visited before 

639 if quantum_id in quanta_visited: 

640 continue 

641 

642 cluster_name, info = get_cluster_name_from_info( 

643 quantum_id, 

644 cqgraph.qxgraph.nodes[quantum_id], 

645 cluster_dims, 

646 cluster_label, 

647 template, 

648 equal_dims, 

649 partition_dims, 

650 ) 

651 if "partition_key" in info: 

652 partition_values.add(info["partition_key"]) 

653 quanta_info.setdefault(cluster_name, []).append(info) 

654 quanta_visited.add(quantum_id) 

655 

656 # Use dependencies to find other quantum to add 

657 # Note: in testing, using the following code was faster than 

658 # using networkx descendants and ancestors functions 

659 # While traversing the QuantumGraph, nodes may appear 

660 # repeatedly in possible_nodes. 

661 nodes_to_use = [quantum_id] 

662 while nodes_to_use: 

663 node_to_use = nodes_to_use.pop() 

664 possible_node_ids = find_possible_nodes(node_to_use) 

665 for possible_node_id in possible_node_ids: 

666 # skip if visited before 

667 if possible_node_id in quanta_visited: 

668 continue 

669 quanta_visited.add(possible_node_id) 

670 

671 possible_node_info = cqgraph.qxgraph.nodes[possible_node_id] 

672 if possible_node_info["task_label"] in ordered_tasks[cluster_label]: 

673 cluster_name, info = get_cluster_name_from_info( 

674 possible_node_id, 

675 possible_node_info, 

676 cluster_dims, 

677 cluster_label, 

678 template, 

679 equal_dims, 

680 partition_dims, 

681 ) 

682 if "partition_key" in info: 

683 partition_values.add(info["partition_key"]) 

684 quanta_info.setdefault(cluster_name, []).append(info) 

685 nodes_to_use.append(possible_node_id) 

686 else: 

687 _LOG.debug( 

688 "label (%s) not in ordered_tasks. Not adding possible quantum %s", 

689 possible_node_info["task_label"], 

690 possible_node_id, 

691 ) 

692 

693 make_and_add_clusters( 

694 cqgraph, 

695 cluster_config, 

696 cluster_label, 

697 partition_values, 

698 quanta_info, 

699 quantum_to_cluster, 

700 ) 

701 

702 

703def make_and_add_clusters( 

704 cqgraph: ClusteredQuantumGraph, 

705 cluster_config: BpsConfig, 

706 cluster_label: str, 

707 partition_values: set[Any], 

708 quanta_info: dict[str, list[dict[str, Any]]], 

709 quantum_to_cluster: dict[UUID, str], 

710) -> None: 

711 """Make the clusters add them and dependencies to the given 

712 ClusteredQuantumGraph. 

713 

714 Parameters 

715 ---------- 

716 cqgraph : `lsst.ctrl.bps.ClusteredQuantumGraph` 

717 The ClusteredQuantumGraph to which the new 

718 clusters are added (modified in method). 

719 cluster_config : `lsst.ctrl.bps.BpsConfig` 

720 BPS configuration for specific cluster label. 

721 cluster_label : `str` 

722 Cluster label for which to add clusters. 

723 partition_values : `set` [`~typing.Any`] 

724 Values that would be internal to clusters to be used in partitioning. 

725 quanta_info : `dict` [`str`, `dict` [`str`, `~typing.Any`]] 

726 Mapping of cluster names to pre-gathered quanta information. 

727 quantum_to_cluster : `dict` [`uuid.UUID`, `str`] 

728 Mapping of quantum node id to which cluster it was added 

729 (modified in method). 

730 """ 

731 if partition_values: 

732 partition_key_to_id = partition_cluster_values( 

733 cluster_config, cluster_label, partition_values, len(quanta_info) 

734 ) 

735 

736 # Make and add clusters. 

737 new_clusters: dict[str, QuantaCluster] = {} 

738 for cluster_name, info_list in quanta_info.items(): 

739 for info in info_list: 

740 full_cluster_name = cluster_name 

741 if "partition_key" in info: 

742 full_cluster_name += f"_{partition_key_to_id[info['partition_key']]:03}" 

743 

744 if full_cluster_name in new_clusters: 

745 cluster = new_clusters[full_cluster_name] 

746 else: 

747 cluster = QuantaCluster(full_cluster_name, cluster_label, info["tags"]) 

748 cqgraph.add_cluster(cluster) 

749 new_clusters[full_cluster_name] = cluster 

750 

751 cluster.add_quantum(info["node_number"], info["node_label"]) 

752 

753 # Save mapping for use when creating dependencies. 

754 quantum_to_cluster[info["node_number"]] = full_cluster_name 

755 

756 for cluster in new_clusters.values(): 

757 add_cluster_dependencies(cqgraph, cluster, quantum_to_cluster) 

758 

759 

760def get_cluster_name_from_info( 

761 quantum_id: uuid.UUID, 

762 quantum_info: QuantumInfo, 

763 cluster_dims: list[str], 

764 cluster_label: str, 

765 template: str, 

766 equal_dims: list[tuple[str, str]], 

767 partition_dims: list[str], 

768) -> tuple[str, dict[str, Any]]: 

769 """Get the cluster name in which to add the given node. 

770 

771 Parameters 

772 ---------- 

773 quantum_id : `uuid.UUID` 

774 Unique ID for the quantum. 

775 quantum_info : `lsst.pipe.base.quantum_graph.QuantumInfo` 

776 Info dictionary from which to create the cluster. 

777 cluster_dims : `list` [`str`] 

778 Dimension names to be used when clustering. 

779 cluster_label : `str` 

780 Cluster label. 

781 template : `str` 

782 Template for the cluster name. 

783 equal_dims : `list` [`tuple` [`str`, `str`]] 

784 Pairs of dimension names considered equal for clustering. 

785 partition_dims : `list` [`str`] 

786 Dimension names to be used when partitioning. 

787 

788 Returns 

789 ------- 

790 cluster_name : `str` 

791 Name of the cluster in which to add the given node. 

792 info : dict [`str`, `~typing.Any`] 

793 Information needed if creating a new node. 

794 """ 

795 # Gather info for cluster name template into a dictionary. 

796 info: dict[str, Any] = { 

797 "node_number": quantum_id, 

798 "node_label": quantum_info["task_label"], 

799 "label": cluster_label, 

800 } 

801 

802 # Save values for all dimensions used in clustering and partitioning. 

803 all_dims = cluster_dims + partition_dims 

804 

805 missing_info = set() 

806 data_id_info = dict(quantum_info["data_id"].mapping) 

807 for dim_name in all_dims: 

808 _LOG.debug("dim_name = %s", dim_name) 

809 if dim_name in data_id_info: 

810 info[dim_name] = data_id_info[dim_name] 

811 else: 

812 missing_info.add(dim_name) 

813 for dim1, dim2 in equal_dims: 

814 if dim1 in all_dims and dim2 in data_id_info: 

815 info[dim1] = data_id_info[dim2] 

816 missing_info.remove(dim1) 

817 elif dim2 in all_dims and dim1 in data_id_info: 

818 info[dim2] = data_id_info[dim1] 

819 missing_info.remove(dim2) 

820 

821 if missing_info: 

822 raise RuntimeError( 

823 f"Quantum {quantum_id} ({data_id_info}) missing dimensions: {','.join(missing_info)}; " 

824 f"required for cluster {cluster_label}" 

825 ) 

826 

827 _LOG.debug("info for template = %s", info) 

828 

829 # Use dictionary plus template format string to create name. 

830 # To avoid # key errors from generic patterns, use defaultdict. 

831 cluster_name = template.format_map(defaultdict(lambda: "", info)) 

832 cluster_name = re.sub("_+", "_", cluster_name) 

833 

834 # Some dimensions contain slash which must be replaced. 

835 cluster_name = re.sub("/", "_", cluster_name) 

836 _LOG.debug("cluster_name = %s", cluster_name) 

837 

838 info["tags"] = {} 

839 for dim in cluster_dims: 

840 info["tags"][dim] = info[dim] 

841 del info[dim] 

842 

843 if partition_dims: 

844 info["partition_key"] = "==".join([str(info[dim]) for dim in partition_dims]) 

845 

846 return cluster_name, info