Coverage for python / lsst / ctrl / bps / bps_reports.py: 12%

182 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-26 09:03 +0000

1# This file is part of ctrl_bps. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <https://www.gnu.org/licenses/>. 

27 

28"""Classes and functions used in reporting run status.""" 

29 

30__all__ = [ 

31 "BaseRunReport", 

32 "DetailedRunReport", 

33 "ExitCodesReport", 

34 "SummaryRunReport", 

35 "compile_code_summary", 

36 "compile_job_summary", 

37] 

38 

39import abc 

40import logging 

41 

42from astropy.table import Table 

43 

44from .wms_service import WmsRunReport, WmsStates 

45 

46_LOG = logging.getLogger(__name__) 

47 

48 

49class BaseRunReport(abc.ABC): 

50 """The base class representing a run report. 

51 

52 Parameters 

53 ---------- 

54 fields : `list` [ `tuple` [ `str`, `str`]] 

55 The list of column specification, fields, to include in the report. 

56 Each field has a name and a type. 

57 """ 

58 

59 def __init__(self, fields): 

60 self._table = Table(dtype=fields) 

61 self._msg = None 

62 

63 def __eq__(self, other): 

64 if isinstance(other, BaseRunReport): 

65 return self._table.pformat() == other._table.pformat() 

66 return False 

67 

68 def __len__(self): 

69 """Return the number of runs in the report.""" 

70 return len(self._table) 

71 

72 def __str__(self): 

73 lines = list(self._table.pformat(max_lines=-1, max_width=-1)) 

74 return "\n".join(lines) 

75 

76 @property 

77 def message(self): 

78 """Extra information a method need to pass to its caller (`str`).""" 

79 return self._msg 

80 

81 def clear(self): 

82 """Remove all entries from the report.""" 

83 self._msg = None 

84 self._table.remove_rows(slice(len(self))) 

85 

86 def sort(self, columns, ascending=True): 

87 """Sort the report entries according to one or more keys. 

88 

89 Parameters 

90 ---------- 

91 columns : `str` | `list` [ `str` ] 

92 The column(s) to order the report by. 

93 ascending : `bool`, optional 

94 Sort report entries in ascending order, default. 

95 

96 Raises 

97 ------ 

98 AttributeError 

99 Raised if supplied with non-existent column(s). 

100 """ 

101 if isinstance(columns, str): 

102 columns = [columns] 

103 unknown_keys = set(columns) - set(self._table.colnames) 

104 if unknown_keys: 

105 raise AttributeError( 

106 f"cannot sort the report entries: column(s) {', '.join(unknown_keys)} not found" 

107 ) 

108 self._table.sort(keys=columns, reverse=not ascending) 

109 

110 @classmethod 

111 def from_table(cls, table): 

112 """Create a report from a table. 

113 

114 Parameters 

115 ---------- 

116 table : `astropy.table.Table` 

117 Information about a run in a tabular form. 

118 

119 Returns 

120 ------- 

121 inst : `lsst.ctrl.bps.bps_reports.BaseRunReport` 

122 A report created based on the information in the provided table. 

123 """ 

124 inst = cls(table.dtype.descr) 

125 inst._table = table.copy() 

126 return inst 

127 

128 @abc.abstractmethod 

129 def add(self, run_report, use_global_id=False): 

130 """Add a single run info to the report. 

131 

132 Parameters 

133 ---------- 

134 run_report : `lsst.ctrl.bps.WmsRunReport` 

135 Information for single run. 

136 use_global_id : `bool`, optional 

137 If set, use global run id. Defaults to False which means that 

138 the local id will be used instead. 

139 

140 Only applicable in the context of a WMS using distributed job 

141 queues (e.g., HTCondor). 

142 """ 

143 

144 

145class SummaryRunReport(BaseRunReport): 

146 """A summary run report.""" 

147 

148 def add(self, run_report, use_global_id=False): 

149 # Docstring inherited from the base class. 

150 

151 # Flag any running workflow that might need human attention. 

152 run_flag = " " 

153 if run_report.state == WmsStates.RUNNING: 

154 if run_report.job_state_counts.get(WmsStates.HELD, 0): 

155 run_flag = "H" 

156 elif run_report.job_state_counts.get(WmsStates.DELETED, 0): 

157 run_flag = "D" 

158 elif run_report.job_state_counts.get(WmsStates.FAILED, 0): 

159 run_flag = "F" 

160 

161 # Estimate success rate. 

162 percent_succeeded = "UNK" 

163 _LOG.debug("total_number_jobs = %s", run_report.total_number_jobs) 

164 _LOG.debug("run_report.job_state_counts = %s", run_report.job_state_counts) 

165 if run_report.total_number_jobs: 

166 succeeded = run_report.job_state_counts.get(WmsStates.SUCCEEDED, 0) 

167 _LOG.debug("succeeded = %s", succeeded) 

168 percent_succeeded = f"{int(succeeded / run_report.total_number_jobs * 100)}" 

169 

