Coverage for python / lsst / pipe / base / quantum_graph_skeleton.py: 34%
206 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-26 08:59 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-26 08:59 +0000
1# This file is part of pipe_base.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28"""An under-construction version of QuantumGraph and various helper
29classes.
30"""
32from __future__ import annotations
34__all__ = (
35 "DatasetKey",
36 "PrerequisiteDatasetKey",
37 "QuantumGraphSkeleton",
38 "QuantumKey",
39 "TaskInitKey",
40)
42import dataclasses
43from collections import defaultdict
44from collections.abc import Iterable, Iterator, MutableMapping, Set
45from typing import TYPE_CHECKING, Any, ClassVar, Literal
47import networkx
49from lsst.daf.butler import (
50 Butler,
51 DataCoordinate,
52 DataIdValue,
53 DatasetRef,
54 DimensionDataAttacher,
55 DimensionDataExtractor,
56 DimensionGroup,
57 DimensionRecordSet,
58)
59from lsst.utils.logging import getLogger
61if TYPE_CHECKING:
62 pass
64_LOG = getLogger(__name__)
67@dataclasses.dataclass(slots=True, eq=True, frozen=True)
68class QuantumKey:
69 """Identifier type for quantum keys in a `QuantumGraphSkeleton`."""
71 task_label: str
72 """Label of the task in the pipeline."""
74 data_id_values: tuple[DataIdValue, ...]
75 """Data ID values of the quantum.
77 Note that keys are fixed given `task_label`, so using only the values here
78 speeds up comparisons.
79 """
81 is_task: ClassVar[Literal[True]] = True
82 """Whether this node represents a quantum or task initialization rather
83 than a dataset (always `True`).
84 """
87@dataclasses.dataclass(slots=True, eq=True, frozen=True)
88class TaskInitKey:
89 """Identifier type for task init keys in a `QuantumGraphSkeleton`."""
91 task_label: str
92 """Label of the task in the pipeline."""
94 is_task: ClassVar[Literal[True]] = True
95 """Whether this node represents a quantum or task initialization rather
96 than a dataset (always `True`).
97 """
100@dataclasses.dataclass(slots=True, eq=True, frozen=True)
101class DatasetKey:
102 """Identifier type for dataset keys in a `QuantumGraphSkeleton`."""
104 parent_dataset_type_name: str
105 """Name of the dataset type (never a component)."""
107 data_id_values: tuple[DataIdValue, ...]
108 """Data ID values of the dataset.
110 Note that keys are fixed given `parent_dataset_type_name`, so using only
111 the values here speeds up comparisons.
112 """
114 is_task: ClassVar[Literal[False]] = False
115 """Whether this node represents a quantum or task initialization rather
116 than a dataset (always `False`).
117 """
119 is_prerequisite: ClassVar[Literal[False]] = False
122@dataclasses.dataclass(slots=True, eq=True, frozen=True)
123class PrerequisiteDatasetKey:
124 """Identifier type for prerequisite dataset keys in a
125 `QuantumGraphSkeleton`.
127 Unlike regular datasets, prerequisites are not actually required to come
128 from a find-first search of `input_collections`, so we don't want to
129 assume that the same data ID implies the same dataset. Happily we also
130 don't need to search for them by data ID in the graph, so we can use the
131 dataset ID (UUID) instead.
132 """
134 parent_dataset_type_name: str
135 """Name of the dataset type (never a component)."""
137 dataset_id_bytes: bytes
138 """Dataset ID (UUID) as raw bytes."""
140 is_task: ClassVar[Literal[False]] = False
141 """Whether this node represents a quantum or task initialization rather
142 than a dataset (always `False`).
143 """
145 is_prerequisite: ClassVar[Literal[True]] = True
148type Key = QuantumKey | TaskInitKey | DatasetKey | PrerequisiteDatasetKey
151class QuantumGraphSkeleton:
152 """An under-construction quantum graph.
154 QuantumGraphSkeleton is intended for use inside `QuantumGraphBuilder` and
155 its subclasses.
157 Parameters
158 ----------
159 task_labels : `~collections.abc.Iterable` [ `str` ]
160 The labels of all tasks whose quanta may be included in the graph, in
161 topological order.
163 Notes
164 -----
165 QuantumGraphSkeleton models a bipartite version of the quantum graph, in
166 which both quanta and datasets are represented as nodes and each type of
167 node only has edges to the other type.
169 Square-bracket (`getitem`) indexing returns a mutable mapping of a node's
170 flexible attributes.
172 The details of the `QuantumGraphSkeleton` API (e.g. which operations
173 operate on multiple nodes vs. a single node) are set by what's actually
174 needed by current quantum graph generation algorithms. New variants can be
175 added as needed, but adding all operations that *might* be useful for some
176 future algorithm seems premature.
177 """
179 def __init__(self, task_labels: Iterable[str]):
180 self._tasks: dict[str, tuple[TaskInitKey, set[QuantumKey]]] = {}
181 self._xgraph: networkx.DiGraph = networkx.DiGraph()
182 self._global_init_outputs: set[DatasetKey] = set()
183 self._dimension_data: dict[str, DimensionRecordSet] = {}
184 for task_label in task_labels:
185 task_init_key = TaskInitKey(task_label)
186 self._tasks[task_label] = (task_init_key, set())
187 self._xgraph.add_node(task_init_key)
189 def __contains__(self, key: Key) -> bool:
190 return key in self._xgraph.nodes
192 def __getitem__(self, key: Key) -> MutableMapping[str, Any]:
193 return self._xgraph.nodes[key]
195 def __iter__(self) -> Iterator[Key]:
196 return iter(self._xgraph.nodes)
198 @property
199 def n_nodes(self) -> int:
200 """The total number of nodes of all types."""
201 return len(self._xgraph.nodes)
203 @property
204 def n_edges(self) -> int:
205 """The total number of edges."""
206 return len(self._xgraph.edges)
208 @property
209 def has_any_quanta(self) -> bool:
210 """Test whether this graph has any quanta."""
211 for _ in self.iter_all_quanta():
212 return True
213 return False
215 def has_task(self, task_label: str) -> bool:
216 """Test whether the given task is in this skeleton.
218 Tasks are only added to the skeleton at initialization, but may be
219 removed by `remove_task` if they end up having no quanta.
221 Parameters
222 ----------
223 task_label : `str`
224 Task to check for.
226 Returns
227 -------
228 has : `bool`
229 `True` if the task is in this skeleton.
230 """
231 return task_label in self._tasks
233 def get_task_init_node(self, task_label: str) -> TaskInitKey:
234 """Return the graph node that represents a task's initialization.
236 Parameters
237 ----------
238 task_label : `str`
239 The task label to use.
241 Returns
242 -------
243 node : `TaskInitKey`
244 The graph node representing this task's initialization.
245 """
246 return self._tasks[task_label][0]
248 def get_quanta(self, task_label: str) -> Set[QuantumKey]:
249 """Return the quanta for the given task label.
251 Parameters
252 ----------
253 task_label : `str`
254 Label for the task.
256 Returns
257 -------
258 quanta : `~collections.abc.Set` [ `QuantumKey` ]
259 A set-like object with the identifiers of all quanta for the given
260 task. *The skeleton object's set of quanta must not be modified
261 while iterating over this container; make a copy if mutation during
262 iteration is necessary*.
263 """
264 return self._tasks[task_label][1]
266 @property
267 def global_init_outputs(self) -> Set[DatasetKey]:
268 """The set of dataset nodes that are not associated with any task."""
269 return self._global_init_outputs
271 def iter_all_quanta(self) -> Iterator[QuantumKey]:
272 """Iterate over all quanta from any task, in topological (but otherwise
273 unspecified) order.
274 """
275 for _, quanta in self._tasks.values():
276 yield from quanta
278 def iter_outputs_of(self, quantum_key: QuantumKey | TaskInitKey) -> Iterator[DatasetKey]:
279 """Iterate over the datasets produced by the given quantum.
281 Parameters
282 ----------
283 quantum_key : `QuantumKey` or `TaskInitKey`
284 Quantum to iterate over.
286 Returns
287 -------
288 datasets : `~collections.abc.Iterator` of `DatasetKey`
289 Datasets produced by the given quanta.
290 """
291 return self._xgraph.successors(quantum_key)
293 def iter_inputs_of(
294 self, quantum_key: QuantumKey | TaskInitKey
295 ) -> Iterator[DatasetKey | PrerequisiteDatasetKey]:
296 """Iterate over the datasets consumed by the given quantum.
298 Parameters
299 ----------
300 quantum_key : `QuantumKey` or `TaskInitKey`
301 Quantum to iterate over.
303 Returns
304 -------
305 datasets : `~collections.abc.Iterator` of `DatasetKey` \
306 or `PrequisiteDatasetKey`
307 Datasets consumed by the given quanta.
308 """
309 return self._xgraph.predecessors(quantum_key)
311 def update(self, other: QuantumGraphSkeleton) -> None:
312 """Copy all nodes from ``other`` to ``self``.
314 Parameters
315 ----------
316 other : `QuantumGraphSkeleton`
317 Source of nodes. The tasks in ``other`` must be a subset of the
318 tasks in ``self`` (this method is expected to be used to populate
319 a skeleton for a full from independent-subgraph skeletons).
320 """
321 for task_label, (_, quanta) in other._tasks.items():
322 self._tasks[task_label][1].update(quanta)
323 self._xgraph.update(other._xgraph)
324 for record_set in other._dimension_data.values():
325 self._dimension_data.setdefault(
326 record_set.element.name, DimensionRecordSet(record_set.element)
327 ).update(record_set)
329 def add_quantum_node(self, task_label: str, data_id: DataCoordinate, **attrs: Any) -> QuantumKey:
330 """Add a new node representing a quantum.
332 Parameters
333 ----------
334 task_label : `str`
335 Name of task.
336 data_id : `~lsst.daf.butler.DataCoordinate`
337 The data ID of the quantum.
338 **attrs : `~typing.Any`
339 Additional attributes.
340 """
341 key = QuantumKey(task_label, data_id.required_values)
342 self._xgraph.add_node(key, data_id=data_id, **attrs)
343 self._tasks[key.task_label][1].add(key)
344 return key
346 def add_dataset_node(
347 self,
348 parent_dataset_type_name: str,
349 data_id: DataCoordinate,
350 is_global_init_output: bool = False,
351 **attrs: Any,
352 ) -> DatasetKey:
353 """Add a new node representing a dataset.
355 Parameters
356 ----------
357 parent_dataset_type_name : `str`
358 Name of the parent dataset type.
359 data_id : `~lsst.daf.butler.DataCoordinate`
360 The dataset data ID.
361 is_global_init_output : `bool`, optional
362 Whether this dataset is a global init output.
363 **attrs : `~typing.Any`
364 Additional attributes for the node.
365 """
366 key = DatasetKey(parent_dataset_type_name, data_id.required_values)
367 self._xgraph.add_node(key, data_id=data_id, **attrs)
368 if is_global_init_output:
369 assert isinstance(key, DatasetKey), str(key)
370 self._global_init_outputs.add(key)
371 return key
373 def add_prerequisite_node(
374 self,
375 ref: DatasetRef,
376 **attrs: Any,
377 ) -> PrerequisiteDatasetKey:
378 """Add a new node representing a prerequisite input dataset.
380 Parameters
381 ----------
382 ref : `~lsst.daf.butler.DatasetRef`
383 The dataset ref of the prerequisite.
384 **attrs : `~typing.Any`
385 Additional attributes for the node.
386 """
387 key = PrerequisiteDatasetKey(ref.datasetType.name, ref.id.bytes)
388 self._xgraph.add_node(key, data_id=ref.dataId, ref=ref, **attrs)
389 return key
391 def remove_quantum_node(self, key: QuantumKey, remove_outputs: bool) -> None:
392 """Remove a node representing a quantum.
394 Parameters
395 ----------
396 key : `QuantumKey`
397 Identifier for the node.
398 remove_outputs : `bool`
399 If `True`, also remove all dataset nodes produced by this quantum.
400 If `False`, any such dataset nodes will become overall inputs.
401 """
402 _, quanta = self._tasks[key.task_label]
403 quanta.remove(key)
404 if remove_outputs:
405 to_remove = list(self._xgraph.successors(key))
406 to_remove.append(key)
407 self._xgraph.remove_nodes_from(to_remove)
408 else:
409 self._xgraph.remove_node(key)
411 def remove_dataset_nodes(self, keys: Iterable[DatasetKey | PrerequisiteDatasetKey]) -> None:
412 """Remove nodes representing datasets.
414 Parameters
415 ----------
416 keys : `~collections.abc.Iterable` of `DatasetKey`\
417 or `PrerequisiteDatasetKey`
418 Nodes to remove.
419 """
420 self._xgraph.remove_nodes_from(keys)
422 def remove_task(self, task_label: str) -> None:
423 """Fully remove a task from the skeleton.
425 All init-output datasets and quanta for the task must already have been
426 removed.
428 Parameters
429 ----------
430 task_label : `str`
431 Name of task to remove.
432 """
433 task_init_key, quanta = self._tasks.pop(task_label)
434 assert not quanta, "Cannot remove task unless all quanta have already been removed."
435 assert not list(self._xgraph.successors(task_init_key))
436 self._xgraph.remove_node(task_init_key)
438 def add_input_edges(
439 self,
440 task_key: QuantumKey | TaskInitKey,
441 dataset_keys: Iterable[DatasetKey | PrerequisiteDatasetKey],
442 ) -> None:
443 """Add edges connecting datasets to a quantum that consumes them.
445 Parameters
446 ----------
447 task_key : `QuantumKey` or `TaskInitKey`
448 Quantum to connect.
449 dataset_keys : `~collections.abc.Iterable` of `DatasetKey`\
450 or `PrequisiteDatasetKey`
451 Datasets to join to the quantum.
453 Notes
454 -----
455 This must only be called if the task node has already been added.
456 Use `add_input_edge` if this cannot be assumed.
458 Dataset nodes that are not already present will be created.
459 """
460 assert task_key in self._xgraph, str(task_key)
461 self._xgraph.add_edges_from((dataset_key, task_key) for dataset_key in dataset_keys)
463 def remove_input_edges(
464 self,
465 task_key: QuantumKey | TaskInitKey,
466 dataset_keys: Iterable[DatasetKey | PrerequisiteDatasetKey],
467 ) -> None:
468 """Remove edges connecting datasets to a quantum that consumes them.
470 Parameters
471 ----------
472 task_key : `QuantumKey` or `TaskInitKey`
473 Quantum to disconnect.
474 dataset_keys : `~collections.abc.Iterable` of `DatasetKey`\
475 or `PrequisiteDatasetKey`
476 Datasets to remove from the quantum.
477 """
478 self._xgraph.remove_edges_from((dataset_key, task_key) for dataset_key in dataset_keys)
480 def add_input_edge(
481 self,
482 task_key: QuantumKey | TaskInitKey,
483 dataset_key: DatasetKey | PrerequisiteDatasetKey,
484 ignore_unrecognized_quanta: bool = False,
485 ) -> bool:
486 """Add an edge connecting a dataset to a quantum that consumes it.
488 Parameters
489 ----------
490 task_key : `QuantumKey` or `TaskInitKey`
491 Identifier for the quantum node.
492 dataset_key : `DatasetKey` or `PrerequisiteKey`
493 Identifier for the dataset node.
494 ignore_unrecognized_quanta : `bool`, optional
495 If `False`, do nothing if the quantum node is not already present.
496 If `True`, the quantum node is assumed to be present.
498 Returns
499 -------
500 added : `bool`
501 `True` if an edge was actually added, `False` if the quantum was
502 not recognized and the edge was not added as a result.
504 Notes
505 -----
506 Dataset nodes that are not already present will be created.
507 """
508 if ignore_unrecognized_quanta and task_key not in self._xgraph:
509 return False
510 self._xgraph.add_edge(dataset_key, task_key)
511 return True
513 def add_output_edge(self, task_key: QuantumKey | TaskInitKey, dataset_key: DatasetKey) -> None:
514 """Add an edge connecting a dataset to the quantum that produces it.
516 Parameters
517 ----------
518 task_key : `QuantumKey` or `TaskInitKey`
519 Identifier for the quantum node. Must identify a node already
520 present in the graph.
521 dataset_key : `DatasetKey`
522 Identifier for the dataset node. Must identify a node already
523 present in the graph.
524 """
525 assert task_key in self._xgraph, str(task_key)
526 assert dataset_key in self._xgraph, str(dataset_key)
527 self._xgraph.add_edge(task_key, dataset_key)
529 def remove_output_edge(self, dataset_key: DatasetKey) -> None:
530 """Remove the edge connecting a dataset to the quantum that produces
531 it.
533 Parameters
534 ----------
535 dataset_key : `DatasetKey`
536 Identifier for the dataset node. Must identify a node already
537 present in the graph.
538 """
539 (task_key,) = self._xgraph.predecessors(dataset_key)
540 assert dataset_key in self._xgraph, str(dataset_key)
541 self._xgraph.remove_edge(task_key, dataset_key)
543 def remove_orphan_datasets(self) -> None:
544 """Remove any dataset nodes that do not have any edges."""
545 for orphan in list(networkx.isolates(self._xgraph)):
546 if not orphan.is_task and orphan not in self._global_init_outputs:
547 self._xgraph.remove_node(orphan)
549 def extract_overall_inputs(self) -> dict[DatasetKey | PrerequisiteDatasetKey, DatasetRef]:
550 """Find overall input datasets.
552 Returns
553 -------
554 datasets : `dict` [ `DatasetKey` or `PrerequisiteDatasetKey`, \
555 `~lsst.daf.butler.DatasetRef` ]
556 Overall-input datasets, including prerequisites and init-inputs.
557 """
558 result = {}
559 for generation in networkx.algorithms.topological_generations(self._xgraph):
560 for dataset_key in generation:
561 if dataset_key.is_task:
562 continue
563 if (ref := self.get_dataset_ref(dataset_key)) is None:
564 raise AssertionError(
565 f"Logic bug in QG generation: dataset {dataset_key} was never resolved."
566 )
567 result[dataset_key] = ref
568 break
569 return result
571 def set_dataset_ref(
572 self, ref: DatasetRef, key: DatasetKey | PrerequisiteDatasetKey | None = None
573 ) -> None:
574 """Associate a dataset node with a `~lsst.daf.butler.DatasetRef`
575 instance.
577 Parameters
578 ----------
579 ref : `~lsst.daf.butler.DatasetRef`
580 `~lsst.daf.butler.DatasetRef` to associate with the node.
581 key : `DatasetKey` or `PrerequisiteDatasetKey`, optional
582 Identifier for the graph node. If not provided, a `DatasetKey`
583 is constructed from the dataset type name and data ID of ``ref``.
584 """
585 if key is None:
586 key = DatasetKey(ref.datasetType.name, ref.dataId.required_values)
587 self._xgraph.nodes[key]["ref"] = ref
589 def set_output_for_skip(self, ref: DatasetRef) -> None:
590 """Associate a dataset node with a `~lsst.daf.butler.DatasetRef` that
591 represents an existing output in a collection where such outputs can
592 cause a quantum to be skipped.
594 Parameters
595 ----------
596 ref : `~lsst.daf.butler.DatasetRef`
597 `~lsst.daf.butler.DatasetRef` to associate with the node.
598 """
599 key = DatasetKey(ref.datasetType.name, ref.dataId.required_values)
600 self._xgraph.nodes[key]["output_for_skip"] = ref
602 def set_output_in_the_way(self, ref: DatasetRef) -> None:
603 """Associate a dataset node with a `~lsst.daf.butler.DatasetRef` that
604 represents an existing output in the output RUN collection.
606 Parameters
607 ----------
608 ref : `~lsst.daf.butler.DatasetRef`
609 `~lsst.daf.butler.DatasetRef` to associate with the node.
610 """
611 key = DatasetKey(ref.datasetType.name, ref.dataId.required_values)
612 self._xgraph.nodes[key]["output_in_the_way"] = ref
614 def get_dataset_ref(self, key: DatasetKey | PrerequisiteDatasetKey) -> DatasetRef | None:
615 """Return the `~lsst.daf.butler.DatasetRef` associated with the given
616 node.
618 This does not return "output for skip" and "output in the way"
619 datasets.
621 Parameters
622 ----------
623 key : `DatasetKey` or `PrerequisiteDatasetKey`
624 Identifier for the graph node.
626 Returns
627 -------
628 ref : `~lsst.daf.butler.DatasetRef` or `None`
629 Dataset reference associated with the node.
630 """
631 return self._xgraph.nodes[key].get("ref")
633 def get_output_for_skip(self, key: DatasetKey) -> DatasetRef | None:
634 """Return the `~lsst.daf.butler.DatasetRef` associated with the given
635 node in a collection where it could lead to a quantum being skipped.
637 Parameters
638 ----------
639 key : `DatasetKey`
640 Identifier for the graph node.
642 Returns
643 -------
644 ref : `~lsst.daf.butler.DatasetRef` or `None`
645 Dataset reference associated with the node.
646 """
647 return self._xgraph.nodes[key].get("output_for_skip")
649 def get_output_in_the_way(self, key: DatasetKey) -> DatasetRef | None:
650 """Return the `~lsst.daf.butler.DatasetRef` associated with the given
651 node in the output RUN collection.
653 Parameters
654 ----------
655 key : `DatasetKey`
656 Identifier for the graph node.
658 Returns
659 -------
660 ref : `~lsst.daf.butler.DatasetRef` or `None`
661 Dataset reference associated with the node.
662 """
663 return self._xgraph.nodes[key].get("output_in_the_way")
665 def discard_output_in_the_way(self, key: DatasetKey) -> None:
666 """Drop any `~lsst.daf.butler.DatasetRef` associated with this node in
667 the output RUN collection.
669 Does nothing if there is no such `~lsst.daf.butler.DatasetRef`.
671 Parameters
672 ----------
673 key : `DatasetKey`
674 Identifier for the graph node.
675 """
676 self._xgraph.nodes[key].pop("output_in_the_way", None)
678 def set_data_id(self, key: Key, data_id: DataCoordinate) -> None:
679 """Set the data ID associated with a node.
681 This updates the data ID in any `~lsst.daf.butler.DatasetRef` objects
682 associated with the node via `set_ref`, `set_output_for_skip`, or
683 `set_output_in_the_way` as well, assuming it is an expanded version
684 of the original data ID.
686 Parameters
687 ----------
688 key : `Key`
689 Identifier for the graph node.
690 data_id : `~lsst.daf.butler.DataCoordinate`
691 Data ID for the node.
692 """
693 state: MutableMapping[str, Any] = self._xgraph.nodes[key]
694 state["data_id"] = data_id
695 ref: DatasetRef | None
696 if (ref := state.get("ref")) is not None:
697 state["ref"] = ref.expanded(data_id)
698 output_for_skip: DatasetRef | None
699 if (output_for_skip := state.get("output_for_skip")) is not None:
700 state["output_for_skip"] = output_for_skip.expanded(data_id)
701 output_in_the_way: DatasetRef | None
702 if (output_in_the_way := state.get("output_in_the_way")) is not None:
703 state["output_in_the_way"] = output_in_the_way.expanded(data_id)
705 def get_data_id(self, key: Key) -> DataCoordinate:
706 """Return the full data ID for a quantum or dataset, if available.
708 Parameters
709 ----------
710 key : `Key`
711 Identifier for the graph node.
713 Returns
714 -------
715 data_id : `~lsst.daf.butler.DataCoordinate`
716 Expanded data ID for the node, if one is available.
718 Raises
719 ------
720 KeyError
721 Raised if this node does not have an expanded data ID.
722 """
723 return self._xgraph.nodes[key]["data_id"]
725 def attach_dimension_records(
726 self,
727 butler: Butler,
728 dimensions: DimensionGroup,
729 dimension_records: Iterable[DimensionRecordSet] = (),
730 ) -> None:
731 """Attach dimension records to the data IDs in the skeleton.
733 This both attaches records to data IDs in the skeleton and aggregates
734 any existing records on data IDS, so `get_dimension_data` returns all
735 dimension records used in the skeleton. It can be called multiple
736 times.
738 Parameters
739 ----------
740 butler : `lsst.daf.butler.Butler`
741 Butler to use to query for missing dimension records.
742 dimensions : `lsst.daf.butler.DimensionGroup`
743 Superset of all of the dimensions of all data IDs.
744 dimension_records : `~collections.abc.Iterable` [ \
745 `lsst.daf.butler.DimensionRecordSet` ], optional
746 Iterable of sets of dimension records to attach.
747 """
748 for record_set in dimension_records:
749 self._dimension_data.setdefault(
750 record_set.element.name, DimensionRecordSet(record_set.element)
751 ).update(record_set)
752 # Group all nodes by data ID (and dimensions of data ID).
753 data_ids_to_expand: defaultdict[DimensionGroup, defaultdict[DataCoordinate, list[Key]]] = defaultdict(
754 lambda: defaultdict(list)
755 )
756 extractor = DimensionDataExtractor.from_dimension_group(dimensions)
757 data_id: DataCoordinate | None
758 for node_key in self:
759 if data_id := self[node_key].get("data_id"):
760 if data_id.hasRecords():
761 extractor.update([data_id])
762 else:
763 data_ids_to_expand[data_id.dimensions][data_id].append(node_key)
764 # Add records we extracted from data IDs that were already expanded, in
765 # case other nodes want them.
766 for record_set in extractor.records.values():
767 self._dimension_data.setdefault(
768 record_set.element.name, DimensionRecordSet(record_set.element)
769 ).update(record_set)
770 attacher = DimensionDataAttacher(records=self._dimension_data.values(), dimensions=dimensions)
771 for dimensions, data_ids in data_ids_to_expand.items():
772 with butler.query() as query:
773 # Butler query will be used as-needed to get dimension records
774 # (from prerequisites) we didn't fetch in advance. These are
775 # cached in the attacher so we don't look them up multiple
776 # times.
777 expanded_data_ids = attacher.attach(dimensions, data_ids.keys(), query=query)
778 for expanded_data_id, node_keys in zip(expanded_data_ids, data_ids.values()):
779 for node_key in node_keys:
780 self.set_data_id(node_key, expanded_data_id)
781 # Hold on to any records that we had to query for or extracted.
782 self._dimension_data = attacher.records
784 def get_dimension_data(self) -> list[DimensionRecordSet]:
785 """Return the dimension records attached to data IDs."""
786 return list(self._dimension_data.values())