Coverage for python / lsst / pipe / base / tests / mocks / _repo.py: 21%

157 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-24 08:19 +0000

1# This file is part of pipe_base. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28from __future__ import annotations 

29 

30__all__ = ("DirectButlerRepo", "InMemoryRepo", "MockRepo") 

31 

32import logging 

33import tempfile 

34from abc import abstractmethod 

35from collections.abc import Iterable, Iterator, Mapping 

36from contextlib import AbstractContextManager, contextmanager 

37from typing import Any, Literal, Self 

38 

39from lsst.daf.butler import ( 

40 Butler, 

41 CollectionType, 

42 DataCoordinate, 

43 DatasetRef, 

44 DatasetType, 

45 DimensionConfig, 

46 LimitedButler, 

47 RegistryConfig, 

48) 

49from lsst.daf.butler.tests.utils import create_populated_sqlite_registry 

50from lsst.resources import ResourcePath, ResourcePathExpression 

51from lsst.sphgeom import RangeSet 

52 

53from ...all_dimensions_quantum_graph_builder import AllDimensionsQuantumGraphBuilder 

54from ...pipeline_graph import PipelineGraph 

55from ...quantum_graph import PredictedQuantumGraph 

56from ...single_quantum_executor import SingleQuantumExecutor 

57from ..in_memory_limited_butler import InMemoryLimitedButler 

58from ._pipeline_task import ( 

59 DynamicConnectionConfig, 

60 DynamicTestPipelineTask, 

61 DynamicTestPipelineTaskConfig, 

62) 

63from ._storage_class import MockDataset, is_mock_name 

64 

65_LOG = logging.getLogger(__name__) 

66 

67 

68class MockRepo(AbstractContextManager): 

69 """A test helper that populates a butler repository for task execution. 

70 

71 Parameters 

72 ---------- 

73 butler : `lsst.daf.butler.Butler` 

74 Butler to use for at least quantum graph building. Must be writeable. 

75 input_run : `str`, optional 

76 Name of a `~lsst.daf.butler.CollectionType.RUN` collection that will be 

77 used as an input to quantum graph generation. Input datasets created 

78 by the helper are added to this collection. 

79 input_chain : `str`, optional 

80 Name of a `~lsst.daf.butler.CollectionType.CHAINED` collection that 

81 will be the direct input to quantum graph generation. This always 

82 includes ``input_run``. 

83 input_children : `str` or `~collections.abc.Iterable` [ `str`], optional 

84 Additional collections to include in ``input_chain``. 

85 """ 

86 

87 def __init__( 

88 self, 

89 butler: Butler, 

90 input_run: str = "input_run", 

91 input_chain: str = "input_chain", 

92 input_children: Iterable[str] = (), 

93 ): 

94 self.butler = butler 

95 input_chain_definition = [input_run] 

96 input_chain_definition.extend(input_children) 

97 self.input_run = input_run 

98 self.input_chain = input_chain 

99 self.butler.collections.register(self.input_run) 

100 self.butler.collections.register(self.input_chain, CollectionType.CHAINED) 

101 self.butler.collections.redefine_chain(self.input_chain, input_chain_definition) 

102 self.pipeline_graph = PipelineGraph() 

103 self.last_auto_dataset_type_index = 0 

104 self.last_auto_task_index = 0 

105 

106 def __enter__(self) -> Self: 

107 return self 

108 

109 def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> Literal[False]: 

110 try: 

111 self.close() 

112 except Exception: 

113 _LOG.exception("An exception occurred during MockRepo.close()") 

114 return False 

115 

116 def close(self) -> None: 

117 """Release all resources associated with this mock instance. The 

118 instance may no longer be used after this is called. 

119 

120 Notes 

121 ----- 

122 Instead of calling ``close()`` directly, you can use the mock object 

123 as a context manager. For example:: 

124 

125 with MockRepo(...) as butler: 

126 butler.get(...) 

127 # butler is closed after exiting the block. 

128 """ 

