Coverage for python / lsst / ctrl / bps / htcondor / report_utils.py: 7%

291 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-18 08:49 +0000

1# This file is part of ctrl_bps_htcondor. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <https://www.gnu.org/licenses/>. 

27 

28"""Utility functions used for reporting.""" 

29 

30import logging 

31import os 

32import re 

33from pathlib import Path 

34from typing import Any 

35 

36import htcondor 

37 

38from lsst.ctrl.bps import ( 

39 WmsJobReport, 

40 WmsRunReport, 

41 WmsSpecificInfo, 

42 WmsStates, 

43) 

44 

45from .common_utils import _htc_status_to_wms_state 

46from .lssthtc import ( 

47 MISSING_ID, 

48 WmsNodeType, 

49 condor_search, 

50 htc_check_dagman_output, 

51 htc_tweak_log_info, 

52 pegasus_name_to_label, 

53 read_dag_info, 

54 read_dag_log, 

55 read_dag_status, 

56 read_node_status, 

57 summarize_dag, 

58) 

59 

60_LOG = logging.getLogger(__name__) 

61 

62 

63def _get_status_from_id( 

64 wms_workflow_id: str, hist: float, schedds: dict[str, htcondor.Schedd] 

65) -> tuple[WmsStates, str]: 

66 """Gather run information using workflow id. 

67 

68 Parameters 

69 ---------- 

70 wms_workflow_id : `str` 

71 Limit to specific run based on id. 

72 hist : `float` 

73 Limit history search to this many days. 

74 schedds : `dict` [ `str`, `htcondor.Schedd` ] 

75 HTCondor schedulers which to query for job information. If empty 

76 dictionary, all queries will be run against the local scheduler only. 

77 

78 Returns 

79 ------- 

80 state : `lsst.ctrl.bps.WmsStates` 

81 Status for the corresponding run. 

82 message : `str` 

83 Message with extra error information. 

84 """ 

85 _LOG.debug("_get_status_from_id: id=%s, hist=%s, schedds=%s", wms_workflow_id, hist, schedds) 

86 

87 message = "" 

88 

89 # Collect information about the job by querying HTCondor schedd and 

90 # HTCondor history. 

91 schedd_dag_info = _get_info_from_schedd(wms_workflow_id, hist, schedds) 

92 if len(schedd_dag_info) == 1: 

93 schedd_name = next(iter(schedd_dag_info)) 

94 dag_id = next(iter(schedd_dag_info[schedd_name])) 

95 dag_ad = schedd_dag_info[schedd_name][dag_id] 

96 state = _htc_status_to_wms_state(dag_ad) 

97 else: 

98 state = WmsStates.UNKNOWN 

99 message = f"DAGMan job {wms_workflow_id} not found in queue or history. Check id or try path." 

100 return state, message 

101 

102 

103def _get_status_from_path(wms_path: str | os.PathLike) -> tuple[WmsStates, str]: 

104 """Gather run status from a given run directory. 

105 

106 Parameters 

107 ---------- 

108 wms_path : `str` | `os.PathLike` 

109 The directory containing the submit side files (e.g., HTCondor files). 

110 

111 Returns 

112 ------- 

113 state : `lsst.ctrl.bps.WmsStates` 

114 Status for the run. 

115 message : `str` 

116 Message to be printed. 

117 """ 

118 wms_path = Path(wms_path).resolve() 

119 message = "" 

120 try: 

121 wms_workflow_id, dag_ad = read_dag_log(wms_path) 

122 except FileNotFoundError: 

123 wms_workflow_id = MISSING_ID 

124 message = f"DAGMan log not found in {wms_path}. Check path." 

125 

126 if wms_workflow_id == MISSING_ID: 

127 state = WmsStates.UNKNOWN 

128 else: 

129 htc_tweak_log_info(wms_path, dag_ad[wms_workflow_id]) 

130 state = _htc_status_to_wms_state(dag_ad[wms_workflow_id]) 

131 

132 return state, message 

133 

134 

135def _report_from_path(wms_path): 

136 """Gather run information from a given run directory. 

137 

138 Parameters 

139 ---------- 

140 wms_path : `str` 

141 The directory containing the submit side files (e.g., HTCondor files). 

142 

143 Returns 

144 ------- 

145 run_reports : `dict` [`str`, `lsst.ctrl.bps.WmsRunReport`] 

146 Run information for the detailed report. The key is the HTCondor id 

147 and the value is a collection of report information for that run. 

148 message : `str` 

149 Message to be printed with the summary report. 

150 """ 

