Coverage for python / lsst / pipe / base / _status.py: 54%

130 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-24 08:19 +0000

1# This file is part of pipe_base. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28from __future__ import annotations 

29 

30__all__ = ( 

31 "AlgorithmError", 

32 "AnnotatedPartialOutputsError", 

33 "ExceptionInfo", 

34 "InvalidQuantumError", 

35 "NoWorkFound", 

36 "QuantumAttemptStatus", 

37 "QuantumSuccessCaveats", 

38 "RepeatableQuantumError", 

39 "UnprocessableDataError", 

40 "UpstreamFailureNoWorkFound", 

41) 

42 

43import abc 

44import enum 

45import logging 

46import sys 

47from typing import TYPE_CHECKING, Any, ClassVar, Protocol 

48 

49import pydantic 

50 

51from lsst.utils import introspection 

52from lsst.utils.logging import LsstLogAdapter, getLogger 

53 

54from ._task_metadata import GetSetDictMetadata, NestedMetadataDict 

55 

56if TYPE_CHECKING: 

57 from ._task_metadata import TaskMetadata 

58 

59 

60_LOG = getLogger(__name__) 

61 

62 

63class QuantumSuccessCaveats(enum.Flag): 

64 """Flags that add caveats to a "successful" quantum. 

65 

66 Quanta can be considered successful even if they do not produce some of 

67 their expected outputs (and even if they do not produce all of their 

68 expected outputs), as long as the condition is sufficiently well understood 

69 that downstream processing should succeed. 

70 """ 

71 

72 NO_CAVEATS = 0 

73 """All outputs were produced and no exceptions were raised.""" 

74 

75 ANY_OUTPUTS_MISSING = enum.auto() 

76 """At least one predicted output was not produced.""" 

77 

78 ALL_OUTPUTS_MISSING = enum.auto() 

79 """No predicted outputs (except logs and metadata) were produced. 

80 

81 `ANY_OUTPUTS_MISSING` is also set whenever this flag is set. 

82 """ 

83 

84 NO_WORK = enum.auto() 

85 """A subclass of `NoWorkFound` was raised. 

86 

87 This does not necessarily imply that `ANY_OUTPUTS_MISSING` is not set, 

88 since a `PipelineTask.runQuantum` implementation could raise it after 

89 directly writing all of its predicted outputs. 

90 """ 

91 

92 ADJUST_QUANTUM_RAISED = enum.auto() 

93 """`NoWorkFound` was raised by `PipelineTaskConnnections.adjustQuantum`. 

94 

95 This indicates that if a new `QuantumGraph` had been generated immediately 

96 before running this quantum, that quantum would not have even been 

97 included, because required inputs that were expected to exist by the time 

98 it was run (in the original `QuantumGraph`) were not actually produced. 

99 

100 `NO_WORK` and `ALL_OUTPUTS_MISSING` are also set whenever this flag is set. 

101 """ 

102 

103 UPSTREAM_FAILURE_NO_WORK = enum.auto() 

104 """`UpstreamFailureNoWorkFound` was raised by `PipelineTask.runQuantum`. 

105 

106 This exception is raised by downstream tasks when an upstream task's 

107 outputs were incomplete in a way that blocks it from running, often 

108 because the upstream task raised `AnnotatedPartialOutputsError`. 

109 

110 `NO_WORK` is also set whenever this flag is set. 

111 """ 

112 

113 UNPROCESSABLE_DATA = enum.auto() 

114 """`UnprocessableDataError` was raised by `PipelineTask.runQuantum`. 

115 

116 `NO_WORK` is also set whenever this flag is set. 

117 """ 

118 

119 PARTIAL_OUTPUTS_ERROR = enum.auto() 

120 """`AnnotatedPartialOutputsError` was raised by `PipelineTask.runQuantum` 

121 and the execution system was instructed to consider this a qualified 

122 success. 

123 """ 

124 

125 @classmethod 

126 def from_adjust_quantum_no_work(cls) -> QuantumSuccessCaveats: 

127 """Return the set of flags appropriate for a quantum for which 

128 `PipelineTaskConnections.adjustdQuantum` raised `NoWorkFound`. 

129 """ 

130 return cls.NO_WORK | cls.ADJUST_QUANTUM_RAISED | cls.ANY_OUTPUTS_MISSING | cls.ALL_OUTPUTS_MISSING 

131 

132 def concise(self) -> str: 