129 self.butler.close() 

130 

131 def add_task( 

132 self, 

133 label: str | None = None, 

134 *, 

135 task_class: type[DynamicTestPipelineTask] = DynamicTestPipelineTask, 

136 config: DynamicTestPipelineTaskConfig | None = None, 

137 dimensions: Iterable[str] | None = None, 

138 inputs: Mapping[str, DynamicConnectionConfig] | None = None, 

139 outputs: Mapping[str, DynamicConnectionConfig] | None = None, 

140 prerequisite_inputs: Mapping[str, DynamicConnectionConfig] | None = None, 

141 init_inputs: Mapping[str, DynamicConnectionConfig] | None = None, 

142 init_outputs: Mapping[str, DynamicConnectionConfig] | None = None, 

143 ) -> None: 

144 """Add a task to the helper's pipeline graph. 

145 

146 Parameters 

147 ---------- 

148 label : `str`, optional 

149 Label for the task. If not provided, the task name will be 

150 ``task_auto{self.last_auto_task_index}``, with that variable 

151 incremented. 

152 task_class : `type`, optional 

153 Subclass of `DynamicTestPipelineTask` to use. 

154 config : `DynamicTestPipelineTaskConfig`, optional 

155 Task configuration to use. Note that the dimensions are always 

156 overridden by the ``dimensions`` argument and ``inputs`` and 

157 ``outputs`` are updated by those arguments unless they are 

158 explicitly set to empty dictionaries. 

159 dimensions : `~collections.abc.Iterable` [ `str` ], optional 

160 Dimensions of the task and any automatically-added input or output 

161 connection. 

162 inputs : `~collections.abc.Mapping` [ `str`, \ 

163 `DynamicConnectionConfig` ], optional 

164 Input connections to add. If not provided, a single connection is 

165 added with the same dimensions as the task and dataset type name 

166 ``dataset_auto{self.last_auto_dataset_type_index}``. 

167 outputs : `~collections.abc.Mapping` [ `str`, \ 

168 `DynamicConnectionConfig` ], optional 

169 Output connections to add. If not provided, a single connection is 

170 added with the same dimensions as the task and dataset type name 

171 ``dataset_auto{self.last_auto_dataset_type_index}``, with that 

172 variable incremented first. 

173 prerequisite_inputs : `~collections.abc.Mapping` [ `str`, \ 

174 `DynamicConnectionConfig` ], optional 

175 Prerequisite input connections to add. Defaults to an empty 

176 mapping. 

177 init_inputs : `~collections.abc.Mapping` [ `str`, \ 

178 `DynamicConnectionConfig` ], optional 

179 Init input connections to add. Defaults to an empty mapping. 

180 init_outputs : `~collections.abc.Mapping` [ `str`, \ 

181 `DynamicConnectionConfig` ], optional 

182 Init output connections to add. Defaults to an empty mapping. 

183 

184 Notes 

185 ----- 

186 The defaults for this method's arguments are designed to allow it to be 

187 called in succession to create a sequence of "one-to-one" tasks in 

188 which each consumes the output of the last. 

189 """ 

190 if config is None: 

191 config = DynamicTestPipelineTaskConfig() 

192 if dimensions is not None: 

193 config.dimensions = list(dimensions) 

194 if inputs is not None: 

195 config.inputs.update(inputs) 

196 else: 

197 config.inputs["input_connection"] = DynamicConnectionConfig( 

198 dataset_type_name=f"dataset_auto{self.last_auto_dataset_type_index}", 

199 dimensions=list(config.dimensions), 

200 ) 

201 if outputs is not None: 

202 config.outputs.update(outputs) 

203 else: 

204 self.last_auto_dataset_type_index += 1 

