Coverage for python / lsst / pipe / base / quantum_reports.py: 79%

69 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-17 08:59 +0000

1# This file is part of pipe_base. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28from __future__ import annotations 

29 

30__all__ = ["ExceptionInfo", "ExecutionStatus", "QuantumReport", "Report"] 

31 

32import enum 

33import sys 

34import uuid 

35from typing import Any 

36 

37import pydantic 

38 

39from lsst.daf.butler import DataCoordinate, DataId, DataIdValue 

40from lsst.utils.introspection import get_full_type_name 

41 

42from .graph import QgraphSummary 

43 

44 

45def _serializeDataId(dataId: DataId) -> dict[str, DataIdValue]: 

46 if isinstance(dataId, DataCoordinate): 

47 return dict(dataId.required) 

48 else: 

49 return dataId # type: ignore 

50 

51 

52class ExecutionStatus(enum.Enum): 

53 """Possible values for job execution status. 

54 

55 Status `FAILURE` is set if one or more tasks failed. Status `TIMEOUT` is 

56 set if there are no failures but one or more tasks timed out. Timeouts can 

57 only be detected in multi-process mode, child task is killed on timeout 

58 and usually should have non-zero exit code. 

59 """ 

60 

61 SUCCESS = "success" 

62 FAILURE = "failure" 

63 TIMEOUT = "timeout" 

64 SKIPPED = "skipped" 

65 

66 

67class ExceptionInfo(pydantic.BaseModel): 

68 """Information about exception.""" 

69 

70 className: str 

71 """Name of the exception class if exception was raised.""" 

72 

73 message: str 

74 """Exception message for in-process quantum execution, None if 

75 quantum was executed in sub-process. 

76 """ 

77 

78 @classmethod 

79 def from_exception(cls, exception: Exception) -> ExceptionInfo: 

80 """Construct instance from an exception. 

81 

82 Parameters 

83 ---------- 

84 exception : `Exception` 

85 Exception to wrap. 

86 

87 Returns 

88 ------- 

89 info : `ExceptionInfo` 

90 Information about the exception. 

91 """ 

92 return cls(className=get_full_type_name(exception), message=str(exception)) 

93 

94 # Work around the fact that Sphinx chokes on Pydantic docstring formatting, 

95 # when we inherit those docstrings in our public classes. 

96 if "sphinx" in sys.modules: 

97 

98 def copy(self, *args: Any, **kwargs: Any) -> Any: 

99 """See `pydantic.BaseModel.copy`.""" 

100 return super().copy(*args, **kwargs) 

101 

102 def model_dump(self, *args: Any, **kwargs: Any) -> Any: 

103 """See `pydantic.BaseModel.model_dump`.""" 

104 return super().model_dump(*args, **kwargs) 

105 

106 def model_dump_json(self, *args: Any, **kwargs: Any) -> Any: 

107 """See `pydantic.BaseModel.model_dump_json`.""" 

108 return super().model_dump(*args, **kwargs) 

109 

110 def model_copy(self, *args: Any, **kwargs: Any) -> Any: 

111 """See `pydantic.BaseModel.model_copy`.""" 

112 return super().model_copy(*args, **kwargs) 

113 

114 @classmethod 

115 def model_construct(cls, *args: Any, **kwargs: Any) -> Any: # type: ignore[misc, override] 

116 """See `pydantic.BaseModel.model_construct`.""" 

117 return super().model_construct(*args, **kwargs) 

118 

119 @classmethod 

120 def model_json_schema(cls, *args: Any, **kwargs: Any) -> Any: 

121 """See `pydantic.BaseModel.model_json_schema`.""" 

122 return super().model_json_schema(*args, **kwargs) 

123 

124 @classmethod 

125 def model_validate(cls, *args: Any, **kwargs: Any) -> Any: 

126 """See `pydantic.BaseModel.model_validate`.""" 

127 return super().model_validate(*args, **kwargs) 

128 

129 @classmethod 

130 def model_validate_json(cls, *args: Any, **kwargs: Any) -> Any: 

131 """See `pydantic.BaseModel.model_validate_json`.""" 

132 return super().model_validate_json(*args, **kwargs) 

133 

134 @classmethod 

135 def model_validate_strings(cls, *args: Any, **kwargs: Any) -> Any: 

136 """See `pydantic.BaseModel.model_validate_strings`.""" 

137 return super().model_validate_strings(*args, **kwargs) 

138 

139 

140class QuantumReport(pydantic.BaseModel): 

141 """Task execution report for a single Quantum. 

142 

143 Parameters 

144 ---------- 

145 dataId : `~lsst.daf.butler.DataId` 

146 Quantum data ID. 

147 taskLabel : `str` 

148 Label for task executing this Quantum. 

149 status : `ExecutionStatus` 

150 Status of this quantum execution. 

151 exitCode : `int` or `None`, optional 

152 Exit code for sub-process executing this Quantum. `None` for 

153 in-process execution. Negative if process was killed by a signal. 

154 exceptionInfo : `ExceptionInfo` or `None`, optional 

155 Exception information if an exception was raised. 

156 quantumId : `uuid.UUID`, optional 

157 Unique identifier for the quantum. 

158 """ 

