Coverage for python / lsst / pipe / base / simple_pipeline_executor.py: 19%

138 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:47 +0000

1# This file is part of pipe_base. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28from __future__ import annotations 

29 

30__all__ = ("SimplePipelineExecutor",) 

31 

32import os 

33from collections.abc import Iterable, Iterator, Mapping 

34from typing import Any 

35 

36from lsst.daf.butler import ( 

37 Butler, 

38 CollectionType, 

39 DataCoordinate, 

40 DatasetRef, 

41 Quantum, 

42) 

43from lsst.daf.butler.registry import RegistryDefaults 

44from lsst.pex.config import Config 

45 

46from ._instrument import Instrument 

47from ._quantumContext import ExecutionResources 

48from .all_dimensions_quantum_graph_builder import AllDimensionsQuantumGraphBuilder 

49from .graph import QuantumGraph 

50from .pipeline import Pipeline 

51from .pipeline_graph import PipelineGraph 

52from .pipelineTask import PipelineTask 

53from .quantum_graph import PredictedQuantumGraph 

54from .single_quantum_executor import SingleQuantumExecutor 

55from .taskFactory import TaskFactory 

56 

57 

58class SimplePipelineExecutor: 

59 """A simple, high-level executor for pipelines. 

60 

61 Parameters 

62 ---------- 

63 quantum_graph : `.QuantumGraph` 

64 Graph to be executed. 

65 butler : `~lsst.daf.butler.Butler` 

66 Object that manages all I/O. Must be initialized with `collections` 

67 and `run` properties that correspond to the input and output 

68 collections, which must be consistent with those used to create 

69 ``quantum_graph``. 

70 resources : `.ExecutionResources` 

71 The resources available to each quantum being executed. 

72 raise_on_partial_outputs : `bool`, optional 

73 If `True` raise exceptions chained by `.AnnotatedPartialOutputsError` 

74 immediately, instead of considering the partial result a success and 

75 continuing to run downstream tasks. 

76 

77 Notes 

78 ----- 

79 Most callers should use one of the `classmethod` factory functions 

80 (`from_pipeline_filename`, `from_task_class`, `from_pipeline`) instead of 

81 invoking the constructor directly; these guarantee that the 

82 `~lsst.daf.butler.Butler` and `.QuantumGraph` are created consistently. 

83 

84 This class is intended primarily to support unit testing and small-scale 

85 integration testing of `.PipelineTask` classes. It deliberately lacks many 

86 features present in the command-line-only ``pipetask`` tool in order to 

87 keep the implementation simple. Python callers that need more 

88 sophistication should call lower-level tools like 

89 `~.quantum_graph_builder.QuantumGraphBuilder` and 

90 `.single_quantum_executor.SingleQuantumExecutor` directly. 

91 """ 

92 

93 def __init__( 

94 self, 

95 quantum_graph: QuantumGraph | PredictedQuantumGraph, 

96 butler: Butler, 

97 resources: ExecutionResources | None = None, 

98 raise_on_partial_outputs: bool = True, 

99 ): 

100 from .graph import QuantumGraph 

101 

102 self._quantum_graph: QuantumGraph | None = None 

103 if isinstance(quantum_graph, QuantumGraph): 

104 self._quantum_graph = quantum_graph 

105 self.predicted = PredictedQuantumGraph.from_old_quantum_graph(self._quantum_graph) 

106 else: 

107 self.predicted = quantum_graph 

108 self.butler = butler 

109 self.resources = resources 

110 self.raise_on_partial_outputs = raise_on_partial_outputs 

111 

112 @classmethod 

113 def prep_butler( 

114 cls, 

115 root: str, 

116 inputs: Iterable[str], 

117 output: str, 

118 output_run: str | None = None, 

119 ) -> Butler: 