151 wms_workflow_id, jobs, message = _get_info_from_path(wms_path) 

152 if wms_workflow_id == MISSING_ID: 

153 run_reports = {} 

154 else: 

155 run_reports = _create_detailed_report_from_jobs(wms_workflow_id, jobs) 

156 return run_reports, message 

157 

158 

159def _report_from_id(wms_workflow_id, hist, schedds=None): 

160 """Gather run information using workflow id. 

161 

162 Parameters 

163 ---------- 

164 wms_workflow_id : `str` 

165 Limit to specific run based on id. 

166 hist : `float` 

167 Limit history search to this many days. 

168 schedds : `dict` [ `str`, `htcondor.Schedd` ], optional 

169 HTCondor schedulers which to query for job information. If None 

170 (default), all queries will be run against the local scheduler only. 

171 

172 Returns 

173 ------- 

174 run_reports : `dict` [`str`, `lsst.ctrl.bps.WmsRunReport`] 

175 Run information for the detailed report. The key is the HTCondor id 

176 and the value is a collection of report information for that run. 

177 message : `str` 

178 Message to be printed with the summary report. 

179 """ 

180 messages = [] 

181 

182 # Collect information about the job by querying HTCondor schedd and 

183 # HTCondor history. 

184 schedd_dag_info = _get_info_from_schedd(wms_workflow_id, hist, schedds) 

185 if len(schedd_dag_info) == 1: 

186 # Extract the DAG info without altering the results of the query. 

187 schedd_name = next(iter(schedd_dag_info)) 

188 dag_id = next(iter(schedd_dag_info[schedd_name])) 

189 dag_ad = schedd_dag_info[schedd_name][dag_id] 

190 

191 # If the provided workflow id does not correspond to the one extracted 

192 # from the DAGMan log file in the submit directory, rerun the query 

193 # with the id found in the file. 

194 # 

195 # This is to cover the situation in which the user provided the old job 

196 # id of a restarted run. 

197 try: 

198 path_dag_id, _ = read_dag_log(dag_ad["Iwd"]) 

199 except FileNotFoundError as exc: 

200 # At the moment missing DAGMan log is pretty much a fatal error. 

201 # So empty the DAG info to finish early (see the if statement 

202 # below). 

203 schedd_dag_info.clear() 

204 messages.append(f"Cannot create the report for '{dag_id}': {exc}") 

205 else: 

206 if path_dag_id != dag_id: 

207 schedd_dag_info = _get_info_from_schedd(path_dag_id, hist, schedds) 

208 messages.append( 

209 f"WARNING: Found newer workflow executions in same submit directory as id '{dag_id}'. " 

210 "This normally occurs when a run is restarted. The report shown is for the most " 

211 f"recent status with run id '{path_dag_id}'" 

212 ) 

213 

214 if len(schedd_dag_info) == 0: 

215 run_reports = {} 

216 elif len(schedd_dag_info) == 1: 

217 _, dag_info = schedd_dag_info.popitem() 

218 dag_id, dag_ad = dag_info.popitem() 

219 

220 # Create a mapping between jobs and their classads. The keys will 

221 # be of format 'ClusterId.ProcId'. 

222 job_info = {dag_id: dag_ad} 

223 

224 # Find jobs (nodes) belonging to that DAGMan job. 

225 job_constraint = f"DAGManJobId == {int(float(dag_id))}" 

226 schedd_job_info = condor_search(constraint=job_constraint, hist=hist, schedds=schedds) 

227 if schedd_job_info: 

228 _, node_info = schedd_job_info.popitem() 

229 job_info.update(node_info) 

230 

231 # Collect additional pieces of information about jobs using HTCondor 

232 # files in the submission directory. 

233 _, path_jobs, message = _get_info_from_path(dag_ad["Iwd"]) 

234 _update_jobs(job_info, path_jobs) 

235 if message: 

236 messages.append(message) 

237 run_reports = _create_detailed_report_from_jobs(dag_id, job_info) 

238 else: 

239 ids = [ad["GlobalJobId"] for dag_info in schedd_dag_info.values() for ad in dag_info.values()] 