205 config.outputs["output_connection"] = DynamicConnectionConfig( 

206 dataset_type_name=f"dataset_auto{self.last_auto_dataset_type_index}", 

207 dimensions=list(config.dimensions), 

208 ) 

209 if prerequisite_inputs is not None: 

210 config.prerequisite_inputs.update(prerequisite_inputs) 

211 if init_inputs is not None: 

212 config.init_inputs.update(init_inputs) 

213 if init_outputs is not None: 

214 config.init_outputs.update(init_outputs) 

215 if label is None: 

216 self.last_auto_task_index += 1 

217 label = f"task_auto{self.last_auto_task_index}" 

218 self.pipeline_graph.add_task(label, task_class=task_class, config=config) 

219 

220 def make_quantum_graph( 

221 self, 

222 *, 

223 output: str | None = None, 

224 output_run: str = "output_run", 

225 insert_mocked_inputs: bool = True, 

226 register_output_dataset_types: bool = True, 

227 ) -> PredictedQuantumGraph: 

228 """Make a quantum graph from the pipeline task and internal data 

229 repository. 

230 

231 Parameters 

232 ---------- 

233 output : `str` or `None`, optional 

234 Name of the output chained collection to embed within the quantum 

235 graph. Note that this does not actually create this collection. 

236 output_run : `str`, optional 

237 Name of the `~lsst.daf.butler.CollectionType.RUN` collection for 

238 execution outputs. Note that this does not actually create this 

239 collection. 

240 insert_mocked_inputs : `bool`, optional 

241 Whether to automatically insert datasets for all overall inputs to 

242 the pipeline graph whose dataset types have not already been 

243 registered. If set to `False`, inputs must be provided by imported 

244 YAML files or explicit calls to `insert_datasets`, which provides 

245 more fine-grained control over the data IDs of the datasets. 

246 register_output_dataset_types : `bool`, optional 

247 If `True`, register all output dataset types. 

248 

249 Returns 

250 ------- 

251 qg : `..quantum_graph.PredictedQuantumGraph` 

252 Quantum graph. Datastore records will not be attached, since the 

253 test helper does not actually have a datastore. 

254 """ 

255 return ( 

256 self.make_quantum_graph_builder( 

257 insert_mocked_inputs=insert_mocked_inputs, 

258 register_output_dataset_types=register_output_dataset_types, 

259 output_run=output_run, 

260 ) 

261 .finish(output=output, attach_datastore_records=False) 

262 .assemble() 

263 ) 

264 

265 def make_quantum_graph_builder( 

266 self, 

267 *, 

268 output_run: str = "output_run", 

269 insert_mocked_inputs: bool = True, 

270 register_output_dataset_types: bool = True, 

271 ) -> AllDimensionsQuantumGraphBuilder: 

272 """Make a quantum graph builder from the pipeline task and internal 

273 data repository. 

274 

275 Parameters 

276 ---------- 

277 output_run : `str`, optional 

278 Name of the `~lsst.daf.butler.CollectionType.RUN` collection for 

279 execution outputs. Note that this does not actually create this 

280 collection. 

281 insert_mocked_inputs : `bool`, optional 

282 Whether to automatically insert datasets for all overall inputs to 

283 the pipeline graph whose dataset types have not already been 

284 registered. If set to `False`, inputs must be provided by imported 

285 YAML files or explicit calls to `insert_datasets`, which provides 

286 more fine-grained control over the data IDs of the datasets. 

287 register_output_dataset_types : `bool`, optional 

288 If `True`, register all output dataset types. 

289 

290 Returns 

291 ------- 

292 builder : \ 

293 `..all_dimensions_quantum_graph_builder.AllDimensionsQuantumGraphBuilder` 

294 Quantum graph builder. Note that 

295 ``attach_datastore_records=False`` must be passed to `build`, since 

296 the helper's butler does not have a datastore. 

297 """ 

298 if insert_mocked_inputs: 

299 self.pipeline_graph.resolve(self.butler.registry) 

