Coverage for python / lsst / pipe / base / simple_pipeline_executor.py: 19%
138 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-18 08:44 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-18 08:44 +0000
1# This file is part of pipe_base.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28from __future__ import annotations
30__all__ = ("SimplePipelineExecutor",)
32import os
33from collections.abc import Iterable, Iterator, Mapping
34from typing import Any
36from lsst.daf.butler import (
37 Butler,
38 CollectionType,
39 DataCoordinate,
40 DatasetRef,
41 Quantum,
42)
43from lsst.daf.butler.registry import RegistryDefaults
44from lsst.pex.config import Config
46from ._instrument import Instrument
47from ._quantumContext import ExecutionResources
48from .all_dimensions_quantum_graph_builder import AllDimensionsQuantumGraphBuilder
49from .graph import QuantumGraph
50from .pipeline import Pipeline
51from .pipeline_graph import PipelineGraph
52from .pipelineTask import PipelineTask
53from .quantum_graph import PredictedQuantumGraph
54from .single_quantum_executor import SingleQuantumExecutor
55from .taskFactory import TaskFactory
58class SimplePipelineExecutor:
59 """A simple, high-level executor for pipelines.
61 Parameters
62 ----------
63 quantum_graph : `.QuantumGraph`
64 Graph to be executed.
65 butler : `~lsst.daf.butler.Butler`
66 Object that manages all I/O. Must be initialized with `collections`
67 and `run` properties that correspond to the input and output
68 collections, which must be consistent with those used to create
69 ``quantum_graph``.
70 resources : `.ExecutionResources`
71 The resources available to each quantum being executed.
72 raise_on_partial_outputs : `bool`, optional
73 If `True` raise exceptions chained by `.AnnotatedPartialOutputsError`
74 immediately, instead of considering the partial result a success and
75 continuing to run downstream tasks.
77 Notes
78 -----
79 Most callers should use one of the `classmethod` factory functions
80 (`from_pipeline_filename`, `from_task_class`, `from_pipeline`) instead of
81 invoking the constructor directly; these guarantee that the
82 `~lsst.daf.butler.Butler` and `.QuantumGraph` are created consistently.
84 This class is intended primarily to support unit testing and small-scale
85 integration testing of `.PipelineTask` classes. It deliberately lacks many
86 features present in the command-line-only ``pipetask`` tool in order to
87 keep the implementation simple. Python callers that need more
88 sophistication should call lower-level tools like
89 `~.quantum_graph_builder.QuantumGraphBuilder` and
90 `.single_quantum_executor.SingleQuantumExecutor` directly.
91 """
93 def __init__(
94 self,
95 quantum_graph: QuantumGraph | PredictedQuantumGraph,
96 butler: Butler,
97 resources: ExecutionResources | None = None,
98 raise_on_partial_outputs: bool = True,
99 ):
100 from .graph import QuantumGraph
102 self._quantum_graph: QuantumGraph | None = None
103 if isinstance(quantum_graph, QuantumGraph):
104 self._quantum_graph = quantum_graph
105 self.predicted = PredictedQuantumGraph.from_old_quantum_graph(self._quantum_graph)
106 else:
107 self.predicted = quantum_graph
108 self.butler = butler
109 self.resources = resources
110 self.raise_on_partial_outputs = raise_on_partial_outputs
112 @classmethod
113 def prep_butler(
114 cls,
115 root: str,
116 inputs: Iterable[str],
117 output: str,
118 output_run: str | None = None,
119 ) -> Butler:
120 """Return configured `~lsst.daf.butler.Butler`.
122 Helper method for creating `~lsst.daf.butler.Butler` instances with
123 collections appropriate for processing.
125 Parameters
126 ----------
127 root : `str`
128 Root of the butler data repository; must already exist, with all
129 necessary input data.
130 inputs : `~collections.abc.Iterable` [ `str` ]
131 Collections to search for all input datasets, in search order.
132 output : `str`
133 Name of a new output `~lsst.daf.butler.CollectionType.CHAINED`
134 collection to create that will combine both inputs and outputs.
135 output_run : `str`, optional
136 Name of the output `~lsst.daf.butler.CollectionType.RUN` that will
137 directly hold all output datasets. If not provided, a name will be
138 created from ``output`` and a timestamp.
140 Returns
141 -------
142 butler : `~lsst.daf.butler.Butler`
143 Butler client instance compatible with all `classmethod` factories.
144 Always writeable.
145 """
146 if output_run is None:
147 output_run = f"{output}/{Instrument.makeCollectionTimestamp()}"
148 # Make initial butler with no collections, since we haven't created
149 # them yet.
150 butler = Butler.from_config(root, writeable=True)
151 butler.registry.registerCollection(output_run, CollectionType.RUN)
152 butler.registry.registerCollection(output, CollectionType.CHAINED)
153 collections = [output_run]
154 collections.extend(inputs)
155 butler.registry.setCollectionChain(output, collections)
156 # Override the registry defaults. No need to clone.
157 butler.registry.defaults = RegistryDefaults(collections=[output], run=output_run)
158 return butler
160 @classmethod
161 def from_pipeline_filename(
162 cls,
163 pipeline_filename: str,
164 *,
165 where: str = "",
166 bind: Mapping[str, Any] | None = None,
167 butler: Butler,
168 resources: ExecutionResources | None = None,
169 raise_on_partial_outputs: bool = True,
170 attach_datastore_records: bool = False,
171 output: str | None = None,
172 output_run: str | None = None,
173 ) -> SimplePipelineExecutor:
174 """Create an executor by building a QuantumGraph from an on-disk
175 pipeline YAML file.
177 Parameters
178 ----------
179 pipeline_filename : `str`
180 Name of the YAML file to load the pipeline definition from.
181 where : `str`, optional
182 Data ID query expression that constraints the quanta generated.
183 bind : `~collections.abc.Mapping`, optional
184 Mapping containing literal values that should be injected into the
185 ``where`` expression, keyed by the identifiers they replace.
186 butler : `~lsst.daf.butler.Butler`
187 Butler that manages all I/O. `prep_butler` can be used to create
188 one.
189 resources : `.ExecutionResources`
190 The resources available to each quantum being executed.
191 raise_on_partial_outputs : `bool`, optional
192 If `True` raise exceptions chained by
193 `.AnnotatedPartialOutputsError` immediately, instead of considering
194 the partial result a success and continuing to run downstream
195 tasks.
196 attach_datastore_records : `bool`, optional
197 Whether to attach datastore records to the quantum graph. This is
198 usually unnecessary, unless the executor is used to test behavior
199 that depends on datastore records.
200 output : `str`, optional
201 Name of a new output `~lsst.daf.butler.CollectionType.CHAINED`
202 collection to create that will combine both inputs and outputs.
203 output_run : `str`, optional
204 Name of the output `~lsst.daf.butler.CollectionType.RUN` that will
205 directly hold all output datasets. If not provided, a name will be
206 created from ``output`` and a timestamp.
208 Returns
209 -------
210 executor : `SimplePipelineExecutor`
211 An executor instance containing the constructed `.QuantumGraph` and
212 `~lsst.daf.butler.Butler`, ready for `run` to be called.
213 """
214 pipeline = Pipeline.fromFile(pipeline_filename)
215 return cls.from_pipeline(
216 pipeline,
217 butler=butler,
218 where=where,
219 bind=bind,
220 resources=resources,
221 raise_on_partial_outputs=raise_on_partial_outputs,
222 attach_datastore_records=attach_datastore_records,
223 output=output,
224 output_run=output_run,
225 )
227 @classmethod
228 def from_task_class(
229 cls,
230 task_class: type[PipelineTask],
231 config: Config | None = None,
232 label: str | None = None,
233 *,
234 where: str = "",
235 bind: Mapping[str, Any] | None = None,
236 butler: Butler,
237 resources: ExecutionResources | None = None,
238 raise_on_partial_outputs: bool = True,
239 attach_datastore_records: bool = False,
240 output: str | None = None,
241 output_run: str | None = None,
242 ) -> SimplePipelineExecutor:
243 """Create an executor by building a QuantumGraph from a pipeline
244 containing a single task.
246 Parameters
247 ----------
248 task_class : `type`
249 A concrete `.PipelineTask` subclass.
250 config : `~lsst.pex.config.Config`, optional
251 Configuration for the task. If not provided, task-level defaults
252 will be used (no per-instrument overrides).
253 label : `str`, optional
254 Label for the task in its pipeline; defaults to
255 ``task_class._DefaultName``.
256 where : `str`, optional
257 Data ID query expression that constraints the quanta generated.
258 bind : `~collections.abc.Mapping`, optional
259 Mapping containing literal values that should be injected into the
260 ``where`` expression, keyed by the identifiers they replace.
261 butler : `~lsst.daf.butler.Butler`
262 Butler that manages all I/O. `prep_butler` can be used to create
263 one.
264 resources : `.ExecutionResources`
265 The resources available to each quantum being executed.
266 raise_on_partial_outputs : `bool`, optional
267 If `True` raise exceptions chained by
268 `.AnnotatedPartialOutputsError` immediately, instead of considering
269 the partial result a success and continuing to run downstream
270 tasks.
271 attach_datastore_records : `bool`, optional
272 Whether to attach datastore records to the quantum graph. This is
273 usually unnecessary, unless the executor is used to test behavior
274 that depends on datastore records.
275 output : `str`, optional
276 Name of a new output `~lsst.daf.butler.CollectionType.CHAINED`
277 collection to create that will combine both inputs and outputs.
278 output_run : `str`, optional
279 Name of the output `~lsst.daf.butler.CollectionType.RUN` that will
280 directly hold all output datasets. If not provided, a name will be
281 created from ``output`` and a timestamp.
283 Returns
284 -------
285 executor : `SimplePipelineExecutor`
286 An executor instance containing the constructed `.QuantumGraph` and
287 `~lsst.daf.butler.Butler`, ready for `run` to be called.
288 """
289 if config is None:
290 config = task_class.ConfigClass()
291 if label is None:
292 label = task_class._DefaultName
293 if not isinstance(config, task_class.ConfigClass):
294 raise TypeError(
295 f"Invalid config class type: expected {task_class.ConfigClass.__name__}, "
296 f"got {type(config).__name__}."
297 )
298 pipeline_graph = PipelineGraph()
299 pipeline_graph.add_task(label=label, task_class=task_class, config=config)
300 return cls.from_pipeline_graph(
301 pipeline_graph,
302 butler=butler,
303 where=where,
304 bind=bind,
305 resources=resources,
306 raise_on_partial_outputs=raise_on_partial_outputs,
307 attach_datastore_records=attach_datastore_records,
308 output=output,
309 output_run=output_run,
310 )
312 @classmethod
313 def from_pipeline(
314 cls,
315 pipeline: Pipeline,
316 *,
317 where: str = "",
318 bind: Mapping[str, Any] | None = None,
319 butler: Butler,
320 resources: ExecutionResources | None = None,
321 raise_on_partial_outputs: bool = True,
322 attach_datastore_records: bool = False,
323 output: str | None = None,
324 output_run: str | None = None,
325 ) -> SimplePipelineExecutor:
326 """Create an executor by building a QuantumGraph from an in-memory
327 pipeline.
329 Parameters
330 ----------
331 pipeline : `.Pipeline` or `~collections.abc.Iterable` [ `.TaskDef` ]
332 A Python object describing the tasks to run, along with their
333 labels and configuration.
334 where : `str`, optional
335 Data ID query expression that constraints the quanta generated.
336 bind : `~collections.abc.Mapping`, optional
337 Mapping containing literal values that should be injected into the
338 ``where`` expression, keyed by the identifiers they replace.
339 butler : `~lsst.daf.butler.Butler`
340 Butler that manages all I/O. `prep_butler` can be used to create
341 one.
342 resources : `.ExecutionResources`
343 The resources available to each quantum being executed.
344 raise_on_partial_outputs : `bool`, optional
345 If `True` raise exceptions chained by
346 `.AnnotatedPartialOutputsError` immediately, instead of considering
347 the partial result a success and continuing to run downstream
348 tasks.
349 attach_datastore_records : `bool`, optional
350 Whether to attach datastore records to the quantum graph. This is
351 usually unnecessary, unless the executor is used to test behavior
352 that depends on datastore records.
353 output : `str`, optional
354 Name of a new output `~lsst.daf.butler.CollectionType.CHAINED`
355 collection to create that will combine both inputs and outputs.
356 output_run : `str`, optional
357 Name of the output `~lsst.daf.butler.CollectionType.RUN` that will
358 directly hold all output datasets. If not provided, a name will
359 be created from ``output`` and a timestamp.
361 Returns
362 -------
363 executor : `SimplePipelineExecutor`
364 An executor instance containing the constructed `.QuantumGraph` and
365 `~lsst.daf.butler.Butler`, ready for `run` to be called.
366 """
367 pipeline_graph = pipeline.to_graph()
368 return cls.from_pipeline_graph(
369 pipeline_graph,
370 where=where,
371 bind=bind,
372 butler=butler,
373 resources=resources,
374 raise_on_partial_outputs=raise_on_partial_outputs,
375 attach_datastore_records=attach_datastore_records,
376 output=output,
377 output_run=output_run,
378 )
380 @classmethod
381 def from_pipeline_graph(
382 cls,
383 pipeline_graph: PipelineGraph,
384 *,
385 where: str = "",
386 bind: Mapping[str, Any] | None = None,
387 butler: Butler,
388 resources: ExecutionResources | None = None,
389 raise_on_partial_outputs: bool = True,
390 attach_datastore_records: bool = False,
391 output: str | None = None,
392 output_run: str | None = None,
393 ) -> SimplePipelineExecutor:
394 """Create an executor by building a QuantumGraph from an in-memory
395 pipeline graph.
397 Parameters
398 ----------
399 pipeline_graph : `~.pipeline_graph.PipelineGraph`
400 A Python object describing the tasks to run, along with their
401 labels and configuration, in graph form. Will be resolved against
402 the given ``butler``, with any existing resolutions ignored.
403 where : `str`, optional
404 Data ID query expression that constraints the quanta generated.
405 bind : `~collections.abc.Mapping`, optional
406 Mapping containing literal values that should be injected into the
407 ``where`` expression, keyed by the identifiers they replace.
408 butler : `~lsst.daf.butler.Butler`
409 Butler that manages all I/O. `prep_butler` can be used to create
410 one. Must have its `~lsst.daf.butler.Butler.run` and
411 ``butler.collections.defaults`` not empty and not `None`.
412 resources : `.ExecutionResources`
413 The resources available to each quantum being executed.
414 raise_on_partial_outputs : `bool`, optional
415 If `True` raise exceptions chained by
416 `.AnnotatedPartialOutputsError` immediately, instead
417 of considering the partial result a success and continuing to run
418 downstream tasks.
419 attach_datastore_records : `bool`, optional
420 Whether to attach datastore records to the quantum graph. This is
421 usually unnecessary, unless the executor is used to test behavior
422 that depends on datastore records.
423 output : `str`, optional
424 Name of a new output `~lsst.daf.butler.CollectionType.CHAINED`
425 collection to create that will combine both inputs and outputs.
426 output_run : `str`, optional
427 Name of the output `~lsst.daf.butler.CollectionType.RUN` that will
428 directly hold all output datasets. If not provided, a name will
429 be created from ``output`` and a timestamp.
431 Returns
432 -------
433 executor : `SimplePipelineExecutor`
434 An executor instance containing the constructed
435 `.QuantumGraph` and `~lsst.daf.butler.Butler`, ready
436 for `run` to be called.
437 """
438 if output_run is None:
439 output_run = butler.run
440 if output_run is None:
441 if output is None:
442 raise TypeError("At least one of output or output_run must be provided.")
443 output_run = f"{output}/{Instrument.makeCollectionTimestamp()}"
445 quantum_graph_builder = AllDimensionsQuantumGraphBuilder(
446 pipeline_graph, butler, where=where, bind=bind, output_run=output_run
447 )
448 metadata = {
449 "skip_existing_in": [],
450 "skip_existing": False,
451 "data_query": where,
452 }
453 predicted = quantum_graph_builder.finish(
454 output=output,
455 metadata=metadata,
456 attach_datastore_records=attach_datastore_records,
457 ).assemble()
458 return cls(
459 predicted,
460 butler=butler,
461 resources=resources,
462 raise_on_partial_outputs=raise_on_partial_outputs,
463 )
465 @property
466 def quantum_graph(self) -> QuantumGraph:
467 """The quantum graph run by this executor."""
468 if self._quantum_graph is None:
469 self._quantum_graph = self.predicted.to_old_quantum_graph()
470 return self._quantum_graph
472 def use_local_butler(
473 self, root: str, register_dataset_types: bool = True, transfer_dimensions: bool = True
474 ) -> Butler:
475 """Transfer all inputs to a local data repository. and set the executor
476 to write outputs to it.
478 Parameters
479 ----------
480 root : `str`
481 Path to the local data repository; created if it does not exist.
482 register_dataset_types : `bool`, optional
483 Whether to register dataset types in the new repository. If
484 `False`, the local data repository must already exist and already
485 have all input dataset types registered.
486 transfer_dimensions : `bool`, optional
487 Whether to transfer dimension records to the new repository. If
488 `False`, the local data repository must already exist and already
489 have all needed dimension records.
491 Returns
492 -------
493 butler : `lsst.daf.butler.Butler`
494 Writeable butler for local data repository.
496 Notes
497 -----
498 The input collection structure from the original data repository is not
499 preserved by this method (it cannot be reconstructed from the quantum
500 graph). Instead, a `~lsst.daf.butler.CollectionType.TAGGED` collection
501 is created to gather all inputs, and appended to the output
502 `~lsst.daf.butler.CollectionType.CHAINED` collection after the output
503 `~lsst.daf.butler.CollectionType.RUN` collection. Calibration inputs
504 with the same data ID but multiple validity ranges are *not* included
505 in that `~lsst.daf.butler.CollectionType.TAGGED`; they are still
506 transferred to the local data repository, but can only be found via the
507 quantum graph or their original `~lsst.daf.butler.CollectionType.RUN`
508 collections.
509 """
510 if not os.path.exists(root):
511 Butler.makeRepo(root)
512 out_butler = Butler.from_config(root, writeable=True)
514 output_run = self.predicted.header.output_run
515 out_butler.collections.register(output_run, CollectionType.RUN)
516 output = self.predicted.header.output
517 inputs: str | None = None
518 if output is not None:
519 inputs = f"{output}/inputs"
520 out_butler.collections.register(output, CollectionType.CHAINED)
521 out_butler.collections.register(inputs, CollectionType.TAGGED)
522 out_butler.collections.redefine_chain(output, [output_run, inputs])
524 if transfer_dimensions:
525 # We can't just let the transfer_from call below take care of this
526 # because we need dimensions for outputs as well as inputs. And if
527 # we have to do the outputs explicitly, it's more efficient to do
528 # the inputs at the same time since a lot of those dimensions will
529 # be the same.
530 self._transfer_qg_dimension_records(out_butler)
532 # Extract overall-input DatasetRefs to transfer and possibly insert
533 # into a TAGGED collection.
534 refs: set[DatasetRef] = set()
535 to_tag_by_type: dict[str, dict[DataCoordinate, DatasetRef | None]] = {}
536 pipeline_graph = self.predicted.pipeline_graph
537 for name, dataset_type_node in pipeline_graph.iter_overall_inputs():
538 assert dataset_type_node is not None, "PipelineGraph should be resolved."
539 to_tag_for_type = to_tag_by_type.setdefault(name, {})
540 for task_node in pipeline_graph.consumers_of(name):
541 for quantum in self.predicted.build_execution_quanta(task_label=task_node.label).values():
542 for ref in quantum.inputs[name]:
543 ref = dataset_type_node.generalize_ref(ref)
544 refs.add(ref)
545 if to_tag_for_type.setdefault(ref.dataId, ref) != ref:
546 # There is already a dataset with the same data ID
547 # and dataset type, but a different UUID/run. This
548 # can only happen for calibrations found in
549 # calibration collections, and for now we have no
550 # choice but to leave them out of the TAGGED inputs
551 # collection in the local butler.
552 to_tag_for_type[ref.dataId] = None
554 out_butler.transfer_from(
555 self.butler,
556 refs,
557 register_dataset_types=register_dataset_types,
558 transfer_dimensions=False,
559 )
561 if inputs is not None:
562 to_tag_flat: list[DatasetRef] = []
563 for ref_map in to_tag_by_type.values():
564 for tag_ref in ref_map.values():
565 if tag_ref is not None:
566 to_tag_flat.append(tag_ref)
567 out_butler.registry.associate(inputs, to_tag_flat)
569 out_butler.registry.defaults = self.butler.registry.defaults.clone(collections=output, run=output_run)
570 self.butler = out_butler
571 return self.butler
573 def run(self, register_dataset_types: bool = False, save_versions: bool = True) -> list[Quantum]:
574 """Run all the quanta in the quantum graph in topological order.
576 Use this method to run all quanta in the graph. Use
577 `as_generator` to get a generator to run the quanta one at
578 a time.
580 Parameters
581 ----------
582 register_dataset_types : `bool`, optional
583 If `True`, register all output dataset types before executing any
584 quanta.
585 save_versions : `bool`, optional
586 If `True` (default), save a package versions dataset.
588 Returns
589 -------
590 quanta : `list` [ `~lsst.daf.butler.Quantum` ]
591 Executed quanta.
593 Notes
594 -----
595 A topological ordering is not in general unique, but no other
596 guarantees are made about the order in which quanta are processed.
597 """
598 return list(
599 self.as_generator(register_dataset_types=register_dataset_types, save_versions=save_versions)
600 )
602 def as_generator(
603 self, register_dataset_types: bool = False, save_versions: bool = True
604 ) -> Iterator[Quantum]:
605 """Yield quanta in the quantum graph in topological order.
607 These quanta will be run as the returned generator is iterated
608 over. Use this method to run the quanta one at a time.
609 Use `run` to run all quanta in the graph.
611 Parameters
612 ----------
613 register_dataset_types : `bool`, optional
614 If `True`, register all output dataset types before executing any
615 quanta.
616 save_versions : `bool`, optional
617 If `True` (default), save a package versions dataset.
619 Returns
620 -------
621 quanta : `~collections.abc.Iterator` [ `~lsst.daf.butler.Quantum` ]
622 Executed quanta.
624 Notes
625 -----
626 Global initialization steps (see `.QuantumGraph.init_output_run`) are
627 performed immediately when this method is called, but individual quanta
628 are not actually executed until the returned iterator is iterated over.
630 A topological ordering is not in general unique, but no other
631 guarantees are made about the order in which quanta are processed.
632 """
633 if register_dataset_types:
634 self.predicted.pipeline_graph.register_dataset_types(self.butler)
635 self.predicted.write_configs(self.butler, compare_existing=False)
636 self.predicted.write_init_outputs(self.butler, skip_existing=False)
637 if save_versions:
638 self.predicted.write_packages(self.butler, compare_existing=False)
639 task_factory = TaskFactory()
640 single_quantum_executor = SingleQuantumExecutor(
641 butler=self.butler,
642 task_factory=task_factory,
643 resources=self.resources,
644 raise_on_partial_outputs=self.raise_on_partial_outputs,
645 )
646 self.predicted.build_execution_quanta()
647 nodes_map = self.predicted.quantum_only_xgraph.nodes
648 # Important that this returns a generator expression rather than being
649 # a generator itself; that is what makes the init stuff above happen
650 # immediately instead of when the first quanta is executed, which might
651 # be useful for callers who want to check the state of the repo in
652 # between.
653 return (
654 single_quantum_executor.execute(
655 nodes_map[quantum_id]["pipeline_node"],
656 nodes_map[quantum_id]["quantum"],
657 quantum_id,
658 )[0]
659 for quantum_id in self.predicted
660 )
662 def _transfer_qg_dimension_records(self, out_butler: Butler) -> None:
663 """Transfer all dimension records from the quantum graph to a butler.
665 Parameters
666 ----------
667 out_butler : `lsst.daf.butler.Butler`
668 Butler to transfer records to.
669 """
670 assert self.predicted.dimension_data is not None, "Dimension data must be present for execution."
671 records = self.predicted.dimension_data.records
672 dimensions = out_butler.dimensions.sorted(records.keys())
673 for dimension in dimensions:
674 record_set = records[dimension.name]
675 if record_set and record_set.element.has_own_table:
676 out_butler.registry.insertDimensionData(
677 record_set.element,
678 *record_set,
679 skip_existing=True,
680 )