133 """Return a concise string representation of the flags. 

134 

135 Returns 

136 ------- 

137 s : `str` 

138 Two-character string representation, with the first character 

139 indicating whether any predicted outputs were missing and the 

140 second representing any exceptions raised. This representation is 

141 not always complete; some rare combinations of flags are displayed 

142 as if only one of the flags was set. 

143 

144 Notes 

145 ----- 

146 The `legend` method returns a description of the returned codes. 

147 """ 

148 char1 = "" 

149 if self & QuantumSuccessCaveats.ALL_OUTPUTS_MISSING: 

150 char1 = "*" 

151 elif self & QuantumSuccessCaveats.ANY_OUTPUTS_MISSING: 

152 char1 = "+" 

153 char2 = "" 

154 if self & QuantumSuccessCaveats.ADJUST_QUANTUM_RAISED: 

155 char2 = "A" 

156 elif self & QuantumSuccessCaveats.UNPROCESSABLE_DATA: 

157 char2 = "D" 

158 elif self & QuantumSuccessCaveats.UPSTREAM_FAILURE_NO_WORK: 

159 char2 = "U" 

160 elif self & QuantumSuccessCaveats.PARTIAL_OUTPUTS_ERROR: 

161 char2 = "P" 

162 elif self & QuantumSuccessCaveats.NO_WORK: 

163 char2 = "N" 

164 return char1 + char2 

165 

166 @staticmethod 

167 def legend() -> dict[str, str]: 

168 """Return a `dict` with human-readable descriptions of the characters 

169 used in `concise`. 

170 

171 Returns 

172 ------- 

173 legend : `dict` [ `str`, `str` ] 

174 Mapping from character code to description. 

175 """ 

176 return { 

177 "+": "at least one predicted output was missing, but not all were", 

178 "*": "all predicted outputs were missing (besides logs and metadata)", 

179 "A": "adjustQuantum raised NoWorkFound; a regenerated QG would not include this quantum", 

180 "D": "algorithm considers data too bad to be processable", 

181 "U": "one or more input dataset was incomplete due to an upstream failure", 

182 "P": "task failed but wrote partial outputs; considered a partial success", 

183 "N": "runQuantum raised NoWorkFound", 

184 } 

185 

186 

187class ExceptionInfo(pydantic.BaseModel): 

188 """Information about an exception that was raised.""" 

189 

190 type_name: str 

191 """Fully-qualified Python type name for the exception raised.""" 

192 

193 message: str 

194 """String message included in the exception.""" 

195 

196 metadata: dict[str, float | int | str | bool | None] 

197 """Additional metadata included in the exception.""" 

198 

199 @classmethod 

200 def _from_metadata(cls, md: TaskMetadata) -> ExceptionInfo: 

201 """Construct from task metadata. 

202 

203 Parameters 

204 ---------- 

205 md : `TaskMetadata` 

206 Metadata about the error, as written by 

207 `AnnotatedPartialOutputsError`. 

208 

209 Returns 

210 ------- 

211 info : `ExceptionInfo` 

212 Information about the exception. 

213 """ 

214 result = cls(type_name=md["type"], message=md["message"], metadata={}) 

215 if "metadata" in md: 

216 raw_err_metadata = md["metadata"].to_dict() 

217 for k, v in raw_err_metadata.items(): 

218 # Guard against error metadata we wouldn't be able to serialize 

219 # later via Pydantic; don't want one weird value bringing down 

220 # our ability to report on an entire run. 

221 if isinstance(v, float | int | str | bool): 

222 result.metadata[k] = v 

223 else: 

224 _LOG.debug( 

225 "Not propagating nested or JSON-incompatible exception metadata key %s=%r.", k, v 

226 ) 

227 return result 

228 

229 # Work around the fact that Sphinx chokes on Pydantic docstring formatting, 

230 # when we inherit those docstrings in our public classes. 

231 if "sphinx" in sys.modules and not TYPE_CHECKING: 

232 

233 def copy(self, *args: Any, **kwargs: Any) -> Any: 

234 """See `pydantic.BaseModel.copy`.""" 

235 return super().copy(*args, **kwargs) 

236 

237 def model_dump(self, *args: Any, **kwargs: Any) -> Any: 

238 """See `pydantic.BaseModel.model_dump`.""" 

239 return super().model_dump(*args, **kwargs) 

240 

241 def model_dump_json(self, *args: Any, **kwargs: Any) -> Any: 

242 """See `pydantic.BaseModel.model_dump_json`.""" 

243 return super().model_dump(*args, **kwargs) 

244 

245 def model_copy(self, *args: Any, **kwargs: Any) -> Any: 