240 message = ( 

241 f"More than one job matches id '{wms_workflow_id}', " 

242 f"their global ids are: {', '.join(ids)}. Rerun with one of the global ids" 

243 ) 

244 messages.append(message) 

245 run_reports = {} 

246 

247 message = "\n".join(messages) 

248 return run_reports, message 

249 

250 

251def _get_info_from_schedd( 

252 wms_workflow_id: str, hist: float, schedds: dict[str, htcondor.Schedd] 

253) -> dict[str, dict[str, dict[str, Any]]]: 

254 """Gather run information from HTCondor. 

255 

256 Parameters 

257 ---------- 

258 wms_workflow_id : `str` 

259 Limit to specific run based on id. 

260 hist : `float` 

261 Limit history search to this many days. 

262 schedds : `dict` [ `str`, `htcondor.Schedd` ] 

263 HTCondor schedulers which to query for job information. If empty 

264 dictionary, all queries will be run against the local scheduler only. 

265 

266 Returns 

267 ------- 

268 schedd_dag_info : `dict` [`str`, `dict` [`str`, `dict` [`str` Any]]] 

269 Information about jobs satisfying the search criteria where for each 

270 Scheduler, local HTCondor job ids are mapped to their respective 

271 classads. 

272 """ 

273 _LOG.debug("_get_info_from_schedd: id=%s, hist=%s, schedds=%s", wms_workflow_id, hist, schedds) 

274 

275 dag_constraint = 'regexp("dagman$", Cmd)' 

276 try: 

277 cluster_id = int(float(wms_workflow_id)) 

278 except ValueError: 

279 dag_constraint += f' && GlobalJobId == "{wms_workflow_id}"' 

280 else: 

281 dag_constraint += f" && ClusterId == {cluster_id}" 

282 

283 # With the current implementation of the condor_* functions the query 

284 # will always return only one match per Scheduler. 

285 # 

286 # Even in the highly unlikely situation where HTCondor history (which 

287 # condor_search queries too) is long enough to have jobs from before 

288 # the cluster ids were rolled over (and as a result there is more then 

289 # one job with the same cluster id) they will not show up in 

290 # the results. 

291 schedd_dag_info = condor_search(constraint=dag_constraint, hist=hist, schedds=schedds) 

292 return schedd_dag_info 

293 

294 

295def _get_info_from_path(wms_path: str | os.PathLike) -> tuple[str, dict[str, dict[str, Any]], str]: 

296 """Gather run information from a given run directory. 

297 

298 Parameters 

299 ---------- 

300 wms_path : `str` or `os.PathLike` 

301 Directory containing HTCondor files. 

302 

303 Returns 

304 ------- 

305 wms_workflow_id : `str` 

306 The run id which is a DAGman job id. 

307 jobs : `dict` [`str`, `dict` [`str`, `~typing.Any`]] 

308 Information about jobs read from files in the given directory. 

309 The key is the HTCondor id and the value is a dictionary of HTCondor 

310 keys and values. 

311 message : `str` 

312 Message to be printed with the summary report. 

313 """ 

314 # Ensure path is absolute, in particular for folks helping 

315 # debug failures that need to dig around submit files. 

316 wms_path = Path(wms_path).resolve() 

317 

318 messages = [] 

319 try: 

320 wms_workflow_id, jobs = read_dag_log(wms_path) 

321 _LOG.debug("_get_info_from_path: from dag log %s = %s", wms_workflow_id, jobs) 

322 _update_jobs(jobs, read_node_status(wms_path)) 

323 _LOG.debug("_get_info_from_path: after node status %s = %s", wms_workflow_id, jobs) 

324 

325 # Add more info for DAGman job 

326 job = jobs[wms_workflow_id] 

327 job.update(read_dag_status(wms_path)) 

328 

329 job["total_jobs"], job["state_counts"] = _get_state_counts_from_jobs(wms_workflow_id, jobs) 

330 if "bps_run" not in job: 

331 _add_run_info(wms_path, job) 

332 

333 message = htc_check_dagman_output(wms_path) 

334 if message: 

335 messages.append(message) 

336 _LOG.debug( 

337 "_get_info: id = %s, total_jobs = %s", wms_workflow_id, jobs[wms_workflow_id]["total_jobs"] 

338 ) 