120 """Return configured `~lsst.daf.butler.Butler`. 

121 

122 Helper method for creating `~lsst.daf.butler.Butler` instances with 

123 collections appropriate for processing. 

124 

125 Parameters 

126 ---------- 

127 root : `str` 

128 Root of the butler data repository; must already exist, with all 

129 necessary input data. 

130 inputs : `~collections.abc.Iterable` [ `str` ] 

131 Collections to search for all input datasets, in search order. 

132 output : `str` 

133 Name of a new output `~lsst.daf.butler.CollectionType.CHAINED` 

134 collection to create that will combine both inputs and outputs. 

135 output_run : `str`, optional 

136 Name of the output `~lsst.daf.butler.CollectionType.RUN` that will 

137 directly hold all output datasets. If not provided, a name will be 

138 created from ``output`` and a timestamp. 

139 

140 Returns 

141 ------- 

142 butler : `~lsst.daf.butler.Butler` 

143 Butler client instance compatible with all `classmethod` factories. 

144 Always writeable. 

145 """ 

146 if output_run is None: 

147 output_run = f"{output}/{Instrument.makeCollectionTimestamp()}" 

148 # Make initial butler with no collections, since we haven't created 

149 # them yet. 

150 butler = Butler.from_config(root, writeable=True) 

151 butler.registry.registerCollection(output_run, CollectionType.RUN) 

152 butler.registry.registerCollection(output, CollectionType.CHAINED) 

153 collections = [output_run] 

154 collections.extend(inputs) 

155 butler.registry.setCollectionChain(output, collections) 

156 # Override the registry defaults. No need to clone. 

157 butler.registry.defaults = RegistryDefaults(collections=[output], run=output_run) 

158 return butler 

159 

160 @classmethod 

161 def from_pipeline_filename( 

162 cls, 

163 pipeline_filename: str, 

164 *, 

165 where: str = "", 

166 bind: Mapping[str, Any] | None = None, 

167 butler: Butler, 

168 resources: ExecutionResources | None = None, 

169 raise_on_partial_outputs: bool = True, 

170 attach_datastore_records: bool = False, 

171 output: str | None = None, 

172 output_run: str | None = None, 

173 ) -> SimplePipelineExecutor: 

174 """Create an executor by building a QuantumGraph from an on-disk 

175 pipeline YAML file. 

176 

177 Parameters 

178 ---------- 

179 pipeline_filename : `str` 

180 Name of the YAML file to load the pipeline definition from. 

181 where : `str`, optional 

182 Data ID query expression that constraints the quanta generated. 

183 bind : `~collections.abc.Mapping`, optional 

184 Mapping containing literal values that should be injected into the 

185 ``where`` expression, keyed by the identifiers they replace. 

186 butler : `~lsst.daf.butler.Butler` 

187 Butler that manages all I/O. `prep_butler` can be used to create 

188 one. 

189 resources : `.ExecutionResources` 

190 The resources available to each quantum being executed. 

191 raise_on_partial_outputs : `bool`, optional 

192 If `True` raise exceptions chained by 

193 `.AnnotatedPartialOutputsError` immediately, instead of considering 

194 the partial result a success and continuing to run downstream 

195 tasks. 

196 attach_datastore_records : `bool`, optional 

197 Whether to attach datastore records to the quantum graph. This is 

198 usually unnecessary, unless the executor is used to test behavior 

199 that depends on datastore records. 

200 output : `str`, optional 

201 Name of a new output `~lsst.daf.butler.CollectionType.CHAINED` 

202 collection to create that will combine both inputs and outputs. 

203 output_run : `str`, optional 

204 Name of the output `~lsst.daf.butler.CollectionType.RUN` that will 

205 directly hold all output datasets. If not provided, a name will be 

206 created from ``output`` and a timestamp. 

207 

208 Returns 

209 ------- 

210 executor : `SimplePipelineExecutor` 

211 An executor instance containing the constructed `.QuantumGraph` and 

212 `~lsst.daf.butler.Butler`, ready for `run` to be called. 

213 """ 