300 for _, dataset_type_node in self.pipeline_graph.iter_overall_inputs(): 

301 assert dataset_type_node is not None, "pipeline graph is resolved." 

302 if self.butler.registry.registerDatasetType(dataset_type_node.dataset_type): 

303 self.insert_datasets(dataset_type_node.dataset_type, register=False) 

304 builder = AllDimensionsQuantumGraphBuilder( 

305 self.pipeline_graph, 

306 self.butler, 

307 input_collections=[self.input_chain], 

308 output_run=output_run, 

309 ) 

310 if register_output_dataset_types: 

311 self.pipeline_graph.register_dataset_types(self.butler) 

312 return builder 

313 

314 def insert_datasets( 

315 self, dataset_type: DatasetType | str, register: bool = True, *args: Any, **kwargs: Any 

316 ) -> list[DatasetRef]: 

317 """Insert input datasets into the test repository. 

318 

319 Parameters 

320 ---------- 

321 dataset_type : `~lsst.daf.butler.DatasetType` or `str` 

322 Dataset type or name. If a name, it must be included in the 

323 pipeline graph. 

324 register : `bool`, optional 

325 Whether to register the dataset type. If `False`, the dataset type 

326 must already be registered. 

327 *args : `object` 

328 Forwarded to `~lsst.daf.butler.query_data_ids`. 

329 **kwargs : `object` 

330 Forwarded to `~lsst.daf.butler.query_data_ids`. 

331 

332 Returns 

333 ------- 

334 refs : `list` [ `lsst.daf.butler.DatasetRef` ] 

335 References to the inserted datasets. 

336 

337 Notes 

338 ----- 

339 For dataset types with dimensions that are queryable, this queries for 

340 all data IDs in the repository (forwarding ``*args`` and ``**kwargs`` 

341 for e.g. ``where`` strings). For skypix dimensions, this queries for 

342 both patches and visit-detector regions (forwarding `*args`` and 

343 ``**kwargs`` to both) and uses all overlapping sky pixels. Dataset 

344 types with a mix of skypix and queryable dimensions are not supported. 

345 """ 

346 if isinstance(dataset_type, str): 

347 self.pipeline_graph.resolve(self.butler.registry) 

348 dataset_type = self.pipeline_graph.dataset_types[dataset_type].dataset_type 

349 if register: 

350 self.butler.registry.registerDatasetType(dataset_type) 

351 dimensions = dataset_type.dimensions 

352 if dataset_type.dimensions.skypix: 

353 if len(dimensions) == 1: 

354 (skypix_name,) = dimensions.skypix 

355 pixelization = dimensions.universe.skypix_dimensions[skypix_name].pixelization 

356 ranges = RangeSet() 

357 for patch_record in self.butler.query_dimension_records( 

358 "patch", *args, **kwargs, explain=False 

359 ): 

360 ranges |= pixelization.envelope(patch_record.region) 

361 for vdr_record in self.butler.query_dimension_records( 

362 "visit_detector_region", *args, **kwargs, explain=False 

363 ): 

364 ranges |= pixelization.envelope(vdr_record.region) 

365 data_ids = [] 

366 for begin, end in ranges: 

367 for index in range(begin, end): 

368 data_ids.append(DataCoordinate.from_required_values(dimensions, (index,))) 

369 else: 

370 raise NotImplementedError( 

371 "Can only generate data IDs for queryable dimensions and isolated skypix." 

372 ) 

373 else: 

374 data_ids = self.butler.query_data_ids(dimensions, *args, **kwargs, explain=False) 

375 return self._insert_datasets_impl(dataset_type, data_ids) 

376 

377 @abstractmethod 

378 def _insert_datasets_impl( 

379 self, dataset_type: DatasetType, data_ids: list[DataCoordinate] 

380 ) -> list[DatasetRef]: 

