Coverage for python / lsst / pipe / base / simple_pipeline_executor.py: 19%
137 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-06 08:32 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-06 08:32 +0000
1# This file is part of pipe_base.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28from __future__ import annotations
30__all__ = ("SimplePipelineExecutor",)
32from collections.abc import Iterable, Iterator, Mapping
33from typing import Any
35from lsst.daf.butler import (
36 Butler,
37 CollectionType,
38 DataCoordinate,
39 DatasetRef,
40 Quantum,
41)
42from lsst.daf.butler.registry import RegistryDefaults
43from lsst.pex.config import Config
45from ._instrument import Instrument
46from ._quantumContext import ExecutionResources
47from .all_dimensions_quantum_graph_builder import AllDimensionsQuantumGraphBuilder
48from .graph import QuantumGraph
49from .pipeline import Pipeline
50from .pipeline_graph import PipelineGraph
51from .pipelineTask import PipelineTask
52from .quantum_graph import PredictedQuantumGraph
53from .single_quantum_executor import SingleQuantumExecutor
54from .taskFactory import TaskFactory
57class SimplePipelineExecutor:
58 """A simple, high-level executor for pipelines.
60 Parameters
61 ----------
62 quantum_graph : `.QuantumGraph`
63 Graph to be executed.
64 butler : `~lsst.daf.butler.Butler`
65 Object that manages all I/O. Must be initialized with `collections`
66 and `run` properties that correspond to the input and output
67 collections, which must be consistent with those used to create
68 ``quantum_graph``.
69 resources : `.ExecutionResources`
70 The resources available to each quantum being executed.
71 raise_on_partial_outputs : `bool`, optional
72 If `True` raise exceptions chained by `.AnnotatedPartialOutputsError`
73 immediately, instead of considering the partial result a success and
74 continuing to run downstream tasks.
76 Notes
77 -----
78 Most callers should use one of the `classmethod` factory functions
79 (`from_pipeline_filename`, `from_task_class`, `from_pipeline`) instead of
80 invoking the constructor directly; these guarantee that the
81 `~lsst.daf.butler.Butler` and `.QuantumGraph` are created consistently.
83 This class is intended primarily to support unit testing and small-scale
84 integration testing of `.PipelineTask` classes. It deliberately lacks many
85 features present in the command-line-only ``pipetask`` tool in order to
86 keep the implementation simple. Python callers that need more
87 sophistication should call lower-level tools like
88 `~.quantum_graph_builder.QuantumGraphBuilder` and
89 `.single_quantum_executor.SingleQuantumExecutor` directly.
90 """
92 def __init__(
93 self,
94 quantum_graph: QuantumGraph | PredictedQuantumGraph,
95 butler: Butler,
96 resources: ExecutionResources | None = None,
97 raise_on_partial_outputs: bool = True,
98 ):
99 from .graph import QuantumGraph
101 self._quantum_graph: QuantumGraph | None = None
102 if isinstance(quantum_graph, QuantumGraph):
103 self._quantum_graph = quantum_graph
104 self.predicted = PredictedQuantumGraph.from_old_quantum_graph(self._quantum_graph)
105 else:
106 self.predicted = quantum_graph
107 self.butler = butler
108 self.resources = resources
109 self.raise_on_partial_outputs = raise_on_partial_outputs
111 @classmethod
112 def prep_butler(
113 cls,
114 root: str,
115 inputs: Iterable[str],
116 output: str,
117 output_run: str | None = None,
118 ) -> Butler:
119 """Return configured `~lsst.daf.butler.Butler`.
121 Helper method for creating `~lsst.daf.butler.Butler` instances with
122 collections appropriate for processing.
124 Parameters
125 ----------
126 root : `str`
127 Root of the butler data repository; must already exist, with all
128 necessary input data.
129 inputs : `~collections.abc.Iterable` [ `str` ]
130 Collections to search for all input datasets, in search order.
131 output : `str`
132 Name of a new output `~lsst.daf.butler.CollectionType.CHAINED`
133 collection to create that will combine both inputs and outputs.
134 output_run : `str`, optional
135 Name of the output `~lsst.daf.butler.CollectionType.RUN` that will
136 directly hold all output datasets. If not provided, a name will be
137 created from ``output`` and a timestamp.
139 Returns
140 -------
141 butler : `~lsst.daf.butler.Butler`
142 Butler client instance compatible with all `classmethod` factories.
143 Always writeable.
144 """
145 if output_run is None:
146 output_run = f"{output}/{Instrument.makeCollectionTimestamp()}"
147 # Make initial butler with no collections, since we haven't created
148 # them yet.
149 butler = Butler.from_config(root, writeable=True)
150 butler.registry.registerCollection(output_run, CollectionType.RUN)
151 butler.registry.registerCollection(output, CollectionType.CHAINED)
152 collections = [output_run]
153 collections.extend(inputs)
154 butler.registry.setCollectionChain(output, collections)
155 # Override the registry defaults. No need to clone.
156 butler.registry.defaults = RegistryDefaults(collections=[output], run=output_run)
157 return butler
159 @classmethod
160 def from_pipeline_filename(
161 cls,
162 pipeline_filename: str,
163 *,
164 where: str = "",
165 bind: Mapping[str, Any] | None = None,
166 butler: Butler,
167 resources: ExecutionResources | None = None,
168 raise_on_partial_outputs: bool = True,
169 attach_datastore_records: bool = False,
170 output: str | None = None,
171 output_run: str | None = None,
172 ) -> SimplePipelineExecutor:
173 """Create an executor by building a QuantumGraph from an on-disk
174 pipeline YAML file.
176 Parameters
177 ----------
178 pipeline_filename : `str`
179 Name of the YAML file to load the pipeline definition from.
180 where : `str`, optional
181 Data ID query expression that constraints the quanta generated.
182 bind : `~collections.abc.Mapping`, optional
183 Mapping containing literal values that should be injected into the
184 ``where`` expression, keyed by the identifiers they replace.
185 butler : `~lsst.daf.butler.Butler`
186 Butler that manages all I/O. `prep_butler` can be used to create
187 one.
188 resources : `.ExecutionResources`
189 The resources available to each quantum being executed.
190 raise_on_partial_outputs : `bool`, optional
191 If `True` raise exceptions chained by
192 `.AnnotatedPartialOutputsError` immediately, instead of considering
193 the partial result a success and continuing to run downstream
194 tasks.
195 attach_datastore_records : `bool`, optional
196 Whether to attach datastore records to the quantum graph. This is
197 usually unnecessary, unless the executor is used to test behavior
198 that depends on datastore records.
199 output : `str`, optional
200 Name of a new output `~lsst.daf.butler.CollectionType.CHAINED`
201 collection to create that will combine both inputs and outputs.
202 output_run : `str`, optional
203 Name of the output `~lsst.daf.butler.CollectionType.RUN` that will
204 directly hold all output datasets. If not provided, a name will be
205 created from ``output`` and a timestamp.
207 Returns
208 -------
209 executor : `SimplePipelineExecutor`
210 An executor instance containing the constructed `.QuantumGraph` and
211 `~lsst.daf.butler.Butler`, ready for `run` to be called.
212 """
213 pipeline = Pipeline.fromFile(pipeline_filename)
214 return cls.from_pipeline(
215 pipeline,
216 butler=butler,
217 where=where,
218 bind=bind,
219 resources=resources,
220 raise_on_partial_outputs=raise_on_partial_outputs,
221 attach_datastore_records=attach_datastore_records,
222 output=output,
223 output_run=output_run,
224 )
226 @classmethod
227 def from_task_class(
228 cls,
229 task_class: type[PipelineTask],
230 config: Config | None = None,
231 label: str | None = None,
232 *,
233 where: str = "",
234 bind: Mapping[str, Any] | None = None,
235 butler: Butler,
236 resources: ExecutionResources | None = None,
237 raise_on_partial_outputs: bool = True,
238 attach_datastore_records: bool = False,
239 output: str | None = None,
240 output_run: str | None = None,
241 ) -> SimplePipelineExecutor:
242 """Create an executor by building a QuantumGraph from a pipeline
243 containing a single task.
245 Parameters
246 ----------
247 task_class : `type`
248 A concrete `.PipelineTask` subclass.
249 config : `~lsst.pex.config.Config`, optional
250 Configuration for the task. If not provided, task-level defaults
251 will be used (no per-instrument overrides).
252 label : `str`, optional
253 Label for the task in its pipeline; defaults to
254 ``task_class._DefaultName``.
255 where : `str`, optional
256 Data ID query expression that constraints the quanta generated.
257 bind : `~collections.abc.Mapping`, optional
258 Mapping containing literal values that should be injected into the
259 ``where`` expression, keyed by the identifiers they replace.
260 butler : `~lsst.daf.butler.Butler`
261 Butler that manages all I/O. `prep_butler` can be used to create
262 one.
263 resources : `.ExecutionResources`
264 The resources available to each quantum being executed.
265 raise_on_partial_outputs : `bool`, optional
266 If `True` raise exceptions chained by
267 `.AnnotatedPartialOutputsError` immediately, instead of considering
268 the partial result a success and continuing to run downstream
269 tasks.
270 attach_datastore_records : `bool`, optional
271 Whether to attach datastore records to the quantum graph. This is
272 usually unnecessary, unless the executor is used to test behavior
273 that depends on datastore records.
274 output : `str`, optional
275 Name of a new output `~lsst.daf.butler.CollectionType.CHAINED`
276 collection to create that will combine both inputs and outputs.
277 output_run : `str`, optional
278 Name of the output `~lsst.daf.butler.CollectionType.RUN` that will
279 directly hold all output datasets. If not provided, a name will be
280 created from ``output`` and a timestamp.
282 Returns
283 -------
284 executor : `SimplePipelineExecutor`
285 An executor instance containing the constructed `.QuantumGraph` and
286 `~lsst.daf.butler.Butler`, ready for `run` to be called.
287 """
288 if config is None:
289 config = task_class.ConfigClass()
290 if label is None:
291 label = task_class._DefaultName
292 if not isinstance(config, task_class.ConfigClass):
293 raise TypeError(
294 f"Invalid config class type: expected {task_class.ConfigClass.__name__}, "
295 f"got {type(config).__name__}."
296 )
297 pipeline_graph = PipelineGraph()
298 pipeline_graph.add_task(label=label, task_class=task_class, config=config)
299 return cls.from_pipeline_graph(
300 pipeline_graph,
301 butler=butler,
302 where=where,
303 bind=bind,
304 resources=resources,
305 raise_on_partial_outputs=raise_on_partial_outputs,
306 attach_datastore_records=attach_datastore_records,
307 output=output,
308 output_run=output_run,
309 )
311 @classmethod
312 def from_pipeline(
313 cls,
314 pipeline: Pipeline,
315 *,
316 where: str = "",
317 bind: Mapping[str, Any] | None = None,
318 butler: Butler,
319 resources: ExecutionResources | None = None,
320 raise_on_partial_outputs: bool = True,
321 attach_datastore_records: bool = False,
322 output: str | None = None,
323 output_run: str | None = None,
324 ) -> SimplePipelineExecutor:
325 """Create an executor by building a QuantumGraph from an in-memory
326 pipeline.
328 Parameters
329 ----------
330 pipeline : `.Pipeline` or `~collections.abc.Iterable` [ `.TaskDef` ]
331 A Python object describing the tasks to run, along with their
332 labels and configuration.
333 where : `str`, optional
334 Data ID query expression that constraints the quanta generated.
335 bind : `~collections.abc.Mapping`, optional
336 Mapping containing literal values that should be injected into the
337 ``where`` expression, keyed by the identifiers they replace.
338 butler : `~lsst.daf.butler.Butler`
339 Butler that manages all I/O. `prep_butler` can be used to create
340 one.
341 resources : `.ExecutionResources`
342 The resources available to each quantum being executed.
343 raise_on_partial_outputs : `bool`, optional
344 If `True` raise exceptions chained by
345 `.AnnotatedPartialOutputsError` immediately, instead of considering
346 the partial result a success and continuing to run downstream
347 tasks.
348 attach_datastore_records : `bool`, optional
349 Whether to attach datastore records to the quantum graph. This is
350 usually unnecessary, unless the executor is used to test behavior
351 that depends on datastore records.
352 output : `str`, optional
353 Name of a new output `~lsst.daf.butler.CollectionType.CHAINED`
354 collection to create that will combine both inputs and outputs.
355 output_run : `str`, optional
356 Name of the output `~lsst.daf.butler.CollectionType.RUN` that will
357 directly hold all output datasets. If not provided, a name will
358 be created from ``output`` and a timestamp.
360 Returns
361 -------
362 executor : `SimplePipelineExecutor`
363 An executor instance containing the constructed `.QuantumGraph` and
364 `~lsst.daf.butler.Butler`, ready for `run` to be called.
365 """
366 pipeline_graph = pipeline.to_graph()
367 return cls.from_pipeline_graph(
368 pipeline_graph,
369 where=where,
370 bind=bind,
371 butler=butler,
372 resources=resources,
373 raise_on_partial_outputs=raise_on_partial_outputs,
374 attach_datastore_records=attach_datastore_records,
375 output=output,
376 output_run=output_run,
377 )
379 @classmethod
380 def from_pipeline_graph(
381 cls,
382 pipeline_graph: PipelineGraph,
383 *,
384 where: str = "",
385 bind: Mapping[str, Any] | None = None,
386 butler: Butler,
387 resources: ExecutionResources | None = None,
388 raise_on_partial_outputs: bool = True,
389 attach_datastore_records: bool = False,
390 output: str | None = None,
391 output_run: str | None = None,
392 ) -> SimplePipelineExecutor:
393 """Create an executor by building a QuantumGraph from an in-memory
394 pipeline graph.
396 Parameters
397 ----------
398 pipeline_graph : `~.pipeline_graph.PipelineGraph`
399 A Python object describing the tasks to run, along with their
400 labels and configuration, in graph form. Will be resolved against
401 the given ``butler``, with any existing resolutions ignored.
402 where : `str`, optional
403 Data ID query expression that constraints the quanta generated.
404 bind : `~collections.abc.Mapping`, optional
405 Mapping containing literal values that should be injected into the
406 ``where`` expression, keyed by the identifiers they replace.
407 butler : `~lsst.daf.butler.Butler`
408 Butler that manages all I/O. `prep_butler` can be used to create
409 one. Must have its `~lsst.daf.butler.Butler.run` and
410 ``butler.collections.defaults`` not empty and not `None`.
411 resources : `.ExecutionResources`
412 The resources available to each quantum being executed.
413 raise_on_partial_outputs : `bool`, optional
414 If `True` raise exceptions chained by
415 `.AnnotatedPartialOutputsError` immediately, instead
416 of considering the partial result a success and continuing to run
417 downstream tasks.
418 attach_datastore_records : `bool`, optional
419 Whether to attach datastore records to the quantum graph. This is
420 usually unnecessary, unless the executor is used to test behavior
421 that depends on datastore records.
422 output : `str`, optional
423 Name of a new output `~lsst.daf.butler.CollectionType.CHAINED`
424 collection to create that will combine both inputs and outputs.
425 output_run : `str`, optional
426 Name of the output `~lsst.daf.butler.CollectionType.RUN` that will
427 directly hold all output datasets. If not provided, a name will
428 be created from ``output`` and a timestamp.
430 Returns
431 -------
432 executor : `SimplePipelineExecutor`
433 An executor instance containing the constructed
434 `.QuantumGraph` and `~lsst.daf.butler.Butler`, ready
435 for `run` to be called.
436 """
437 if output_run is None:
438 output_run = butler.run
439 if output_run is None:
440 if output is None:
441 raise TypeError("At least one of output or output_run must be provided.")
442 output_run = f"{output}/{Instrument.makeCollectionTimestamp()}"
444 quantum_graph_builder = AllDimensionsQuantumGraphBuilder(
445 pipeline_graph, butler, where=where, bind=bind, output_run=output_run
446 )
447 metadata = {
448 "skip_existing_in": [],
449 "skip_existing": False,
450 "data_query": where,
451 }
452 predicted = quantum_graph_builder.finish(
453 output=output,
454 metadata=metadata,
455 attach_datastore_records=attach_datastore_records,
456 ).assemble()
457 return cls(
458 predicted,
459 butler=butler,
460 resources=resources,
461 raise_on_partial_outputs=raise_on_partial_outputs,
462 )
464 @property
465 def quantum_graph(self) -> QuantumGraph:
466 """The quantum graph run by this executor."""
467 if self._quantum_graph is None:
468 self._quantum_graph = self.predicted.to_old_quantum_graph()
469 return self._quantum_graph
471 def use_local_butler(
472 self, root: str, register_dataset_types: bool = True, transfer_dimensions: bool = True
473 ) -> Butler:
474 """Transfer all inputs to a local data repository. and set the executor
475 to write outputs to it.
477 Parameters
478 ----------
479 root : `str`
480 Path to the local data repository; created if it does not exist.
481 register_dataset_types : `bool`, optional
482 Whether to register dataset types in the new repository. If
483 `False`, the local data repository must already exist and already
484 have all input dataset types registered.
485 transfer_dimensions : `bool`, optional
486 Whether to transfer dimension records to the new repository. If
487 `False`, the local data repository must already exist and already
488 have all needed dimension records.
490 Returns
491 -------
492 butler : `lsst.daf.butler.Butler`
493 Writeable butler for local data repository.
495 Notes
496 -----
497 The input collection structure from the original data repository is not
498 preserved by this method (it cannot be reconstructed from the quantum
499 graph). Instead, a `~lsst.daf.butler.CollectionType.TAGGED` collection
500 is created to gather all inputs, and appended to the output
501 `~lsst.daf.butler.CollectionType.CHAINED` collection after the output
502 `~lsst.daf.butler.CollectionType.RUN` collection. Calibration inputs
503 with the same data ID but multiple validity ranges are *not* included
504 in that `~lsst.daf.butler.CollectionType.TAGGED`; they are still
505 transferred to the local data repository, but can only be found via the
506 quantum graph or their original `~lsst.daf.butler.CollectionType.RUN`
507 collections.
508 """
509 if not Butler.has_repo_config(root):
510 Butler.makeRepo(root)
511 out_butler = Butler.from_config(root, writeable=True)
513 output_run = self.predicted.header.output_run
514 out_butler.collections.register(output_run, CollectionType.RUN)
515 output = self.predicted.header.output
516 inputs: str | None = None
517 if output is not None:
518 inputs = f"{output}/inputs"
519 out_butler.collections.register(output, CollectionType.CHAINED)
520 out_butler.collections.register(inputs, CollectionType.TAGGED)
521 out_butler.collections.redefine_chain(output, [output_run, inputs])
523 if transfer_dimensions:
524 # We can't just let the transfer_from call below take care of this
525 # because we need dimensions for outputs as well as inputs. And if
526 # we have to do the outputs explicitly, it's more efficient to do
527 # the inputs at the same time since a lot of those dimensions will
528 # be the same.
529 self._transfer_qg_dimension_records(out_butler)
531 # Extract overall-input DatasetRefs to transfer and possibly insert
532 # into a TAGGED collection.
533 refs: set[DatasetRef] = set()
534 to_tag_by_type: dict[str, dict[DataCoordinate, DatasetRef | None]] = {}
535 pipeline_graph = self.predicted.pipeline_graph
536 for name, dataset_type_node in pipeline_graph.iter_overall_inputs():
537 assert dataset_type_node is not None, "PipelineGraph should be resolved."
538 to_tag_for_type = to_tag_by_type.setdefault(name, {})
539 for task_node in pipeline_graph.consumers_of(name):
540 for quantum in self.predicted.build_execution_quanta(task_label=task_node.label).values():
541 for ref in quantum.inputs[name]:
542 ref = dataset_type_node.generalize_ref(ref)
543 refs.add(ref)
544 if to_tag_for_type.setdefault(ref.dataId, ref) != ref:
545 # There is already a dataset with the same data ID
546 # and dataset type, but a different UUID/run. This
547 # can only happen for calibrations found in
548 # calibration collections, and for now we have no
549 # choice but to leave them out of the TAGGED inputs
550 # collection in the local butler.
551 to_tag_for_type[ref.dataId] = None
553 out_butler.transfer_from(
554 self.butler,
555 refs,
556 register_dataset_types=register_dataset_types,
557 transfer_dimensions=False,
558 )
560 if inputs is not None:
561 to_tag_flat: list[DatasetRef] = []
562 for ref_map in to_tag_by_type.values():
563 for tag_ref in ref_map.values():
564 if tag_ref is not None:
565 to_tag_flat.append(tag_ref)
566 out_butler.registry.associate(inputs, to_tag_flat)
568 out_butler.registry.defaults = self.butler.registry.defaults.clone(collections=output, run=output_run)
569 self.butler = out_butler
570 return self.butler
572 def run(self, register_dataset_types: bool = False, save_versions: bool = True) -> list[Quantum]:
573 """Run all the quanta in the quantum graph in topological order.
575 Use this method to run all quanta in the graph. Use
576 `as_generator` to get a generator to run the quanta one at
577 a time.
579 Parameters
580 ----------
581 register_dataset_types : `bool`, optional
582 If `True`, register all output dataset types before executing any
583 quanta.
584 save_versions : `bool`, optional
585 If `True` (default), save a package versions dataset.
587 Returns
588 -------
589 quanta : `list` [ `~lsst.daf.butler.Quantum` ]
590 Executed quanta.
592 Notes
593 -----
594 A topological ordering is not in general unique, but no other
595 guarantees are made about the order in which quanta are processed.
596 """
597 return list(
598 self.as_generator(register_dataset_types=register_dataset_types, save_versions=save_versions)
599 )
601 def as_generator(
602 self, register_dataset_types: bool = False, save_versions: bool = True
603 ) -> Iterator[Quantum]:
604 """Yield quanta in the quantum graph in topological order.
606 These quanta will be run as the returned generator is iterated
607 over. Use this method to run the quanta one at a time.
608 Use `run` to run all quanta in the graph.
610 Parameters
611 ----------
612 register_dataset_types : `bool`, optional
613 If `True`, register all output dataset types before executing any
614 quanta.
615 save_versions : `bool`, optional
616 If `True` (default), save a package versions dataset.
618 Returns
619 -------
620 quanta : `~collections.abc.Iterator` [ `~lsst.daf.butler.Quantum` ]
621 Executed quanta.
623 Notes
624 -----
625 Global initialization steps (see `.QuantumGraph.init_output_run`) are
626 performed immediately when this method is called, but individual quanta
627 are not actually executed until the returned iterator is iterated over.
629 A topological ordering is not in general unique, but no other
630 guarantees are made about the order in which quanta are processed.
631 """
632 if register_dataset_types:
633 self.predicted.pipeline_graph.register_dataset_types(self.butler)
634 self.predicted.write_configs(self.butler, compare_existing=False)
635 self.predicted.write_init_outputs(self.butler, skip_existing=False)
636 if save_versions:
637 self.predicted.write_packages(self.butler, compare_existing=False)
638 task_factory = TaskFactory()
639 single_quantum_executor = SingleQuantumExecutor(
640 butler=self.butler,
641 task_factory=task_factory,
642 resources=self.resources,
643 raise_on_partial_outputs=self.raise_on_partial_outputs,
644 )
645 self.predicted.build_execution_quanta()
646 nodes_map = self.predicted.quantum_only_xgraph.nodes
647 # Important that this returns a generator expression rather than being
648 # a generator itself; that is what makes the init stuff above happen
649 # immediately instead of when the first quanta is executed, which might
650 # be useful for callers who want to check the state of the repo in
651 # between.
652 return (
653 single_quantum_executor.execute(
654 nodes_map[quantum_id]["pipeline_node"],
655 nodes_map[quantum_id]["quantum"],
656 quantum_id,
657 )[0]
658 for quantum_id in self.predicted
659 )
661 def _transfer_qg_dimension_records(self, out_butler: Butler) -> None:
662 """Transfer all dimension records from the quantum graph to a butler.
664 Parameters
665 ----------
666 out_butler : `lsst.daf.butler.Butler`
667 Butler to transfer records to.
668 """
669 assert self.predicted.dimension_data is not None, "Dimension data must be present for execution."
670 records = self.predicted.dimension_data.records
671 dimensions = out_butler.dimensions.sorted(records.keys())
672 for dimension in dimensions:
673 record_set = records[dimension.name]
674 if record_set and record_set.element.has_own_table:
675 out_butler.registry.insertDimensionData(
676 record_set.element,
677 *record_set,
678 skip_existing=True,
679 )