Coverage for python / lsst / pipe / base / single_quantum_executor.py: 11%

231 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-17 08:59 +0000

1# This file is part of pipe_base. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28__all__ = ["SingleQuantumExecutor"] 

29 

30import logging 

31import time 

32import uuid 

33from collections import defaultdict 

34from collections.abc import Callable, Mapping 

35from itertools import chain 

36from typing import Any 

37 

38from lsst.daf.butler import ( 

39 Butler, 

40 DatasetRef, 

41 DatasetType, 

42 LimitedButler, 

43 NamedKeyDict, 

44 Quantum, 

45) 

46from lsst.daf.butler.logging import ButlerLogRecords 

47from lsst.utils.introspection import get_full_type_name 

48from lsst.utils.timer import logInfo 

49 

50from ._quantumContext import ExecutionResources, QuantumContext 

51from ._status import ( 

52 AnnotatedPartialOutputsError, 

53 ExceptionInfo, 

54 InvalidQuantumError, 

55 NoWorkFound, 

56 QuantumSuccessCaveats, 

57) 

58from .connections import AdjustQuantumHelper 

59from .log_capture import LogCapture, _ExecutionLogRecordsExtra 

60from .pipeline_graph import TaskNode 

61from .pipelineTask import PipelineTask 

62from .quantum_graph_executor import QuantumExecutionResult, QuantumExecutor 

63from .quantum_reports import QuantumReport 

64from .task import _TASK_FULL_METADATA_TYPE, _TASK_METADATA_TYPE 

65from .taskFactory import TaskFactory 

66 

67_LOG = logging.getLogger(__name__) 

68 

69 

70class SingleQuantumExecutor(QuantumExecutor): 

71 """Executor class which runs one Quantum at a time. 

72 

73 Parameters 

74 ---------- 

75 butler : `~lsst.daf.butler.LimitedButler` or `None`, optional 

76 Data butler; `None` means that ``limited_butler_factory`` should be 

77 used instead. 

78 task_factory : `.TaskFactory`, optional 

79 Instance of a task factory. Defaults to a new instance of 

80 `lsst.pipe.base.TaskFactory`. 

81 skip_existing_in : `str` or `~collections.abc.Iterable` [ `str` ] 

82 A collection name or list of collections to search for the existing 

83 outputs of quanta, which indicates that those quanta should be skipped. 

84 This class only checks for the presence of butler output run in the 

85 list of collections. If the output run is present in the list then the 

86 quanta whose complete outputs exist in the output run will be skipped. 

87 `None` or empty string/sequence disables skipping. 

88 clobber_outputs : `bool`, optional 

89 If `True`, then outputs from a quantum that exist in output run 

90 collection will be removed prior to executing a quantum. If 

91 ``skip_existing_in`` contains output run, then only partial outputs 

92 from a quantum will be removed. Only used when ``butler`` is not 

93 `None`. 

94 enable_lsst_debug : `bool`, optional 

95 Enable debugging with ``lsstDebug`` facility for a task. 

96 limited_butler_factory : `~collections.abc.Callable`, optional 

97 A method that creates a `~lsst.daf.butler.LimitedButler` instance for a 

98 given Quantum. This parameter must be provided if ``butler`` is 

99 `None`. If ``butler`` is not `None` then this parameter is ignored. 

100 resources : `.ExecutionResources`, optional 

101 The resources available to this quantum when executing. 

102 skip_existing : `bool`, optional 

103 If `True`, skip quanta whose metadata datasets are already stored. 

104 Unlike ``skip_existing_in``, this works with limited butlers as well as 

105 full butlers. Always set to `True` if ``skip_existing_in`` matches 

106 ``butler.run``. 

107 assume_no_existing_outputs : `bool`, optional 

108 If `True`, assume preexisting outputs are impossible (e.g. because this 

109 is known by higher-level code to be a new ``RUN`` collection), and do 

110 not look for them. This causes the ``skip_existing`` and 

111 ``clobber_outputs`` options to be ignored, but unlike just setting both 

112 of those to `False`, it also avoids all dataset existence checks. 

113 raise_on_partial_outputs : `bool`, optional 

114 If `True` raise exceptions chained by `.AnnotatedPartialOutputsError` 

115 immediately, instead of considering the partial result a success and 

116 continuing to run downstream tasks. 

117 job_metadata : `~collections.abc.Mapping` 

118 Mapping with extra metadata to embed within the quantum metadata under 

119 the "job" key. This is intended to correspond to information common to 

120 all quanta being executed in a single process, such as the time taken 

121 to load the quantum graph in a BPS job. 

122 """ 