170 row = ( 

171 run_flag, 

172 run_report.state.name, 

173 percent_succeeded, 

174 run_report.global_wms_id if use_global_id else run_report.wms_id, 

175 run_report.operator, 

176 run_report.project, 

177 run_report.campaign, 

178 run_report.payload, 

179 run_report.run, 

180 ) 

181 self._table.add_row(row) 

182 

183 

184class DetailedRunReport(BaseRunReport): 

185 """A detailed run report.""" 

186 

187 def add(self, run_report, use_global_id=False): 

188 # Docstring inherited from the base class. 

189 

190 # If run summary exists, use it to get the reference job counts. 

191 by_label_expected = {} 

192 if run_report.run_summary: 

193 for part in run_report.run_summary.split(";"): 

194 label, count = part.split(":") 

195 by_label_expected[label] = int(count) 

196 

197 total = ["TOTAL"] 

198 total.extend([run_report.job_state_counts[state] for state in WmsStates]) 

199 total.append(sum(by_label_expected.values()) if by_label_expected else run_report.total_number_jobs) 

200 self._table.add_row(total) 

201 

202 job_summary = run_report.job_summary 

203 if job_summary is None: 

204 id_ = run_report.global_wms_id if use_global_id else run_report.wms_id 

205 self._msg = f"WARNING: Job summary for run '{id_}' not available, report may be incomplete." 

206 return 

207 

208 if by_label_expected: 

209 job_order = list(by_label_expected) 

210 else: 

211 job_order = sorted(job_summary) 

212 self._msg = "WARNING: Could not determine order of pipeline, instead sorted alphabetically." 

213 for label in job_order: 

214 try: 

215 counts = job_summary[label] 

216 except KeyError: 

217 counts = dict.fromkeys(WmsStates, -1) 

218 else: 

219 if label in by_label_expected: 

220 already_counted = sum(counts.values()) 

221 if already_counted != by_label_expected[label]: 

222 counts[WmsStates.UNREADY] += by_label_expected[label] - already_counted 

223 

224 run = [label] 

225 run.extend([counts[state] for state in WmsStates]) 

226 run.append(by_label_expected[label] if by_label_expected else -1) 

227 self._table.add_row(run) 

228 

229 def __str__(self): 

230 alignments = ["<"] + [">"] * (len(self._table.colnames) - 1) 

231 lines = list(self._table.pformat(max_lines=-1, max_width=-1, align=alignments)) 

232 lines.insert(3, lines[1]) 

233 return str("\n".join(lines)) 

234 

235 

236class ExitCodesReport(BaseRunReport): 

237 """An extension of run report to give information about 

238 error handling from the wms service. 

239 """ 

240 

241 def add(self, run_report: WmsRunReport, use_global_id: bool = False) -> None: 

242 # Docstring inherited from the base class. 

243 

244 exit_code_summary = run_report.exit_code_summary 

245 if not exit_code_summary: 

246 id_ = run_report.global_wms_id if use_global_id else run_report.wms_id 

247 self._msg = f"WARNING: Exit code summary for run '{id_}' not available, report may be incomplete." 

248 return 

249 

250 warnings = [] 

251 

252 # If available, use label ordering from the run summary as it should 

253 # reflect the ordering of the pipetasks in the pipeline. 

254 labels = [] 

255 if run_report.run_summary: 

256 for part in run_report.run_summary.split(";"): 

257 label, _ = part.split(":") 

258 labels.append(label) 

259 if not labels: 

260 labels = sorted(exit_code_summary) 

261 warnings.append("WARNING: Could not determine order of pipeline, instead sorted alphabetically.") 

262 

263 # Payload (e.g. pipetask) error codes: 

264 # * 1: general failure, 

265 # * 2: command line error (e.g. unknown command and/or option). 

266 pyld_error_codes = {1, 2} 

267 

268 missing_labels = set() 

269 for label in labels: 

270 try: 

271 exit_codes = exit_code_summary[label] 

272 except KeyError: 

273 missing_labels.add(label) 

274 else: 

275 pyld_errors = [code for code in exit_codes if code in pyld_error_codes] 

276 pyld_error_count = len(pyld_errors) 

277 pyld_error_summary = ( 

278 ", ".join(sorted(str(code) for code in set(pyld_errors))) if pyld_errors else "None" 

279 ) 

280 

281 infra_errors = [code for code in exit_codes if code not in pyld_error_codes] 

282 infra_error_count = len(infra_errors) 

283 infra_error_summary = ( 

284 ", ".join(sorted(str(code) for code in set(infra_errors))) if infra_errors else "None" 

285 ) 

286 

287 run = [label, pyld_error_count, pyld_error_summary, infra_error_count, infra_error_summary] 

288 self._table.add_row(run) 

289 if missing_labels: 

290 warnings.append( 

291 f"WARNING: Exit code summary was not available for job labels: {', '.join(missing_labels)}" 

292 ) 

293 if warnings: 

294 self._msg = "\n".join(warnings) 

295 

296 def __str__(self): 

