Coverage for python / lsst / pipe / base / simple_pipeline_executor.py: 19%

137 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-24 08:19 +0000

1# This file is part of pipe_base. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28from __future__ import annotations 

29 

30__all__ = ("SimplePipelineExecutor",) 

31 

32from collections.abc import Iterable, Iterator, Mapping 

33from typing import Any 

34 

35from lsst.daf.butler import ( 

36 Butler, 

37 CollectionType, 

38 DataCoordinate, 

39 DatasetRef, 

40 Quantum, 

41) 

42from lsst.daf.butler.registry import RegistryDefaults 

43from lsst.pex.config import Config 

44 

45from ._instrument import Instrument 

46from ._quantumContext import ExecutionResources 

47from .all_dimensions_quantum_graph_builder import AllDimensionsQuantumGraphBuilder 

48from .graph import QuantumGraph 

49from .pipeline import Pipeline 

50from .pipeline_graph import PipelineGraph 

51from .pipelineTask import PipelineTask 

52from .quantum_graph import PredictedQuantumGraph 

53from .single_quantum_executor import SingleQuantumExecutor 

54from .taskFactory import TaskFactory 

55 

56 

57class SimplePipelineExecutor: 

58 """A simple, high-level executor for pipelines. 

59 

60 Parameters 

61 ---------- 

62 quantum_graph : `.QuantumGraph` 

63 Graph to be executed. 

64 butler : `~lsst.daf.butler.Butler` 

65 Object that manages all I/O. Must be initialized with `collections` 

66 and `run` properties that correspond to the input and output 

67 collections, which must be consistent with those used to create 

68 ``quantum_graph``. 

69 resources : `.ExecutionResources` 

70 The resources available to each quantum being executed. 

71 raise_on_partial_outputs : `bool`, optional 

72 If `True` raise exceptions chained by `.AnnotatedPartialOutputsError` 

73 immediately, instead of considering the partial result a success and 

74 continuing to run downstream tasks. 

75 

76 Notes 

77 ----- 

78 Most callers should use one of the `classmethod` factory functions 

79 (`from_pipeline_filename`, `from_task_class`, `from_pipeline`) instead of 

80 invoking the constructor directly; these guarantee that the 

81 `~lsst.daf.butler.Butler` and `.QuantumGraph` are created consistently. 

82 

83 This class is intended primarily to support unit testing and small-scale 

84 integration testing of `.PipelineTask` classes. It deliberately lacks many 

85 features present in the command-line-only ``pipetask`` tool in order to 

86 keep the implementation simple. Python callers that need more 

87 sophistication should call lower-level tools like 

88 `~.quantum_graph_builder.QuantumGraphBuilder` and 

89 `.single_quantum_executor.SingleQuantumExecutor` directly. 

90 """ 

91 

92 def __init__( 

93 self, 

94 quantum_graph: QuantumGraph | PredictedQuantumGraph, 

95 butler: Butler, 

96 resources: ExecutionResources | None = None, 

97 raise_on_partial_outputs: bool = True, 

98 ): 

99 from .graph import QuantumGraph 

100 

101 self._quantum_graph: QuantumGraph | None = None 

102 if isinstance(quantum_graph, QuantumGraph): 

103 self._quantum_graph = quantum_graph 

104 self.predicted = PredictedQuantumGraph.from_old_quantum_graph(self._quantum_graph) 

105 else: 

106 self.predicted = quantum_graph 

107 self.butler = butler 

108 self.resources = resources 

109 self.raise_on_partial_outputs = raise_on_partial_outputs 

110 

111 @classmethod 

112 def prep_butler( 

113 cls, 

114 root: str, 

115 inputs: Iterable[str], 

116 output: str, 

117 output_run: str | None = None, 

118 ) -> Butler: 