123 

124 def __init__( 

125 self, 

126 *, 

127 butler: LimitedButler | None = None, 

128 task_factory: TaskFactory | None = None, 

129 skip_existing_in: Any = None, 

130 clobber_outputs: bool = False, 

131 enable_lsst_debug: bool = False, 

132 limited_butler_factory: Callable[[Quantum], LimitedButler] | None = None, 

133 resources: ExecutionResources | None = None, 

134 skip_existing: bool = False, 

135 assume_no_existing_outputs: bool = False, 

136 raise_on_partial_outputs: bool = True, 

137 job_metadata: Mapping[str, int | str | float] | None = None, 

138 ): 

139 self._butler: Butler | None = None 

140 self._limited_butler: LimitedButler | None = None 

141 match butler: 

142 case Butler(): 

143 self._butler = butler 

144 self._limited_butler = butler 

145 case LimitedButler(): 

146 self._limited_butler = butler 

147 case None: 

148 if limited_butler_factory is None: 

149 raise ValueError("limited_butler_factory is needed when butler is None") 

150 self._task_factory = task_factory if task_factory is not None else TaskFactory() 

151 self._clobber_outputs = clobber_outputs 

152 self._enable_lsst_debug = enable_lsst_debug 

153 self._limited_butler_factory = limited_butler_factory 

154 self._resources = resources 

155 self._assume_no_existing_outputs = assume_no_existing_outputs 

156 self._raise_on_partial_outputs = raise_on_partial_outputs 

157 self._job_metadata = job_metadata 

158 # Find whether output run is in skip_existing_in. 

159 self._skip_existing = skip_existing 

160 if self._butler is not None and skip_existing_in and not self._skip_existing: 

161 self._skip_existing = self._butler.run in self._butler.collections.query( 

162 skip_existing_in, flatten_chains=True 

163 ) 

164 self._previous_process_quanta: list[uuid.UUID] = [] 

165 

166 def execute( 

167 self, 

168 task_node: TaskNode, 

169 /, 

170 quantum: Quantum, 

171 quantum_id: uuid.UUID | None = None, 

172 *, 

173 log_records: ButlerLogRecords | None = None, 

174 ) -> QuantumExecutionResult: 

175 # Docstring inherited from QuantumExecutor.execute 

176 if self._butler is not None: 

177 self._butler.registry.refresh() 

178 return self._execute(task_node, quantum, quantum_id=quantum_id, log_records=log_records) 

179 

180 def _execute( 

181 self, 

182 task_node: TaskNode, 

183 /, 

184 quantum: Quantum, 

185 quantum_id: uuid.UUID | None = None, 

186 *, 

187 log_records: ButlerLogRecords | None = None, 

188 ) -> QuantumExecutionResult: 

189 """Execute the quantum. 

190 

191 Internal implementation of `execute()`. 

192 """ 

193 # Make a limited butler instance if needed. 

194 limited_butler: LimitedButler 

195 used_butler_factory = False 

196 if self._butler is not None: 

197 limited_butler = self._butler 

198 else: 

199 # We check this in constructor, but mypy needs this check here. 

200 if self._limited_butler is not None: 

201 limited_butler = self._limited_butler 

202 else: 

203 assert self._limited_butler_factory is not None 

204 limited_butler = self._limited_butler_factory(quantum) 

205 used_butler_factory = True 

206 

207 try: 

208 return self._execute_with_limited_butler( 

209 task_node, limited_butler, quantum=quantum, quantum_id=quantum_id, log_records=log_records 

210 ) 