214 pipeline = Pipeline.fromFile(pipeline_filename) 

215 return cls.from_pipeline( 

216 pipeline, 

217 butler=butler, 

218 where=where, 

219 bind=bind, 

220 resources=resources, 

221 raise_on_partial_outputs=raise_on_partial_outputs, 

222 attach_datastore_records=attach_datastore_records, 

223 output=output, 

224 output_run=output_run, 

225 ) 

226 

227 @classmethod 

228 def from_task_class( 

229 cls, 

230 task_class: type[PipelineTask], 

231 config: Config | None = None, 

232 label: str | None = None, 

233 *, 

234 where: str = "", 

235 bind: Mapping[str, Any] | None = None, 

236 butler: Butler, 

237 resources: ExecutionResources | None = None, 

238 raise_on_partial_outputs: bool = True, 

239 attach_datastore_records: bool = False, 

240 output: str | None = None, 

241 output_run: str | None = None, 

242 ) -> SimplePipelineExecutor: 

243 """Create an executor by building a QuantumGraph from a pipeline 

244 containing a single task. 

245 

246 Parameters 

247 ---------- 

248 task_class : `type` 

249 A concrete `.PipelineTask` subclass. 

250 config : `~lsst.pex.config.Config`, optional 

251 Configuration for the task. If not provided, task-level defaults 

252 will be used (no per-instrument overrides). 

253 label : `str`, optional 

254 Label for the task in its pipeline; defaults to 

255 ``task_class._DefaultName``. 

256 where : `str`, optional 

257 Data ID query expression that constraints the quanta generated. 

258 bind : `~collections.abc.Mapping`, optional 

259 Mapping containing literal values that should be injected into the 

260 ``where`` expression, keyed by the identifiers they replace. 

261 butler : `~lsst.daf.butler.Butler` 

262 Butler that manages all I/O. `prep_butler` can be used to create 

263 one. 

264 resources : `.ExecutionResources` 

265 The resources available to each quantum being executed. 

266 raise_on_partial_outputs : `bool`, optional 

267 If `True` raise exceptions chained by 

268 `.AnnotatedPartialOutputsError` immediately, instead of considering 

269 the partial result a success and continuing to run downstream 

270 tasks. 

271 attach_datastore_records : `bool`, optional 

272 Whether to attach datastore records to the quantum graph. This is 

273 usually unnecessary, unless the executor is used to test behavior 

274 that depends on datastore records. 

275 output : `str`, optional 

276 Name of a new output `~lsst.daf.butler.CollectionType.CHAINED` 

277 collection to create that will combine both inputs and outputs. 

278 output_run : `str`, optional 

279 Name of the output `~lsst.daf.butler.CollectionType.RUN` that will 

280 directly hold all output datasets. If not provided, a name will be 

281 created from ``output`` and a timestamp. 

282 

283 Returns 

284 ------- 

285 executor : `SimplePipelineExecutor` 

286 An executor instance containing the constructed `.QuantumGraph` and 

287 `~lsst.daf.butler.Butler`, ready for `run` to be called. 

288 """ 

289 if config is None: 

290 config = task_class.ConfigClass() 

291 if label is None: 

292 label = task_class._DefaultName 

293 if not isinstance(config, task_class.ConfigClass): 

294 raise TypeError( 

295 f"Invalid config class type: expected {task_class.ConfigClass.__name__}, " 

296 f"got {type(config).__name__}." 

297 ) 

298 pipeline_graph = PipelineGraph() 

299 pipeline_graph.add_task(label=label, task_class=task_class, config=config) 

300 return cls.from_pipeline_graph( 

301 pipeline_graph, 

302 butler=butler, 

303 where=where, 

304 bind=bind, 

305 resources=resources, 

306 raise_on_partial_outputs=raise_on_partial_outputs, 

307 attach_datastore_records=attach_datastore_records, 

308 output=output, 

309 output_run=output_run, 

310 ) 