246 """See `pydantic.BaseModel.model_copy`.""" 

247 return super().model_copy(*args, **kwargs) 

248 

249 @classmethod 

250 def model_construct(cls, *args: Any, **kwargs: Any) -> Any: # type: ignore[misc, override] 

251 """See `pydantic.BaseModel.model_construct`.""" 

252 return super().model_construct(*args, **kwargs) 

253 

254 @classmethod 

255 def model_json_schema(cls, *args: Any, **kwargs: Any) -> Any: 

256 """See `pydantic.BaseModel.model_json_schema`.""" 

257 return super().model_json_schema(*args, **kwargs) 

258 

259 @classmethod 

260 def model_validate(cls, *args: Any, **kwargs: Any) -> Any: 

261 """See `pydantic.BaseModel.model_validate`.""" 

262 return super().model_validate(*args, **kwargs) 

263 

264 @classmethod 

265 def model_validate_json(cls, *args: Any, **kwargs: Any) -> Any: 

266 """See `pydantic.BaseModel.model_validate_json`.""" 

267 return super().model_validate_json(*args, **kwargs) 

268 

269 @classmethod 

270 def model_validate_strings(cls, *args: Any, **kwargs: Any) -> Any: 

271 """See `pydantic.BaseModel.model_validate_strings`.""" 

272 return super().model_validate_strings(*args, **kwargs) 

273 

274 

275class QuantumAttemptStatus(enum.Enum): 

276 """Enum summarizing an attempt to run a quantum.""" 

277 

278 ABORTED = -4 

279 """The quantum failed with a hard error that prevented both logs and 

280 metadata from being written. 

281 

282 This state is only set if information from higher-level tooling (e.g. BPS) 

283 is available to distinguish it from ``UNKNOWN``. 

284 """ 

285 

286 UNKNOWN = -3 

287 """The status of this attempt is unknown. 

288 

289 This means no logs or metadata were written, and it at least could not be 

290 determined whether the quantum was blocked by an upstream failure (if it 

291 was definitely blocked, `BLOCKED` is set instead). 

292 """ 

293 

294 ABORTED_SUCCESS = -2 

295 """Task metadata was written for this attempt but logs were not. 

296 

297 This is a rare condition that requires a hard failure (i.e. the kind that 

298 can prevent a ``finally`` block from running or I/O from being durable) at 

299 a very precise time. 

300 """ 

301 

302 FAILED = -1 

303 """Execution of the quantum failed gracefully. 

304 

305 This is always set if the task metadata dataset was not written but logs 

306 were, as is the case when a Python exception is caught and handled by the 

307 execution system. 

308 

309 This status guarantees that the task log dataset was produced but the 

310 metadata dataset was not. 

311 """ 

312 

313 BLOCKED = 0 

314 """This quantum was not executed because an upstream quantum failed. 

315 

316 Upstream quanta with status `UNKNOWN`, `FAILED`, or `ABORTED` are 

317 considered blockers; `ABORTED_SUCCESS` is not. 

318 """ 

319 

320 SUCCESSFUL = 1 

321 """This quantum was successfully executed. 

322 

323 Quanta may be considered successful even if they do not write any outputs 

324 or shortcut early by raising `NoWorkFound` or one of its variants. They 

325 may even be considered successful if they raise 

326 `AnnotatedPartialOutputsError` if the executor is configured to treat that 

327 exception as a non-failure. See `QuantumSuccessCaveats` for details on how 

328 these "successes with caveats" are reported. 

329 """ 

330 

331 @property 

332 def has_metadata(self) -> bool: 

333 """Whether the task metadata dataset was produced.""" 

334 return self is self.SUCCESSFUL or self is self.ABORTED_SUCCESS 

335 

336 @property 

337 def has_log(self) -> bool: 

338 """Whether the log dataset was produced.""" 

339 return self is self.SUCCESSFUL or self is self.FAILED 

340 

341 @property 

342 def title(self) -> str: 

343 """A version of this status' name suitable for use as a title in a plot 

344 or table. 

345 """ 

346 return self.name.capitalize().replace("_", " ") 

347 

348 @property 

349 def is_rare(self) -> bool: 

350 """Whether this status is rare enough that it should only be listed 

351 when it actually occurs. 

352 """ 

353 return self in (self.ABORTED, self.ABORTED_SUCCESS, self.UNKNOWN) 

354 

355 

356class GetSetDictMetadataHolder(Protocol): 