211 finally: 

212 if used_butler_factory: 

213 limited_butler.close() 

214 

215 def _execute_with_limited_butler( 

216 self, 

217 task_node: TaskNode, 

218 limited_butler: LimitedButler, 

219 /, 

220 quantum: Quantum, 

221 quantum_id: uuid.UUID | None = None, 

222 *, 

223 log_records: ButlerLogRecords | None = None, 

224 ) -> QuantumExecutionResult: 

225 startTime = time.time() 

226 assert quantum.dataId is not None, "Quantum DataId cannot be None" 

227 report = QuantumReport(quantumId=quantum_id, dataId=quantum.dataId, taskLabel=task_node.label) 

228 if self._butler is not None: 

229 log_capture = LogCapture.from_full(self._butler) 

230 else: 

231 log_capture = LogCapture.from_limited(limited_butler) 

232 with log_capture.capture_logging(task_node, quantum, records=log_records) as captureLog: 

233 # Save detailed resource usage before task start to metadata. 

234 quantumMetadata = _TASK_METADATA_TYPE() 

235 logInfo(None, "prep", metadata=quantumMetadata) # type: ignore[arg-type] 

236 

237 _LOG.info( 

238 "Preparing execution of quantum for label=%s dataId=%s.", task_node.label, quantum.dataId 

239 ) 

240 

241 # check whether to skip or delete old outputs, if it returns True 

242 # or raises an exception do not try to store logs, as they may be 

243 # already in butler. 

244 captureLog.store = False 

245 if self._check_existing_outputs(quantum, task_node, limited_butler, captureLog.extra): 

246 _LOG.info( 

247 "Skipping already-successful quantum for label=%s dataId=%s.", 

248 task_node.label, 

249 quantum.dataId, 

250 ) 

251 return QuantumExecutionResult(quantum, report, skipped_existing=True, adjusted_no_work=False) 

252 captureLog.store = True 

253 

254 captureLog.extra.previous_process_quanta.extend(self._previous_process_quanta) 

255 if quantum_id is not None: 

256 self._previous_process_quanta.append(quantum_id) 

257 try: 

258 quantum = self._updated_quantum_inputs(quantum, task_node, limited_butler) 

259 except NoWorkFound as exc: 

260 _LOG.info( 

261 "Nothing to do for task '%s' on quantum %s; saving metadata and skipping: %s", 

262 task_node.label, 

263 quantum.dataId, 

264 str(exc), 

265 ) 

266 quantumMetadata["caveats"] = QuantumSuccessCaveats.from_adjust_quantum_no_work().value 

267 quantumMetadata["outputs"] = [] 

268 # Make empty metadata that looks something like what a 

269 # do-nothing task would write (but we don't bother with empty 

270 # nested PropertySets for subtasks). This is slightly 

271 # duplicative with logic in pipe_base that we can't easily call 

272 # from here; we'll fix this on DM-29761. 

273 logInfo(None, "end", metadata=quantumMetadata) # type: ignore[arg-type] 

274 fullMetadata = _TASK_FULL_METADATA_TYPE() 

275 fullMetadata[task_node.label] = _TASK_METADATA_TYPE() 

276 fullMetadata["quantum"] = quantumMetadata 

277 if self._job_metadata is not None: 

278 fullMetadata["job"] = self._job_metadata 

279 self._write_metadata(quantum, fullMetadata, task_node, limited_butler) 

280 return QuantumExecutionResult( 

281 quantum, 

282 report, 

283 skipped_existing=False, 

284 adjusted_no_work=True, 

285 task_metadata=fullMetadata, 

286 ) 

287 

288 # enable lsstDebug debugging 

289 if self._enable_lsst_debug: 

290 try: 

291 _LOG.debug("Will try to import debug.py") 

292 import debug # type: ignore # noqa:F401 

293 except ImportError: 

294 _LOG.warning("No 'debug' module found.") 

295 

296 # Ensure that we are executing a frozen config 