339 

340 # Add extra pieces of information which cannot be found in HTCondor 

341 # generated files like 'GlobalJobId'. 

342 # 

343 # Do not treat absence of this file as a serious error. Neither runs 

344 # submitted with earlier versions of the plugin nor the runs submitted 

345 # with Pegasus plugin will have it at the moment. However, once enough 

346 # time passes and Pegasus plugin will have its own report() method 

347 # (instead of sneakily using HTCondor's one), the lack of that file 

348 # should be treated as seriously as lack of any other file. 

349 try: 

350 job_info = read_dag_info(wms_path) 

351 except FileNotFoundError as exc: 

352 message = f"Warn: Some information may not be available: {exc}" 

353 messages.append(message) 

354 else: 

355 schedd_name = next(iter(job_info)) 

356 job_ad = next(iter(job_info[schedd_name].values())) 

357 job.update(job_ad) 

358 except FileNotFoundError as err: 

359 message = f"Could not find HTCondor files in '{wms_path}' ({err})" 

360 _LOG.debug(message) 

361 messages.append(message) 

362 message = htc_check_dagman_output(wms_path) 

363 if message: 

364 messages.append(message) 

365 wms_workflow_id = MISSING_ID 

366 jobs = {} 

367 

368 # Add more condor_q-like info. 

369 for job in jobs.values(): 

370 htc_tweak_log_info(wms_path, job) 

371 

372 message = "\n".join([msg for msg in messages if msg]) 

373 _LOG.debug("wms_workflow_id = %s, jobs = %s", wms_workflow_id, jobs.keys()) 

374 _LOG.debug("message = %s", message) 

375 return wms_workflow_id, jobs, message 

376 

377 

378def _create_detailed_report_from_jobs( 

379 wms_workflow_id: str, jobs: dict[str, dict[str, Any]] 

380) -> dict[str, WmsRunReport]: 

381 """Gather run information to be used in generating summary reports. 

382 

383 Parameters 

384 ---------- 

385 wms_workflow_id : `str` 

386 The run id to create the report for. 

387 jobs : `dict` [`str`, `dict` [`str`, Any]] 

388 Mapping HTCondor job id to job information. 

389 

390 Returns 

391 ------- 

392 run_reports : `dict` [`str`, `lsst.ctrl.bps.WmsRunReport`] 

393 Run information for the detailed report. The key is the given HTCondor 

394 id and the value is a collection of report information for that run. 

395 """ 

396 _LOG.debug("_create_detailed_report: id = %s, job = %s", wms_workflow_id, jobs[wms_workflow_id]) 

397 

398 dag_ad = jobs[wms_workflow_id] 

399 

400 report = WmsRunReport( 

401 wms_id=f"{dag_ad['ClusterId']}.{dag_ad['ProcId']}", 

402 global_wms_id=dag_ad.get("GlobalJobId", "MISS"), 

403 path=dag_ad["Iwd"], 

404 label=dag_ad.get("bps_job_label", "MISS"), 

405 run=dag_ad.get("bps_run", "MISS"), 

406 project=dag_ad.get("bps_project", "MISS"), 

407 campaign=dag_ad.get("bps_campaign", "MISS"), 

408 payload=dag_ad.get("bps_payload", "MISS"), 

409 operator=_get_owner(dag_ad), 

410 run_summary=_get_run_summary(dag_ad), 

411 state=_htc_status_to_wms_state(dag_ad), 

412 total_number_jobs=0, 

413 jobs=[], 

414 job_state_counts=dict.fromkeys(WmsStates, 0), 

415 exit_code_summary={}, 

416 ) 

417 

418 payload_jobs = {} # keep track for later processing 

419 specific_info = WmsSpecificInfo() 

420 for job_id, job_ad in jobs.items(): 

421 if job_ad.get("wms_node_type", WmsNodeType.UNKNOWN) in [WmsNodeType.PAYLOAD, WmsNodeType.FINAL]: 

422 try: 

423 name = job_ad.get("DAGNodeName", job_id) 

424 wms_state = _htc_status_to_wms_state(job_ad) 

425 job_report = WmsJobReport( 

426 wms_id=job_id, 

427 name=name, 

428 label=job_ad.get("bps_job_label", pegasus_name_to_label(name)), 

429 state=wms_state, 

430 ) 