357 """Protocol for objects that have a ``metadata`` attribute that satisfies 

358 `GetSetDictMetadata`. 

359 """ 

360 

361 @property 

362 def metadata(self) -> GetSetDictMetadata | None: 

363 pass 

364 

365 

366class NoWorkFound(BaseException): 

367 """An exception raised when a Quantum should not exist because there is no 

368 work for it to do. 

369 

370 This usually occurs because a non-optional input dataset is not present, or 

371 a spatiotemporal overlap that was conservatively predicted does not 

372 actually exist. 

373 

374 This inherits from BaseException because it is used to signal a case that 

375 we don't consider a real error, even though we often want to use try/except 

376 logic to trap it. 

377 """ 

378 

379 FLAGS: ClassVar = QuantumSuccessCaveats.NO_WORK 

380 

381 

382class UpstreamFailureNoWorkFound(NoWorkFound): 

383 """A specialization of `NoWorkFound` that indicates that an upstream task 

384 had a problem that was ignored (e.g. to prevent a single-detector failure 

385 from bringing down an entire visit). 

386 """ 

387 

388 FLAGS: ClassVar = QuantumSuccessCaveats.NO_WORK | QuantumSuccessCaveats.UPSTREAM_FAILURE_NO_WORK 

389 

390 

391class RepeatableQuantumError(RuntimeError): 

392 """Exception that may be raised by PipelineTasks (and code they delegate 

393 to) in order to indicate that a repeatable problem that will not be 

394 addressed by retries. 

395 

396 This usually indicates that the algorithm and the data it has been given 

397 are somehow incompatible, and the task should run fine on most other data. 

398 

399 This exception may be used as a base class for more specific questions, or 

400 used directly while chaining another exception, e.g.:: 

401 

402 try: 

403 run_code() 

404 except SomeOtherError as err: 

405 raise RepeatableQuantumError() from err 

406 

407 This may be used for missing input data when the desired behavior is to 

408 cause all downstream tasks being run be blocked, forcing the user to 

409 address the problem. When the desired behavior is to skip all of this 

410 quantum and attempt downstream tasks (or skip them) without its its 

411 outputs, raise `NoWorkFound` or return without raising instead. 

412 """ 

413 

414 EXIT_CODE = 20 

415 

416 

417class AlgorithmError(RepeatableQuantumError, abc.ABC): 

418 """Exception that may be raised by PipelineTasks (and code they delegate 

419 to) in order to indicate a repeatable algorithmic failure that will not be 

420 addressed by retries. 

421 

422 Subclass this exception to define the metadata associated with the error 

423 (for example: number of data points in a fit vs. degrees of freedom). 

424 """ 

425 

426 def __new__(cls, *args: Any, **kwargs: Any) -> AlgorithmError: 

427 # Have to override __new__ because builtin subclasses aren't checked 

428 # for abstract methods; see https://github.com/python/cpython/issues/50246 

429 if cls.__abstractmethods__: 

430 raise TypeError( 

431 f"Can't instantiate abstract class {cls.__name__} with " 

432 f"abstract methods: {','.join(sorted(cls.__abstractmethods__))}" 

433 ) 

434 return super().__new__(cls, *args, **kwargs) 

435 

436 @property 

437 @abc.abstractmethod 

438 def metadata(self) -> NestedMetadataDict | None: 

439 """Metadata from the raising `~lsst.pipe.base.Task` with more 

440 information about the failure. The contents of the dict are 

441 `~lsst.pipe.base.Task`-dependent, and must have `str` keys and `str`, 

442 `int`, `float`, `bool`, or nested-dictionary (with the same key and 

443 value types) values. 

444 """ 

445 raise NotImplementedError 

446 

447 

448class UnprocessableDataError(NoWorkFound): 

449 """A specialization of `NoWorkFound` that will be [subclassed and] raised 

450 by Tasks to indicate a failure to process their inputs for some reason that 

451 is non-recoverable. 

452 

453 Notes 

454 ----- 

455 An example is a known bright star that causes PSF measurement to fail, and 

456 that makes that detector entirely non-recoverable. Another example is an 

457 image with an oddly shaped PSF (e.g. due to a failure to achieve focus) 

458 that warrants the image being flagged as "poor quality" which should not 

459 have further processing attempted. 

460 

461 The `NoWorkFound` inheritance ensures the job will not be considered a 

462 failure (i.e. such that no human time will inadvertently be spent chasing 

463 it down). 

464 

465 Do not raise this unless we are convinced that the data cannot (or should 

466 not) be processed, even by a better algorithm. Most instances where this 

467 error would be raised likely require an RFC to explicitly define the 

468 situation. 

469 """ 