119 """Return configured `~lsst.daf.butler.Butler`. 

120 

121 Helper method for creating `~lsst.daf.butler.Butler` instances with 

122 collections appropriate for processing. 

123 

124 Parameters 

125 ---------- 

126 root : `str` 

127 Root of the butler data repository; must already exist, with all 

128 necessary input data. 

129 inputs : `~collections.abc.Iterable` [ `str` ] 

130 Collections to search for all input datasets, in search order. 

131 output : `str` 

132 Name of a new output `~lsst.daf.butler.CollectionType.CHAINED` 

133 collection to create that will combine both inputs and outputs. 

134 output_run : `str`, optional 

135 Name of the output `~lsst.daf.butler.CollectionType.RUN` that will 

136 directly hold all output datasets. If not provided, a name will be 

137 created from ``output`` and a timestamp. 

138 

139 Returns 

140 ------- 

141 butler : `~lsst.daf.butler.Butler` 

142 Butler client instance compatible with all `classmethod` factories. 

143 Always writeable. 

144 """ 

145 if output_run is None: 

146 output_run = f"{output}/{Instrument.makeCollectionTimestamp()}" 

147 # Make initial butler with no collections, since we haven't created 

148 # them yet. 

149 butler = Butler.from_config(root, writeable=True) 

150 butler.registry.registerCollection(output_run, CollectionType.RUN) 

151 butler.registry.registerCollection(output, CollectionType.CHAINED) 

152 collections = [output_run] 

153 collections.extend(inputs) 

154 butler.registry.setCollectionChain(output, collections) 

155 # Override the registry defaults. No need to clone. 

156 butler.registry.defaults = RegistryDefaults(collections=[output], run=output_run) 

157 return butler 

158 

159 @classmethod 

160 def from_pipeline_filename( 

161 cls, 

162 pipeline_filename: str, 

163 *, 

164 where: str = "", 

165 bind: Mapping[str, Any] | None = None, 

166 butler: Butler, 

167 resources: ExecutionResources | None = None, 

168 raise_on_partial_outputs: bool = True, 

169 attach_datastore_records: bool = False, 

170 output: str | None = None, 

171 output_run: str | None = None, 

172 ) -> SimplePipelineExecutor: 

173 """Create an executor by building a QuantumGraph from an on-disk 

174 pipeline YAML file. 

175 

176 Parameters 

177 ---------- 

178 pipeline_filename : `str` 

179 Name of the YAML file to load the pipeline definition from. 

180 where : `str`, optional 

181 Data ID query expression that constraints the quanta generated. 

182 bind : `~collections.abc.Mapping`, optional 

183 Mapping containing literal values that should be injected into the 

184 ``where`` expression, keyed by the identifiers they replace. 

185 butler : `~lsst.daf.butler.Butler` 

186 Butler that manages all I/O. `prep_butler` can be used to create 

187 one. 

188 resources : `.ExecutionResources` 

189 The resources available to each quantum being executed. 

190 raise_on_partial_outputs : `bool`, optional 

191 If `True` raise exceptions chained by 

192 `.AnnotatedPartialOutputsError` immediately, instead of considering 

193 the partial result a success and continuing to run downstream 

194 tasks. 

195 attach_datastore_records : `bool`, optional 

196 Whether to attach datastore records to the quantum graph. This is 

197 usually unnecessary, unless the executor is used to test behavior 

198 that depends on datastore records. 

199 output : `str`, optional 

200 Name of a new output `~lsst.daf.butler.CollectionType.CHAINED` 

201 collection to create that will combine both inputs and outputs. 

202 output_run : `str`, optional 

203 Name of the output `~lsst.daf.butler.CollectionType.RUN` that will 

204 directly hold all output datasets. If not provided, a name will be 

205 created from ``output`` and a timestamp. 

206 

207 Returns 

208 ------- 

209 executor : `SimplePipelineExecutor` 

210 An executor instance containing the constructed `.QuantumGraph` and 

211 `~lsst.daf.butler.Butler`, ready for `run` to be called. 

212 """ 