431 if job_report.label == "init": 

432 job_report.label = "pipetaskInit" 

433 report.job_state_counts[wms_state] += 1 

434 report.jobs.append(job_report) 

435 payload_jobs[job_id] = job_ad 

436 except KeyError as ex: 

437 _LOG.error("Job missing key '%s': %s", str(ex), job_ad) 

438 raise 

439 elif is_service_job(job_ad): 

440 _LOG.debug( 

441 "Found service job: id='%s', name='%s', label='%s', NodeStatus='%s', JobStatus='%s'", 

442 job_id, 

443 job_ad["DAGNodeName"], 

444 job_ad.get("bps_job_label", "MISS"), 

445 job_ad.get("NodeStatus", "MISS"), 

446 job_ad.get("JobStatus", "MISS"), 

447 ) 

448 _add_service_job_specific_info(job_ad, specific_info) 

449 

450 report.total_number_jobs = len(payload_jobs) 

451 report.exit_code_summary = _get_exit_code_summary(payload_jobs) 

452 if specific_info: 

453 report.specific_info = specific_info 

454 

455 # Workflow will exit with non-zero DAG_STATUS if problem with 

456 # any of the wms jobs. So change FAILED to SUCCEEDED if all 

457 # payload jobs SUCCEEDED. 

458 if report.total_number_jobs == report.job_state_counts[WmsStates.SUCCEEDED]: 

459 report.state = WmsStates.SUCCEEDED 

460 

461 run_reports = {report.wms_id: report} 

462 _LOG.debug("_create_detailed_report: run_reports = %s", run_reports) 

463 return run_reports 

464 

465 

466def _add_service_job_specific_info(job_ad: dict[str, Any], specific_info: WmsSpecificInfo) -> None: 

467 """Generate report information for service job. 

468 

469 Parameters 

470 ---------- 

471 job_ad : `dict` [`str`, `~typing.Any`] 

472 Provisioning job information. 

473 specific_info : `lsst.ctrl.bps.WmsSpecificInfo` 

474 Where to add message. 

475 """ 

476 status_details = "" 

477 job_status = _htc_status_to_wms_state(job_ad) 

478 

479 # Service jobs in queue are deleted when DAG is done. 

480 # To get accurate status, need to check other info. 

481 if ( 

482 job_status == WmsStates.DELETED 

483 and "Reason" in job_ad 

484 and ( 

485 "Removed by DAGMan" in job_ad["Reason"] 

486 or "removed because <OtherJobRemoveRequirements = DAGManJobId =?=" in job_ad["Reason"] 

487 or "DAG is exiting and writing rescue file." in job_ad["Reason"] 

488 ) 

489 ): 

490 if "HoldReason" in job_ad: 

491 # HoldReason exists even if released, so check. 

492 if "job_released_time" in job_ad and job_ad["job_held_time"] < job_ad["job_released_time"]: 

493 # If released, assume running until deleted. 

494 job_status = WmsStates.SUCCEEDED 

495 status_details = "" 

496 else: 

497 # If job held when deleted by DAGMan, still want to 

498 # report hold reason 

499 status_details = f"(Job was held for the following reason: {job_ad['HoldReason']})" 

500 

501 else: 

502 job_status = WmsStates.SUCCEEDED 

503 elif job_status == WmsStates.SUCCEEDED: 

504 status_details = "(Note: Finished before workflow.)" 

505 elif job_status == WmsStates.HELD: 

506 status_details = f"({job_ad['HoldReason']})" 

507 

508 template = "Status of {job_name}: {status} {status_details}" 

509 context = { 

510 "job_name": job_ad["DAGNodeName"], 

511 "status": job_status.name, 

512 "status_details": status_details, 

513 } 

514 specific_info.add_message(template=template, context=context) 

515 

516 

517def _summary_report(user, hist, pass_thru, schedds=None): 

518 """Gather run information to be used in generating summary reports. 

519 

520 Parameters 

521 ---------- 

522 user : `str` 

523 Run lookup restricted to given user. 

524 hist : `float` 

525 How many previous days to search for run information. 

526 pass_thru : `str` 

527 Advanced users can define the HTCondor constraint to be used 

528 when searching queue and history. 

529 

530 Returns 

531 ------- 

532 run_reports : `dict` [`str`, `lsst.ctrl.bps.WmsRunReport`] 

533 Run information for the summary report. The keys are HTCondor ids and 

534 the values are collections of report information for each run. 

535 message : `str` 

536 Message to be printed with the summary report. 

537 """ 