381 """Insert datasets after their data IDs have been generated. 

382 

383 Parameters 

384 ---------- 

385 dataset_type : `lsst.daf.butler.DatasetType` 

386 Type of the datasets. 

387 data_ids : `list` [ `lsst.daf.butler.DataCoordinate` ] 

388 Data IDs of all datasets. 

389 

390 Returns 

391 ------- 

392 refs : `list` [ `lsst.daf.butler.DatasetRef` ] 

393 References to the new datasets. 

394 """ 

395 raise NotImplementedError() 

396 

397 @abstractmethod 

398 def make_single_quantum_executor( 

399 self, qg: PredictedQuantumGraph 

400 ) -> tuple[SingleQuantumExecutor, LimitedButler]: 

401 """Make a single-quantum executor. 

402 

403 Parameters 

404 ---------- 

405 qg : `..quantum_graph.PredictedQuantumGraph` 

406 Graph whose quanta the executor must be capable of executing. 

407 

408 Returns 

409 ------- 

410 executor : `..single_quantum_executor.SingleQuantumExecutor` 

411 An executor for a single quantum. 

412 butler : `lsst.daf.butler.LimitedButler` 

413 The butler that the executor will write to. 

414 """ 

415 raise NotImplementedError() 

416 

417 

418class InMemoryRepo(MockRepo): 

419 """A test helper that simulates a butler repository for task execution 

420 without any disk I/O. 

421 

422 Parameters 

423 ---------- 

424 *args : `str` or `lsst.resources.ResourcePath` 

425 Butler YAML import files to load into the test repository. 

426 registry_config : `lsst.daf.butler.RegistryConfig`, optional 

427 Registry configuration for the repository. 

428 dimension_config : `lsst.daf.butler.DimensionConfig`, optional 

429 Dimension universe configuration for the repository. 

430 input_run : `str`, optional 

431 Name of a `~lsst.daf.butler.CollectionType.RUN` collection that will be 

432 used as an input to quantum graph generation. Input datasets created 

433 by the helper are added to this collection. 

434 input_chain : `str`, optional 

435 Name of a `~lsst.daf.butler.CollectionType.CHAINED` collection that 

436 will be the direct input to quantum graph generation. This always 

437 includes ``input_run``. 

438 use_import_collections_as_input : `bool`, `str`, or \ 

439 `~collections.abc.Iterable` [ `str`], optional 

440 Additional collections from YAML import files to include in 

441 ``input_chain``, or `True` to include all such collections (in 

442 chain-flattened lexicographical order). 

443 data_root : convertible to `lsst.resources.ResourcePath`, optional 

444 Root directory to join to each element in ``*args``. Defaults to 

445 the `lsst.daf.butler.tests.registry_data` package. 

446 

447 Notes 

448 ----- 

449 This helper maintains an `..pipeline_graph.PipelineGraph` and a 

450 no-datastore butler backed by an in-memory SQLite database for use in 

451 quantum graph generation. It creates a separate in-memory limited butler 

452 for execution as needed. 

453 """ 

454 

455 def __init__( 

456 self, 

457 *args: str | ResourcePath, 

458 registry_config: RegistryConfig | None = None, 

459 dimension_config: DimensionConfig | None = None, 

460 input_run: str = "input_run", 

461 input_chain: str = "input_chain", 

462 use_import_collections_as_input: bool | str | Iterable[str] = True, 

463 data_root: ResourcePathExpression | None = "resource://lsst.daf.butler/tests/registry_data", 

464 ): 

465 if data_root is not None: 

466 data_root = ResourcePath(data_root, forceDirectory=True) 

467 args = tuple(data_root.join(arg) for arg in args) 

468 butler = create_populated_sqlite_registry( 

469 *args, registry_config=registry_config, dimension_config=dimension_config 

470 ) 

471 if use_import_collections_as_input: 

472 if use_import_collections_as_input is True: 

473 use_import_collections_as_input = sorted(butler.collections.query("*", flatten_chains=True)) 