213 pipeline = Pipeline.fromFile(pipeline_filename) 

214 return cls.from_pipeline( 

215 pipeline, 

216 butler=butler, 

217 where=where, 

218 bind=bind, 

219 resources=resources, 

220 raise_on_partial_outputs=raise_on_partial_outputs, 

221 attach_datastore_records=attach_datastore_records, 

222 output=output, 

223 output_run=output_run, 

224 ) 

225 

226 @classmethod 

227 def from_task_class( 

228 cls, 

229 task_class: type[PipelineTask], 

230 config: Config | None = None, 

231 label: str | None = None, 

232 *, 

233 where: str = "", 

234 bind: Mapping[str, Any] | None = None, 

235 butler: Butler, 

236 resources: ExecutionResources | None = None, 

237 raise_on_partial_outputs: bool = True, 

238 attach_datastore_records: bool = False, 

239 output: str | None = None, 

240 output_run: str | None = None, 

241 ) -> SimplePipelineExecutor: 

242 """Create an executor by building a QuantumGraph from a pipeline 

243 containing a single task. 

244 

245 Parameters 

246 ---------- 

247 task_class : `type` 

248 A concrete `.PipelineTask` subclass. 

249 config : `~lsst.pex.config.Config`, optional 

250 Configuration for the task. If not provided, task-level defaults 

251 will be used (no per-instrument overrides). 

252 label : `str`, optional 

253 Label for the task in its pipeline; defaults to 

254 ``task_class._DefaultName``. 

255 where : `str`, optional 

256 Data ID query expression that constraints the quanta generated. 

257 bind : `~collections.abc.Mapping`, optional 

258 Mapping containing literal values that should be injected into the 

259 ``where`` expression, keyed by the identifiers they replace. 

260 butler : `~lsst.daf.butler.Butler` 

261 Butler that manages all I/O. `prep_butler` can be used to create 

262 one. 

263 resources : `.ExecutionResources` 

264 The resources available to each quantum being executed. 

265 raise_on_partial_outputs : `bool`, optional 

266 If `True` raise exceptions chained by 

267 `.AnnotatedPartialOutputsError` immediately, instead of considering 

268 the partial result a success and continuing to run downstream 

269 tasks. 

270 attach_datastore_records : `bool`, optional 

271 Whether to attach datastore records to the quantum graph. This is 

272 usually unnecessary, unless the executor is used to test behavior 

273 that depends on datastore records. 

274 output : `str`, optional 

275 Name of a new output `~lsst.daf.butler.CollectionType.CHAINED` 

276 collection to create that will combine both inputs and outputs. 

277 output_run : `str`, optional 

278 Name of the output `~lsst.daf.butler.CollectionType.RUN` that will 

279 directly hold all output datasets. If not provided, a name will be 

280 created from ``output`` and a timestamp. 

281 

282 Returns 

283 ------- 

284 executor : `SimplePipelineExecutor` 

285 An executor instance containing the constructed `.QuantumGraph` and 

286 `~lsst.daf.butler.Butler`, ready for `run` to be called. 

287 """ 

288 if config is None: 

289 config = task_class.ConfigClass() 

290 if label is None: 

291 label = task_class._DefaultName 

292 if not isinstance(config, task_class.ConfigClass): 

293 raise TypeError( 

294 f"Invalid config class type: expected {task_class.ConfigClass.__name__}, " 

295 f"got {type(config).__name__}." 

296 ) 

297 pipeline_graph = PipelineGraph() 

298 pipeline_graph.add_task(label=label, task_class=task_class, config=config) 

299 return cls.from_pipeline_graph( 

300 pipeline_graph, 

301 butler=butler, 

302 where=where, 

303 bind=bind, 

304 resources=resources, 

305 raise_on_partial_outputs=raise_on_partial_outputs, 

306 attach_datastore_records=attach_datastore_records, 

307 output=output, 

308 output_run=output_run, 

309 ) 