297 alignments = ["<"] + [">"] * (len(self._table.colnames) - 1) 

298 lines = list(self._table.pformat(max_lines=-1, max_width=-1, align=alignments)) 

299 return str("\n".join(lines)) 

300 

301 

302def compile_job_summary(report: WmsRunReport) -> list[str]: 

303 """Add a job summary to the run report if necessary. 

304 

305 If the job summary is not provided, the function will attempt to compile 

306 it from information available for individual jobs (if any) and add it to 

307 the report. If the report already includes a job summary, the function is 

308 effectively a no-op. 

309 

310 Parameters 

311 ---------- 

312 report : `lsst.ctrl.bps.WmsRunReport` 

313 Information about a single run. 

314 

315 Returns 

316 ------- 

317 warnings : `list` [`str`] 

318 List of messages describing any non-critical issues encountered during 

319 processing. Empty if none. 

320 """ 

321 warnings: list[str] = [] 

322 

323 # If the job summary already exists, exit early. 

324 if report.job_summary: 

325 return warnings 

326 

327 if report.jobs: 

328 job_summary = {} 

329 by_label = group_jobs_by_label(report.jobs) 

330 for label, job_group in by_label.items(): 

331 by_label_state = group_jobs_by_state(job_group) 

332 _LOG.debug("by_label_state = %s", by_label_state) 

333 counts = {state: len(jobs) for state, jobs in by_label_state.items()} 

334 job_summary[label] = counts 

335 report.job_summary = job_summary 

336 else: 

337 warnings.append("information about individual jobs not available") 

338 

339 return warnings 

340 

341 

342def compile_code_summary(report: WmsRunReport) -> list[str]: 

343 """Add missing entries to the exit code summary if necessary. 

344 

345 A WMS plugin may exclude job labels for which there are no failures from 

346 the exit code summary. The function will attempt to use the job summary, 

347 if available, to add missing entries for these labels. 

348 

349 Parameters 

350 ---------- 

351 report : `lsst.ctrl.bps.WmsRunReport` 

352 Information about a single run. 

353 

354 Returns 

355 ------- 

356 warnings : `list` [`str`] 

357 List of messages describing any non-critical issues encountered during 

358 processing. Empty if none. 

359 """ 

360 warnings: list[str] = [] 

361 

362 # If the job summary is not available, exit early. 

363 if not report.job_summary: 

364 return warnings 

365 

366 # A shallow copy is enough here because we won't be modifying the existing 

367 # entries, only adding new ones if necessary. 

368 exit_code_summary = dict(report.exit_code_summary) if report.exit_code_summary else {} 

369 

370 # Use the job summary to add the entries for labels with no failures 

371 # *without* modifying already existing entries. 

372 failure_summary = {label: states[WmsStates.FAILED] for label, states in report.job_summary.items()} 

373 for label, count in failure_summary.items(): 

374 if count == 0: 

375 exit_code_summary.setdefault(label, []) 

376 

377 # Check if there are any discrepancies between the data in the exit code 

378 # summary and the job summary. 

379 code_summary_labels = set(exit_code_summary) 

380 failure_summary_labels = set(failure_summary) 

381 mismatches = { 

382 label 

383 for label in failure_summary_labels & code_summary_labels 

384 if len(exit_code_summary[label]) != failure_summary[label] 

385 } 

386 if mismatches: 

387 warnings.append( 

388 f"number of exit codes differs from number of failures for job labels: {', '.join(mismatches)}" 

389 ) 

390 missing = failure_summary_labels - code_summary_labels 

391 if missing: 

392 warnings.append(f"exit codes not available for job labels: {', '.join(missing)}") 

393 

394 if exit_code_summary: 

395 report.exit_code_summary = exit_code_summary 

396 

397 return warnings 

398 

399 

400def group_jobs_by_state(jobs): 

401 """Divide given jobs into groups based on their state value. 

402 

403 Parameters 

404 ---------- 

405 jobs : `list` [`lsst.ctrl.bps.WmsJobReport`] 

406 Jobs to divide into groups based on state. 

407 

408 Returns 

409 ------- 

410 by_state : `dict` 

411 Mapping of job state to a list of jobs. 

412 """ 

413 _LOG.debug("group_jobs_by_state: jobs=%s", jobs) 

414 by_state = {state: [] for state in WmsStates} 

415 for job in jobs: 

416 by_state[job.state].append(job) 

417 return by_state 

418 

419 

420def group_jobs_by_label(jobs): 

421 """Divide given jobs into groups based on their label value. 

422 

423 Parameters 

424 ---------- 

425 jobs : `list` [`lsst.ctrl.bps.WmsJobReport`] 

426 Jobs to divide into groups based on label. 

427 

428 Returns 

429 ------- 

430 by_label : `dict` [`str`, `list` [`lsst.ctrl.bps.WmsJobReport`]] 

431 Mapping of job state to a list of jobs. 

432 """ 

433 by_label = {} 

434 for job in jobs: 

435 group = by_label.setdefault(job.label, []) 

436 group.append(job) 

437 return by_label