474 else: 

475 use_import_collections_as_input = () 

476 super().__init__( 

477 butler, 

478 input_run=input_run, 

479 input_chain=input_chain, 

480 input_children=list(use_import_collections_as_input), 

481 ) 

482 

483 def _insert_datasets_impl( 

484 self, dataset_type: DatasetType, data_ids: list[DataCoordinate] 

485 ) -> list[DatasetRef]: 

486 return self.butler.registry.insertDatasets(dataset_type, data_ids, run=self.input_run) 

487 

488 def make_limited_butler(self) -> InMemoryLimitedButler: 

489 """Make a test limited butler for execution. 

490 

491 Returns 

492 ------- 

493 limited_butler : `.InMemoryLimitedButler` 

494 A limited butler that can be used for task execution. 

495 

496 Notes 

497 ----- 

498 This queries the database-only butler used for quantum-graph generation 

499 for all datasets in the ``input_chain`` collection, and populates the 

500 limited butler with those that have a mock storage class. Other 

501 datasets are ignored, so they will appear as though they were present 

502 during quantum graph generation but absent during execution. 

503 """ 

504 butler = InMemoryLimitedButler(self.butler.dimensions, self.butler.registry.queryDatasetTypes()) 

505 for ref in self.butler.query_all_datasets(self.input_chain): 

506 if is_mock_name(ref.datasetType.storageClass_name): 

507 butler.put( 

508 MockDataset( 

509 dataset_id=ref.id, 

510 dataset_type=ref.datasetType.to_simple(), 

511 data_id=dict(ref.dataId.mapping), 

512 run=ref.run, 

513 ), 

514 ref, 

515 ) 

516 return butler 

517 

518 def make_single_quantum_executor( 

519 self, qg: PredictedQuantumGraph | None = None 

520 ) -> tuple[SingleQuantumExecutor, InMemoryLimitedButler]: 

521 """Make a single-quantum executor backed by a new limited butler. 

522 

523 Parameters 

524 ---------- 

525 qg : `..quantum_graph.PredictedQuantumGraph` 

526 Ignored by this implementation. 

527 

528 Returns 

529 ------- 

530 executor : `..single_quantum_executor.SingleQuantumExecutor` 

531 An executor for a single quantum. 

532 butler : `.InMemoryLimitedButler` 

533 The butler that the executor will write to. 

534 """ 

535 butler = self.make_limited_butler() 

536 return SingleQuantumExecutor(limited_butler_factory=butler.factory), butler 

537 

538 

539class DirectButlerRepo(MockRepo): 

540 """A test helper for task execution backed by a local direct butler. 

541 

542 Parameters 

543 ---------- 

544 butler : `lsst.daf.butler.direct_butler.DirectButler` 

545 Butler to write to. 

546 *args : `str` or `lsst.resources.ResourcePath` 

547 Butler YAML import files to load into the test repository. 

548 input_run : `str`, optional 

549 Name of a `~lsst.daf.butler.CollectionType.RUN` collection that will be 

550 used as an input to quantum graph generation. Input datasets created 

551 by the helper are added to this collection. 

552 input_chain : `str`, optional 

553 Name of a `~lsst.daf.butler.CollectionType.CHAINED` collection that 

554 will be the direct input to quantum graph generation. This always 

555 includes ``input_run``. 

556 use_import_collections_as_input : `bool`, `str`, or \ 

557 `~collections.abc.Iterable` [ `str`], optional 

558 Additional collections from YAML import files to include in 

559 ``input_chain``, or `True` to include all such collections (in 

560 chain-flattened lexicographical order). 

561 data_root : convertible to `lsst.resources.ResourcePath`, optional 

562 Root directory to join to each element in ``*args``. Defaults to 

563 the `lsst.daf.butler.tests.registry_data` package. 

564 

565 Notes 

566 ----- 

567 This helper maintains an `..pipeline_graph.PipelineGraph` and a 

568 no-datastore butler backed by an in-memory SQLite database for use in 

569 quantum graph generation. It creates a separate in-memory limited butler 

570 for execution as needed. 

571 """ 