310 

311 @classmethod 

312 def from_pipeline( 

313 cls, 

314 pipeline: Pipeline, 

315 *, 

316 where: str = "", 

317 bind: Mapping[str, Any] | None = None, 

318 butler: Butler, 

319 resources: ExecutionResources | None = None, 

320 raise_on_partial_outputs: bool = True, 

321 attach_datastore_records: bool = False, 

322 output: str | None = None, 

323 output_run: str | None = None, 

324 ) -> SimplePipelineExecutor: 

325 """Create an executor by building a QuantumGraph from an in-memory 

326 pipeline. 

327 

328 Parameters 

329 ---------- 

330 pipeline : `.Pipeline` or `~collections.abc.Iterable` [ `.TaskDef` ] 

331 A Python object describing the tasks to run, along with their 

332 labels and configuration. 

333 where : `str`, optional 

334 Data ID query expression that constraints the quanta generated. 

335 bind : `~collections.abc.Mapping`, optional 

336 Mapping containing literal values that should be injected into the 

337 ``where`` expression, keyed by the identifiers they replace. 

338 butler : `~lsst.daf.butler.Butler` 

339 Butler that manages all I/O. `prep_butler` can be used to create 

340 one. 

341 resources : `.ExecutionResources` 

342 The resources available to each quantum being executed. 

343 raise_on_partial_outputs : `bool`, optional 

344 If `True` raise exceptions chained by 

345 `.AnnotatedPartialOutputsError` immediately, instead of considering 

346 the partial result a success and continuing to run downstream 

347 tasks. 

348 attach_datastore_records : `bool`, optional 

349 Whether to attach datastore records to the quantum graph. This is 

350 usually unnecessary, unless the executor is used to test behavior 

351 that depends on datastore records. 

352 output : `str`, optional 

353 Name of a new output `~lsst.daf.butler.CollectionType.CHAINED` 

354 collection to create that will combine both inputs and outputs. 

355 output_run : `str`, optional 

356 Name of the output `~lsst.daf.butler.CollectionType.RUN` that will 

357 directly hold all output datasets. If not provided, a name will 

358 be created from ``output`` and a timestamp. 

359 

360 Returns 

361 ------- 

362 executor : `SimplePipelineExecutor` 

363 An executor instance containing the constructed `.QuantumGraph` and 

364 `~lsst.daf.butler.Butler`, ready for `run` to be called. 

365 """ 

366 pipeline_graph = pipeline.to_graph() 

367 return cls.from_pipeline_graph( 

368 pipeline_graph, 

369 where=where, 

370 bind=bind, 

371 butler=butler, 

372 resources=resources, 

373 raise_on_partial_outputs=raise_on_partial_outputs, 

374 attach_datastore_records=attach_datastore_records, 

375 output=output, 

376 output_run=output_run, 

377 ) 

378 

379 @classmethod 

380 def from_pipeline_graph( 

381 cls, 

382 pipeline_graph: PipelineGraph, 

383 *, 

384 where: str = "", 

385 bind: Mapping[str, Any] | None = None, 

386 butler: Butler, 

387 resources: ExecutionResources | None = None, 

388 raise_on_partial_outputs: bool = True, 

389 attach_datastore_records: bool = False, 

390 output: str | None = None, 

391 output_run: str | None = None, 

392 ) -> SimplePipelineExecutor: 

