Coverage for python / lsst / pipe / base / single_quantum_executor.py: 11%
231 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 08:59 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 08:59 +0000
1# This file is part of pipe_base.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28__all__ = ["SingleQuantumExecutor"]
30import logging
31import time
32import uuid
33from collections import defaultdict
34from collections.abc import Callable, Mapping
35from itertools import chain
36from typing import Any
38from lsst.daf.butler import (
39 Butler,
40 DatasetRef,
41 DatasetType,
42 LimitedButler,
43 NamedKeyDict,
44 Quantum,
45)
46from lsst.daf.butler.logging import ButlerLogRecords
47from lsst.utils.introspection import get_full_type_name
48from lsst.utils.timer import logInfo
50from ._quantumContext import ExecutionResources, QuantumContext
51from ._status import (
52 AnnotatedPartialOutputsError,
53 ExceptionInfo,
54 InvalidQuantumError,
55 NoWorkFound,
56 QuantumSuccessCaveats,
57)
58from .connections import AdjustQuantumHelper
59from .log_capture import LogCapture, _ExecutionLogRecordsExtra
60from .pipeline_graph import TaskNode
61from .pipelineTask import PipelineTask
62from .quantum_graph_executor import QuantumExecutionResult, QuantumExecutor
63from .quantum_reports import QuantumReport
64from .task import _TASK_FULL_METADATA_TYPE, _TASK_METADATA_TYPE
65from .taskFactory import TaskFactory
67_LOG = logging.getLogger(__name__)
70class SingleQuantumExecutor(QuantumExecutor):
71 """Executor class which runs one Quantum at a time.
73 Parameters
74 ----------
75 butler : `~lsst.daf.butler.LimitedButler` or `None`, optional
76 Data butler; `None` means that ``limited_butler_factory`` should be
77 used instead.
78 task_factory : `.TaskFactory`, optional
79 Instance of a task factory. Defaults to a new instance of
80 `lsst.pipe.base.TaskFactory`.
81 skip_existing_in : `str` or `~collections.abc.Iterable` [ `str` ]
82 A collection name or list of collections to search for the existing
83 outputs of quanta, which indicates that those quanta should be skipped.
84 This class only checks for the presence of butler output run in the
85 list of collections. If the output run is present in the list then the
86 quanta whose complete outputs exist in the output run will be skipped.
87 `None` or empty string/sequence disables skipping.
88 clobber_outputs : `bool`, optional
89 If `True`, then outputs from a quantum that exist in output run
90 collection will be removed prior to executing a quantum. If
91 ``skip_existing_in`` contains output run, then only partial outputs
92 from a quantum will be removed. Only used when ``butler`` is not
93 `None`.
94 enable_lsst_debug : `bool`, optional
95 Enable debugging with ``lsstDebug`` facility for a task.
96 limited_butler_factory : `~collections.abc.Callable`, optional
97 A method that creates a `~lsst.daf.butler.LimitedButler` instance for a
98 given Quantum. This parameter must be provided if ``butler`` is
99 `None`. If ``butler`` is not `None` then this parameter is ignored.
100 resources : `.ExecutionResources`, optional
101 The resources available to this quantum when executing.
102 skip_existing : `bool`, optional
103 If `True`, skip quanta whose metadata datasets are already stored.
104 Unlike ``skip_existing_in``, this works with limited butlers as well as
105 full butlers. Always set to `True` if ``skip_existing_in`` matches
106 ``butler.run``.
107 assume_no_existing_outputs : `bool`, optional
108 If `True`, assume preexisting outputs are impossible (e.g. because this
109 is known by higher-level code to be a new ``RUN`` collection), and do
110 not look for them. This causes the ``skip_existing`` and
111 ``clobber_outputs`` options to be ignored, but unlike just setting both
112 of those to `False`, it also avoids all dataset existence checks.
113 raise_on_partial_outputs : `bool`, optional
114 If `True` raise exceptions chained by `.AnnotatedPartialOutputsError`
115 immediately, instead of considering the partial result a success and
116 continuing to run downstream tasks.
117 job_metadata : `~collections.abc.Mapping`
118 Mapping with extra metadata to embed within the quantum metadata under
119 the "job" key. This is intended to correspond to information common to
120 all quanta being executed in a single process, such as the time taken
121 to load the quantum graph in a BPS job.
122 """
124 def __init__(
125 self,
126 *,
127 butler: LimitedButler | None = None,
128 task_factory: TaskFactory | None = None,
129 skip_existing_in: Any = None,
130 clobber_outputs: bool = False,
131 enable_lsst_debug: bool = False,
132 limited_butler_factory: Callable[[Quantum], LimitedButler] | None = None,
133 resources: ExecutionResources | None = None,
134 skip_existing: bool = False,
135 assume_no_existing_outputs: bool = False,
136 raise_on_partial_outputs: bool = True,
137 job_metadata: Mapping[str, int | str | float] | None = None,
138 ):
139 self._butler: Butler | None = None
140 self._limited_butler: LimitedButler | None = None
141 match butler:
142 case Butler():
143 self._butler = butler
144 self._limited_butler = butler
145 case LimitedButler():
146 self._limited_butler = butler
147 case None:
148 if limited_butler_factory is None:
149 raise ValueError("limited_butler_factory is needed when butler is None")
150 self._task_factory = task_factory if task_factory is not None else TaskFactory()
151 self._clobber_outputs = clobber_outputs
152 self._enable_lsst_debug = enable_lsst_debug
153 self._limited_butler_factory = limited_butler_factory
154 self._resources = resources
155 self._assume_no_existing_outputs = assume_no_existing_outputs
156 self._raise_on_partial_outputs = raise_on_partial_outputs
157 self._job_metadata = job_metadata
158 # Find whether output run is in skip_existing_in.
159 self._skip_existing = skip_existing
160 if self._butler is not None and skip_existing_in and not self._skip_existing:
161 self._skip_existing = self._butler.run in self._butler.collections.query(
162 skip_existing_in, flatten_chains=True
163 )
164 self._previous_process_quanta: list[uuid.UUID] = []
166 def execute(
167 self,
168 task_node: TaskNode,
169 /,
170 quantum: Quantum,
171 quantum_id: uuid.UUID | None = None,
172 *,
173 log_records: ButlerLogRecords | None = None,
174 ) -> QuantumExecutionResult:
175 # Docstring inherited from QuantumExecutor.execute
176 if self._butler is not None:
177 self._butler.registry.refresh()
178 return self._execute(task_node, quantum, quantum_id=quantum_id, log_records=log_records)
180 def _execute(
181 self,
182 task_node: TaskNode,
183 /,
184 quantum: Quantum,
185 quantum_id: uuid.UUID | None = None,
186 *,
187 log_records: ButlerLogRecords | None = None,
188 ) -> QuantumExecutionResult:
189 """Execute the quantum.
191 Internal implementation of `execute()`.
192 """
193 # Make a limited butler instance if needed.
194 limited_butler: LimitedButler
195 used_butler_factory = False
196 if self._butler is not None:
197 limited_butler = self._butler
198 else:
199 # We check this in constructor, but mypy needs this check here.
200 if self._limited_butler is not None:
201 limited_butler = self._limited_butler
202 else:
203 assert self._limited_butler_factory is not None
204 limited_butler = self._limited_butler_factory(quantum)
205 used_butler_factory = True
207 try:
208 return self._execute_with_limited_butler(
209 task_node, limited_butler, quantum=quantum, quantum_id=quantum_id, log_records=log_records
210 )
211 finally:
212 if used_butler_factory:
213 limited_butler.close()
215 def _execute_with_limited_butler(
216 self,
217 task_node: TaskNode,
218 limited_butler: LimitedButler,
219 /,
220 quantum: Quantum,
221 quantum_id: uuid.UUID | None = None,
222 *,
223 log_records: ButlerLogRecords | None = None,
224 ) -> QuantumExecutionResult:
225 startTime = time.time()
226 assert quantum.dataId is not None, "Quantum DataId cannot be None"
227 report = QuantumReport(quantumId=quantum_id, dataId=quantum.dataId, taskLabel=task_node.label)
228 if self._butler is not None:
229 log_capture = LogCapture.from_full(self._butler)
230 else:
231 log_capture = LogCapture.from_limited(limited_butler)
232 with log_capture.capture_logging(task_node, quantum, records=log_records) as captureLog:
233 # Save detailed resource usage before task start to metadata.
234 quantumMetadata = _TASK_METADATA_TYPE()
235 logInfo(None, "prep", metadata=quantumMetadata) # type: ignore[arg-type]
237 _LOG.info(
238 "Preparing execution of quantum for label=%s dataId=%s.", task_node.label, quantum.dataId
239 )
241 # check whether to skip or delete old outputs, if it returns True
242 # or raises an exception do not try to store logs, as they may be
243 # already in butler.
244 captureLog.store = False
245 if self._check_existing_outputs(quantum, task_node, limited_butler, captureLog.extra):
246 _LOG.info(
247 "Skipping already-successful quantum for label=%s dataId=%s.",
248 task_node.label,
249 quantum.dataId,
250 )
251 return QuantumExecutionResult(quantum, report, skipped_existing=True, adjusted_no_work=False)
252 captureLog.store = True
254 captureLog.extra.previous_process_quanta.extend(self._previous_process_quanta)
255 if quantum_id is not None:
256 self._previous_process_quanta.append(quantum_id)
257 try:
258 quantum = self._updated_quantum_inputs(quantum, task_node, limited_butler)
259 except NoWorkFound as exc:
260 _LOG.info(
261 "Nothing to do for task '%s' on quantum %s; saving metadata and skipping: %s",
262 task_node.label,
263 quantum.dataId,
264 str(exc),
265 )
266 quantumMetadata["caveats"] = QuantumSuccessCaveats.from_adjust_quantum_no_work().value
267 quantumMetadata["outputs"] = []
268 # Make empty metadata that looks something like what a
269 # do-nothing task would write (but we don't bother with empty
270 # nested PropertySets for subtasks). This is slightly
271 # duplicative with logic in pipe_base that we can't easily call
272 # from here; we'll fix this on DM-29761.
273 logInfo(None, "end", metadata=quantumMetadata) # type: ignore[arg-type]
274 fullMetadata = _TASK_FULL_METADATA_TYPE()
275 fullMetadata[task_node.label] = _TASK_METADATA_TYPE()
276 fullMetadata["quantum"] = quantumMetadata
277 if self._job_metadata is not None:
278 fullMetadata["job"] = self._job_metadata
279 self._write_metadata(quantum, fullMetadata, task_node, limited_butler)
280 return QuantumExecutionResult(
281 quantum,
282 report,
283 skipped_existing=False,
284 adjusted_no_work=True,
285 task_metadata=fullMetadata,
286 )
288 # enable lsstDebug debugging
289 if self._enable_lsst_debug:
290 try:
291 _LOG.debug("Will try to import debug.py")
292 import debug # type: ignore # noqa:F401
293 except ImportError:
294 _LOG.warning("No 'debug' module found.")
296 # Ensure that we are executing a frozen config
297 task_node.config.freeze()
298 logInfo(None, "init", metadata=quantumMetadata) # type: ignore[arg-type]
299 init_input_refs = list(quantum.initInputs.values())
301 _LOG.info(
302 "Constructing task and executing quantum for label=%s dataId=%s.",
303 task_node.label,
304 quantum.dataId,
305 )
306 try:
307 task = self._task_factory.makeTask(task_node, limited_butler, init_input_refs)
308 logInfo(None, "start", metadata=quantumMetadata) # type: ignore[arg-type]
309 outputs_put: list[uuid.UUID] = []
310 with limited_butler.record_metrics() as butler_metrics:
311 caveats = self._run_quantum(
312 task, quantum, task_node, limited_butler, quantum_id=quantum_id, ids_put=outputs_put
313 )
314 except Exception as e:
315 _LOG.error(
316 "Execution of task '%s' on quantum %s failed. Exception %s: %s",
317 task_node.label,
318 quantum.dataId,
319 e.__class__.__name__,
320 str(e),
321 )
322 captureLog.extra.exception = ExceptionInfo(
323 type_name=get_full_type_name(e),
324 message=str(e),
325 metadata={},
326 )
327 raise
328 else:
329 quantumMetadata["butler_metrics"] = butler_metrics.model_dump()
330 quantumMetadata["caveats"] = caveats.value
331 # Stringify the UUID for easier compatibility with
332 # PropertyList.
333 finally:
334 logInfo(None, "end", metadata=quantumMetadata) # type: ignore[arg-type]
335 fullMetadata = task.getFullMetadata()
336 quantumMetadata["outputs"] = [str(output) for output in outputs_put]
337 fullMetadata["quantum"] = quantumMetadata
338 if self._job_metadata is not None:
339 fullMetadata["job"] = self._job_metadata
340 captureLog.extra.metadata = fullMetadata
341 self._write_metadata(quantum, fullMetadata, task_node, limited_butler)
342 stopTime = time.time()
343 _LOG.info(
344 "Execution of task '%s' on quantum %s took %.3f seconds",
345 task_node.label,
346 quantum.dataId,
347 stopTime - startTime,
348 )
349 return QuantumExecutionResult(
350 quantum,
351 report,
352 skipped_existing=False,
353 adjusted_no_work=False,
354 task_metadata=fullMetadata,
355 )
357 def _check_existing_outputs(
358 self,
359 quantum: Quantum,
360 task_node: TaskNode,
361 /,
362 limited_butler: LimitedButler,
363 log_extra: _ExecutionLogRecordsExtra,
364 ) -> bool:
365 """Decide whether this quantum needs to be executed.
367 If only partial outputs exist then they are removed if
368 ``clobberOutputs`` is True, otherwise an exception is raised.
370 The ``LimitedButler`` is used for everything, and should be set to
371 ``self.butler`` if no separate ``LimitedButler`` is available.
373 Parameters
374 ----------
375 quantum : `~lsst.daf.butler.Quantum`
376 Quantum to check for existing outputs.
377 task_node : `~.pipeline_graph.TaskNode`
378 Task definition structure.
379 limited_butler : `~lsst.daf.butler.LimitedButler`
380 Butler to use for querying and clobbering.
381 log_extra : `.log_capture.TaskLogRecordsExtra`
382 Extra information to attach to log records.
384 Returns
385 -------
386 exist : `bool`
387 `True` if ``self.skipExisting`` is defined, and a previous
388 execution of this quanta appears to have completed successfully
389 (either because metadata was written or all datasets were written).
390 `False` otherwise.
392 Raises
393 ------
394 RuntimeError
395 Raised if some outputs exist and some not.
396 """
397 if self._assume_no_existing_outputs:
398 return False
400 if self._skip_existing:
401 _LOG.debug(
402 "Checking existence of metadata from previous execution of label=%s dataId=%s.",
403 task_node.label,
404 quantum.dataId,
405 )
406 # Metadata output exists; this is sufficient to assume the previous
407 # run was successful and should be skipped.
408 [metadata_ref] = quantum.outputs[task_node.metadata_output.dataset_type_name]
409 if metadata_ref is not None:
410 if limited_butler.stored(metadata_ref):
411 return True
413 # Find and prune (partial) outputs if `self.clobberOutputs` is set.
414 _LOG.debug(
415 "Looking for existing outputs in the way for label=%s dataId=%s.", task_node.label, quantum.dataId
416 )
417 ref_dict = limited_butler.stored_many(chain.from_iterable(quantum.outputs.values()))
418 if task_node.log_output is not None:
419 (log_ref,) = quantum.outputs[task_node.log_output.dataset_type_name]
420 if ref_dict[log_ref]:
421 _LOG.debug(
422 "Attaching logs from previous attempt on label=%s dataId=%s.",
423 task_node.label,
424 quantum.dataId,
425 )
426 log_extra.attach_previous_attempt(limited_butler.get(log_ref))
427 existingRefs = [ref for ref, exists in ref_dict.items() if exists]
428 missingRefs = [ref for ref, exists in ref_dict.items() if not exists]
429 if existingRefs:
430 if not missingRefs:
431 # Full outputs exist.
432 if self._skip_existing:
433 return True
434 elif self._clobber_outputs:
435 _LOG.info("Removing complete outputs for quantum %s: %s", quantum, existingRefs)
436 limited_butler.pruneDatasets(existingRefs, disassociate=True, unstore=True, purge=True)
437 else:
438 raise RuntimeError(
439 f"Complete outputs exists for a quantum {quantum} "
440 "and neither clobberOutputs nor skipExisting is set: "
441 f"existingRefs={existingRefs}"
442 )
443 else:
444 # Partial outputs from a failed quantum.
445 _LOG.debug(
446 "Partial outputs exist for quantum %s existingRefs=%s missingRefs=%s",
447 quantum,
448 existingRefs,
449 missingRefs,
450 )
451 if self._clobber_outputs:
452 # only prune
453 _LOG.info("Removing partial outputs for task %s: %s", task_node.label, existingRefs)
454 limited_butler.pruneDatasets(existingRefs, disassociate=True, unstore=True, purge=True)
455 return False
456 else:
457 raise RuntimeError(
458 "Registry inconsistency while checking for existing quantum outputs:"
459 f" quantum={quantum} existingRefs={existingRefs}"
460 f" missingRefs={missingRefs}"
461 )
463 # By default always execute.
464 return False
466 def _updated_quantum_inputs(
467 self, quantum: Quantum, task_node: TaskNode, /, limited_butler: LimitedButler
468 ) -> Quantum:
469 """Update quantum with extra information, returns a new updated
470 Quantum.
472 Some methods may require input DatasetRefs to have non-None
473 ``dataset_id``, but in case of intermediate dataset it may not be
474 filled during QuantumGraph construction. This method will retrieve
475 missing info from registry.
477 Parameters
478 ----------
479 quantum : `~lsst.daf.butler.Quantum`
480 Single Quantum instance.
481 task_node : `~.pipeline_graph.TaskNode`
482 Task definition structure.
483 limited_butler : `~lsst.daf.butler.LimitedButler`
484 Butler to use for querying.
486 Returns
487 -------
488 update : `~lsst.daf.butler.Quantum`
489 Updated Quantum instance.
490 """
491 anyChanges = False
492 updatedInputs: defaultdict[DatasetType, list] = defaultdict(list)
493 for key, refsForDatasetType in quantum.inputs.items():
494 _LOG.debug(
495 "Checking existence of input '%s' for label=%s dataId=%s.",
496 key.name,
497 task_node.label,
498 quantum.dataId,
499 )
500 toCheck = []
501 newRefsForDatasetType = updatedInputs[key]
502 for ref in refsForDatasetType:
503 if self._should_assume_exists(quantum, ref):
504 newRefsForDatasetType.append(ref)
505 else:
506 toCheck.append(ref)
507 if not toCheck:
508 _LOG.debug(
509 "Assuming overall input '%s' is present without checks for label=%s dataId=%s.",
510 key.name,
511 task_node.label,
512 quantum.dataId,
513 )
514 continue
515 stored = limited_butler.stored_many(toCheck)
516 for ref in toCheck:
517 if stored[ref]:
518 newRefsForDatasetType.append(ref)
519 else:
520 # This should only happen if a predicted intermediate was
521 # not actually produced upstream, but
522 # datastore misconfigurations can unfortunately also land
523 # us here.
524 _LOG.info("No dataset artifact found for %s", ref)
525 continue
526 if len(newRefsForDatasetType) != len(refsForDatasetType):
527 anyChanges = True
528 # If we removed any input datasets, let the task check if it has enough
529 # to proceed and/or prune related datasets that it also doesn't
530 # need/produce anymore. It will raise NoWorkFound if it can't run,
531 # which we'll let propagate up. This is exactly what we run during QG
532 # generation, because a task shouldn't care whether an input is missing
533 # because some previous task didn't produce it, or because it just
534 # wasn't there during QG generation.
535 namedUpdatedInputs = NamedKeyDict[DatasetType, list[DatasetRef]](updatedInputs.items())
536 helper = AdjustQuantumHelper(namedUpdatedInputs, quantum.outputs)
537 if anyChanges:
538 _LOG.debug("Running adjustQuantum for label=%s dataId=%s.", task_node.label, quantum.dataId)
539 assert quantum.dataId is not None, "Quantum DataId cannot be None"
540 helper.adjust_in_place(task_node.get_connections(), label=task_node.label, data_id=quantum.dataId)
541 return Quantum(
542 taskName=quantum.taskName,
543 taskClass=quantum.taskClass,
544 dataId=quantum.dataId,
545 initInputs=quantum.initInputs,
546 inputs=helper.inputs,
547 outputs=helper.outputs,
548 )
550 def _run_quantum(
551 self,
552 task: PipelineTask,
553 quantum: Quantum,
554 task_node: TaskNode,
555 /,
556 limited_butler: LimitedButler,
557 quantum_id: uuid.UUID | None,
558 ids_put: list[uuid.UUID],
559 ) -> QuantumSuccessCaveats:
560 """Execute task on a single quantum.
562 Parameters
563 ----------
564 task : `PipelineTask`
565 Task object.
566 quantum : `~lsst.daf.butler.Quantum`
567 Single Quantum instance.
568 task_node : `~.pipeline_graph.TaskNode`
569 Task definition structure.
570 limited_butler : `~lsst.daf.butler.LimitedButler`
571 Butler to use for dataset I/O.
572 quantum_id : `uuid.UUID` or `None`
573 ID of the quantum being executed.
574 ids_put : list[ `uuid.UUID` ]
575 List to be populated with the dataset IDs that were written by this
576 quantum. This is an output parameter in order to allow it to be
577 populated even when an exception is raised.
579 Returns
580 -------
581 flags : `QuantumSuccessCaveats`
582 Flags that describe qualified successes.
583 """
584 flags = QuantumSuccessCaveats.NO_CAVEATS
586 # Create a butler that operates in the context of a quantum
587 butlerQC = QuantumContext(limited_butler, quantum, resources=self._resources, quantum_id=quantum_id)
589 # Get the input and output references for the task
590 inputRefs, outputRefs = task_node.get_connections().buildDatasetRefs(quantum)
592 # Call task runQuantum() method.
593 try:
594 task.runQuantum(butlerQC, inputRefs, outputRefs)
595 except NoWorkFound as err:
596 # Not an error, just an early exit.
597 _LOG.info(
598 "Task '%s' on quantum %s exited early with no work found: %s.",
599 task_node.label,
600 quantum.dataId,
601 str(err),
602 )
603 flags |= err.FLAGS
604 except AnnotatedPartialOutputsError as caught:
605 error: BaseException
606 if caught.__cause__ is None:
607 _LOG.error(
608 "Incorrect use of AnnotatedPartialOutputsError: no chained exception found.",
609 task_node.label,
610 quantum.dataId,
611 )
612 error = caught
613 else:
614 error = caught.__cause__
615 if self._raise_on_partial_outputs:
616 # Note: this is a real edge case that required some
617 # experimentation: without 'from None' below, this raise would
618 # produce a "while one exception was being handled, another was
619 # raised" traceback involving AnnotatedPartialOutputsError.
620 # With the 'from None', we get just the error chained to it, as
621 # desired.
622 raise error from None
623 else:
624 _LOG.error(
625 "Task '%s' on quantum %s exited with partial outputs; "
626 "considering this a qualified success and proceeding.",
627 task_node.label,
628 quantum.dataId,
629 )
630 _LOG.error(error, exc_info=error)
631 flags |= caught.FLAGS
632 finally:
633 ids_put.extend(output[2] for output in butlerQC.outputsPut)
634 if not butlerQC.outputsPut:
635 flags |= QuantumSuccessCaveats.ALL_OUTPUTS_MISSING
636 if not butlerQC.outputsPut == butlerQC.allOutputs:
637 flags |= QuantumSuccessCaveats.ANY_OUTPUTS_MISSING
638 return flags
640 def _write_metadata(
641 self, quantum: Quantum, metadata: Any, task_node: TaskNode, /, limited_butler: LimitedButler
642 ) -> None:
643 # DatasetRef has to be in the Quantum outputs, can lookup by name
644 try:
645 [ref] = quantum.outputs[task_node.metadata_output.dataset_type_name]
646 except LookupError as exc:
647 raise InvalidQuantumError(
648 "Quantum outputs is missing metadata dataset type "
649 f"{task_node.metadata_output.dataset_type_name};"
650 " this could happen due to inconsistent options between QuantumGraph generation"
651 " and execution"
652 ) from exc
653 limited_butler.put(metadata, ref)
655 def _should_assume_exists(self, quantum: Quantum, ref: DatasetRef) -> bool | None:
656 """Report whether the given dataset can be assumed to exist because
657 some previous check reported that it did.
659 If this is `True` for a dataset does not in fact exist anymore, that's
660 an unexpected problem that we want to raise as an exception, and
661 definitely not a case where some predicted output just wasn't produced.
662 We can't always tell the difference, but in this case we can.
664 Parameters
665 ----------
666 quantum : `Quantum`
667 Quantum being processed.
668 ref : `lsst.daf.butler.DatasetRef`
669 Reference to the input dataset.
671 Returns
672 -------
673 exists : `bool` or `None`
674 `True` if this dataset is definitely an overall input, `False` if
675 some other quantum in the graph is expected to produce it, and
676 `None` if the answer could not be determined.
677 """
678 if quantum.datastore_records:
679 for datastore_record_data in quantum.datastore_records.values():
680 if ref.id in datastore_record_data.records:
681 return True
682 return False
683 return None