572 

573 def __init__( 

574 self, 

575 butler: Butler, 

576 *args: str | ResourcePath, 

577 input_run: str = "input_run", 

578 input_chain: str = "input_chain", 

579 use_import_collections_as_input: bool | str | Iterable[str] = True, 

580 data_root: ResourcePathExpression | None = "resource://lsst.daf.butler/tests/registry_data", 

581 ): 

582 if data_root is not None: 

583 data_root = ResourcePath(data_root, forceDirectory=True) 

584 args = tuple(data_root.join(arg) for arg in args) 

585 for arg in args: 

586 butler.import_(filename=arg) 

587 if use_import_collections_as_input: 

588 if use_import_collections_as_input is True: 

589 use_import_collections_as_input = sorted(butler.collections.query("*", flatten_chains=True)) 

590 else: 

591 use_import_collections_as_input = () 

592 super().__init__( 

593 butler, 

594 input_run=input_run, 

595 input_chain=input_chain, 

596 input_children=list(use_import_collections_as_input), 

597 ) 

598 

599 @classmethod 

600 @contextmanager 

601 def make_temporary( 

602 cls, 

603 *args: str | ResourcePath, 

604 input_run: str = "input_run", 

605 input_chain: str = "input_chain", 

606 use_import_collections_as_input: bool | str | Iterable[str] = True, 

607 data_root: ResourcePathExpression | None = "resource://lsst.daf.butler/tests/registry_data", 

608 **kwargs: Any, 

609 ) -> Iterator[tuple[DirectButlerRepo, str]]: 

610 with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as root: 

611 config = Butler.makeRepo(root, **kwargs) 

612 with Butler.from_config(config, writeable=True) as butler: 

613 yield ( 

614 cls( 

615 butler, 

616 *args, 

617 input_run=input_run, 

618 input_chain=input_chain, 

619 use_import_collections_as_input=use_import_collections_as_input, 

620 data_root=data_root, 

621 ), 

622 root, 

623 ) 

624 

625 def _insert_datasets_impl( 

626 self, dataset_type: DatasetType, data_ids: list[DataCoordinate] 

627 ) -> list[DatasetRef]: 

628 if is_mock_name(dataset_type.storageClass_name): 

629 refs: list[DatasetRef] = [] 

630 for data_id in data_ids: 

631 data_id = self.butler.registry.expandDataId(data_id) 

632 ref = DatasetRef(dataset_type, data_id, run=self.input_run) 

633 self.butler.put( 

634 MockDataset( 

635 dataset_id=ref.id, 

636 dataset_type=ref.datasetType.to_simple(), 

637 data_id=dict(ref.dataId.mapping), 

638 run=ref.run, 

639 ), 

640 ref, 

641 ) 

642 refs.append(ref) 

643 return refs 

644 else: 

645 return self.butler.registry.insertDatasets(dataset_type, data_ids, run=self.input_run) 

646 

647 def make_single_quantum_executor( 

648 self, qg: PredictedQuantumGraph | None = None 

649 ) -> tuple[SingleQuantumExecutor, Butler]: 

650 """Make a single-quantum executor backed by a new limited butler. 

651 

652 Parameters 

653 ---------- 

654 qg : `..quantum_graph.PredictedQuantumGraph` 

655 Ignored by this implementation. 

656 

657 Returns 

658 ------- 

659 executor : `..single_quantum_executor.SingleQuantumExecutor` 

660 An executor for a single quantum. 

661 butler : `lsst.daf.butler.Butler` 

662 The butler that the executor will write to. 

663 """ 

664 return SingleQuantumExecutor(limited_butler_factory=None, butler=self.butler), self.butler