311 

312 @classmethod 

313 def from_pipeline( 

314 cls, 

315 pipeline: Pipeline, 

316 *, 

317 where: str = "", 

318 bind: Mapping[str, Any] | None = None, 

319 butler: Butler, 

320 resources: ExecutionResources | None = None, 

321 raise_on_partial_outputs: bool = True, 

322 attach_datastore_records: bool = False, 

323 output: str | None = None, 

324 output_run: str | None = None, 

325 ) -> SimplePipelineExecutor: 

326 """Create an executor by building a QuantumGraph from an in-memory 

327 pipeline. 

328 

329 Parameters 

330 ---------- 

331 pipeline : `.Pipeline` or `~collections.abc.Iterable` [ `.TaskDef` ] 

332 A Python object describing the tasks to run, along with their 

333 labels and configuration. 

334 where : `str`, optional 

335 Data ID query expression that constraints the quanta generated. 

336 bind : `~collections.abc.Mapping`, optional 

337 Mapping containing literal values that should be injected into the 

338 ``where`` expression, keyed by the identifiers they replace. 

339 butler : `~lsst.daf.butler.Butler` 

340 Butler that manages all I/O. `prep_butler` can be used to create 

341 one. 

342 resources : `.ExecutionResources` 

343 The resources available to each quantum being executed. 

344 raise_on_partial_outputs : `bool`, optional 

345 If `True` raise exceptions chained by 

346 `.AnnotatedPartialOutputsError` immediately, instead of considering 

347 the partial result a success and continuing to run downstream 

348 tasks. 

349 attach_datastore_records : `bool`, optional 

350 Whether to attach datastore records to the quantum graph. This is 

351 usually unnecessary, unless the executor is used to test behavior 

352 that depends on datastore records. 

353 output : `str`, optional 

354 Name of a new output `~lsst.daf.butler.CollectionType.CHAINED` 

355 collection to create that will combine both inputs and outputs. 

356 output_run : `str`, optional 

357 Name of the output `~lsst.daf.butler.CollectionType.RUN` that will 

358 directly hold all output datasets. If not provided, a name will 

359 be created from ``output`` and a timestamp. 

360 

361 Returns 

362 ------- 

363 executor : `SimplePipelineExecutor` 

364 An executor instance containing the constructed `.QuantumGraph` and 

365 `~lsst.daf.butler.Butler`, ready for `run` to be called. 

366 """ 

367 pipeline_graph = pipeline.to_graph() 

368 return cls.from_pipeline_graph( 

369 pipeline_graph, 

370 where=where, 

371 bind=bind, 

372 butler=butler, 

373 resources=resources, 

374 raise_on_partial_outputs=raise_on_partial_outputs, 

375 attach_datastore_records=attach_datastore_records, 

376 output=output, 

377 output_run=output_run, 

378 ) 

379 

380 @classmethod 

381 def from_pipeline_graph( 

382 cls, 

383 pipeline_graph: PipelineGraph, 

384 *, 

385 where: str = "", 

386 bind: Mapping[str, Any] | None = None, 

387 butler: Butler, 

388 resources: ExecutionResources | None = None, 

389 raise_on_partial_outputs: bool = True, 

390 attach_datastore_records: bool = False, 

391 output: str | None = None, 

392 output_run: str | None = None, 

393 ) -> SimplePipelineExecutor: 