297 task_node.config.freeze() 

298 logInfo(None, "init", metadata=quantumMetadata) # type: ignore[arg-type] 

299 init_input_refs = list(quantum.initInputs.values()) 

300 

301 _LOG.info( 

302 "Constructing task and executing quantum for label=%s dataId=%s.", 

303 task_node.label, 

304 quantum.dataId, 

305 ) 

306 try: 

307 task = self._task_factory.makeTask(task_node, limited_butler, init_input_refs) 

308 logInfo(None, "start", metadata=quantumMetadata) # type: ignore[arg-type] 

309 outputs_put: list[uuid.UUID] = [] 

310 with limited_butler.record_metrics() as butler_metrics: 

311 caveats = self._run_quantum( 

312 task, quantum, task_node, limited_butler, quantum_id=quantum_id, ids_put=outputs_put 

313 ) 

314 except Exception as e: 

315 _LOG.error( 

316 "Execution of task '%s' on quantum %s failed. Exception %s: %s", 

317 task_node.label, 

318 quantum.dataId, 

319 e.__class__.__name__, 

320 str(e), 

321 ) 

322 captureLog.extra.exception = ExceptionInfo( 

323 type_name=get_full_type_name(e), 

324 message=str(e), 

325 metadata={}, 

326 ) 

327 raise 

328 else: 

329 quantumMetadata["butler_metrics"] = butler_metrics.model_dump() 

330 quantumMetadata["caveats"] = caveats.value 

331 # Stringify the UUID for easier compatibility with 

332 # PropertyList. 

333 finally: 

334 logInfo(None, "end", metadata=quantumMetadata) # type: ignore[arg-type] 

335 fullMetadata = task.getFullMetadata() 

336 quantumMetadata["outputs"] = [str(output) for output in outputs_put] 

337 fullMetadata["quantum"] = quantumMetadata 

338 if self._job_metadata is not None: 

339 fullMetadata["job"] = self._job_metadata 

340 captureLog.extra.metadata = fullMetadata 

341 self._write_metadata(quantum, fullMetadata, task_node, limited_butler) 

342 stopTime = time.time() 

343 _LOG.info( 

344 "Execution of task '%s' on quantum %s took %.3f seconds", 

345 task_node.label, 

346 quantum.dataId, 

347 stopTime - startTime, 

348 ) 

349 return QuantumExecutionResult( 

350 quantum, 

351 report, 

352 skipped_existing=False, 

353 adjusted_no_work=False, 

354 task_metadata=fullMetadata, 

355 ) 

356 

357 def _check_existing_outputs( 

358 self, 

359 quantum: Quantum, 

360 task_node: TaskNode, 

361 /, 

362 limited_butler: LimitedButler, 

363 log_extra: _ExecutionLogRecordsExtra, 

364 ) -> bool: 

365 """Decide whether this quantum needs to be executed. 

366 

367 If only partial outputs exist then they are removed if 

368 ``clobberOutputs`` is True, otherwise an exception is raised. 

369 

370 The ``LimitedButler`` is used for everything, and should be set to 

371 ``self.butler`` if no separate ``LimitedButler`` is available. 

372 

373 Parameters 

374 ---------- 

375 quantum : `~lsst.daf.butler.Quantum` 

376 Quantum to check for existing outputs. 

377 task_node : `~.pipeline_graph.TaskNode` 

378 Task definition structure. 

379 limited_butler : `~lsst.daf.butler.LimitedButler` 

380 Butler to use for querying and clobbering. 

381 log_extra : `.log_capture.TaskLogRecordsExtra` 

382 Extra information to attach to log records. 

383 

384 Returns 

385 ------- 

386 exist : `bool` 

387 `True` if ``self.skipExisting`` is defined, and a previous 

388 execution of this quanta appears to have completed successfully 

389 (either because metadata was written or all datasets were written). 

390 `False` otherwise. 

391 

392 Raises 

393 ------ 

394 RuntimeError 

395 Raised if some outputs exist and some not. 

396 """ 

397 if self._assume_no_existing_outputs: 