159 

160 status: ExecutionStatus = ExecutionStatus.SUCCESS 

161 """Execution status, one of the values in `ExecutionStatus` enum.""" 

162 

163 dataId: dict[str, DataIdValue] 

164 """Quantum DataId.""" 

165 

166 taskLabel: str | None 

167 """Label for a task executing this Quantum.""" 

168 

169 exitCode: int | None = None 

170 """Exit code for a sub-process executing Quantum, None for in-process 

171 Quantum execution. Negative if process was killed by a signal. 

172 """ 

173 

174 exceptionInfo: ExceptionInfo | None = None 

175 """Exception information if exception was raised.""" 

176 

177 quantumId: uuid.UUID | None = None 

178 """Unique identifier for the quantum.""" 

179 

180 def __init__( 

181 self, 

182 dataId: DataId, 

183 taskLabel: str, 

184 status: ExecutionStatus = ExecutionStatus.SUCCESS, 

185 exitCode: int | None = None, 

186 exceptionInfo: ExceptionInfo | None = None, 

187 quantumId: uuid.UUID | None = None, 

188 ): 

189 super().__init__( 

190 quantumId=quantumId, 

191 status=status, 

192 dataId=_serializeDataId(dataId), 

193 taskLabel=taskLabel, 

194 exitCode=exitCode, 

195 exceptionInfo=exceptionInfo, 

196 ) 

197 

198 @classmethod 

199 def from_exception( 

200 cls, 

201 exception: Exception, 

202 dataId: DataId, 

203 taskLabel: str, 

204 *, 

205 exitCode: int | None = None, 

206 quantumId: uuid.UUID | None = None, 

207 ) -> QuantumReport: 

208 """Construct report instance from an exception and other pieces of 

209 data. 

210 

211 Parameters 

212 ---------- 

213 exception : `Exception` 

214 Exception caught from processing quantum. 

215 dataId : `~lsst.daf.butler.DataId` 

216 Data ID of quantum. 

217 taskLabel : `str` 

218 Label of task. 

219 exitCode : `int`, optional 

220 Exit code for the process, used when it is known that the process 

221 will exit with that exit code. 

222 quantumId : `uuid.UUID`, optional 

223 Unique identifier for the quantum. 

224 """ 

225 return cls( 

226 status=ExecutionStatus.FAILURE, 

227 dataId=dataId, 

228 taskLabel=taskLabel, 

229 exitCode=exitCode, 

230 exceptionInfo=ExceptionInfo.from_exception(exception), 

231 quantumId=quantumId, 

232 ) 

233 

234 @classmethod 

235 def from_exit_code( 

236 cls, 

237 exitCode: int, 

238 dataId: DataId, 

239 taskLabel: str, 

240 quantumId: uuid.UUID | None = None, 

241 ) -> QuantumReport: 

242 """Construct report instance from an exit code and other pieces of 

243 data. 

244 

245 Parameters 

246 ---------- 

247 exitCode : `int` 

248 The exit code of the subprocess. 

249 dataId : `~lsst.daf.butler.DataId` 

250 The quantum Data ID. 

251 taskLabel : `str` 

252 The task label. 

253 quantumId : `uuid.UUID`, optional 

254 Unique identifier for the quantum. 

255 """ 

256 return cls( 

257 status=ExecutionStatus.SUCCESS if exitCode == 0 else ExecutionStatus.FAILURE, 

258 dataId=dataId, 

259 taskLabel=taskLabel, 

260 exitCode=exitCode, 

261 quantumId=quantumId, 

262 ) 

263 

264 # Work around the fact that Sphinx chokes on Pydantic docstring formatting, 

265 # when we inherit those docstrings in our public classes. 

266 if "sphinx" in sys.modules: 

267 

268 def copy(self, *args: Any, **kwargs: Any) -> Any: 

269 """See `pydantic.BaseModel.copy`.""" 

270 return super().copy(*args, **kwargs) 

271 

272 def model_dump(self, *args: Any, **kwargs: Any) -> Any: 

273 """See `pydantic.BaseModel.model_dump`.""" 

274 return super().model_dump(*args, **kwargs) 

275 

276 def model_dump_json(self, *args: Any, **kwargs: Any) -> Any: 

277 """See `pydantic.BaseModel.model_dump_json`.""" 

278 return super().model_dump(*args, **kwargs) 

279 

280 def model_copy(self, *args: Any, **kwargs: Any) -> Any: 

281 """See `pydantic.BaseModel.model_copy`.""" 

282 return super().model_copy(*args, **kwargs) 

283 

284 @classmethod 