470 

471 FLAGS: ClassVar = QuantumSuccessCaveats.NO_WORK | QuantumSuccessCaveats.UNPROCESSABLE_DATA 

472 

473 

474class AnnotatedPartialOutputsError(RepeatableQuantumError): 

475 """Exception that runQuantum raises when the (partial) outputs it has 

476 written contain information about their own incompleteness or degraded 

477 quality. 

478 

479 Clients should construct this exception by calling `annotate` instead of 

480 calling the constructor directly. However, `annotate` does not chain the 

481 exception; this must still be done by the client. 

482 

483 This exception should always chain the original error. When the 

484 executor catches this exception, it will report the original exception. In 

485 contrast, other exceptions raised from ``runQuantum`` are considered to 

486 invalidate any outputs that are already written. 

487 """ 

488 

489 FLAGS: ClassVar = QuantumSuccessCaveats.PARTIAL_OUTPUTS_ERROR 

490 

491 @classmethod 

492 def annotate( 

493 cls, error: Exception, *args: GetSetDictMetadataHolder | None, log: logging.Logger | LsstLogAdapter 

494 ) -> AnnotatedPartialOutputsError: 

495 """Set metadata on outputs to explain the nature of the failure. 

496 

497 Parameters 

498 ---------- 

499 error : `Exception` 

500 Exception that caused the task to fail. 

501 *args : `GetSetDictMetadataHolder` 

502 Objects (e.g. Task, Exposure, SimpleCatalog) to annotate with 

503 failure information. They must have a `metadata` property. 

504 log : `logging.Logger` 

505 Log to send error message to. 

506 

507 Returns 

508 ------- 

509 error : `AnnotatedPartialOutputsError` 

510 Exception that the failing task can ``raise from`` with the 

511 passed-in exception. 

512 

513 Notes 

514 ----- 

515 This should be called from within an except block that has caught an 

516 exception. Here is an example of handling a failure in 

517 ``PipelineTask.runQuantum`` that annotates and writes partial outputs: 

518 

519 .. code-block:: py 

520 :name: annotate-error-example 

521 

522 def runQuantum(self, butlerQC, inputRefs, outputRefs): 

523 inputs = butlerQC.get(inputRefs) 

524 exposures = inputs.pop("exposures") 

525 assert not inputs, "runQuantum got more inputs than expected" 

526 

527 result = pipeBase.Struct(catalog=None) 

528 try: 

529 self.run(exposure) 

530 except pipeBase.AlgorithmError as e: 

531 error = pipeBase.AnnotatedPartialOutputsError.annotate( 

532 e, self, result.catalog, log=self.log 

533 ) 

534 raise error from e 

535 finally: 

536 butlerQC.put(result, outputRefs) 

537 """ 

538 failure_info = { 

539 "message": str(error), 

540 "type": introspection.get_full_type_name(error), 

541 } 

542 if other := getattr(error, "metadata", None): 

543 failure_info["metadata"] = other 

544 

545 # NOTE: Can't fully test this in pipe_base because afw is not a 

546 # dependency; test_calibrateImage.py in pipe_tasks gives more coverage. 

547 for item in args: 

548 # Some outputs may not exist, so we cannot set metadata on them. 

549 if item is None: 

550 continue 

551 item.metadata.set_dict("failure", failure_info) # type: ignore 

552 

553 log.debug( 

554 "Task failed with only partial outputs; see exception message for details.", 

555 exc_info=error, 

556 ) 

557 

558 return cls("Task failed and wrote partial outputs: see chained exception for details.") 

559 

560 

561class InvalidQuantumError(Exception): 

562 """Exception that may be raised by PipelineTasks (and code they delegate 

563 to) in order to indicate logic bug or configuration problem. 

564 

565 This usually indicates that the configured algorithm itself is invalid and 

566 will not run on a significant fraction of quanta (often all of them). 

567 

568 This exception may be used as a base class for more specific questions, or 

569 used directly while chaining another exception, e.g.:: 

570 

571 try: 

572 run_code() 

573 except SomeOtherError as err: 

574 raise RepeatableQuantumError() from err 

575 

576 Raising this exception in `PipelineTask.runQuantum` or something it calls 

577 is a last resort - whenever possible, such problems should cause exceptions 

578 in ``__init__`` or in QuantumGraph generation. It should never be used 

579 for missing data. 

580 """ 

581 

582 EXIT_CODE = 21