398 return False 

399 

400 if self._skip_existing: 

401 _LOG.debug( 

402 "Checking existence of metadata from previous execution of label=%s dataId=%s.", 

403 task_node.label, 

404 quantum.dataId, 

405 ) 

406 # Metadata output exists; this is sufficient to assume the previous 

407 # run was successful and should be skipped. 

408 [metadata_ref] = quantum.outputs[task_node.metadata_output.dataset_type_name] 

409 if metadata_ref is not None: 

410 if limited_butler.stored(metadata_ref): 

411 return True 

412 

413 # Find and prune (partial) outputs if `self.clobberOutputs` is set. 

414 _LOG.debug( 

415 "Looking for existing outputs in the way for label=%s dataId=%s.", task_node.label, quantum.dataId 

416 ) 

417 ref_dict = limited_butler.stored_many(chain.from_iterable(quantum.outputs.values())) 

418 if task_node.log_output is not None: 

419 (log_ref,) = quantum.outputs[task_node.log_output.dataset_type_name] 

420 if ref_dict[log_ref]: 

421 _LOG.debug( 

422 "Attaching logs from previous attempt on label=%s dataId=%s.", 

423 task_node.label, 

424 quantum.dataId, 

425 ) 

426 log_extra.attach_previous_attempt(limited_butler.get(log_ref)) 

427 existingRefs = [ref for ref, exists in ref_dict.items() if exists] 

428 missingRefs = [ref for ref, exists in ref_dict.items() if not exists] 

429 if existingRefs: 

430 if not missingRefs: 

431 # Full outputs exist. 

432 if self._skip_existing: 

433 return True 

434 elif self._clobber_outputs: 

435 _LOG.info("Removing complete outputs for quantum %s: %s", quantum, existingRefs) 

436 limited_butler.pruneDatasets(existingRefs, disassociate=True, unstore=True, purge=True) 

437 else: 

438 raise RuntimeError( 

439 f"Complete outputs exists for a quantum {quantum} " 

440 "and neither clobberOutputs nor skipExisting is set: " 

441 f"existingRefs={existingRefs}" 

442 ) 

443 else: 

444 # Partial outputs from a failed quantum. 

445 _LOG.debug( 

446 "Partial outputs exist for quantum %s existingRefs=%s missingRefs=%s", 

447 quantum, 

448 existingRefs, 

449 missingRefs, 

450 ) 

451 if self._clobber_outputs: 

452 # only prune 

453 _LOG.info("Removing partial outputs for task %s: %s", task_node.label, existingRefs) 

454 limited_butler.pruneDatasets(existingRefs, disassociate=True, unstore=True, purge=True) 

455 return False 

456 else: 

457 raise RuntimeError( 

458 "Registry inconsistency while checking for existing quantum outputs:" 

459 f" quantum={quantum} existingRefs={existingRefs}" 

460 f" missingRefs={missingRefs}" 

461 ) 

462 

463 # By default always execute. 

464 return False 

465 

466 def _updated_quantum_inputs( 

467 self, quantum: Quantum, task_node: TaskNode, /, limited_butler: LimitedButler 

468 ) -> Quantum: 

469 """Update quantum with extra information, returns a new updated 

470 Quantum. 

471 

472 Some methods may require input DatasetRefs to have non-None 

473 ``dataset_id``, but in case of intermediate dataset it may not be 

474 filled during QuantumGraph construction. This method will retrieve 

475 missing info from registry. 

476 

477 Parameters 

478 ---------- 

479 quantum : `~lsst.daf.butler.Quantum` 

480 Single Quantum instance. 

481 task_node : `~.pipeline_graph.TaskNode` 

482 Task definition structure. 

483 limited_butler : `~lsst.daf.butler.LimitedButler` 

484 Butler to use for querying. 

485 

486 Returns 

487 ------- 

488 update : `~lsst.daf.butler.Quantum` 

489 Updated Quantum instance. 

490 """ 

491 anyChanges = False 

492 updatedInputs: defaultdict[DatasetType, list] = defaultdict(list) 