393 """Create an executor by building a QuantumGraph from an in-memory 

394 pipeline graph. 

395 

396 Parameters 

397 ---------- 

398 pipeline_graph : `~.pipeline_graph.PipelineGraph` 

399 A Python object describing the tasks to run, along with their 

400 labels and configuration, in graph form. Will be resolved against 

401 the given ``butler``, with any existing resolutions ignored. 

402 where : `str`, optional 

403 Data ID query expression that constraints the quanta generated. 

404 bind : `~collections.abc.Mapping`, optional 

405 Mapping containing literal values that should be injected into the 

406 ``where`` expression, keyed by the identifiers they replace. 

407 butler : `~lsst.daf.butler.Butler` 

408 Butler that manages all I/O. `prep_butler` can be used to create 

409 one. Must have its `~lsst.daf.butler.Butler.run` and 

410 ``butler.collections.defaults`` not empty and not `None`. 

411 resources : `.ExecutionResources` 

412 The resources available to each quantum being executed. 

413 raise_on_partial_outputs : `bool`, optional 

414 If `True` raise exceptions chained by 

415 `.AnnotatedPartialOutputsError` immediately, instead 

416 of considering the partial result a success and continuing to run 

417 downstream tasks. 

418 attach_datastore_records : `bool`, optional 

419 Whether to attach datastore records to the quantum graph. This is 

420 usually unnecessary, unless the executor is used to test behavior 

421 that depends on datastore records. 

422 output : `str`, optional 

423 Name of a new output `~lsst.daf.butler.CollectionType.CHAINED` 

424 collection to create that will combine both inputs and outputs. 

425 output_run : `str`, optional 

426 Name of the output `~lsst.daf.butler.CollectionType.RUN` that will 

427 directly hold all output datasets. If not provided, a name will 

428 be created from ``output`` and a timestamp. 

429 

430 Returns 

431 ------- 

432 executor : `SimplePipelineExecutor` 

433 An executor instance containing the constructed 

434 `.QuantumGraph` and `~lsst.daf.butler.Butler`, ready 

435 for `run` to be called. 

436 """ 

437 if output_run is None: 

438 output_run = butler.run 

439 if output_run is None: 

440 if output is None: 

441 raise TypeError("At least one of output or output_run must be provided.") 

442 output_run = f"{output}/{Instrument.makeCollectionTimestamp()}" 

443 

444 quantum_graph_builder = AllDimensionsQuantumGraphBuilder( 

445 pipeline_graph, butler, where=where, bind=bind, output_run=output_run 

446 ) 

447 metadata = { 

448 "skip_existing_in": [], 

449 "skip_existing": False, 

450 "data_query": where, 

451 } 

452 predicted = quantum_graph_builder.finish( 

453 output=output, 

454 metadata=metadata, 

455 attach_datastore_records=attach_datastore_records, 

456 ).assemble() 

457 return cls( 

458 predicted, 

459 butler=butler, 

460 resources=resources, 

461 raise_on_partial_outputs=raise_on_partial_outputs, 

462 ) 

463 

464 @property 

465 def quantum_graph(self) -> QuantumGraph: 

466 """The quantum graph run by this executor.""" 

467 if self._quantum_graph is None: 

468 self._quantum_graph = self.predicted.to_old_quantum_graph() 

469 return self._quantum_graph 

470 

471 def use_local_butler( 

472 self, root: str, register_dataset_types: bool = True, transfer_dimensions: bool = True 

473 ) -> Butler: 

474 """Transfer all inputs to a local data repository. and set the executor 

475 to write outputs to it. 

476 

477 Parameters 

478 ---------- 

479 root : `str` 

480 Path to the local data repository; created if it does not exist. 

481 register_dataset_types : `bool`, optional 

482 Whether to register dataset types in the new repository. If 

483 `False`, the local data repository must already exist and already 

484 have all input dataset types registered. 

485 transfer_dimensions : `bool`, optional 

486 Whether to transfer dimension records to the new repository. If 

487 `False`, the local data repository must already exist and already 

488 have all needed dimension records. 

489 

490 Returns 

491 ------- 

492 butler : `lsst.daf.butler.Butler` 

493 Writeable butler for local data repository. 

494 

495 Notes 

496 ----- 

497 The input collection structure from the original data repository is not 

498 preserved by this method (it cannot be reconstructed from the quantum 

499 graph). Instead, a `~lsst.daf.butler.CollectionType.TAGGED` collection 

500 is created to gather all inputs, and appended to the output 

501 `~lsst.daf.butler.CollectionType.CHAINED` collection after the output 

502 `~lsst.daf.butler.CollectionType.RUN` collection. Calibration inputs 

503 with the same data ID but multiple validity ranges are *not* included 

504 in that `~lsst.daf.butler.CollectionType.TAGGED`; they are still 

505 transferred to the local data repository, but can only be found via the 

506 quantum graph or their original `~lsst.daf.butler.CollectionType.RUN` 

507 collections. 

508 """ 

