Coverage for python / lsst / pipe / base / quantum_reports.py: 79%
69 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-28 08:44 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-28 08:44 +0000
1# This file is part of pipe_base.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28from __future__ import annotations
30__all__ = ["ExceptionInfo", "ExecutionStatus", "QuantumReport", "Report"]
32import enum
33import sys
34import uuid
35from typing import Any
37import pydantic
39from lsst.daf.butler import DataCoordinate, DataId, DataIdValue
40from lsst.utils.introspection import get_full_type_name
42from .graph import QgraphSummary
45def _serializeDataId(dataId: DataId) -> dict[str, DataIdValue]:
46 if isinstance(dataId, DataCoordinate):
47 return dict(dataId.required)
48 else:
49 return dataId # type: ignore
52class ExecutionStatus(enum.Enum):
53 """Possible values for job execution status.
55 Status `FAILURE` is set if one or more tasks failed. Status `TIMEOUT` is
56 set if there are no failures but one or more tasks timed out. Timeouts can
57 only be detected in multi-process mode, child task is killed on timeout
58 and usually should have non-zero exit code.
59 """
61 SUCCESS = "success"
62 FAILURE = "failure"
63 TIMEOUT = "timeout"
64 SKIPPED = "skipped"
67class ExceptionInfo(pydantic.BaseModel):
68 """Information about exception."""
70 className: str
71 """Name of the exception class if exception was raised."""
73 message: str
74 """Exception message for in-process quantum execution, None if
75 quantum was executed in sub-process.
76 """
78 @classmethod
79 def from_exception(cls, exception: Exception) -> ExceptionInfo:
80 """Construct instance from an exception.
82 Parameters
83 ----------
84 exception : `Exception`
85 Exception to wrap.
87 Returns
88 -------
89 info : `ExceptionInfo`
90 Information about the exception.
91 """
92 return cls(className=get_full_type_name(exception), message=str(exception))
94 # Work around the fact that Sphinx chokes on Pydantic docstring formatting,
95 # when we inherit those docstrings in our public classes.
96 if "sphinx" in sys.modules:
98 def copy(self, *args: Any, **kwargs: Any) -> Any:
99 """See `pydantic.BaseModel.copy`."""
100 return super().copy(*args, **kwargs)
102 def model_dump(self, *args: Any, **kwargs: Any) -> Any:
103 """See `pydantic.BaseModel.model_dump`."""
104 return super().model_dump(*args, **kwargs)
106 def model_dump_json(self, *args: Any, **kwargs: Any) -> Any:
107 """See `pydantic.BaseModel.model_dump_json`."""
108 return super().model_dump(*args, **kwargs)
110 def model_copy(self, *args: Any, **kwargs: Any) -> Any:
111 """See `pydantic.BaseModel.model_copy`."""
112 return super().model_copy(*args, **kwargs)
114 @classmethod
115 def model_construct(cls, *args: Any, **kwargs: Any) -> Any: # type: ignore[misc, override]
116 """See `pydantic.BaseModel.model_construct`."""
117 return super().model_construct(*args, **kwargs)
119 @classmethod
120 def model_json_schema(cls, *args: Any, **kwargs: Any) -> Any:
121 """See `pydantic.BaseModel.model_json_schema`."""
122 return super().model_json_schema(*args, **kwargs)
124 @classmethod
125 def model_validate(cls, *args: Any, **kwargs: Any) -> Any:
126 """See `pydantic.BaseModel.model_validate`."""
127 return super().model_validate(*args, **kwargs)
129 @classmethod
130 def model_validate_json(cls, *args: Any, **kwargs: Any) -> Any:
131 """See `pydantic.BaseModel.model_validate_json`."""
132 return super().model_validate_json(*args, **kwargs)
134 @classmethod
135 def model_validate_strings(cls, *args: Any, **kwargs: Any) -> Any:
136 """See `pydantic.BaseModel.model_validate_strings`."""
137 return super().model_validate_strings(*args, **kwargs)
140class QuantumReport(pydantic.BaseModel):
141 """Task execution report for a single Quantum.
143 Parameters
144 ----------
145 dataId : `~lsst.daf.butler.DataId`
146 Quantum data ID.
147 taskLabel : `str`
148 Label for task executing this Quantum.
149 status : `ExecutionStatus`
150 Status of this quantum execution.
151 exitCode : `int` or `None`, optional
152 Exit code for sub-process executing this Quantum. `None` for
153 in-process execution. Negative if process was killed by a signal.
154 exceptionInfo : `ExceptionInfo` or `None`, optional
155 Exception information if an exception was raised.
156 quantumId : `uuid.UUID`, optional
157 Unique identifier for the quantum.
158 """
160 status: ExecutionStatus = ExecutionStatus.SUCCESS
161 """Execution status, one of the values in `ExecutionStatus` enum."""
163 dataId: dict[str, DataIdValue]
164 """Quantum DataId."""
166 taskLabel: str | None
167 """Label for a task executing this Quantum."""
169 exitCode: int | None = None
170 """Exit code for a sub-process executing Quantum, None for in-process
171 Quantum execution. Negative if process was killed by a signal.
172 """
174 exceptionInfo: ExceptionInfo | None = None
175 """Exception information if exception was raised."""
177 quantumId: uuid.UUID | None = None
178 """Unique identifier for the quantum."""
180 def __init__(
181 self,
182 dataId: DataId,
183 taskLabel: str,
184 status: ExecutionStatus = ExecutionStatus.SUCCESS,
185 exitCode: int | None = None,
186 exceptionInfo: ExceptionInfo | None = None,
187 quantumId: uuid.UUID | None = None,
188 ):
189 super().__init__(
190 quantumId=quantumId,
191 status=status,
192 dataId=_serializeDataId(dataId),
193 taskLabel=taskLabel,
194 exitCode=exitCode,
195 exceptionInfo=exceptionInfo,
196 )
198 @classmethod
199 def from_exception(
200 cls,
201 exception: Exception,
202 dataId: DataId,
203 taskLabel: str,
204 *,
205 exitCode: int | None = None,
206 quantumId: uuid.UUID | None = None,
207 ) -> QuantumReport:
208 """Construct report instance from an exception and other pieces of
209 data.
211 Parameters
212 ----------
213 exception : `Exception`
214 Exception caught from processing quantum.
215 dataId : `~lsst.daf.butler.DataId`
216 Data ID of quantum.
217 taskLabel : `str`
218 Label of task.
219 exitCode : `int`, optional
220 Exit code for the process, used when it is known that the process
221 will exit with that exit code.
222 quantumId : `uuid.UUID`, optional
223 Unique identifier for the quantum.
224 """
225 return cls(
226 status=ExecutionStatus.FAILURE,
227 dataId=dataId,
228 taskLabel=taskLabel,
229 exitCode=exitCode,
230 exceptionInfo=ExceptionInfo.from_exception(exception),
231 quantumId=quantumId,
232 )
234 @classmethod
235 def from_exit_code(
236 cls,
237 exitCode: int,
238 dataId: DataId,
239 taskLabel: str,
240 quantumId: uuid.UUID | None = None,
241 ) -> QuantumReport:
242 """Construct report instance from an exit code and other pieces of
243 data.
245 Parameters
246 ----------
247 exitCode : `int`
248 The exit code of the subprocess.
249 dataId : `~lsst.daf.butler.DataId`
250 The quantum Data ID.
251 taskLabel : `str`
252 The task label.
253 quantumId : `uuid.UUID`, optional
254 Unique identifier for the quantum.
255 """
256 return cls(
257 status=ExecutionStatus.SUCCESS if exitCode == 0 else ExecutionStatus.FAILURE,
258 dataId=dataId,
259 taskLabel=taskLabel,
260 exitCode=exitCode,
261 quantumId=quantumId,
262 )
264 # Work around the fact that Sphinx chokes on Pydantic docstring formatting,
265 # when we inherit those docstrings in our public classes.
266 if "sphinx" in sys.modules:
268 def copy(self, *args: Any, **kwargs: Any) -> Any:
269 """See `pydantic.BaseModel.copy`."""
270 return super().copy(*args, **kwargs)
272 def model_dump(self, *args: Any, **kwargs: Any) -> Any:
273 """See `pydantic.BaseModel.model_dump`."""
274 return super().model_dump(*args, **kwargs)
276 def model_dump_json(self, *args: Any, **kwargs: Any) -> Any:
277 """See `pydantic.BaseModel.model_dump_json`."""
278 return super().model_dump(*args, **kwargs)
280 def model_copy(self, *args: Any, **kwargs: Any) -> Any:
281 """See `pydantic.BaseModel.model_copy`."""
282 return super().model_copy(*args, **kwargs)
284 @classmethod
285 def model_construct(cls, *args: Any, **kwargs: Any) -> Any: # type: ignore[misc, override]
286 """See `pydantic.BaseModel.model_construct`."""
287 return super().model_construct(*args, **kwargs)
289 @classmethod
290 def model_json_schema(cls, *args: Any, **kwargs: Any) -> Any:
291 """See `pydantic.BaseModel.model_json_schema`."""
292 return super().model_json_schema(*args, **kwargs)
294 @classmethod
295 def model_validate(cls, *args: Any, **kwargs: Any) -> Any:
296 """See `pydantic.BaseModel.model_validate`."""
297 return super().model_validate(*args, **kwargs)
299 @classmethod
300 def model_validate_json(cls, *args: Any, **kwargs: Any) -> Any:
301 """See `pydantic.BaseModel.model_validate_json`."""
302 return super().model_validate_json(*args, **kwargs)
304 @classmethod
305 def model_validate_strings(cls, *args: Any, **kwargs: Any) -> Any:
306 """See `pydantic.BaseModel.model_validate_strings`."""
307 return super().model_validate_strings(*args, **kwargs)
310class Report(pydantic.BaseModel):
311 """Execution report for the whole job with one or few quanta."""
313 qgraphSummary: QgraphSummary
314 """Summary report about QuantumGraph."""
316 status: ExecutionStatus = ExecutionStatus.SUCCESS
317 """Job status."""
319 cmdLine: list[str] | None = None
320 """Command line for the whole job."""
322 exitCode: int | None = None
323 """Job exit code, this obviously cannot be set in pipetask."""
325 exceptionInfo: ExceptionInfo | None = None
326 """Exception information if exception was raised."""
328 quantaReports: list[QuantumReport] = []
329 """List of per-quantum reports, ordering is not specified. Some or all
330 quanta may not produce a report.
331 """
333 # Always want to validate the default value for cmdLine so
334 # use a model_validator.
335 @pydantic.model_validator(mode="before")
336 @classmethod
337 def _set_cmdLine(cls, data: Any) -> Any:
338 if data.get("cmdLine") is None:
339 data["cmdLine"] = sys.argv
340 return data
342 def set_exception(self, exception: Exception) -> None:
343 """Update exception information from an exception object.
345 Parameters
346 ----------
347 exception : `Exception`
348 Exception to use to extract information from.
349 """
350 self.exceptionInfo = ExceptionInfo.from_exception(exception)
352 # Work around the fact that Sphinx chokes on Pydantic docstring formatting,
353 # when we inherit those docstrings in our public classes.
354 if "sphinx" in sys.modules:
356 def copy(self, *args: Any, **kwargs: Any) -> Any:
357 """See `pydantic.BaseModel.copy`."""
358 return super().copy(*args, **kwargs)
360 def model_dump(self, *args: Any, **kwargs: Any) -> Any:
361 """See `pydantic.BaseModel.model_dump`."""
362 return super().model_dump(*args, **kwargs)
364 def model_dump_json(self, *args: Any, **kwargs: Any) -> Any:
365 """See `pydantic.BaseModel.model_dump_json`."""
366 return super().model_dump(*args, **kwargs)
368 def model_copy(self, *args: Any, **kwargs: Any) -> Any:
369 """See `pydantic.BaseModel.model_copy`."""
370 return super().model_copy(*args, **kwargs)
372 @classmethod
373 def model_construct(cls, *args: Any, **kwargs: Any) -> Any: # type: ignore[misc, override]
374 """See `pydantic.BaseModel.model_construct`."""
375 return super().model_construct(*args, **kwargs)
377 @classmethod
378 def model_json_schema(cls, *args: Any, **kwargs: Any) -> Any:
379 """See `pydantic.BaseModel.model_json_schema`."""
380 return super().model_json_schema(*args, **kwargs)
382 @classmethod
383 def model_validate(cls, *args: Any, **kwargs: Any) -> Any:
384 """See `pydantic.BaseModel.model_validate`."""
385 return super().model_validate(*args, **kwargs)
387 @classmethod
388 def model_validate_json(cls, *args: Any, **kwargs: Any) -> Any:
389 """See `pydantic.BaseModel.model_validate_json`."""
390 return super().model_validate_json(*args, **kwargs)
392 @classmethod
393 def model_validate_strings(cls, *args: Any, **kwargs: Any) -> Any:
394 """See `pydantic.BaseModel.model_validate_strings`."""
395 return super().model_validate_strings(*args, **kwargs)