Coverage for python / lsst / pipe / base / tests / mocks / _repo.py: 21%
157 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:47 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-14 23:47 +0000
1# This file is part of pipe_base.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28from __future__ import annotations
30__all__ = ("DirectButlerRepo", "InMemoryRepo", "MockRepo")
32import logging
33import tempfile
34from abc import abstractmethod
35from collections.abc import Iterable, Iterator, Mapping
36from contextlib import AbstractContextManager, contextmanager
37from typing import Any, Literal, Self
39from lsst.daf.butler import (
40 Butler,
41 CollectionType,
42 DataCoordinate,
43 DatasetRef,
44 DatasetType,
45 DimensionConfig,
46 LimitedButler,
47 RegistryConfig,
48)
49from lsst.daf.butler.tests.utils import create_populated_sqlite_registry
50from lsst.resources import ResourcePath, ResourcePathExpression
51from lsst.sphgeom import RangeSet
53from ...all_dimensions_quantum_graph_builder import AllDimensionsQuantumGraphBuilder
54from ...pipeline_graph import PipelineGraph
55from ...quantum_graph import PredictedQuantumGraph
56from ...single_quantum_executor import SingleQuantumExecutor
57from ..in_memory_limited_butler import InMemoryLimitedButler
58from ._pipeline_task import (
59 DynamicConnectionConfig,
60 DynamicTestPipelineTask,
61 DynamicTestPipelineTaskConfig,
62)
63from ._storage_class import MockDataset, is_mock_name
65_LOG = logging.getLogger(__name__)
68class MockRepo(AbstractContextManager):
69 """A test helper that populates a butler repository for task execution.
71 Parameters
72 ----------
73 butler : `lsst.daf.butler.Butler`
74 Butler to use for at least quantum graph building. Must be writeable.
75 input_run : `str`, optional
76 Name of a `~lsst.daf.butler.CollectionType.RUN` collection that will be
77 used as an input to quantum graph generation. Input datasets created
78 by the helper are added to this collection.
79 input_chain : `str`, optional
80 Name of a `~lsst.daf.butler.CollectionType.CHAINED` collection that
81 will be the direct input to quantum graph generation. This always
82 includes ``input_run``.
83 input_children : `str` or `~collections.abc.Iterable` [ `str`], optional
84 Additional collections to include in ``input_chain``.
85 """
87 def __init__(
88 self,
89 butler: Butler,
90 input_run: str = "input_run",
91 input_chain: str = "input_chain",
92 input_children: Iterable[str] = (),
93 ):
94 self.butler = butler
95 input_chain_definition = [input_run]
96 input_chain_definition.extend(input_children)
97 self.input_run = input_run
98 self.input_chain = input_chain
99 self.butler.collections.register(self.input_run)
100 self.butler.collections.register(self.input_chain, CollectionType.CHAINED)
101 self.butler.collections.redefine_chain(self.input_chain, input_chain_definition)
102 self.pipeline_graph = PipelineGraph()
103 self.last_auto_dataset_type_index = 0
104 self.last_auto_task_index = 0
106 def __enter__(self) -> Self:
107 return self
109 def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> Literal[False]:
110 try:
111 self.close()
112 except Exception:
113 _LOG.exception("An exception occurred during MockRepo.close()")
114 return False
116 def close(self) -> None:
117 """Release all resources associated with this mock instance. The
118 instance may no longer be used after this is called.
120 Notes
121 -----
122 Instead of calling ``close()`` directly, you can use the mock object
123 as a context manager. For example::
125 with MockRepo(...) as butler:
126 butler.get(...)
127 # butler is closed after exiting the block.
128 """
129 self.butler.close()
131 def add_task(
132 self,
133 label: str | None = None,
134 *,
135 task_class: type[DynamicTestPipelineTask] = DynamicTestPipelineTask,
136 config: DynamicTestPipelineTaskConfig | None = None,
137 dimensions: Iterable[str] | None = None,
138 inputs: Mapping[str, DynamicConnectionConfig] | None = None,
139 outputs: Mapping[str, DynamicConnectionConfig] | None = None,
140 prerequisite_inputs: Mapping[str, DynamicConnectionConfig] | None = None,
141 init_inputs: Mapping[str, DynamicConnectionConfig] | None = None,
142 init_outputs: Mapping[str, DynamicConnectionConfig] | None = None,
143 ) -> None:
144 """Add a task to the helper's pipeline graph.
146 Parameters
147 ----------
148 label : `str`, optional
149 Label for the task. If not provided, the task name will be
150 ``task_auto{self.last_auto_task_index}``, with that variable
151 incremented.
152 task_class : `type`, optional
153 Subclass of `DynamicTestPipelineTask` to use.
154 config : `DynamicTestPipelineTaskConfig`, optional
155 Task configuration to use. Note that the dimensions are always
156 overridden by the ``dimensions`` argument and ``inputs`` and
157 ``outputs`` are updated by those arguments unless they are
158 explicitly set to empty dictionaries.
159 dimensions : `~collections.abc.Iterable` [ `str` ], optional
160 Dimensions of the task and any automatically-added input or output
161 connection.
162 inputs : `~collections.abc.Mapping` [ `str`, \
163 `DynamicConnectionConfig` ], optional
164 Input connections to add. If not provided, a single connection is
165 added with the same dimensions as the task and dataset type name
166 ``dataset_auto{self.last_auto_dataset_type_index}``.
167 outputs : `~collections.abc.Mapping` [ `str`, \
168 `DynamicConnectionConfig` ], optional
169 Output connections to add. If not provided, a single connection is
170 added with the same dimensions as the task and dataset type name
171 ``dataset_auto{self.last_auto_dataset_type_index}``, with that
172 variable incremented first.
173 prerequisite_inputs : `~collections.abc.Mapping` [ `str`, \
174 `DynamicConnectionConfig` ], optional
175 Prerequisite input connections to add. Defaults to an empty
176 mapping.
177 init_inputs : `~collections.abc.Mapping` [ `str`, \
178 `DynamicConnectionConfig` ], optional
179 Init input connections to add. Defaults to an empty mapping.
180 init_outputs : `~collections.abc.Mapping` [ `str`, \
181 `DynamicConnectionConfig` ], optional
182 Init output connections to add. Defaults to an empty mapping.
184 Notes
185 -----
186 The defaults for this method's arguments are designed to allow it to be
187 called in succession to create a sequence of "one-to-one" tasks in
188 which each consumes the output of the last.
189 """
190 if config is None:
191 config = DynamicTestPipelineTaskConfig()
192 if dimensions is not None:
193 config.dimensions = list(dimensions)
194 if inputs is not None:
195 config.inputs.update(inputs)
196 else:
197 config.inputs["input_connection"] = DynamicConnectionConfig(
198 dataset_type_name=f"dataset_auto{self.last_auto_dataset_type_index}",
199 dimensions=list(config.dimensions),
200 )
201 if outputs is not None:
202 config.outputs.update(outputs)
203 else:
204 self.last_auto_dataset_type_index += 1
205 config.outputs["output_connection"] = DynamicConnectionConfig(
206 dataset_type_name=f"dataset_auto{self.last_auto_dataset_type_index}",
207 dimensions=list(config.dimensions),
208 )
209 if prerequisite_inputs is not None:
210 config.prerequisite_inputs.update(prerequisite_inputs)
211 if init_inputs is not None:
212 config.init_inputs.update(init_inputs)
213 if init_outputs is not None:
214 config.init_outputs.update(init_outputs)
215 if label is None:
216 self.last_auto_task_index += 1
217 label = f"task_auto{self.last_auto_task_index}"
218 self.pipeline_graph.add_task(label, task_class=task_class, config=config)
220 def make_quantum_graph(
221 self,
222 *,
223 output: str | None = None,
224 output_run: str = "output_run",
225 insert_mocked_inputs: bool = True,
226 register_output_dataset_types: bool = True,
227 ) -> PredictedQuantumGraph:
228 """Make a quantum graph from the pipeline task and internal data
229 repository.
231 Parameters
232 ----------
233 output : `str` or `None`, optional
234 Name of the output chained collection to embed within the quantum
235 graph. Note that this does not actually create this collection.
236 output_run : `str`, optional
237 Name of the `~lsst.daf.butler.CollectionType.RUN` collection for
238 execution outputs. Note that this does not actually create this
239 collection.
240 insert_mocked_inputs : `bool`, optional
241 Whether to automatically insert datasets for all overall inputs to
242 the pipeline graph whose dataset types have not already been
243 registered. If set to `False`, inputs must be provided by imported
244 YAML files or explicit calls to `insert_datasets`, which provides
245 more fine-grained control over the data IDs of the datasets.
246 register_output_dataset_types : `bool`, optional
247 If `True`, register all output dataset types.
249 Returns
250 -------
251 qg : `..quantum_graph.PredictedQuantumGraph`
252 Quantum graph. Datastore records will not be attached, since the
253 test helper does not actually have a datastore.
254 """
255 return (
256 self.make_quantum_graph_builder(
257 insert_mocked_inputs=insert_mocked_inputs,
258 register_output_dataset_types=register_output_dataset_types,
259 output_run=output_run,
260 )
261 .finish(output=output, attach_datastore_records=False)
262 .assemble()
263 )
265 def make_quantum_graph_builder(
266 self,
267 *,
268 output_run: str = "output_run",
269 insert_mocked_inputs: bool = True,
270 register_output_dataset_types: bool = True,
271 ) -> AllDimensionsQuantumGraphBuilder:
272 """Make a quantum graph builder from the pipeline task and internal
273 data repository.
275 Parameters
276 ----------
277 output_run : `str`, optional
278 Name of the `~lsst.daf.butler.CollectionType.RUN` collection for
279 execution outputs. Note that this does not actually create this
280 collection.
281 insert_mocked_inputs : `bool`, optional
282 Whether to automatically insert datasets for all overall inputs to
283 the pipeline graph whose dataset types have not already been
284 registered. If set to `False`, inputs must be provided by imported
285 YAML files or explicit calls to `insert_datasets`, which provides
286 more fine-grained control over the data IDs of the datasets.
287 register_output_dataset_types : `bool`, optional
288 If `True`, register all output dataset types.
290 Returns
291 -------
292 builder : \
293 `..all_dimensions_quantum_graph_builder.AllDimensionsQuantumGraphBuilder`
294 Quantum graph builder. Note that
295 ``attach_datastore_records=False`` must be passed to `build`, since
296 the helper's butler does not have a datastore.
297 """
298 if insert_mocked_inputs:
299 self.pipeline_graph.resolve(self.butler.registry)
300 for _, dataset_type_node in self.pipeline_graph.iter_overall_inputs():
301 assert dataset_type_node is not None, "pipeline graph is resolved."
302 if self.butler.registry.registerDatasetType(dataset_type_node.dataset_type):
303 self.insert_datasets(dataset_type_node.dataset_type, register=False)
304 builder = AllDimensionsQuantumGraphBuilder(
305 self.pipeline_graph,
306 self.butler,
307 input_collections=[self.input_chain],
308 output_run=output_run,
309 )
310 if register_output_dataset_types:
311 self.pipeline_graph.register_dataset_types(self.butler)
312 return builder
314 def insert_datasets(
315 self, dataset_type: DatasetType | str, register: bool = True, *args: Any, **kwargs: Any
316 ) -> list[DatasetRef]:
317 """Insert input datasets into the test repository.
319 Parameters
320 ----------
321 dataset_type : `~lsst.daf.butler.DatasetType` or `str`
322 Dataset type or name. If a name, it must be included in the
323 pipeline graph.
324 register : `bool`, optional
325 Whether to register the dataset type. If `False`, the dataset type
326 must already be registered.
327 *args : `object`
328 Forwarded to `~lsst.daf.butler.query_data_ids`.
329 **kwargs : `object`
330 Forwarded to `~lsst.daf.butler.query_data_ids`.
332 Returns
333 -------
334 refs : `list` [ `lsst.daf.butler.DatasetRef` ]
335 References to the inserted datasets.
337 Notes
338 -----
339 For dataset types with dimensions that are queryable, this queries for
340 all data IDs in the repository (forwarding ``*args`` and ``**kwargs``
341 for e.g. ``where`` strings). For skypix dimensions, this queries for
342 both patches and visit-detector regions (forwarding `*args`` and
343 ``**kwargs`` to both) and uses all overlapping sky pixels. Dataset
344 types with a mix of skypix and queryable dimensions are not supported.
345 """
346 if isinstance(dataset_type, str):
347 self.pipeline_graph.resolve(self.butler.registry)
348 dataset_type = self.pipeline_graph.dataset_types[dataset_type].dataset_type
349 if register:
350 self.butler.registry.registerDatasetType(dataset_type)
351 dimensions = dataset_type.dimensions
352 if dataset_type.dimensions.skypix:
353 if len(dimensions) == 1:
354 (skypix_name,) = dimensions.skypix
355 pixelization = dimensions.universe.skypix_dimensions[skypix_name].pixelization
356 ranges = RangeSet()
357 for patch_record in self.butler.query_dimension_records(
358 "patch", *args, **kwargs, explain=False
359 ):
360 ranges |= pixelization.envelope(patch_record.region)
361 for vdr_record in self.butler.query_dimension_records(
362 "visit_detector_region", *args, **kwargs, explain=False
363 ):
364 ranges |= pixelization.envelope(vdr_record.region)
365 data_ids = []
366 for begin, end in ranges:
367 for index in range(begin, end):
368 data_ids.append(DataCoordinate.from_required_values(dimensions, (index,)))
369 else:
370 raise NotImplementedError(
371 "Can only generate data IDs for queryable dimensions and isolated skypix."
372 )
373 else:
374 data_ids = self.butler.query_data_ids(dimensions, *args, **kwargs, explain=False)
375 return self._insert_datasets_impl(dataset_type, data_ids)
377 @abstractmethod
378 def _insert_datasets_impl(
379 self, dataset_type: DatasetType, data_ids: list[DataCoordinate]
380 ) -> list[DatasetRef]:
381 """Insert datasets after their data IDs have been generated.
383 Parameters
384 ----------
385 dataset_type : `lsst.daf.butler.DatasetType`
386 Type of the datasets.
387 data_ids : `list` [ `lsst.daf.butler.DataCoordinate` ]
388 Data IDs of all datasets.
390 Returns
391 -------
392 refs : `list` [ `lsst.daf.butler.DatasetRef` ]
393 References to the new datasets.
394 """
395 raise NotImplementedError()
397 @abstractmethod
398 def make_single_quantum_executor(
399 self, qg: PredictedQuantumGraph
400 ) -> tuple[SingleQuantumExecutor, LimitedButler]:
401 """Make a single-quantum executor.
403 Parameters
404 ----------
405 qg : `..quantum_graph.PredictedQuantumGraph`
406 Graph whose quanta the executor must be capable of executing.
408 Returns
409 -------
410 executor : `..single_quantum_executor.SingleQuantumExecutor`
411 An executor for a single quantum.
412 butler : `lsst.daf.butler.LimitedButler`
413 The butler that the executor will write to.
414 """
415 raise NotImplementedError()
418class InMemoryRepo(MockRepo):
419 """A test helper that simulates a butler repository for task execution
420 without any disk I/O.
422 Parameters
423 ----------
424 *args : `str` or `lsst.resources.ResourcePath`
425 Butler YAML import files to load into the test repository.
426 registry_config : `lsst.daf.butler.RegistryConfig`, optional
427 Registry configuration for the repository.
428 dimension_config : `lsst.daf.butler.DimensionConfig`, optional
429 Dimension universe configuration for the repository.
430 input_run : `str`, optional
431 Name of a `~lsst.daf.butler.CollectionType.RUN` collection that will be
432 used as an input to quantum graph generation. Input datasets created
433 by the helper are added to this collection.
434 input_chain : `str`, optional
435 Name of a `~lsst.daf.butler.CollectionType.CHAINED` collection that
436 will be the direct input to quantum graph generation. This always
437 includes ``input_run``.
438 use_import_collections_as_input : `bool`, `str`, or \
439 `~collections.abc.Iterable` [ `str`], optional
440 Additional collections from YAML import files to include in
441 ``input_chain``, or `True` to include all such collections (in
442 chain-flattened lexicographical order).
443 data_root : convertible to `lsst.resources.ResourcePath`, optional
444 Root directory to join to each element in ``*args``. Defaults to
445 the `lsst.daf.butler.tests.registry_data` package.
447 Notes
448 -----
449 This helper maintains an `..pipeline_graph.PipelineGraph` and a
450 no-datastore butler backed by an in-memory SQLite database for use in
451 quantum graph generation. It creates a separate in-memory limited butler
452 for execution as needed.
453 """
455 def __init__(
456 self,
457 *args: str | ResourcePath,
458 registry_config: RegistryConfig | None = None,
459 dimension_config: DimensionConfig | None = None,
460 input_run: str = "input_run",
461 input_chain: str = "input_chain",
462 use_import_collections_as_input: bool | str | Iterable[str] = True,
463 data_root: ResourcePathExpression | None = "resource://lsst.daf.butler/tests/registry_data",
464 ):
465 if data_root is not None:
466 data_root = ResourcePath(data_root, forceDirectory=True)
467 args = tuple(data_root.join(arg) for arg in args)
468 butler = create_populated_sqlite_registry(
469 *args, registry_config=registry_config, dimension_config=dimension_config
470 )
471 if use_import_collections_as_input:
472 if use_import_collections_as_input is True:
473 use_import_collections_as_input = sorted(butler.collections.query("*", flatten_chains=True))
474 else:
475 use_import_collections_as_input = ()
476 super().__init__(
477 butler,
478 input_run=input_run,
479 input_chain=input_chain,
480 input_children=list(use_import_collections_as_input),
481 )
483 def _insert_datasets_impl(
484 self, dataset_type: DatasetType, data_ids: list[DataCoordinate]
485 ) -> list[DatasetRef]:
486 return self.butler.registry.insertDatasets(dataset_type, data_ids, run=self.input_run)
488 def make_limited_butler(self) -> InMemoryLimitedButler:
489 """Make a test limited butler for execution.
491 Returns
492 -------
493 limited_butler : `.InMemoryLimitedButler`
494 A limited butler that can be used for task execution.
496 Notes
497 -----
498 This queries the database-only butler used for quantum-graph generation
499 for all datasets in the ``input_chain`` collection, and populates the
500 limited butler with those that have a mock storage class. Other
501 datasets are ignored, so they will appear as though they were present
502 during quantum graph generation but absent during execution.
503 """
504 butler = InMemoryLimitedButler(self.butler.dimensions, self.butler.registry.queryDatasetTypes())
505 for ref in self.butler.query_all_datasets(self.input_chain):
506 if is_mock_name(ref.datasetType.storageClass_name):
507 butler.put(
508 MockDataset(
509 dataset_id=ref.id,
510 dataset_type=ref.datasetType.to_simple(),
511 data_id=dict(ref.dataId.mapping),
512 run=ref.run,
513 ),
514 ref,
515 )
516 return butler
518 def make_single_quantum_executor(
519 self, qg: PredictedQuantumGraph | None = None
520 ) -> tuple[SingleQuantumExecutor, InMemoryLimitedButler]:
521 """Make a single-quantum executor backed by a new limited butler.
523 Parameters
524 ----------
525 qg : `..quantum_graph.PredictedQuantumGraph`
526 Ignored by this implementation.
528 Returns
529 -------
530 executor : `..single_quantum_executor.SingleQuantumExecutor`
531 An executor for a single quantum.
532 butler : `.InMemoryLimitedButler`
533 The butler that the executor will write to.
534 """
535 butler = self.make_limited_butler()
536 return SingleQuantumExecutor(limited_butler_factory=butler.factory), butler
539class DirectButlerRepo(MockRepo):
540 """A test helper for task execution backed by a local direct butler.
542 Parameters
543 ----------
544 butler : `lsst.daf.butler.direct_butler.DirectButler`
545 Butler to write to.
546 *args : `str` or `lsst.resources.ResourcePath`
547 Butler YAML import files to load into the test repository.
548 input_run : `str`, optional
549 Name of a `~lsst.daf.butler.CollectionType.RUN` collection that will be
550 used as an input to quantum graph generation. Input datasets created
551 by the helper are added to this collection.
552 input_chain : `str`, optional
553 Name of a `~lsst.daf.butler.CollectionType.CHAINED` collection that
554 will be the direct input to quantum graph generation. This always
555 includes ``input_run``.
556 use_import_collections_as_input : `bool`, `str`, or \
557 `~collections.abc.Iterable` [ `str`], optional
558 Additional collections from YAML import files to include in
559 ``input_chain``, or `True` to include all such collections (in
560 chain-flattened lexicographical order).
561 data_root : convertible to `lsst.resources.ResourcePath`, optional
562 Root directory to join to each element in ``*args``. Defaults to
563 the `lsst.daf.butler.tests.registry_data` package.
565 Notes
566 -----
567 This helper maintains an `..pipeline_graph.PipelineGraph` and a
568 no-datastore butler backed by an in-memory SQLite database for use in
569 quantum graph generation. It creates a separate in-memory limited butler
570 for execution as needed.
571 """
573 def __init__(
574 self,
575 butler: Butler,
576 *args: str | ResourcePath,
577 input_run: str = "input_run",
578 input_chain: str = "input_chain",
579 use_import_collections_as_input: bool | str | Iterable[str] = True,
580 data_root: ResourcePathExpression | None = "resource://lsst.daf.butler/tests/registry_data",
581 ):
582 if data_root is not None:
583 data_root = ResourcePath(data_root, forceDirectory=True)
584 args = tuple(data_root.join(arg) for arg in args)
585 for arg in args:
586 butler.import_(filename=arg)
587 if use_import_collections_as_input:
588 if use_import_collections_as_input is True:
589 use_import_collections_as_input = sorted(butler.collections.query("*", flatten_chains=True))
590 else:
591 use_import_collections_as_input = ()
592 super().__init__(
593 butler,
594 input_run=input_run,
595 input_chain=input_chain,
596 input_children=list(use_import_collections_as_input),
597 )
599 @classmethod
600 @contextmanager
601 def make_temporary(
602 cls,
603 *args: str | ResourcePath,
604 input_run: str = "input_run",
605 input_chain: str = "input_chain",
606 use_import_collections_as_input: bool | str | Iterable[str] = True,
607 data_root: ResourcePathExpression | None = "resource://lsst.daf.butler/tests/registry_data",
608 **kwargs: Any,
609 ) -> Iterator[tuple[DirectButlerRepo, str]]:
610 with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as root:
611 config = Butler.makeRepo(root, **kwargs)
612 with Butler.from_config(config, writeable=True) as butler:
613 yield (
614 cls(
615 butler,
616 *args,
617 input_run=input_run,
618 input_chain=input_chain,
619 use_import_collections_as_input=use_import_collections_as_input,
620 data_root=data_root,
621 ),
622 root,
623 )
625 def _insert_datasets_impl(
626 self, dataset_type: DatasetType, data_ids: list[DataCoordinate]
627 ) -> list[DatasetRef]:
628 if is_mock_name(dataset_type.storageClass_name):
629 refs: list[DatasetRef] = []
630 for data_id in data_ids:
631 data_id = self.butler.registry.expandDataId(data_id)
632 ref = DatasetRef(dataset_type, data_id, run=self.input_run)
633 self.butler.put(
634 MockDataset(
635 dataset_id=ref.id,
636 dataset_type=ref.datasetType.to_simple(),
637 data_id=dict(ref.dataId.mapping),
638 run=ref.run,
639 ),
640 ref,
641 )
642 refs.append(ref)
643 return refs
644 else:
645 return self.butler.registry.insertDatasets(dataset_type, data_ids, run=self.input_run)
647 def make_single_quantum_executor(
648 self, qg: PredictedQuantumGraph | None = None
649 ) -> tuple[SingleQuantumExecutor, Butler]:
650 """Make a single-quantum executor backed by a new limited butler.
652 Parameters
653 ----------
654 qg : `..quantum_graph.PredictedQuantumGraph`
655 Ignored by this implementation.
657 Returns
658 -------
659 executor : `..single_quantum_executor.SingleQuantumExecutor`
660 An executor for a single quantum.
661 butler : `lsst.daf.butler.Butler`
662 The butler that the executor will write to.
663 """
664 return SingleQuantumExecutor(limited_butler_factory=None, butler=self.butler), self.butler