538 # only doing summary report so only look for dagman jobs 

539 if pass_thru: 

540 constraint = pass_thru 

541 else: 

542 # Notes: 

543 # * bps_isjob == 'True' isn't getting set for DAG jobs that are 

544 # manually restarted. 

545 # * Any job with DAGManJobID isn't a DAG job 

546 constraint = 'bps_isjob == "True" && JobUniverse == 7' 

547 if user: 

548 constraint += f' && (Owner == "{user}" || bps_operator == "{user}")' 

549 

550 job_info = condor_search(constraint=constraint, hist=hist, schedds=schedds) 

551 

552 # Have list of DAGMan jobs, need to get run_report info. 

553 run_reports = {} 

554 msg = "" 

555 for jobs in job_info.values(): 

556 for job_id, job in jobs.items(): 

557 total_jobs, state_counts = _get_state_counts_from_dag_job(job) 

558 # If didn't get from queue information (e.g., Kerberos bug), 

559 # try reading from file. 

560 if total_jobs == 0: 

561 try: 

562 job.update(read_dag_status(job["Iwd"])) 

563 total_jobs, state_counts = _get_state_counts_from_dag_job(job) 

564 except StopIteration: 

565 pass # don't kill report can't find htcondor files 

566 

567 if "bps_run" not in job: 

568 _add_run_info(job["Iwd"], job) 

569 report = WmsRunReport( 

570 wms_id=job_id, 

571 global_wms_id=job["GlobalJobId"], 

572 path=job["Iwd"], 

573 label=job.get("bps_job_label", "MISS"), 

574 run=job.get("bps_run", "MISS"), 

575 project=job.get("bps_project", "MISS"), 

576 campaign=job.get("bps_campaign", "MISS"), 

577 payload=job.get("bps_payload", "MISS"), 

578 operator=_get_owner(job), 

579 run_summary=_get_run_summary(job), 

580 state=_htc_status_to_wms_state(job), 

581 jobs=[], 

582 total_number_jobs=total_jobs, 

583 job_state_counts=state_counts, 

584 ) 

585 run_reports[report.global_wms_id] = report 

586 

587 return run_reports, msg 

588 

589 

590def _add_run_info(wms_path, job): 

591 """Find BPS run information elsewhere for runs without bps attributes. 

592 

593 Parameters 

594 ---------- 

595 wms_path : `str` 

596 Path to submit files for the run. 

597 job : `dict` [`str`, `~typing.Any`] 

598 HTCondor dag job information. 

599 

600 Raises 

601 ------ 

602 StopIteration 

603 If cannot find file it is looking for. Permission errors are 

604 caught and job's run is marked with error. 

605 """ 

606 path = Path(wms_path) / "jobs" 

607 try: 

608 subfile = next(path.glob("**/*.sub")) 

609 except (StopIteration, PermissionError): 

610 job["bps_run"] = "Unavailable" 

611 else: 

612 _LOG.debug("_add_run_info: subfile = %s", subfile) 

613 try: 

614 with open(subfile, encoding="utf-8") as fh: 

615 for line in fh: 

616 if line.startswith("+bps_"): 

617 m = re.match(r"\+(bps_[^\s]+)\s*=\s*(.+)$", line) 

618 if m: 

619 _LOG.debug("Matching line: %s", line) 

620 job[m.group(1)] = m.group(2).replace('"', "") 

621 else: 

622 _LOG.debug("Could not parse attribute: %s", line) 

623 except PermissionError: 

624 job["bps_run"] = "PermissionError" 

625 _LOG.debug("After adding job = %s", job) 

626 

627 

628def _get_owner(job): 

629 """Get the owner of a dag job. 

630 

631 Parameters 

632 ---------- 

633 job : `dict` [`str`, `~typing.Any`] 

634 HTCondor dag job information. 

635 

636 Returns 

637 ------- 

638 owner : `str` 

639 Owner of the dag job. 

640 """ 

641 owner = job.get("bps_operator", None) 

642 if not owner: 

643 owner = job.get("Owner", None) 

644 if not owner: 

645 _LOG.warning("Could not get Owner from htcondor job: %s", job) 

