Coverage for python / lsst / ctrl / bps / quantum_clustering_funcs.py: 5%
276 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 09:04 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 09:04 +0000
1# This file is part of ctrl_bps.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <https://www.gnu.org/licenses/>.
28"""Functions that convert QuantumGraph into ClusteredQuantumGraph."""
30__all__ = ["check_clustering_config"]
32import logging
33import re
34import uuid
35from collections import defaultdict
36from typing import Any
37from uuid import UUID
39from networkx import DiGraph, NetworkXNoCycle, find_cycle, topological_sort
41from lsst.pipe.base.quantum_graph import PredictedQuantumGraph, QuantumInfo
43from . import BpsConfig, ClusteredQuantumGraph, QuantaCluster
45_LOG = logging.getLogger(__name__)
48def single_quantum_clustering(
49 config: BpsConfig, qgraph: PredictedQuantumGraph, name: str
50) -> ClusteredQuantumGraph:
51 """Create clusters with only single quantum.
53 Parameters
54 ----------
55 config : `lsst.ctrl.bps.BpsConfig`
56 BPS configuration.
57 qgraph : `lsst.pipe.base.quantum_graph.PredictedQuantumGraph`
58 Quantum graph to break into clusters for ClusteredQuantumGraph.
59 name : `str`
60 Name to give to ClusteredQuantumGraph.
62 Returns
63 -------
64 clustered_quantum : `lsst.ctrl.bps.ClusteredQuantumGraph`
65 ClusteredQuantumGraph with single quantum per cluster created from
66 given QuantumGraph.
67 """
68 cqgraph = ClusteredQuantumGraph(
69 name=name,
70 qgraph=qgraph,
71 qgraph_filename=config[".bps_defined.runQgraphFile"],
72 )
74 # Save mapping of quantum nodeNumber to name so don't have to create it
75 # multiple times.
76 number_to_name = {}
78 # Cache template per label for speed.
79 cached_template = {}
81 # Create cluster of single quantum.
82 for quantum_id, quantum_info in cqgraph.qxgraph.nodes.items():
83 task_label = quantum_info["task_label"]
84 if task_label not in cached_template:
85 found, template_data_id = config.search(
86 "templateDataId",
87 opt={"curvals": {"curr_pipetask": task_label}, "replaceVars": False},
88 )
89 if found:
90 template = "{label}_" + template_data_id
91 _, use_node_number = config.search(
92 "useNodeIdInClusterName",
93 opt={
94 "curvals": {"curr_pipetask": task_label},
95 "replaceVars": False,
96 "default": True,
97 },
98 )
99 if use_node_number:
100 template = "{node_number}_" + template
101 else:
102 template = "{node_number}"
103 cached_template[task_label] = template
105 cluster = QuantaCluster.from_quantum_info(quantum_id, quantum_info, cached_template[task_label])
107 # Save mapping for use when creating dependencies.
108 number_to_name[quantum_id] = cluster.name
110 cqgraph.add_cluster(cluster)
112 # Add cluster dependencies.
113 for quantum_id in cqgraph.qxgraph:
114 # Get child nodes.
115 children = cqgraph.qxgraph.successors(quantum_id)
116 for child in children:
117 cqgraph.add_dependency(number_to_name[quantum_id], number_to_name[child])
119 return cqgraph
122def check_clustering_config(
123 cluster_config: BpsConfig, task_graph: DiGraph
124) -> tuple[list[str], dict[str, list[str]]]:
125 """Check cluster definitions in terms of pipetask lists.
127 Parameters
128 ----------
129 cluster_config : `lsst.ctrl.bps.BpsConfig`
130 The cluster section from the BPS configuration.
131 task_graph : `networkx.DiGraph`
132 Directed graph of tasks.
134 Returns
135 -------
136 cluster_labels: `list` [`str`]
137 Dependency ordered list of cluster labels (includes
138 single quantum clusters).
139 ordered_tasks : `dict` [`str`, `networkx.DiGraph`]
140 Mapping of cluster label to task subgraph.
142 Raises
143 ------
144 RuntimeError
145 Raised if task label appears in more than one cluster def or
146 if there's a cycle in the cluster defs.
147 """
148 # Build a PipelineTask graph of just labels because PipelineGraph
149 # methods revolve around NodeKey instead of labels.
150 label_graph = DiGraph()
151 for node_key in task_graph:
152 label_graph.add_node(node_key.name)
153 for parent in task_graph.predecessors(node_key):
154 label_graph.add_edge(parent.name, node_key.name)
156 # Build a "clustered" task graph to check for cycle.
157 task_to_cluster = {}
158 used_labels = set()
159 clustered_task_graph = DiGraph()
160 ordered_tasks = {} # cluster label to ordered list of task labels
162 # Create clusters based on given configuration.
163 for cluster_label in cluster_config:
164 _LOG.debug("cluster = %s", cluster_label)
165 cluster_tasks = [pt.strip() for pt in cluster_config[cluster_label]["pipetasks"].split(",")]
166 cluster_tasks_in_qgraph = []
167 for task_label in cluster_tasks:
168 if task_label in used_labels:
169 raise RuntimeError(
170 f"Task label {task_label} appears in more than one cluster definition. "
171 "Aborting submission."
172 )
173 # Only check cluster defs that affect the QuantumGraph
174 if label_graph.has_node(task_label):
175 cluster_tasks_in_qgraph.append(task_label)
176 used_labels.add(task_label)
177 task_to_cluster[task_label] = cluster_label
179 if cluster_tasks_in_qgraph:
180 # Ensure have list of tasks in dependency order.
181 quantum_subgraph = label_graph.subgraph(cluster_tasks_in_qgraph)
182 ordered_tasks[cluster_label] = quantum_subgraph
184 clustered_task_graph.add_node(cluster_label)
186 # Create single task clusters for tasks not covered by clusters.
187 for label in label_graph:
188 if label not in used_labels:
189 task_to_cluster[label] = label
190 clustered_task_graph.add_node(label)
191 ordered_tasks[label] = label_graph.subgraph([label])
193 # Create dependencies between clusters.
194 for edge in task_graph.edges:
195 if task_to_cluster[edge[0].name] != task_to_cluster[edge[1].name]:
196 clustered_task_graph.add_edge(task_to_cluster[edge[0].name], task_to_cluster[edge[1].name])
198 _LOG.debug("clustered_task_graph.edges = %s", list(clustered_task_graph.edges))
200 # Check if DAG: DiGraph enforces direction, so need to check for cycles
201 try:
202 cycle = find_cycle(clustered_task_graph)
203 except NetworkXNoCycle:
204 _LOG.debug("Did not find a cycle in the clustered_task_graph")
205 else:
206 _LOG.error(
207 "Found cycle when making clusters: %s. Typically this means a PipelineTask needs to be added to"
208 " a cluster or removed from a cluster.",
209 cycle,
210 )
211 raise RuntimeError("Cluster pipetasks do not create a DAG")
213 return list(topological_sort(clustered_task_graph)), ordered_tasks
216def dimension_clustering(
217 config: BpsConfig, qgraph: PredictedQuantumGraph, name: str
218) -> ClusteredQuantumGraph:
219 """Follow config instructions to make clusters based upon dimensions.
221 Parameters
222 ----------
223 config : `lsst.ctrl.bps.BpsConfig`
224 BPS configuration.
225 qgraph : `lsst.pipe.base.quantum_graph.PredictedQuantumGraph`
226 Quantum graph to break into clusters for ClusteredQuantumGraph.
227 name : `str`
228 Name to give to ClusteredQuantumGraph.
230 Returns
231 -------
232 cqgraph : `lsst.ctrl.bps.ClusteredQuantumGraph`
233 ClusteredQuantumGraph with clustering as defined in config.
234 """
235 cqgraph = ClusteredQuantumGraph(
236 name=name,
237 qgraph=qgraph,
238 qgraph_filename=config[".bps_defined.runQgraphFile"],
239 )
241 # save mapping in order to create dependencies later
242 quantum_to_cluster: dict[UUID, str] = {}
244 cluster_section = config["cluster"]
245 cluster_labels, ordered_tasks = check_clustering_config(
246 cluster_section, qgraph.pipeline_graph.make_task_xgraph()
247 )
248 for cluster_label in cluster_labels:
249 _LOG.debug("cluster = %s", cluster_label)
250 if cluster_label in cluster_section:
251 if "findDependencyMethod" in cluster_section[cluster_label]:
252 add_func = add_dim_clusters_dependency
253 else:
254 add_func = add_dim_clusters
256 add_func(
257 cluster_section[cluster_label],
258 cluster_label,
259 qgraph,
260 ordered_tasks,
261 cqgraph,
262 quantum_to_cluster,
263 )
264 else:
265 add_clusters_per_quantum(config, cluster_label, qgraph, cqgraph, quantum_to_cluster)
267 return cqgraph
270def add_clusters_per_quantum(
271 config: BpsConfig,
272 label: str,
273 qgraph: PredictedQuantumGraph,
274 cqgraph: ClusteredQuantumGraph,
275 quantum_to_cluster: dict[UUID, str],
276) -> None:
277 """Add 1-quantum clusters for a task to a ClusteredQuantumGraph.
279 Parameters
280 ----------
281 config : `lsst.ctrl.bps.BpsConfig`
282 BPS configuration.
283 label : `str`
284 The taskDef label for which to add clusters.
285 qgraph : `lsst.pipe.base.quantum_graph.PredictedQuantumGraph`
286 Quantum graph providing quanta for the clusters.
287 cqgraph : `lsst.ctrl.bps.ClusteredQuantumGraph`
288 The ClusteredQuantumGraph to which the new 1-quantum
289 clusters are added (modified in method).
290 quantum_to_cluster : `dict` [`uuid.UUID`, `str`]
291 Mapping of quantum node id to which cluster it was added
292 (modified in method).
293 """
294 _LOG.info("Creating 1-quantum clusters for task %s", label)
295 found, template_data_id = config.search(
296 "templateDataId", opt={"curvals": {"curr_pipetask": label}, "replaceVars": False}
297 )
298 if found:
299 template = "{label}_" + template_data_id
300 _, use_node_number = config.search(
301 "useNodeIdInClusterName",
302 opt={
303 "curvals": {"curr_pipetask": label},
304 "replaceVars": False,
305 "default": True,
306 },
307 )
308 if use_node_number:
309 template = "{node_number}_" + template
310 else:
311 template = "{node_number}"
313 for quantum_id in qgraph.quanta_by_task[label].values():
314 cluster = QuantaCluster.from_quantum_info(quantum_id, cqgraph.qxgraph.nodes[quantum_id], template)
315 cqgraph.add_cluster(cluster)
316 quantum_to_cluster[quantum_id] = cluster.name
317 add_cluster_dependencies(cqgraph, cluster, quantum_to_cluster)
320def _get_dim_config_settings(
321 cluster_config: BpsConfig,
322) -> tuple[list[str], list[tuple[str, str]], list[str]]:
323 """Parse dimension-related cluster configuration.
325 Parameters
326 ----------
327 cluster_config : `lsst.ctrl.bps.BpsConfig`
328 BPS configuration for specific cluster label.
330 Returns
331 -------
332 cluster_dims : `list` [`str`]
333 Names for dimensions which should be used for clustering.
334 equal_dims : `list` [`tuple` [`str`, `str`]]
335 Names for dimensions which should be considered equivalent.
336 partition_dims : `list` [`str`]
337 Names for dimensions which should be used for partitioning clusters.
338 """
339 cluster_dims = []
340 if "dimensions" in cluster_config:
341 cluster_dims = [d.strip() for d in cluster_config["dimensions"].split(",")]
342 _LOG.debug("cluster_dims = %s", cluster_dims)
344 equal_dims = []
345 if "equalDimensions" in cluster_config:
346 equal_dims = [pt.strip().split(":") for pt in cluster_config["equalDimensions"].split(",")]
347 _LOG.debug("equal_dims = %s", equal_dims)
349 partition_dims = []
350 if "partitionDimensions" in cluster_config:
351 partition_dims = [d.strip() for d in cluster_config["partitionDimensions"].split(",")]
352 _LOG.debug("partition_dims = %s", partition_dims)
354 return cluster_dims, equal_dims, partition_dims
357def partition_cluster_values(
358 cluster_config: BpsConfig, cluster_label: str, partition_values: set[Any], num_cluster_values: int
359) -> dict[Any, int]:
360 """Partition given values into appropriately sized chunks.
362 Parameters
363 ----------
364 cluster_config : `lsst.ctrl.bps.BpsConfig`
365 BPS configuration for specific cluster label.
366 cluster_label : `str`
367 Cluster label for which to add clusters.
368 partition_values : `set` [`~typing.Any`]
369 Values that would be internal to clusters to be used in partitioning.
370 num_cluster_values : `int`
371 Number of values used when doing main clustering.
373 Returns
374 -------
375 partition_key_to_id : `dict` [`~typing.Any`, `int`]
376 Mapping of a value to a partition id.
378 Raises
379 ------
380 KeyError
381 When missing one of the partitioning config values or when specifying
382 multiple sizing values.
383 RuntimeError
384 When number of cluster dimension values is larger than
385 partitionMaxClusters.
386 """
387 _, partition_max_clusters = cluster_config.search("partitionMaxClusters")
388 _, partition_max_size = cluster_config.search("partitionMaxSize")
389 _LOG.debug("partition_max_clusters = %s", partition_max_clusters)
390 _LOG.debug("partition_max_size = %s", partition_max_size)
392 if not partition_max_clusters and not partition_max_size:
393 raise KeyError(
394 f"Defined partitionDimensions for cluster {cluster_label}, but missing one of the following:"
395 "partitionMaxClusters or partitionMaxSize"
396 )
397 elif partition_max_clusters and partition_max_size:
398 raise KeyError(
399 f"Defined more than one of the following for cluster {cluster_label}: "
400 "partitionMaxClusters and partitionMaxSize"
401 )
403 partition_max_chunks = 0
404 if partition_max_clusters:
405 # User wants to define the total number of clusters for this label
406 # which translates into jobs.
407 if num_cluster_values > partition_max_clusters:
408 raise RuntimeError(
409 f"Cluster {cluster_label}: partitionMaxClusters ({partition_max_clusters}) "
410 f"must same or larger than the number of cluster dimension values ({num_cluster_values})"
411 )
413 partition_max_chunks = partition_max_clusters // num_cluster_values
414 _LOG.debug("max_chunks = %s", partition_max_chunks)
416 if partition_max_chunks:
417 max_size, remainder = divmod(len(partition_values), partition_max_chunks)
418 else:
419 max_size = partition_max_size
420 remainder = 0
422 _LOG.debug("max_size = %s, remainder = %s", max_size, remainder)
424 # Make partitions to be used across all clusters for this label.
425 # Example: values = 8, 1, 2, 5, 7, 3, 6, 4, 9, 10
426 # max_size = 3, remainder = 1
427 # partition 1 gets 1, 2, 3, 4
428 # partition 2 gets 5, 6, 7
429 # partition 3 gets 8, 9, 10
430 #
431 # Since won't be traversing QuantumGraph in partition value order when
432 # clustering, actually make a mapping of partition value to partition id
433 # instead of just sublists of partition values. Example, partition value
434 # of 5 maps to partition 2
436 partition_key_to_id = {}
437 partition_id = 0 # id for current partition
439 # Track how many partitions so far that had an extra added in order
440 # to handle remainder
441 cnt_extra = 0
443 curr_max_size = max_size
445 # How many values in current partition, set initial value so
446 # enters if clause first time through loop to set all the values.
447 cnt = curr_max_size + 1
449 for value in sorted(partition_values):
450 # If filled up a partition, need to (re)set for next partition
451 if cnt >= curr_max_size:
452 partition_id += 1
453 cnt = 0
454 curr_max_size = max_size
455 # if still working on the partitions that need the extra value
456 if cnt_extra < remainder:
457 curr_max_size += 1
458 cnt_extra += 1
460 partition_key_to_id[value] = partition_id
461 cnt += 1
463 _LOG.debug("partition cnt = %s, total # clusters = %s", partition_id, partition_id * num_cluster_values)
464 return partition_key_to_id
467def add_dim_clusters(
468 cluster_config: BpsConfig,
469 cluster_label: str,
470 qgraph: PredictedQuantumGraph,
471 ordered_tasks: dict[str, DiGraph],
472 cqgraph: ClusteredQuantumGraph,
473 quantum_to_cluster: dict[UUID, str],
474) -> None:
475 """Add clusters for a cluster label to a ClusteredQuantumGraph.
477 Parameters
478 ----------
479 cluster_config : `lsst.ctrl.bps.BpsConfig`
480 BPS configuration for specific cluster label.
481 cluster_label : `str`
482 Cluster label for which to add clusters.
483 qgraph : `lsst.pipe.base.QuantumGraph`
484 QuantumGraph providing quanta for the clusters.
485 ordered_tasks : `dict` [`str`, `networkx.DiGraph`]
486 Mapping of cluster label to task label subgraph.
487 cqgraph : `lsst.ctrl.bps.ClusteredQuantumGraph`
488 The ClusteredQuantumGraph to which the new 1-quantum
489 clusters are added (modified in method).
490 quantum_to_cluster : `dict` [`uuid.UUID`, `str`]
491 Mapping of quantum node id to which cluster it was added
492 (modified in method).
493 """
494 cluster_dims, equal_dims, partition_dims = _get_dim_config_settings(cluster_config)
496 found, template = cluster_config.search("clusterTemplate", opt={"replaceVars": False})
497 if not found:
498 if cluster_dims:
499 template = f"{cluster_label}_" + "_".join(f"{{{dim}}}" for dim in cluster_dims)
500 else:
501 template = cluster_label
502 _LOG.debug("template = %s", template)
504 # First gather quanta info and all partition values.
505 partition_values: set[str] = set()
506 quanta_info: dict[str, list[dict[str, Any]]] = {}
507 for task_label in topological_sort(ordered_tasks[cluster_label]):
508 for quantum_id in qgraph.quanta_by_task[task_label].values():
509 cluster_name, info = get_cluster_name_from_info(
510 quantum_id,
511 cqgraph.qxgraph.nodes[quantum_id],
512 cluster_dims,
513 cluster_label,
514 template,
515 equal_dims,
516 partition_dims,
517 )
518 if "partition_key" in info:
519 partition_values.add(info["partition_key"])
520 quanta_info.setdefault(cluster_name, []).append(info)
522 make_and_add_clusters(
523 cqgraph,
524 cluster_config,
525 cluster_label,
526 partition_values,
527 quanta_info,
528 quantum_to_cluster,
529 )
532def add_cluster_dependencies(
533 cqgraph: ClusteredQuantumGraph, cluster: QuantaCluster, quantum_to_cluster: dict[UUID, str]
534) -> None:
535 """Add dependencies for a cluster within a ClusteredQuantumGraph.
537 Parameters
538 ----------
539 cqgraph : `lsst.ctrl.bps.ClusteredQuantumGraph`
540 The ClusteredQuantumGraph to which the new 1-quantum
541 clusters are added (modified in method).
542 cluster : `lsst.ctrl.bps.QuantaCluster`
543 The cluster for which to add dependencies.
544 quantum_to_cluster : `dict` [`uuid.UUID`, `str`]
545 Mapping of quantum node id to which cluster it was added
546 (modified in method).
548 Raises
549 ------
550 KeyError
551 Raised if any of the cluster's quantum node ids are missing
552 from quantum_to_cluster or if their parent quantum node ids
553 are missing from quantum_to_cluster.
554 """
555 for node_id in cluster.qgraph_node_ids:
556 cluster_node_info = cqgraph.get_quantum_info(node_id)
557 parents = cqgraph.qxgraph.predecessors(node_id)
558 for parent_id in parents:
559 try:
560 if quantum_to_cluster[parent_id] != quantum_to_cluster[node_id]:
561 cqgraph.add_dependency(quantum_to_cluster[parent_id], quantum_to_cluster[node_id])
562 except KeyError as e: # pragma: no cover
563 # For debugging a problem internal to method
564 qnode_info = cqgraph.get_quantum_info(e.args[0])
565 _LOG.error(
566 "Quanta missing when clustering: cluster node = %s, %s; missing = %s, %s",
567 cluster_node_info["task_label"],
568 cluster_node_info["data_id"],
569 qnode_info["task_label"],
570 qnode_info["data_id"],
571 )
572 _LOG.error(quantum_to_cluster)
573 raise
576def add_dim_clusters_dependency(
577 cluster_config: BpsConfig,
578 cluster_label: str,
579 qgraph: PredictedQuantumGraph,
580 ordered_tasks: dict[str, DiGraph],
581 cqgraph: ClusteredQuantumGraph,
582 quantum_to_cluster: dict[UUID, str],
583) -> None:
584 """Add clusters for a cluster label to a ClusteredQuantumGraph using
585 quantum graph dependencies as well as dimension values to help when
586 some do not have particular dimension value.
588 Parameters
589 ----------
590 cluster_config : `lsst.ctrl.bps.BpsConfig`
591 BPS configuration for specific cluster label.
592 cluster_label : `str`
593 Cluster label for which to add clusters.
594 qgraph : `lsst.pipe.base.quantum_graph.PredictedQuantumGraph`
595 Quantum graph providing quanta for the clusters.
596 ordered_tasks : `dict` [`str`, `networkx.DiGraph`]
597 Mapping of cluster label to task label subgraph.
598 cqgraph : `lsst.ctrl.bps.ClusteredQuantumGraph`
599 The ClusteredQuantumGraph to which the new
600 clusters are added (modified in method).
601 quantum_to_cluster : `dict` [`uuid.UUID`, `str`]
602 Mapping of quantum node id to which cluster it was added
603 (modified in method).
604 """
605 cluster_dims, equal_dims, partition_dims = _get_dim_config_settings(cluster_config)
607 found, template = cluster_config.search("clusterTemplate", opt={"replaceVars": False})
608 if not found:
609 if cluster_dims:
610 template = f"{cluster_label}_" + "_".join(f"{{{dim}}}" for dim in cluster_dims)
611 else:
612 template = cluster_label
613 _LOG.debug("template = %s", template)
615 # Note: Can't use just source/sink labels in case some quanta for those
616 # aren't in the QuantumGraph (e.g., a rescue QuantumGraph)
617 label_search_order = list(topological_sort(ordered_tasks[cluster_label]))
618 method = cluster_config["findDependencyMethod"]
619 match method:
620 case "source":
621 find_possible_nodes = cqgraph.qxgraph.successors
622 case "sink":
623 find_possible_nodes = cqgraph.qxgraph.predecessors
624 label_search_order.reverse()
625 case _:
626 raise RuntimeError(f"Invalid findDependencyMethod ({method})")
627 _LOG.info("label_search_order = %s", label_search_order)
629 # First gather quanta info and all partition values.
630 partition_values: set[str] = set()
631 quanta_info: dict[str, list[dict[str, Any]]] = {}
632 # Since not only using source/sink labels, might look at
633 # node more than once. Keep a list of node ids to make
634 # quicker to check
635 quanta_visited = set()
636 for task_label in label_search_order:
637 for quantum_id in qgraph.quanta_by_task[task_label].values():
638 # skip if visited before
639 if quantum_id in quanta_visited:
640 continue
642 cluster_name, info = get_cluster_name_from_info(
643 quantum_id,
644 cqgraph.qxgraph.nodes[quantum_id],
645 cluster_dims,
646 cluster_label,
647 template,
648 equal_dims,
649 partition_dims,
650 )
651 if "partition_key" in info:
652 partition_values.add(info["partition_key"])
653 quanta_info.setdefault(cluster_name, []).append(info)
654 quanta_visited.add(quantum_id)
656 # Use dependencies to find other quantum to add
657 # Note: in testing, using the following code was faster than
658 # using networkx descendants and ancestors functions
659 # While traversing the QuantumGraph, nodes may appear
660 # repeatedly in possible_nodes.
661 nodes_to_use = [quantum_id]
662 while nodes_to_use:
663 node_to_use = nodes_to_use.pop()
664 possible_node_ids = find_possible_nodes(node_to_use)
665 for possible_node_id in possible_node_ids:
666 # skip if visited before
667 if possible_node_id in quanta_visited:
668 continue
669 quanta_visited.add(possible_node_id)
671 possible_node_info = cqgraph.qxgraph.nodes[possible_node_id]
672 if possible_node_info["task_label"] in ordered_tasks[cluster_label]:
673 cluster_name, info = get_cluster_name_from_info(
674 possible_node_id,
675 possible_node_info,
676 cluster_dims,
677 cluster_label,
678 template,
679 equal_dims,
680 partition_dims,
681 )
682 if "partition_key" in info:
683 partition_values.add(info["partition_key"])
684 quanta_info.setdefault(cluster_name, []).append(info)
685 nodes_to_use.append(possible_node_id)
686 else:
687 _LOG.debug(
688 "label (%s) not in ordered_tasks. Not adding possible quantum %s",
689 possible_node_info["task_label"],
690 possible_node_id,
691 )
693 make_and_add_clusters(
694 cqgraph,
695 cluster_config,
696 cluster_label,
697 partition_values,
698 quanta_info,
699 quantum_to_cluster,
700 )
703def make_and_add_clusters(
704 cqgraph: ClusteredQuantumGraph,
705 cluster_config: BpsConfig,
706 cluster_label: str,
707 partition_values: set[Any],
708 quanta_info: dict[str, list[dict[str, Any]]],
709 quantum_to_cluster: dict[UUID, str],
710) -> None:
711 """Make the clusters add them and dependencies to the given
712 ClusteredQuantumGraph.
714 Parameters
715 ----------
716 cqgraph : `lsst.ctrl.bps.ClusteredQuantumGraph`
717 The ClusteredQuantumGraph to which the new
718 clusters are added (modified in method).
719 cluster_config : `lsst.ctrl.bps.BpsConfig`
720 BPS configuration for specific cluster label.
721 cluster_label : `str`
722 Cluster label for which to add clusters.
723 partition_values : `set` [`~typing.Any`]
724 Values that would be internal to clusters to be used in partitioning.
725 quanta_info : `dict` [`str`, `dict` [`str`, `~typing.Any`]]
726 Mapping of cluster names to pre-gathered quanta information.
727 quantum_to_cluster : `dict` [`uuid.UUID`, `str`]
728 Mapping of quantum node id to which cluster it was added
729 (modified in method).
730 """
731 if partition_values:
732 partition_key_to_id = partition_cluster_values(
733 cluster_config, cluster_label, partition_values, len(quanta_info)
734 )
736 # Make and add clusters.
737 new_clusters: dict[str, QuantaCluster] = {}
738 for cluster_name, info_list in quanta_info.items():
739 for info in info_list:
740 full_cluster_name = cluster_name
741 if "partition_key" in info:
742 full_cluster_name += f"_{partition_key_to_id[info['partition_key']]:03}"
744 if full_cluster_name in new_clusters:
745 cluster = new_clusters[full_cluster_name]
746 else:
747 cluster = QuantaCluster(full_cluster_name, cluster_label, info["tags"])
748 cqgraph.add_cluster(cluster)
749 new_clusters[full_cluster_name] = cluster
751 cluster.add_quantum(info["node_number"], info["node_label"])
753 # Save mapping for use when creating dependencies.
754 quantum_to_cluster[info["node_number"]] = full_cluster_name
756 for cluster in new_clusters.values():
757 add_cluster_dependencies(cqgraph, cluster, quantum_to_cluster)
760def get_cluster_name_from_info(
761 quantum_id: uuid.UUID,
762 quantum_info: QuantumInfo,
763 cluster_dims: list[str],
764 cluster_label: str,
765 template: str,
766 equal_dims: list[tuple[str, str]],
767 partition_dims: list[str],
768) -> tuple[str, dict[str, Any]]:
769 """Get the cluster name in which to add the given node.
771 Parameters
772 ----------
773 quantum_id : `uuid.UUID`
774 Unique ID for the quantum.
775 quantum_info : `lsst.pipe.base.quantum_graph.QuantumInfo`
776 Info dictionary from which to create the cluster.
777 cluster_dims : `list` [`str`]
778 Dimension names to be used when clustering.
779 cluster_label : `str`
780 Cluster label.
781 template : `str`
782 Template for the cluster name.
783 equal_dims : `list` [`tuple` [`str`, `str`]]
784 Pairs of dimension names considered equal for clustering.
785 partition_dims : `list` [`str`]
786 Dimension names to be used when partitioning.
788 Returns
789 -------
790 cluster_name : `str`
791 Name of the cluster in which to add the given node.
792 info : dict [`str`, `~typing.Any`]
793 Information needed if creating a new node.
794 """
795 # Gather info for cluster name template into a dictionary.
796 info: dict[str, Any] = {
797 "node_number": quantum_id,
798 "node_label": quantum_info["task_label"],
799 "label": cluster_label,
800 }
802 # Save values for all dimensions used in clustering and partitioning.
803 all_dims = cluster_dims + partition_dims
805 missing_info = set()
806 data_id_info = dict(quantum_info["data_id"].mapping)
807 for dim_name in all_dims:
808 _LOG.debug("dim_name = %s", dim_name)
809 if dim_name in data_id_info:
810 info[dim_name] = data_id_info[dim_name]
811 else:
812 missing_info.add(dim_name)
813 for dim1, dim2 in equal_dims:
814 if dim1 in all_dims and dim2 in data_id_info:
815 info[dim1] = data_id_info[dim2]
816 missing_info.remove(dim1)
817 elif dim2 in all_dims and dim1 in data_id_info:
818 info[dim2] = data_id_info[dim1]
819 missing_info.remove(dim2)
821 if missing_info:
822 raise RuntimeError(
823 f"Quantum {quantum_id} ({data_id_info}) missing dimensions: {','.join(missing_info)}; "
824 f"required for cluster {cluster_label}"
825 )
827 _LOG.debug("info for template = %s", info)
829 # Use dictionary plus template format string to create name.
830 # To avoid # key errors from generic patterns, use defaultdict.
831 cluster_name = template.format_map(defaultdict(lambda: "", info))
832 cluster_name = re.sub("_+", "_", cluster_name)
834 # Some dimensions contain slash which must be replaced.
835 cluster_name = re.sub("/", "_", cluster_name)
836 _LOG.debug("cluster_name = %s", cluster_name)
838 info["tags"] = {}
839 for dim in cluster_dims:
840 info["tags"][dim] = info[dim]
841 del info[dim]
843 if partition_dims:
844 info["partition_key"] = "==".join([str(info[dim]) for dim in partition_dims])
846 return cluster_name, info