493 for key, refsForDatasetType in quantum.inputs.items(): 

494 _LOG.debug( 

495 "Checking existence of input '%s' for label=%s dataId=%s.", 

496 key.name, 

497 task_node.label, 

498 quantum.dataId, 

499 ) 

500 toCheck = [] 

501 newRefsForDatasetType = updatedInputs[key] 

502 for ref in refsForDatasetType: 

503 if self._should_assume_exists(quantum, ref): 

504 newRefsForDatasetType.append(ref) 

505 else: 

506 toCheck.append(ref) 

507 if not toCheck: 

508 _LOG.debug( 

509 "Assuming overall input '%s' is present without checks for label=%s dataId=%s.", 

510 key.name, 

511 task_node.label, 

512 quantum.dataId, 

513 ) 

514 continue 

515 stored = limited_butler.stored_many(toCheck) 

516 for ref in toCheck: 

517 if stored[ref]: 

518 newRefsForDatasetType.append(ref) 

519 else: 

520 # This should only happen if a predicted intermediate was 

521 # not actually produced upstream, but 

522 # datastore misconfigurations can unfortunately also land 

523 # us here. 

524 _LOG.info("No dataset artifact found for %s", ref) 

525 continue 

526 if len(newRefsForDatasetType) != len(refsForDatasetType): 

527 anyChanges = True 

528 # If we removed any input datasets, let the task check if it has enough 

529 # to proceed and/or prune related datasets that it also doesn't 

530 # need/produce anymore. It will raise NoWorkFound if it can't run, 

531 # which we'll let propagate up. This is exactly what we run during QG 

532 # generation, because a task shouldn't care whether an input is missing 

533 # because some previous task didn't produce it, or because it just 

534 # wasn't there during QG generation. 

535 namedUpdatedInputs = NamedKeyDict[DatasetType, list[DatasetRef]](updatedInputs.items()) 

536 helper = AdjustQuantumHelper(namedUpdatedInputs, quantum.outputs) 

537 if anyChanges: 

538 _LOG.debug("Running adjustQuantum for label=%s dataId=%s.", task_node.label, quantum.dataId) 

539 assert quantum.dataId is not None, "Quantum DataId cannot be None" 

540 helper.adjust_in_place(task_node.get_connections(), label=task_node.label, data_id=quantum.dataId) 

541 return Quantum( 

542 taskName=quantum.taskName, 

543 taskClass=quantum.taskClass, 

544 dataId=quantum.dataId, 

545 initInputs=quantum.initInputs, 

546 inputs=helper.inputs, 

547 outputs=helper.outputs, 

548 ) 

549 

550 def _run_quantum( 

551 self, 

552 task: PipelineTask, 

553 quantum: Quantum, 

554 task_node: TaskNode, 

555 /, 

556 limited_butler: LimitedButler, 

557 quantum_id: uuid.UUID | None, 

558 ids_put: list[uuid.UUID], 

559 ) -> QuantumSuccessCaveats: 

560 """Execute task on a single quantum. 

561 

562 Parameters 

563 ---------- 

564 task : `PipelineTask` 

565 Task object. 

566 quantum : `~lsst.daf.butler.Quantum` 

567 Single Quantum instance. 

568 task_node : `~.pipeline_graph.TaskNode` 

569 Task definition structure. 

570 limited_butler : `~lsst.daf.butler.LimitedButler` 

571 Butler to use for dataset I/O. 

572 quantum_id : `uuid.UUID` or `None` 

573 ID of the quantum being executed. 

574 ids_put : list[ `uuid.UUID` ] 

575 List to be populated with the dataset IDs that were written by this 

576 quantum. This is an output parameter in order to allow it to be 

577 populated even when an exception is raised. 

578 

579 Returns 

580 ------- 

581 flags : `QuantumSuccessCaveats` 

582 Flags that describe qualified successes. 

583 """ 

584 flags = QuantumSuccessCaveats.NO_CAVEATS 

585 

586 # Create a butler that operates in the context of a quantum 