394 """Create an executor by building a QuantumGraph from an in-memory 

395 pipeline graph. 

396 

397 Parameters 

398 ---------- 

399 pipeline_graph : `~.pipeline_graph.PipelineGraph` 

400 A Python object describing the tasks to run, along with their 

401 labels and configuration, in graph form. Will be resolved against 

402 the given ``butler``, with any existing resolutions ignored. 

403 where : `str`, optional 

404 Data ID query expression that constraints the quanta generated. 

405 bind : `~collections.abc.Mapping`, optional 

406 Mapping containing literal values that should be injected into the 

407 ``where`` expression, keyed by the identifiers they replace. 

408 butler : `~lsst.daf.butler.Butler` 

409 Butler that manages all I/O. `prep_butler` can be used to create 

410 one. Must have its `~lsst.daf.butler.Butler.run` and 

411 ``butler.collections.defaults`` not empty and not `None`. 

412 resources : `.ExecutionResources` 

413 The resources available to each quantum being executed. 

414 raise_on_partial_outputs : `bool`, optional 

415 If `True` raise exceptions chained by 

416 `.AnnotatedPartialOutputsError` immediately, instead 

417 of considering the partial result a success and continuing to run 

418 downstream tasks. 

419 attach_datastore_records : `bool`, optional 

420 Whether to attach datastore records to the quantum graph. This is 

421 usually unnecessary, unless the executor is used to test behavior 

422 that depends on datastore records. 

423 output : `str`, optional 

424 Name of a new output `~lsst.daf.butler.CollectionType.CHAINED` 

425 collection to create that will combine both inputs and outputs. 

426 output_run : `str`, optional 

427 Name of the output `~lsst.daf.butler.CollectionType.RUN` that will 

428 directly hold all output datasets. If not provided, a name will 

429 be created from ``output`` and a timestamp. 

430 

431 Returns 

432 ------- 

433 executor : `SimplePipelineExecutor` 

434 An executor instance containing the constructed 

435 `.QuantumGraph` and `~lsst.daf.butler.Butler`, ready 

436 for `run` to be called. 

437 """ 

438 if output_run is None: 

439 output_run = butler.run 

440 if output_run is None: 

441 if output is None: 

442 raise TypeError("At least one of output or output_run must be provided.") 

443 output_run = f"{output}/{Instrument.makeCollectionTimestamp()}" 

444 

445 quantum_graph_builder = AllDimensionsQuantumGraphBuilder( 

446 pipeline_graph, butler, where=where, bind=bind, output_run=output_run 

447 ) 

448 metadata = { 

449 "skip_existing_in": [], 

450 "skip_existing": False, 

451 "data_query": where, 

452 } 

453 predicted = quantum_graph_builder.finish( 

454 output=output, 

455 metadata=metadata, 

456 attach_datastore_records=attach_datastore_records, 

457 ).assemble() 

458 return cls( 

459 predicted, 

460 butler=butler, 

461 resources=resources, 

462 raise_on_partial_outputs=raise_on_partial_outputs, 

463 ) 

464 

465 @property 

466 def quantum_graph(self) -> QuantumGraph: 

467 """The quantum graph run by this executor.""" 

468 if self._quantum_graph is None: 

469 self._quantum_graph = self.predicted.to_old_quantum_graph() 

470 return self._quantum_graph 

471 

472 def use_local_butler( 

473 self, root: str, register_dataset_types: bool = True, transfer_dimensions: bool = True 

474 ) -> Butler: 

475 """Transfer all inputs to a local data repository. and set the executor 

476 to write outputs to it. 

477 

478 Parameters 

479 ---------- 

480 root : `str` 

481 Path to the local data repository; created if it does not exist. 

482 register_dataset_types : `bool`, optional 

483 Whether to register dataset types in the new repository. If 

484 `False`, the local data repository must already exist and already 

485 have all input dataset types registered. 

486 transfer_dimensions : `bool`, optional 

487 Whether to transfer dimension records to the new repository. If 

488 `False`, the local data repository must already exist and already 

489 have all needed dimension records. 

490 

491 Returns 

492 ------- 

493 butler : `lsst.daf.butler.Butler` 

494 Writeable butler for local data repository. 

495 

496 Notes 

497 ----- 

498 The input collection structure from the original data repository is not 

499 preserved by this method (it cannot be reconstructed from the quantum 

500 graph). Instead, a `~lsst.daf.butler.CollectionType.TAGGED` collection 

501 is created to gather all inputs, and appended to the output 

502 `~lsst.daf.butler.CollectionType.CHAINED` collection after the output 

503 `~lsst.daf.butler.CollectionType.RUN` collection. Calibration inputs 

504 with the same data ID but multiple validity ranges are *not* included 

505 in that `~lsst.daf.butler.CollectionType.TAGGED`; they are still 

506 transferred to the local data repository, but can only be found via the 

507 quantum graph or their original `~lsst.daf.butler.CollectionType.RUN` 

508 collections. 

509 """ 