646 owner = "MISS" 

647 return owner 

648 

649 

650def _get_run_summary(job): 

651 """Get the run summary for a job. 

652 

653 Parameters 

654 ---------- 

655 job : `dict` [`str`, `~typing.Any`] 

656 HTCondor dag job information. 

657 

658 Returns 

659 ------- 

660 summary : `str` 

661 Number of jobs per PipelineTask label in approximate pipeline order. 

662 Format: <label>:<count>[;<label>:<count>]+ 

663 """ 

664 summary = job.get("bps_job_summary", job.get("bps_run_summary", None)) 

665 if not summary: 

666 summary, _, _ = summarize_dag(job["Iwd"]) 

667 if not summary: 

668 _LOG.warning("Could not get run summary for htcondor job: %s", job) 

669 _LOG.debug("_get_run_summary: summary=%s", summary) 

670 

671 # Workaround sometimes using init vs pipetaskInit 

672 summary = summary.replace("init:", "pipetaskInit:") 

673 

674 if "pegasus_version" in job and "pegasus" not in summary: 

675 summary += ";pegasus:0" 

676 

677 return summary 

678 

679 

680def _get_exit_code_summary(jobs): 

681 """Get the exit code summary for a run. 

682 

683 Parameters 

684 ---------- 

685 jobs : `dict` [`str`, `dict` [`str`, Any]] 

686 Mapping HTCondor job id to job information. 

687 

688 Returns 

689 ------- 

690 summary : `dict` [`str`, `list` [`int`]] 

691 Jobs' exit codes per job label. 

692 """ 

693 summary = {} 

694 for job_id, job_ad in jobs.items(): 

695 job_label = job_ad["bps_job_label"] 

696 summary.setdefault(job_label, []) 

697 try: 

698 exit_code = 0 

699 job_status = job_ad["JobStatus"] 

700 match job_status: 

701 case htcondor.JobStatus.COMPLETED | htcondor.JobStatus.HELD | htcondor.JobStatus.REMOVED: 

702 exit_code = job_ad["ExitSignal"] if job_ad["ExitBySignal"] else job_ad["ExitCode"] 

703 case ( 

704 htcondor.JobStatus.IDLE 

705 | htcondor.JobStatus.RUNNING 

706 | htcondor.JobStatus.TRANSFERRING_OUTPUT 

707 | htcondor.JobStatus.SUSPENDED 

708 ): 

709 pass 

710 case _: 

711 _LOG.debug("Unknown 'JobStatus' value ('%d') in classad for job '%s'", job_status, job_id) 

712 if exit_code != 0: 

713 summary[job_label].append(exit_code) 

714 except KeyError as ex: 

715 _LOG.debug("Attribute '%s' not found in the classad for job '%s'", ex, job_id) 

716 return summary 

717 

718 

719def _get_state_counts_from_jobs( 

720 wms_workflow_id: str, jobs: dict[str, dict[str, Any]] 

721) -> tuple[int, dict[WmsStates, int]]: 

722 """Count number of jobs per WMS state. 

723 

724 The workflow job and the service jobs are excluded from the count. 

725 

726 Parameters 

727 ---------- 

728 wms_workflow_id : `str` 

729 HTCondor job id. 

730 jobs : `dict [`dict` [`str`, `~typing.Any`]] 

731 HTCondor dag job information. 

732 

733 Returns 

734 ------- 

735 total_count : `int` 

736 Total number of dag nodes. 

737 state_counts : `dict` [`lsst.ctrl.bps.WmsStates`, `int`] 

738 Keys are the different WMS states and values are counts of jobs 

739 that are in that WMS state. 

740 """ 

741 state_counts = dict.fromkeys(WmsStates, 0) 

742 for job_id, job_ad in jobs.items(): 

743 if job_id != wms_workflow_id and job_ad.get("wms_node_type", WmsNodeType.UNKNOWN) in [ 

744 WmsNodeType.PAYLOAD, 

745 WmsNodeType.FINAL, 

746 ]: 

747 state_counts[_htc_status_to_wms_state(job_ad)] += 1 

748 total_count = sum(state_counts.values()) 

749 

750 return total_count, state_counts 

751 

752 

753def _get_state_counts_from_dag_job(job): 