587 butlerQC = QuantumContext(limited_butler, quantum, resources=self._resources, quantum_id=quantum_id) 

588 

589 # Get the input and output references for the task 

590 inputRefs, outputRefs = task_node.get_connections().buildDatasetRefs(quantum) 

591 

592 # Call task runQuantum() method. 

593 try: 

594 task.runQuantum(butlerQC, inputRefs, outputRefs) 

595 except NoWorkFound as err: 

596 # Not an error, just an early exit. 

597 _LOG.info( 

598 "Task '%s' on quantum %s exited early with no work found: %s.", 

599 task_node.label, 

600 quantum.dataId, 

601 str(err), 

602 ) 

603 flags |= err.FLAGS 

604 except AnnotatedPartialOutputsError as caught: 

605 error: BaseException 

606 if caught.__cause__ is None: 

607 _LOG.error( 

608 "Incorrect use of AnnotatedPartialOutputsError: no chained exception found.", 

609 task_node.label, 

610 quantum.dataId, 

611 ) 

612 error = caught 

613 else: 

614 error = caught.__cause__ 

615 if self._raise_on_partial_outputs: 

616 # Note: this is a real edge case that required some 

617 # experimentation: without 'from None' below, this raise would 

618 # produce a "while one exception was being handled, another was 

619 # raised" traceback involving AnnotatedPartialOutputsError. 

620 # With the 'from None', we get just the error chained to it, as 

621 # desired. 

622 raise error from None 

623 else: 

624 _LOG.error( 

625 "Task '%s' on quantum %s exited with partial outputs; " 

626 "considering this a qualified success and proceeding.", 

627 task_node.label, 

628 quantum.dataId, 

629 ) 

630 _LOG.error(error, exc_info=error) 

631 flags |= caught.FLAGS 

632 finally: 

633 ids_put.extend(output[2] for output in butlerQC.outputsPut) 

634 if not butlerQC.outputsPut: 

635 flags |= QuantumSuccessCaveats.ALL_OUTPUTS_MISSING 

636 if not butlerQC.outputsPut == butlerQC.allOutputs: 

637 flags |= QuantumSuccessCaveats.ANY_OUTPUTS_MISSING 

638 return flags 

639 

640 def _write_metadata( 

641 self, quantum: Quantum, metadata: Any, task_node: TaskNode, /, limited_butler: LimitedButler 

642 ) -> None: 

643 # DatasetRef has to be in the Quantum outputs, can lookup by name 

644 try: 

645 [ref] = quantum.outputs[task_node.metadata_output.dataset_type_name] 

646 except LookupError as exc: 

647 raise InvalidQuantumError( 

648 "Quantum outputs is missing metadata dataset type " 

649 f"{task_node.metadata_output.dataset_type_name};" 

650 " this could happen due to inconsistent options between QuantumGraph generation" 

651 " and execution" 

652 ) from exc 

653 limited_butler.put(metadata, ref) 

654 

655 def _should_assume_exists(self, quantum: Quantum, ref: DatasetRef) -> bool | None: 

656 """Report whether the given dataset can be assumed to exist because 

657 some previous check reported that it did. 

658 

659 If this is `True` for a dataset does not in fact exist anymore, that's 

660 an unexpected problem that we want to raise as an exception, and 

661 definitely not a case where some predicted output just wasn't produced. 

662 We can't always tell the difference, but in this case we can. 

663 

664 Parameters 

665 ---------- 

666 quantum : `Quantum` 

667 Quantum being processed. 

668 ref : `lsst.daf.butler.DatasetRef` 

669 Reference to the input dataset. 

670 

671 Returns 

672 ------- 

673 exists : `bool` or `None` 

674 `True` if this dataset is definitely an overall input, `False` if 

675 some other quantum in the graph is expected to produce it, and 

676 `None` if the answer could not be determined. 

677 """ 

678 if quantum.datastore_records: 

679 for datastore_record_data in quantum.datastore_records.values(): 

680 if ref.id in datastore_record_data.records: 

681 return True 

682 return False 

683 return None