510 if not os.path.exists(root): 

511 Butler.makeRepo(root) 

512 out_butler = Butler.from_config(root, writeable=True) 

513 

514 output_run = self.predicted.header.output_run 

515 out_butler.collections.register(output_run, CollectionType.RUN) 

516 output = self.predicted.header.output 

517 inputs: str | None = None 

518 if output is not None: 

519 inputs = f"{output}/inputs" 

520 out_butler.collections.register(output, CollectionType.CHAINED) 

521 out_butler.collections.register(inputs, CollectionType.TAGGED) 

522 out_butler.collections.redefine_chain(output, [output_run, inputs]) 

523 

524 if transfer_dimensions: 

525 # We can't just let the transfer_from call below take care of this 

526 # because we need dimensions for outputs as well as inputs. And if 

527 # we have to do the outputs explicitly, it's more efficient to do 

528 # the inputs at the same time since a lot of those dimensions will 

529 # be the same. 

530 self._transfer_qg_dimension_records(out_butler) 

531 

532 # Extract overall-input DatasetRefs to transfer and possibly insert 

533 # into a TAGGED collection. 

534 refs: set[DatasetRef] = set() 

535 to_tag_by_type: dict[str, dict[DataCoordinate, DatasetRef | None]] = {} 

536 pipeline_graph = self.predicted.pipeline_graph 

537 for name, dataset_type_node in pipeline_graph.iter_overall_inputs(): 

538 assert dataset_type_node is not None, "PipelineGraph should be resolved." 

539 to_tag_for_type = to_tag_by_type.setdefault(name, {}) 

540 for task_node in pipeline_graph.consumers_of(name): 

541 for quantum in self.predicted.build_execution_quanta(task_label=task_node.label).values(): 

542 for ref in quantum.inputs[name]: 

543 ref = dataset_type_node.generalize_ref(ref) 

544 refs.add(ref) 

545 if to_tag_for_type.setdefault(ref.dataId, ref) != ref: 

546 # There is already a dataset with the same data ID 

547 # and dataset type, but a different UUID/run. This 

548 # can only happen for calibrations found in 

549 # calibration collections, and for now we have no 

550 # choice but to leave them out of the TAGGED inputs 

551 # collection in the local butler. 

552 to_tag_for_type[ref.dataId] = None 

553 

554 out_butler.transfer_from( 

555 self.butler, 

556 refs, 

557 register_dataset_types=register_dataset_types, 

558 transfer_dimensions=False, 

559 ) 

560 

561 if inputs is not None: 

562 to_tag_flat: list[DatasetRef] = [] 

563 for ref_map in to_tag_by_type.values(): 

564 for tag_ref in ref_map.values(): 

565 if tag_ref is not None: 

566 to_tag_flat.append(tag_ref) 

567 out_butler.registry.associate(inputs, to_tag_flat) 

568 

569 out_butler.registry.defaults = self.butler.registry.defaults.clone(collections=output, run=output_run) 

570 self.butler = out_butler 

571 return self.butler 

572 

573 def run(self, register_dataset_types: bool = False, save_versions: bool = True) -> list[Quantum]: 