509 if not Butler.has_repo_config(root): 

510 Butler.makeRepo(root) 

511 out_butler = Butler.from_config(root, writeable=True) 

512 

513 output_run = self.predicted.header.output_run 

514 out_butler.collections.register(output_run, CollectionType.RUN) 

515 output = self.predicted.header.output 

516 inputs: str | None = None 

517 if output is not None: 

518 inputs = f"{output}/inputs" 

519 out_butler.collections.register(output, CollectionType.CHAINED) 

520 out_butler.collections.register(inputs, CollectionType.TAGGED) 

521 out_butler.collections.redefine_chain(output, [output_run, inputs]) 

522 

523 if transfer_dimensions: 

524 # We can't just let the transfer_from call below take care of this 

525 # because we need dimensions for outputs as well as inputs. And if 

526 # we have to do the outputs explicitly, it's more efficient to do 

527 # the inputs at the same time since a lot of those dimensions will 

528 # be the same. 

529 self._transfer_qg_dimension_records(out_butler) 

530 

531 # Extract overall-input DatasetRefs to transfer and possibly insert 

532 # into a TAGGED collection. 

533 refs: set[DatasetRef] = set() 

534 to_tag_by_type: dict[str, dict[DataCoordinate, DatasetRef | None]] = {} 

535 pipeline_graph = self.predicted.pipeline_graph 

536 for name, dataset_type_node in pipeline_graph.iter_overall_inputs(): 

537 assert dataset_type_node is not None, "PipelineGraph should be resolved." 

538 to_tag_for_type = to_tag_by_type.setdefault(name, {}) 

539 for task_node in pipeline_graph.consumers_of(name): 

540 for quantum in self.predicted.build_execution_quanta(task_label=task_node.label).values(): 

541 for ref in quantum.inputs[name]: 

542 ref = dataset_type_node.generalize_ref(ref) 

543 refs.add(ref) 

544 if to_tag_for_type.setdefault(ref.dataId, ref) != ref: 

545 # There is already a dataset with the same data ID 

546 # and dataset type, but a different UUID/run. This 

547 # can only happen for calibrations found in 

548 # calibration collections, and for now we have no 

549 # choice but to leave them out of the TAGGED inputs 

550 # collection in the local butler. 

551 to_tag_for_type[ref.dataId] = None 

552 

553 out_butler.transfer_from( 

554 self.butler, 

555 refs, 

556 register_dataset_types=register_dataset_types, 

557 transfer_dimensions=False, 

558 ) 

559 

560 if inputs is not None: 

561 to_tag_flat: list[DatasetRef] = [] 

562 for ref_map in to_tag_by_type.values(): 

563 for tag_ref in ref_map.values(): 

564 if tag_ref is not None: 

565 to_tag_flat.append(tag_ref) 

566 out_butler.registry.associate(inputs, to_tag_flat) 

567 

568 out_butler.registry.defaults = self.butler.registry.defaults.clone(collections=output, run=output_run) 

569 self.butler = out_butler 

570 return self.butler 

571 

572 def run(self, register_dataset_types: bool = False, save_versions: bool = True) -> list[Quantum]: 