285 def model_construct(cls, *args: Any, **kwargs: Any) -> Any: # type: ignore[misc, override] 

286 """See `pydantic.BaseModel.model_construct`.""" 

287 return super().model_construct(*args, **kwargs) 

288 

289 @classmethod 

290 def model_json_schema(cls, *args: Any, **kwargs: Any) -> Any: 

291 """See `pydantic.BaseModel.model_json_schema`.""" 

292 return super().model_json_schema(*args, **kwargs) 

293 

294 @classmethod 

295 def model_validate(cls, *args: Any, **kwargs: Any) -> Any: 

296 """See `pydantic.BaseModel.model_validate`.""" 

297 return super().model_validate(*args, **kwargs) 

298 

299 @classmethod 

300 def model_validate_json(cls, *args: Any, **kwargs: Any) -> Any: 

301 """See `pydantic.BaseModel.model_validate_json`.""" 

302 return super().model_validate_json(*args, **kwargs) 

303 

304 @classmethod 

305 def model_validate_strings(cls, *args: Any, **kwargs: Any) -> Any: 

306 """See `pydantic.BaseModel.model_validate_strings`.""" 

307 return super().model_validate_strings(*args, **kwargs) 

308 

309 

310class Report(pydantic.BaseModel): 

311 """Execution report for the whole job with one or few quanta.""" 

312 

313 qgraphSummary: QgraphSummary 

314 """Summary report about QuantumGraph.""" 

315 

316 status: ExecutionStatus = ExecutionStatus.SUCCESS 

317 """Job status.""" 

318 

319 cmdLine: list[str] | None = None 

320 """Command line for the whole job.""" 

321 

322 exitCode: int | None = None 

323 """Job exit code, this obviously cannot be set in pipetask.""" 

324 

325 exceptionInfo: ExceptionInfo | None = None 

326 """Exception information if exception was raised.""" 

327 

328 quantaReports: list[QuantumReport] = [] 

329 """List of per-quantum reports, ordering is not specified. Some or all 

330 quanta may not produce a report. 

331 """ 

332 

333 # Always want to validate the default value for cmdLine so 

334 # use a model_validator. 

335 @pydantic.model_validator(mode="before") 

336 @classmethod 

337 def _set_cmdLine(cls, data: Any) -> Any: 

338 if data.get("cmdLine") is None: 

339 data["cmdLine"] = sys.argv 

340 return data 

341 

342 def set_exception(self, exception: Exception) -> None: 

343 """Update exception information from an exception object. 

344 

345 Parameters 

346 ---------- 

347 exception : `Exception` 

348 Exception to use to extract information from. 

349 """ 

350 self.exceptionInfo = ExceptionInfo.from_exception(exception) 

351 

352 # Work around the fact that Sphinx chokes on Pydantic docstring formatting, 

353 # when we inherit those docstrings in our public classes. 

354 if "sphinx" in sys.modules: 

355 

356 def copy(self, *args: Any, **kwargs: Any) -> Any: 

357 """See `pydantic.BaseModel.copy`.""" 

358 return super().copy(*args, **kwargs) 

359 

360 def model_dump(self, *args: Any, **kwargs: Any) -> Any: 

361 """See `pydantic.BaseModel.model_dump`.""" 

362 return super().model_dump(*args, **kwargs) 

363 

364 def model_dump_json(self, *args: Any, **kwargs: Any) -> Any: 

365 """See `pydantic.BaseModel.model_dump_json`.""" 

366 return super().model_dump(*args, **kwargs) 

367 

368 def model_copy(self, *args: Any, **kwargs: Any) -> Any: 

369 """See `pydantic.BaseModel.model_copy`.""" 

370 return super().model_copy(*args, **kwargs) 

371 

372 @classmethod 

373 def model_construct(cls, *args: Any, **kwargs: Any) -> Any: # type: ignore[misc, override] 

374 """See `pydantic.BaseModel.model_construct`.""" 

375 return super().model_construct(*args, **kwargs) 

376 

377 @classmethod 

378 def model_json_schema(cls, *args: Any, **kwargs: Any) -> Any: 

379 """See `pydantic.BaseModel.model_json_schema`.""" 

380 return super().model_json_schema(*args, **kwargs) 

381 

382 @classmethod 

383 def model_validate(cls, *args: Any, **kwargs: Any) -> Any: 

384 """See `pydantic.BaseModel.model_validate`.""" 

385 return super().model_validate(*args, **kwargs) 

386 

387 @classmethod 

388 def model_validate_json(cls, *args: Any, **kwargs: Any) -> Any: 

389 """See `pydantic.BaseModel.model_validate_json`.""" 

390 return super().model_validate_json(*args, **kwargs) 

391 

392 @classmethod 

393 def model_validate_strings(cls, *args: Any, **kwargs: Any) -> Any: 

394 """See `pydantic.BaseModel.model_validate_strings`.""" 

395 return super().model_validate_strings(*args, **kwargs)