754 """Count number of jobs per WMS state. 

755 

756 Parameters 

757 ---------- 

758 job : `dict` [`str`, `~typing.Any`] 

759 HTCondor dag job information. 

760 

761 Returns 

762 ------- 

763 total_count : `int` 

764 Total number of dag nodes. 

765 state_counts : `dict` [`lsst.ctrl.bps.WmsStates`, `int`] 

766 Keys are the different WMS states and values are counts of jobs 

767 that are in that WMS state. 

768 """ 

769 _LOG.debug("_get_state_counts_from_dag_job: job = %s %s", type(job), len(job)) 

770 state_counts = dict.fromkeys(WmsStates, 0) 

771 if "DAG_NodesReady" in job: 

772 state_counts = { 

773 WmsStates.UNREADY: job.get("DAG_NodesUnready", 0), 

774 WmsStates.READY: job.get("DAG_NodesReady", 0), 

775 WmsStates.HELD: job.get("DAG_JobsHeld", 0), 

776 WmsStates.SUCCEEDED: job.get("DAG_NodesDone", 0), 

777 WmsStates.FAILED: job.get("DAG_NodesFailed", 0), 

778 WmsStates.PRUNED: job.get("DAG_NodesFutile", 0), 

779 WmsStates.MISFIT: job.get("DAG_NodesPre", 0) + job.get("DAG_NodesPost", 0), 

780 } 

781 total_jobs = job.get("DAG_NodesTotal") 

782 _LOG.debug("_get_state_counts_from_dag_job: from DAG_* keys, total_jobs = %s", total_jobs) 

783 elif "NodesFailed" in job: 

784 state_counts = { 

785 WmsStates.UNREADY: job.get("NodesUnready", 0), 

786 WmsStates.READY: job.get("NodesReady", 0), 

787 WmsStates.HELD: job.get("JobProcsHeld", 0), 

788 WmsStates.SUCCEEDED: job.get("NodesDone", 0), 

789 WmsStates.FAILED: job.get("NodesFailed", 0), 

790 WmsStates.PRUNED: job.get("NodesFutile", 0), 

791 WmsStates.MISFIT: job.get("NodesPre", 0) + job.get("NodesPost", 0), 

792 } 

793 try: 

794 total_jobs = job.get("NodesTotal") 

795 except KeyError as ex: 

796 _LOG.error("Job missing %s. job = %s", str(ex), job) 

797 raise 

798 _LOG.debug("_get_state_counts_from_dag_job: from NODES* keys, total_jobs = %s", total_jobs) 

799 else: 

800 # With Kerberos job auth and Kerberos bug, if warning would be printed 

801 # for every DAG. 

802 _LOG.debug("Can't get job state counts %s", job["Iwd"]) 

803 total_jobs = 0 

804 

805 _LOG.debug("total_jobs = %s, state_counts: %s", total_jobs, state_counts) 

806 return total_jobs, state_counts 

807 

808 

809def _update_jobs(jobs1, jobs2): 

810 """Update jobs1 with info in jobs2. 

811 

812 (Basically an update for nested dictionaries.) 

813 

814 Parameters 

815 ---------- 

816 jobs1 : `dict` [`str`, `dict` [`str`, `~typing.Any`]] 

817 HTCondor job information to be updated. 

818 jobs2 : `dict` [`str`, `dict` [`str`, `~typing.Any`]] 

819 Additional HTCondor job information. 

820 """ 

821 for job_id, job_ad in jobs2.items(): 

822 if job_id in jobs1: 

823 jobs1[job_id].update(job_ad) 

824 else: 

825 jobs1[job_id] = job_ad 

826 

827 

828def is_service_job(job_ad: dict[str, Any]) -> bool: 

829 """Determine if a job is a service one. 

830 

831 Parameters 

832 ---------- 

833 job_ad : `dict` [`str`, Any] 

834 Information about an HTCondor job. 

835 

836 Returns 

837 ------- 

838 is_service_job : `bool` 

839 True if the job is a service one, false otherwise. 

840 

841 Notes 

842 ----- 

843 At the moment, HTCondor does not provide a native way to distinguish 

844 between payload and service jobs in the workflow. This code depends 

845 on read_node_status adding wms_node_type. 

846 """ 

847 return job_ad.get("wms_node_type", WmsNodeType.UNKNOWN) == WmsNodeType.SERVICE