573 """Run all the quanta in the quantum graph in topological order. 

574 

575 Use this method to run all quanta in the graph. Use 

576 `as_generator` to get a generator to run the quanta one at 

577 a time. 

578 

579 Parameters 

580 ---------- 

581 register_dataset_types : `bool`, optional 

582 If `True`, register all output dataset types before executing any 

583 quanta. 

584 save_versions : `bool`, optional 

585 If `True` (default), save a package versions dataset. 

586 

587 Returns 

588 ------- 

589 quanta : `list` [ `~lsst.daf.butler.Quantum` ] 

590 Executed quanta. 

591 

592 Notes 

593 ----- 

594 A topological ordering is not in general unique, but no other 

595 guarantees are made about the order in which quanta are processed. 

596 """ 

597 return list( 

598 self.as_generator(register_dataset_types=register_dataset_types, save_versions=save_versions) 

599 ) 

600 

601 def as_generator( 

602 self, register_dataset_types: bool = False, save_versions: bool = True 

603 ) -> Iterator[Quantum]: 

604 """Yield quanta in the quantum graph in topological order. 

605 

606 These quanta will be run as the returned generator is iterated 

607 over. Use this method to run the quanta one at a time. 

608 Use `run` to run all quanta in the graph. 

609 

610 Parameters 

611 ---------- 

612 register_dataset_types : `bool`, optional 

613 If `True`, register all output dataset types before executing any 

614 quanta. 

615 save_versions : `bool`, optional 

616 If `True` (default), save a package versions dataset. 

617 

618 Returns 

619 ------- 

620 quanta : `~collections.abc.Iterator` [ `~lsst.daf.butler.Quantum` ] 

621 Executed quanta. 

622 

623 Notes 

624 ----- 

625 Global initialization steps (see `.QuantumGraph.init_output_run`) are 

626 performed immediately when this method is called, but individual quanta 

627 are not actually executed until the returned iterator is iterated over. 

628 

629 A topological ordering is not in general unique, but no other 

630 guarantees are made about the order in which quanta are processed. 

631 """ 

632 if register_dataset_types: 

633 self.predicted.pipeline_graph.register_dataset_types(self.butler) 

634 self.predicted.write_configs(self.butler, compare_existing=False) 

635 self.predicted.write_init_outputs(self.butler, skip_existing=False) 

636 if save_versions: 

637 self.predicted.write_packages(self.butler, compare_existing=False) 

638 task_factory = TaskFactory() 

639 single_quantum_executor = SingleQuantumExecutor( 

640 butler=self.butler, 

641 task_factory=task_factory, 

642 resources=self.resources, 

643 raise_on_partial_outputs=self.raise_on_partial_outputs, 

644 ) 

645 self.predicted.build_execution_quanta() 

646 nodes_map = self.predicted.quantum_only_xgraph.nodes 

647 # Important that this returns a generator expression rather than being 

648 # a generator itself; that is what makes the init stuff above happen 

649 # immediately instead of when the first quanta is executed, which might 

650 # be useful for callers who want to check the state of the repo in 

651 # between. 

652 return ( 

653 single_quantum_executor.execute( 

654 nodes_map[quantum_id]["pipeline_node"], 

655 nodes_map[quantum_id]["quantum"], 

656 quantum_id, 

657 )[0] 

658 for quantum_id in self.predicted 

659 ) 

660 

661 def _transfer_qg_dimension_records(self, out_butler: Butler) -> None: 

662 """Transfer all dimension records from the quantum graph to a butler. 

663 

664 Parameters 

665 ---------- 

666 out_butler : `lsst.daf.butler.Butler` 

667 Butler to transfer records to. 

668 """ 

669 assert self.predicted.dimension_data is not None, "Dimension data must be present for execution." 

670 records = self.predicted.dimension_data.records 

671 dimensions = out_butler.dimensions.sorted(records.keys()) 

672 for dimension in dimensions: 

673 record_set = records[dimension.name] 

674 if record_set and record_set.element.has_own_table: 

675 out_butler.registry.insertDimensionData( 

676 record_set.element, 

677 *record_set, 

678 skip_existing=True, 

679 )