Coverage for python / lsst / pipe / base / _status.py: 54%
130 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-30 08:49 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-30 08:49 +0000
1# This file is part of pipe_base.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28from __future__ import annotations
30__all__ = (
31 "AlgorithmError",
32 "AnnotatedPartialOutputsError",
33 "ExceptionInfo",
34 "InvalidQuantumError",
35 "NoWorkFound",
36 "QuantumAttemptStatus",
37 "QuantumSuccessCaveats",
38 "RepeatableQuantumError",
39 "UnprocessableDataError",
40 "UpstreamFailureNoWorkFound",
41)
43import abc
44import enum
45import logging
46import sys
47from typing import TYPE_CHECKING, Any, ClassVar, Protocol
49import pydantic
51from lsst.utils import introspection
52from lsst.utils.logging import LsstLogAdapter, getLogger
54from ._task_metadata import GetSetDictMetadata, NestedMetadataDict
56if TYPE_CHECKING:
57 from ._task_metadata import TaskMetadata
60_LOG = getLogger(__name__)
63class QuantumSuccessCaveats(enum.Flag):
64 """Flags that add caveats to a "successful" quantum.
66 Quanta can be considered successful even if they do not produce some of
67 their expected outputs (and even if they do not produce all of their
68 expected outputs), as long as the condition is sufficiently well understood
69 that downstream processing should succeed.
70 """
72 NO_CAVEATS = 0
73 """All outputs were produced and no exceptions were raised."""
75 ANY_OUTPUTS_MISSING = enum.auto()
76 """At least one predicted output was not produced."""
78 ALL_OUTPUTS_MISSING = enum.auto()
79 """No predicted outputs (except logs and metadata) were produced.
81 `ANY_OUTPUTS_MISSING` is also set whenever this flag is set.
82 """
84 NO_WORK = enum.auto()
85 """A subclass of `NoWorkFound` was raised.
87 This does not necessarily imply that `ANY_OUTPUTS_MISSING` is not set,
88 since a `PipelineTask.runQuantum` implementation could raise it after
89 directly writing all of its predicted outputs.
90 """
92 ADJUST_QUANTUM_RAISED = enum.auto()
93 """`NoWorkFound` was raised by `PipelineTaskConnnections.adjustQuantum`.
95 This indicates that if a new `QuantumGraph` had been generated immediately
96 before running this quantum, that quantum would not have even been
97 included, because required inputs that were expected to exist by the time
98 it was run (in the original `QuantumGraph`) were not actually produced.
100 `NO_WORK` and `ALL_OUTPUTS_MISSING` are also set whenever this flag is set.
101 """
103 UPSTREAM_FAILURE_NO_WORK = enum.auto()
104 """`UpstreamFailureNoWorkFound` was raised by `PipelineTask.runQuantum`.
106 This exception is raised by downstream tasks when an upstream task's
107 outputs were incomplete in a way that blocks it from running, often
108 because the upstream task raised `AnnotatedPartialOutputsError`.
110 `NO_WORK` is also set whenever this flag is set.
111 """
113 UNPROCESSABLE_DATA = enum.auto()
114 """`UnprocessableDataError` was raised by `PipelineTask.runQuantum`.
116 `NO_WORK` is also set whenever this flag is set.
117 """
119 PARTIAL_OUTPUTS_ERROR = enum.auto()
120 """`AnnotatedPartialOutputsError` was raised by `PipelineTask.runQuantum`
121 and the execution system was instructed to consider this a qualified
122 success.
123 """
125 @classmethod
126 def from_adjust_quantum_no_work(cls) -> QuantumSuccessCaveats:
127 """Return the set of flags appropriate for a quantum for which
128 `PipelineTaskConnections.adjustdQuantum` raised `NoWorkFound`.
129 """
130 return cls.NO_WORK | cls.ADJUST_QUANTUM_RAISED | cls.ANY_OUTPUTS_MISSING | cls.ALL_OUTPUTS_MISSING
132 def concise(self) -> str:
133 """Return a concise string representation of the flags.
135 Returns
136 -------
137 s : `str`
138 Two-character string representation, with the first character
139 indicating whether any predicted outputs were missing and the
140 second representing any exceptions raised. This representation is
141 not always complete; some rare combinations of flags are displayed
142 as if only one of the flags was set.
144 Notes
145 -----
146 The `legend` method returns a description of the returned codes.
147 """
148 char1 = ""
149 if self & QuantumSuccessCaveats.ALL_OUTPUTS_MISSING:
150 char1 = "*"
151 elif self & QuantumSuccessCaveats.ANY_OUTPUTS_MISSING:
152 char1 = "+"
153 char2 = ""
154 if self & QuantumSuccessCaveats.ADJUST_QUANTUM_RAISED:
155 char2 = "A"
156 elif self & QuantumSuccessCaveats.UNPROCESSABLE_DATA:
157 char2 = "D"
158 elif self & QuantumSuccessCaveats.UPSTREAM_FAILURE_NO_WORK:
159 char2 = "U"
160 elif self & QuantumSuccessCaveats.PARTIAL_OUTPUTS_ERROR:
161 char2 = "P"
162 elif self & QuantumSuccessCaveats.NO_WORK:
163 char2 = "N"
164 return char1 + char2
166 @staticmethod
167 def legend() -> dict[str, str]:
168 """Return a `dict` with human-readable descriptions of the characters
169 used in `concise`.
171 Returns
172 -------
173 legend : `dict` [ `str`, `str` ]
174 Mapping from character code to description.
175 """
176 return {
177 "+": "at least one predicted output was missing, but not all were",
178 "*": "all predicted outputs were missing (besides logs and metadata)",
179 "A": "adjustQuantum raised NoWorkFound; a regenerated QG would not include this quantum",
180 "D": "algorithm considers data too bad to be processable",
181 "U": "one or more input dataset was incomplete due to an upstream failure",
182 "P": "task failed but wrote partial outputs; considered a partial success",
183 "N": "runQuantum raised NoWorkFound",
184 }
187class ExceptionInfo(pydantic.BaseModel):
188 """Information about an exception that was raised."""
190 type_name: str
191 """Fully-qualified Python type name for the exception raised."""
193 message: str
194 """String message included in the exception."""
196 metadata: dict[str, float | int | str | bool | None]
197 """Additional metadata included in the exception."""
199 @classmethod
200 def _from_metadata(cls, md: TaskMetadata) -> ExceptionInfo:
201 """Construct from task metadata.
203 Parameters
204 ----------
205 md : `TaskMetadata`
206 Metadata about the error, as written by
207 `AnnotatedPartialOutputsError`.
209 Returns
210 -------
211 info : `ExceptionInfo`
212 Information about the exception.
213 """
214 result = cls(type_name=md["type"], message=md["message"], metadata={})
215 if "metadata" in md:
216 raw_err_metadata = md["metadata"].to_dict()
217 for k, v in raw_err_metadata.items():
218 # Guard against error metadata we wouldn't be able to serialize
219 # later via Pydantic; don't want one weird value bringing down
220 # our ability to report on an entire run.
221 if isinstance(v, float | int | str | bool):
222 result.metadata[k] = v
223 else:
224 _LOG.debug(
225 "Not propagating nested or JSON-incompatible exception metadata key %s=%r.", k, v
226 )
227 return result
229 # Work around the fact that Sphinx chokes on Pydantic docstring formatting,
230 # when we inherit those docstrings in our public classes.
231 if "sphinx" in sys.modules and not TYPE_CHECKING:
233 def copy(self, *args: Any, **kwargs: Any) -> Any:
234 """See `pydantic.BaseModel.copy`."""
235 return super().copy(*args, **kwargs)
237 def model_dump(self, *args: Any, **kwargs: Any) -> Any:
238 """See `pydantic.BaseModel.model_dump`."""
239 return super().model_dump(*args, **kwargs)
241 def model_dump_json(self, *args: Any, **kwargs: Any) -> Any:
242 """See `pydantic.BaseModel.model_dump_json`."""
243 return super().model_dump(*args, **kwargs)
245 def model_copy(self, *args: Any, **kwargs: Any) -> Any:
246 """See `pydantic.BaseModel.model_copy`."""
247 return super().model_copy(*args, **kwargs)
249 @classmethod
250 def model_construct(cls, *args: Any, **kwargs: Any) -> Any: # type: ignore[misc, override]
251 """See `pydantic.BaseModel.model_construct`."""
252 return super().model_construct(*args, **kwargs)
254 @classmethod
255 def model_json_schema(cls, *args: Any, **kwargs: Any) -> Any:
256 """See `pydantic.BaseModel.model_json_schema`."""
257 return super().model_json_schema(*args, **kwargs)
259 @classmethod
260 def model_validate(cls, *args: Any, **kwargs: Any) -> Any:
261 """See `pydantic.BaseModel.model_validate`."""
262 return super().model_validate(*args, **kwargs)
264 @classmethod
265 def model_validate_json(cls, *args: Any, **kwargs: Any) -> Any:
266 """See `pydantic.BaseModel.model_validate_json`."""
267 return super().model_validate_json(*args, **kwargs)
269 @classmethod
270 def model_validate_strings(cls, *args: Any, **kwargs: Any) -> Any:
271 """See `pydantic.BaseModel.model_validate_strings`."""
272 return super().model_validate_strings(*args, **kwargs)
275class QuantumAttemptStatus(enum.Enum):
276 """Enum summarizing an attempt to run a quantum."""
278 ABORTED = -4
279 """The quantum failed with a hard error that prevented both logs and
280 metadata from being written.
282 This state is only set if information from higher-level tooling (e.g. BPS)
283 is available to distinguish it from ``UNKNOWN``.
284 """
286 UNKNOWN = -3
287 """The status of this attempt is unknown.
289 This means no logs or metadata were written, and it at least could not be
290 determined whether the quantum was blocked by an upstream failure (if it
291 was definitely blocked, `BLOCKED` is set instead).
292 """
294 ABORTED_SUCCESS = -2
295 """Task metadata was written for this attempt but logs were not.
297 This is a rare condition that requires a hard failure (i.e. the kind that
298 can prevent a ``finally`` block from running or I/O from being durable) at
299 a very precise time.
300 """
302 FAILED = -1
303 """Execution of the quantum failed gracefully.
305 This is always set if the task metadata dataset was not written but logs
306 were, as is the case when a Python exception is caught and handled by the
307 execution system.
309 This status guarantees that the task log dataset was produced but the
310 metadata dataset was not.
311 """
313 BLOCKED = 0
314 """This quantum was not executed because an upstream quantum failed.
316 Upstream quanta with status `UNKNOWN`, `FAILED`, or `ABORTED` are
317 considered blockers; `ABORTED_SUCCESS` is not.
318 """
320 SUCCESSFUL = 1
321 """This quantum was successfully executed.
323 Quanta may be considered successful even if they do not write any outputs
324 or shortcut early by raising `NoWorkFound` or one of its variants. They
325 may even be considered successful if they raise
326 `AnnotatedPartialOutputsError` if the executor is configured to treat that
327 exception as a non-failure. See `QuantumSuccessCaveats` for details on how
328 these "successes with caveats" are reported.
329 """
331 @property
332 def has_metadata(self) -> bool:
333 """Whether the task metadata dataset was produced."""
334 return self is self.SUCCESSFUL or self is self.ABORTED_SUCCESS
336 @property
337 def has_log(self) -> bool:
338 """Whether the log dataset was produced."""
339 return self is self.SUCCESSFUL or self is self.FAILED
341 @property
342 def title(self) -> str:
343 """A version of this status' name suitable for use as a title in a plot
344 or table.
345 """
346 return self.name.capitalize().replace("_", " ")
348 @property
349 def is_rare(self) -> bool:
350 """Whether this status is rare enough that it should only be listed
351 when it actually occurs.
352 """
353 return self in (self.ABORTED, self.ABORTED_SUCCESS, self.UNKNOWN)
356class GetSetDictMetadataHolder(Protocol):
357 """Protocol for objects that have a ``metadata`` attribute that satisfies
358 `GetSetDictMetadata`.
359 """
361 @property
362 def metadata(self) -> GetSetDictMetadata | None:
363 pass
366class NoWorkFound(BaseException):
367 """An exception raised when a Quantum should not exist because there is no
368 work for it to do.
370 This usually occurs because a non-optional input dataset is not present, or
371 a spatiotemporal overlap that was conservatively predicted does not
372 actually exist.
374 This inherits from BaseException because it is used to signal a case that
375 we don't consider a real error, even though we often want to use try/except
376 logic to trap it.
377 """
379 FLAGS: ClassVar = QuantumSuccessCaveats.NO_WORK
382class UpstreamFailureNoWorkFound(NoWorkFound):
383 """A specialization of `NoWorkFound` that indicates that an upstream task
384 had a problem that was ignored (e.g. to prevent a single-detector failure
385 from bringing down an entire visit).
386 """
388 FLAGS: ClassVar = QuantumSuccessCaveats.NO_WORK | QuantumSuccessCaveats.UPSTREAM_FAILURE_NO_WORK
391class RepeatableQuantumError(RuntimeError):
392 """Exception that may be raised by PipelineTasks (and code they delegate
393 to) in order to indicate that a repeatable problem that will not be
394 addressed by retries.
396 This usually indicates that the algorithm and the data it has been given
397 are somehow incompatible, and the task should run fine on most other data.
399 This exception may be used as a base class for more specific questions, or
400 used directly while chaining another exception, e.g.::
402 try:
403 run_code()
404 except SomeOtherError as err:
405 raise RepeatableQuantumError() from err
407 This may be used for missing input data when the desired behavior is to
408 cause all downstream tasks being run be blocked, forcing the user to
409 address the problem. When the desired behavior is to skip all of this
410 quantum and attempt downstream tasks (or skip them) without its its
411 outputs, raise `NoWorkFound` or return without raising instead.
412 """
414 EXIT_CODE = 20
417class AlgorithmError(RepeatableQuantumError, abc.ABC):
418 """Exception that may be raised by PipelineTasks (and code they delegate
419 to) in order to indicate a repeatable algorithmic failure that will not be
420 addressed by retries.
422 Subclass this exception to define the metadata associated with the error
423 (for example: number of data points in a fit vs. degrees of freedom).
424 """
426 def __new__(cls, *args: Any, **kwargs: Any) -> AlgorithmError:
427 # Have to override __new__ because builtin subclasses aren't checked
428 # for abstract methods; see https://github.com/python/cpython/issues/50246
429 if cls.__abstractmethods__:
430 raise TypeError(
431 f"Can't instantiate abstract class {cls.__name__} with "
432 f"abstract methods: {','.join(sorted(cls.__abstractmethods__))}"
433 )
434 return super().__new__(cls, *args, **kwargs)
436 @property
437 @abc.abstractmethod
438 def metadata(self) -> NestedMetadataDict | None:
439 """Metadata from the raising `~lsst.pipe.base.Task` with more
440 information about the failure. The contents of the dict are
441 `~lsst.pipe.base.Task`-dependent, and must have `str` keys and `str`,
442 `int`, `float`, `bool`, or nested-dictionary (with the same key and
443 value types) values.
444 """
445 raise NotImplementedError
448class UnprocessableDataError(NoWorkFound):
449 """A specialization of `NoWorkFound` that will be [subclassed and] raised
450 by Tasks to indicate a failure to process their inputs for some reason that
451 is non-recoverable.
453 Notes
454 -----
455 An example is a known bright star that causes PSF measurement to fail, and
456 that makes that detector entirely non-recoverable. Another example is an
457 image with an oddly shaped PSF (e.g. due to a failure to achieve focus)
458 that warrants the image being flagged as "poor quality" which should not
459 have further processing attempted.
461 The `NoWorkFound` inheritance ensures the job will not be considered a
462 failure (i.e. such that no human time will inadvertently be spent chasing
463 it down).
465 Do not raise this unless we are convinced that the data cannot (or should
466 not) be processed, even by a better algorithm. Most instances where this
467 error would be raised likely require an RFC to explicitly define the
468 situation.
469 """
471 FLAGS: ClassVar = QuantumSuccessCaveats.NO_WORK | QuantumSuccessCaveats.UNPROCESSABLE_DATA
474class AnnotatedPartialOutputsError(RepeatableQuantumError):
475 """Exception that runQuantum raises when the (partial) outputs it has
476 written contain information about their own incompleteness or degraded
477 quality.
479 Clients should construct this exception by calling `annotate` instead of
480 calling the constructor directly. However, `annotate` does not chain the
481 exception; this must still be done by the client.
483 This exception should always chain the original error. When the
484 executor catches this exception, it will report the original exception. In
485 contrast, other exceptions raised from ``runQuantum`` are considered to
486 invalidate any outputs that are already written.
487 """
489 FLAGS: ClassVar = QuantumSuccessCaveats.PARTIAL_OUTPUTS_ERROR
491 @classmethod
492 def annotate(
493 cls, error: Exception, *args: GetSetDictMetadataHolder | None, log: logging.Logger | LsstLogAdapter
494 ) -> AnnotatedPartialOutputsError:
495 """Set metadata on outputs to explain the nature of the failure.
497 Parameters
498 ----------
499 error : `Exception`
500 Exception that caused the task to fail.
501 *args : `GetSetDictMetadataHolder`
502 Objects (e.g. Task, Exposure, SimpleCatalog) to annotate with
503 failure information. They must have a `metadata` property.
504 log : `logging.Logger`
505 Log to send error message to.
507 Returns
508 -------
509 error : `AnnotatedPartialOutputsError`
510 Exception that the failing task can ``raise from`` with the
511 passed-in exception.
513 Notes
514 -----
515 This should be called from within an except block that has caught an
516 exception. Here is an example of handling a failure in
517 ``PipelineTask.runQuantum`` that annotates and writes partial outputs:
519 .. code-block:: py
520 :name: annotate-error-example
522 def runQuantum(self, butlerQC, inputRefs, outputRefs):
523 inputs = butlerQC.get(inputRefs)
524 exposures = inputs.pop("exposures")
525 assert not inputs, "runQuantum got more inputs than expected"
527 result = pipeBase.Struct(catalog=None)
528 try:
529 self.run(exposure)
530 except pipeBase.AlgorithmError as e:
531 error = pipeBase.AnnotatedPartialOutputsError.annotate(
532 e, self, result.catalog, log=self.log
533 )
534 raise error from e
535 finally:
536 butlerQC.put(result, outputRefs)
537 """
538 failure_info = {
539 "message": str(error),
540 "type": introspection.get_full_type_name(error),
541 }
542 if other := getattr(error, "metadata", None):
543 failure_info["metadata"] = other
545 # NOTE: Can't fully test this in pipe_base because afw is not a
546 # dependency; test_calibrateImage.py in pipe_tasks gives more coverage.
547 for item in args:
548 # Some outputs may not exist, so we cannot set metadata on them.
549 if item is None:
550 continue
551 item.metadata.set_dict("failure", failure_info) # type: ignore
553 log.debug(
554 "Task failed with only partial outputs; see exception message for details.",
555 exc_info=error,
556 )
558 return cls("Task failed and wrote partial outputs: see chained exception for details.")
561class InvalidQuantumError(Exception):
562 """Exception that may be raised by PipelineTasks (and code they delegate
563 to) in order to indicate logic bug or configuration problem.
565 This usually indicates that the configured algorithm itself is invalid and
566 will not run on a significant fraction of quanta (often all of them).
568 This exception may be used as a base class for more specific questions, or
569 used directly while chaining another exception, e.g.::
571 try:
572 run_code()
573 except SomeOtherError as err:
574 raise RepeatableQuantumError() from err
576 Raising this exception in `PipelineTask.runQuantum` or something it calls
577 is a last resort - whenever possible, such problems should cause exceptions
578 in ``__init__`` or in QuantumGraph generation. It should never be used
579 for missing data.
580 """
582 EXIT_CODE = 21