574 """Run all the quanta in the quantum graph in topological order. 

575 

576 Use this method to run all quanta in the graph. Use 

577 `as_generator` to get a generator to run the quanta one at 

578 a time. 

579 

580 Parameters 

581 ---------- 

582 register_dataset_types : `bool`, optional 

583 If `True`, register all output dataset types before executing any 

584 quanta. 

585 save_versions : `bool`, optional 

586 If `True` (default), save a package versions dataset. 

587 

588 Returns 

589 ------- 

590 quanta : `list` [ `~lsst.daf.butler.Quantum` ] 

591 Executed quanta. 

592 

593 Notes 

594 ----- 

595 A topological ordering is not in general unique, but no other 

596 guarantees are made about the order in which quanta are processed. 

597 """ 

598 return list( 

599 self.as_generator(register_dataset_types=register_dataset_types, save_versions=save_versions) 

600 ) 

601 

602 def as_generator( 

603 self, register_dataset_types: bool = False, save_versions: bool = True 

604 ) -> Iterator[Quantum]: 

605 """Yield quanta in the quantum graph in topological order. 

606 

607 These quanta will be run as the returned generator is iterated 

608 over. Use this method to run the quanta one at a time. 

609 Use `run` to run all quanta in the graph. 

610 

611 Parameters 

612 ---------- 

613 register_dataset_types : `bool`, optional 

614 If `True`, register all output dataset types before executing any 

615 quanta. 

616 save_versions : `bool`, optional 

617 If `True` (default), save a package versions dataset. 

618 

619 Returns 

620 ------- 

621 quanta : `~collections.abc.Iterator` [ `~lsst.daf.butler.Quantum` ] 

622 Executed quanta. 

623 

624 Notes 

625 ----- 

626 Global initialization steps (see `.QuantumGraph.init_output_run`) are 

627 performed immediately when this method is called, but individual quanta 

628 are not actually executed until the returned iterator is iterated over. 

629 

630 A topological ordering is not in general unique, but no other 

631 guarantees are made about the order in which quanta are processed. 

632 """ 

633 if register_dataset_types: 

634 self.predicted.pipeline_graph.register_dataset_types(self.butler) 

635 self.predicted.write_configs(self.butler, compare_existing=False) 

636 self.predicted.write_init_outputs(self.butler, skip_existing=False) 

637 if save_versions: 

638 self.predicted.write_packages(self.butler, compare_existing=False) 

639 task_factory = TaskFactory() 

640 single_quantum_executor = SingleQuantumExecutor( 

641 butler=self.butler, 

642 task_factory=task_factory, 

643 resources=self.resources, 

644 raise_on_partial_outputs=self.raise_on_partial_outputs, 

645 ) 

646 self.predicted.build_execution_quanta() 

647 nodes_map = self.predicted.quantum_only_xgraph.nodes 

648 # Important that this returns a generator expression rather than being 

649 # a generator itself; that is what makes the init stuff above happen 

650 # immediately instead of when the first quanta is executed, which might 

651 # be useful for callers who want to check the state of the repo in 

652 # between. 

653 return ( 

654 single_quantum_executor.execute( 

655 nodes_map[quantum_id]["pipeline_node"], 

656 nodes_map[quantum_id]["quantum"], 

657 quantum_id, 

658 )[0] 

659 for quantum_id in self.predicted 

660 ) 

661 

662 def _transfer_qg_dimension_records(self, out_butler: Butler) -> None: 

663 """Transfer all dimension records from the quantum graph to a butler. 

664 

665 Parameters 

666 ---------- 

667 out_butler : `lsst.daf.butler.Butler` 

668 Butler to transfer records to. 

669 """ 

670 assert self.predicted.dimension_data is not None, "Dimension data must be present for execution." 

671 records = self.predicted.dimension_data.records 

672 dimensions = out_butler.dimensions.sorted(records.keys()) 

673 for dimension in dimensions: 

674 record_set = records[dimension.name] 

675 if record_set and record_set.element.has_own_table: 

676 out_butler.registry.insertDimensionData( 

677 record_set.element, 

678 *record_set, 

679 skip_existing=True, 

680 )