Coverage for python / lsst / ctrl / bps / panda / panda_service.py: 9%

239 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-30 08:55 +0000

1# This file is part of ctrl_bps_panda. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <https://www.gnu.org/licenses/>. 

27"""Interface between generic workflow to PanDA/iDDS workflow system.""" 

28 

29__all__ = ["PanDAService", "PandaBpsWmsWorkflow"] 

30 

31 

32import json 

33import logging 

34import os 

35import pickle 

36import re 

37 

38from idds.workflowv2.workflow import Workflow as IDDS_client_workflow 

39 

40from lsst.ctrl.bps import ( 

41 DEFAULT_MEM_FMT, 

42 DEFAULT_MEM_UNIT, 

43 BaseWmsService, 

44 BaseWmsWorkflow, 

45 WmsRunReport, 

46 WmsStates, 

47) 

48from lsst.ctrl.bps.panda.constants import ( 

49 PANDA_DEFAULT_MAX_COPY_WORKERS, 

50 PANDA_DEFAULT_MAX_REQUEST_LENGTH, 

51) 

52from lsst.ctrl.bps.panda.utils import ( 

53 add_final_idds_work, 

54 add_idds_work, 

55 aggregate_by_basename, 

56 copy_files_for_distribution, 

57 create_idds_build_workflow, 

58 extract_taskname, 

59 get_idds_client, 

60 get_idds_result, 

61 idds_call_with_check, 

62) 

63from lsst.resources import ResourcePath 

64from lsst.utils.timer import time_this 

65 

66_LOG = logging.getLogger(__name__) 

67 

68 

69class PanDAService(BaseWmsService): 

70 """PanDA version of WMS service.""" 

71 

72 def prepare(self, config, generic_workflow, out_prefix=None): 

73 # Docstring inherited from BaseWmsService.prepare. 

74 _LOG.debug("out_prefix = '%s'", out_prefix) 

75 

76 _LOG.info("Starting PanDA prepare stage (creating specific implementation of workflow)") 

77 

78 with time_this( 

79 log=_LOG, 

80 level=logging.INFO, 

81 prefix=None, 

82 msg="PanDA prepare stage completed", 

83 mem_usage=True, 

84 mem_unit=DEFAULT_MEM_UNIT, 

85 mem_fmt=DEFAULT_MEM_FMT, 

86 ): 

87 workflow = PandaBpsWmsWorkflow.from_generic_workflow( 

88 config, generic_workflow, out_prefix, f"{self.__class__.__module__}.{self.__class__.__name__}" 

89 ) 

90 workflow.write(out_prefix) 

91 return workflow 

92 

93 def submit(self, workflow, **kwargs): 

94 config = kwargs["config"] if "config" in kwargs else None 

95 remote_build = kwargs["remote_build"] if "remote_build" in kwargs else None 

96 

97 if config and remote_build: 

98 _LOG.info("remote build") 

99 

100 idds_build_workflow = create_idds_build_workflow(**kwargs) 

101 idds_client = get_idds_client(self.config) 

102 ret = idds_client.submit_build(idds_build_workflow, username=None, use_dataset_name=False) 

103 _LOG.debug("iDDS client manager submit returned = %s", ret) 

104 

105 # Check submission success 

106 status, result, error = get_idds_result(ret) 

107 if status: 

108 request_id = int(result) 

109 else: 

110 raise RuntimeError(f"Error submitting to PanDA service: {error}") 

111 

112 _LOG.info("Submitted into iDDs with request id=%s", request_id) 

113 idds_build_workflow.run_id = request_id 

114 return idds_build_workflow 

115 

116 else: 

117 _, max_request_length = self.config.search( 

118 "maxRequestLength", opt={"default": PANDA_DEFAULT_MAX_REQUEST_LENGTH} 

119 ) 

120 _, max_copy_workers = self.config.search( 

121 "maxCopyWorkers", opt={"default": PANDA_DEFAULT_MAX_COPY_WORKERS} 

122 ) 

123 file_distribution_uri = self.config["fileDistributionEndPoint"] 

124 lsst_temp = "LSST_RUN_TEMP_SPACE" 

125 if lsst_temp in file_distribution_uri and lsst_temp not in os.environ: 

126 file_distribution_uri = self.config["fileDistributionEndPointDefault"] 

127 protocol_pattern = re.compile(r"^[a-zA-Z][a-zA-Z\d+\-.]*://") 

128 if not protocol_pattern.match(file_distribution_uri): 

129 file_distribution_uri = "file://" + file_distribution_uri 

130 

131 idds_client = get_idds_client(self.config) 

132 submit_cmd = workflow.run_attrs.get("bps_iscustom", False) 

133 if not submit_cmd: 

134 copy_files_for_distribution( 

135 workflow.files_to_pre_stage, 

136 ResourcePath(file_distribution_uri, forceDirectory=True), 

137 max_copy_workers, 

138 ) 

139 

140 idds_wf = workflow.idds_client_workflow 

141 workflow_steps = idds_wf.split_workflow_to_steps( 

142 request_cache=self.config["submitPath"], max_request_length=max_request_length 

143 ) 

144 for wf_step in workflow_steps: 

145 ret_step = idds_client.submit(wf_step, username=None, use_dataset_name=False) 

146 status, result_step, error = get_idds_result(ret_step) 

147 if status and result_step == 0: 

148 msg = f"iDDS client manager successfully uploaded workflow step: {wf_step.step_name}" 

149 _LOG.info(msg) 

150 else: 

151 msg = ( 

152 f"iDDS client manager failed to submit workflow step {wf_step.step_name}: " 

153 f"{ret_step}" 

154 ) 

155 raise RuntimeError(msg) 

156 

157 ret = idds_client.submit(workflow.idds_client_workflow, username=None, use_dataset_name=False) 

158 _LOG.debug("iDDS client manager submit returned = %s", ret) 

159 

160 # Check submission success 

161 status, result, error = get_idds_result(ret) 

162 if status: 

163 request_id = int(result) 

164 else: 

165 raise RuntimeError(f"Error submitting to PanDA service: {error}") 

166 

167 _LOG.info("Submitted into iDDs with request id=%s", request_id) 

168 workflow.run_id = request_id 

169 

170 def restart(self, wms_workflow_id): 

171 # Docstring inherited from BaseWmsService.restart. 

172 idds_client = get_idds_client(self.config) 

173 ret = idds_client.retry(request_id=wms_workflow_id) 

174 _LOG.debug("Restart PanDA workflow returned = %s", ret) 

175 

176 status, result, error = get_idds_result(ret) 

177 if status: 

178 _LOG.info("Restarting PanDA workflow %s", result) 

179 return wms_workflow_id, None, json.dumps(result) 

180 

181 return None, None, f"Error retry PanDA workflow: {error}" 

182 

183 def report( 

184 self, 

185 wms_workflow_id=None, 

186 user=None, 

187 hist=0, 

188 pass_thru=None, 

189 is_global=False, 

190 return_exit_codes=False, 

191 ): 

192 # Docstring inherited from BaseWmsService.report. 

193 message = "" 

194 run_reports = [] 

195 

196 if not wms_workflow_id: 

197 message = "Run summary not implemented yet, use 'bps report --id <workflow_id>' instead" 

198 return run_reports, message 

199 

200 idds_client = get_idds_client(self.config) 

201 ret = idds_call_with_check( 

202 idds_client.get_requests, 

203 func_name="get workflow status", 

204 request_id=wms_workflow_id, 

205 with_detail=True, 

206 ) 

207 

208 tasks = ret[1][1] 

209 if not tasks: 

210 message = f"No records found for workflow id '{wms_workflow_id}'. Hint: double check the id" 

211 return run_reports, message 

212 

213 # Create initial WmsRunReport 

214 head = tasks[0] 

215 wms_report = WmsRunReport( 

216 wms_id=str(head["request_id"]), 

217 operator=head["username"], 

218 project="", 

219 campaign="", 

220 payload="", 

221 run=head["name"], 

222 state=WmsStates.UNKNOWN, 

223 total_number_jobs=0, 

224 job_state_counts=dict.fromkeys(WmsStates, 0), 

225 job_summary={}, 

226 run_summary="", 

227 exit_code_summary={}, 

228 ) 

229 

230 # Define workflow status mapping 

231 workflow_status = head["status"]["attributes"]["_name_"] 

232 if workflow_status in ("Finished", "SubFinished"): 

233 wms_report.state = WmsStates.SUCCEEDED 

234 elif workflow_status in ("Failed", "Expired"): 

235 wms_report.state = WmsStates.FAILED 

236 elif workflow_status == "Cancelled": 

237 wms_report.state = WmsStates.DELETED 

238 elif workflow_status == "Suspended": 

239 wms_report.state = WmsStates.HELD 

240 else: 

241 wms_report.state = WmsStates.RUNNING 

242 

243 # Define state mapping for job aggregation 

244 # The status of a task is taken from the first item of state_map. 

245 # The workflow is in status WmsStates.FAILED when: 

246 # All tasks have failed. 

247 # SubFinished tasks has jobs in 

248 # output_processed_files: Finished 

249 # output_failed_files: Failed 

250 # output_missing_files: Missing 

251 state_map = { 

252 "Finished": [WmsStates.SUCCEEDED], 

253 "SubFinished": [WmsStates.SUCCEEDED, WmsStates.FAILED, WmsStates.PRUNED], 

254 "Transforming": [ 

255 WmsStates.RUNNING, 

256 WmsStates.SUCCEEDED, 

257 WmsStates.FAILED, 

258 # WmsStates.READY, 

259 WmsStates.UNREADY, 

260 WmsStates.PRUNED, 

261 ], 

262 "Failed": [WmsStates.FAILED, WmsStates.PRUNED], 

263 } 

264 

265 file_map = { 

266 WmsStates.SUCCEEDED: "output_processed_files", 

267 WmsStates.RUNNING: "output_processing_files", 

268 WmsStates.FAILED: "output_failed_files", 

269 # WmsStates.READY: "output_activated_files", 

270 WmsStates.UNREADY: "input_new_files", 

271 WmsStates.PRUNED: "output_missing_files", 

272 } 

273 

274 # Sort tasks by workload_id or fallback 

275 try: 

276 tasks.sort(key=lambda x: x["transform_workload_id"]) 

277 except (KeyError, TypeError): 

278 tasks.sort(key=lambda x: x["transform_id"]) 

279 

280 exit_codes_all = {} 

281 

282 # --- Process each task sequentially --- 

283 for task in tasks: 

284 if task.get("transform_id") is None: 

285 # Not created task (It happens because of an outer join 

286 # between requests table and transforms table). 

287 continue 

288 

289 task_name = task.get("transform_name", "") 

290 tasklabel = extract_taskname(task_name) 

291 status = task["transform_status"]["attributes"]["_name_"] 

292 totaljobs = task.get("output_total_files", 0) 

293 wms_report.total_number_jobs += totaljobs 

294 

295 # --- If task failed/subfinished, fetch exit codes --- 

296 if status in ("SubFinished", "Failed") and not task_name.startswith("build_task"): 

297 transform_workload_id = task.get("transform_workload_id") 

298 if transform_workload_id: 

299 # When there are failed jobs, ctrl_bps check 

300 # the number of exit codes 

301 nfailed = task.get("output_failed_files", 0) 

302 exit_codes_all[tasklabel] = [1] * nfailed 

303 if return_exit_codes: 

304 new_ret = idds_call_with_check( 

305 idds_client.get_contents_output_ext, 

306 func_name=f"get task {transform_workload_id} detail", 

307 request_id=wms_workflow_id, 

308 workload_id=transform_workload_id, 

309 ) 

310 # task_info is a dictionary of len 1 that contains 

311 # a list of dicts containing panda job info 

312 task_info = new_ret[1][1] 

313 if len(task_info) == 1: 

314 _, wmsjobs = next(iter(task_info.items())) 

315 exit_codes_all[tasklabel] = [ 

316 j["trans_exit_code"] 

317 for j in wmsjobs 

318 if j.get("trans_exit_code") not in (None, 0, "0") 

319 ] 

320 if nfailed > 0 and len(exit_codes_all[tasklabel]) == 0: 

321 _LOG.debug( 

322 f"No exit codes in iDDS task info for workload {transform_workload_id}" 

323 ) 

324 else: 

325 raise RuntimeError( 

326 f"Unexpected iDDS task info for workload {transform_workload_id}: {task_info}" 

327 ) 

328 

329 # --- Aggregate job states --- 

330 taskstatus = {} 

331 mapped_states = state_map.get(status, []) 

332 for state in WmsStates: 

333 njobs = 0 

334 if state in mapped_states and state in file_map: 

335 val = task.get(file_map[state]) 

336 if val: 

337 njobs = val 

338 if state == WmsStates.RUNNING: 

339 njobs += task.get("output_new_files", 0) - task.get("input_new_files", 0) 

340 if state != WmsStates.UNREADY: 

341 wms_report.job_state_counts[state] += njobs 

342 taskstatus[state] = njobs 

343 

344 # Count UNREADY 

345 unready = WmsStates.UNREADY 

346 taskstatus[unready] = totaljobs - sum( 

347 taskstatus[state] for state in WmsStates if state != unready 

348 ) 

349 wms_report.job_state_counts[unready] += taskstatus[unready] 

350 

351 # Store task summary 

352 wms_report.job_summary[tasklabel] = taskstatus 

353 summary_part = f"{tasklabel}:{totaljobs}" 

354 if wms_report.run_summary: 

355 summary_part = f";{summary_part}" 

356 wms_report.run_summary += summary_part 

357 

358 # Store all exit codes 

359 wms_report.exit_code_summary = exit_codes_all 

360 

361 ( 

362 wms_report.job_summary, 

363 wms_report.exit_code_summary, 

364 wms_report.run_summary, 

365 ) = aggregate_by_basename( 

366 wms_report.job_summary, 

367 wms_report.exit_code_summary, 

368 wms_report.run_summary, 

369 ) 

370 

371 run_reports.append(wms_report) 

372 return run_reports, message 

373 

374 def list_submitted_jobs(self, wms_id=None, user=None, require_bps=True, pass_thru=None, is_global=False): 

375 # Docstring inherited from BaseWmsService.list_submitted_jobs. 

376 if wms_id is None and user is not None: 

377 raise RuntimeError( 

378 "Error to get workflow status report: wms_id is required" 

379 " and filtering workflows with 'user' is not supported." 

380 ) 

381 

382 idds_client = get_idds_client(self.config) 

383 ret = idds_client.get_requests(request_id=wms_id) 

384 _LOG.debug("PanDA get workflows returned = %s", ret) 

385 

386 status, result, error = get_idds_result(ret) 

387 if status: 

388 req_ids = [req["request_id"] for req in result] 

389 return req_ids 

390 

391 raise RuntimeError(f"Error list PanDA workflow requests: {error}") 

392 

393 def cancel(self, wms_id, pass_thru=None): 

394 # Docstring inherited from BaseWmsService.cancel. 

395 idds_client = get_idds_client(self.config) 

396 ret = idds_client.abort(request_id=wms_id) 

397 _LOG.debug("Abort PanDA workflow returned = %s", ret) 

398 

399 status, result, error = get_idds_result(ret) 

400 if status: 

401 _LOG.info("Aborting PanDA workflow %s", result) 

402 return True, json.dumps(result) 

403 

404 return False, f"Error abort PanDA workflow: {error}" 

405 

406 def ping(self, pass_thru=None): 

407 # Docstring inherited from BaseWmsService.ping. 

408 idds_client = get_idds_client(self.config) 

409 ret = idds_client.ping() 

410 _LOG.debug("Ping PanDA service returned = %s", ret) 

411 

412 status, result, error = get_idds_result(ret) 

413 if status: 

414 if "Status" in result and result["Status"] == "OK": 

415 return 0, None 

416 

417 return -1, f"Error ping PanDA service: {result}" 

418 

419 return -1, f"Error ping PanDA service: {error}" 

420 

421 def run_submission_checks(self): 

422 # Docstring inherited from BaseWmsService.run_submission_checks. 

423 for key in ["PANDA_URL"]: 

424 if key not in os.environ: 

425 raise OSError(f"Missing environment variable {key}") 

426 

427 status, message = self.ping() 

428 if status != 0: 

429 raise RuntimeError(message) 

430 

431 def get_status( 

432 self, 

433 wms_workflow_id=None, 

434 hist=0, 

435 is_global=False, 

436 ): 

437 # Docstring inherited from BaseWmsService.get_status. 

438 

439 idds_client = get_idds_client(self.config) 

440 ret = idds_client.get_requests(request_id=wms_workflow_id, with_detail=False) 

441 _LOG.debug("PanDA get workflow status returned = %s", str(ret)) 

442 

443 request_status = ret[0] 

444 if request_status != 0: 

445 state = WmsStates.UNKNOWN 

446 message = f"Error getting workflow status for id {wms_workflow_id}: ret = {ret}" 

447 else: 

448 tasks = ret[1][1] 

449 if not tasks: 

450 state = WmsStates.UNKNOWN 

451 message = f"No records found for workflow id '{wms_workflow_id}'. Hint: double check the id" 

452 elif not isinstance(tasks[0], dict): 

453 state = WmsStates.UNKNOWN 

454 message = f"Error getting workflow status for id {wms_workflow_id}: ret = {ret}" 

455 else: 

456 message = "" 

457 head = tasks[0] 

458 workflow_status = head["status"]["attributes"]["_name_"] 

459 if workflow_status in ["Finished"]: 

460 state = WmsStates.SUCCEEDED 

461 elif workflow_status in ["Failed", "Expired", "SubFinished"]: 

462 state = WmsStates.FAILED 

463 elif workflow_status in ["Cancelled"]: 

464 state = WmsStates.DELETED 

465 elif workflow_status in ["Suspended"]: 

466 state = WmsStates.HELD 

467 else: 

468 state = WmsStates.RUNNING 

469 

470 return state, message 

471 

472 

473class PandaBpsWmsWorkflow(BaseWmsWorkflow): 

474 """A single Panda based workflow. 

475 

476 Parameters 

477 ---------- 

478 name : `str` 

479 Unique name for Workflow. 

480 config : `lsst.ctrl.bps.BpsConfig` 

481 BPS configuration that includes necessary submit/runtime information. 

482 """ 

483 

484 def __init__(self, name, config=None): 

485 super().__init__(name, config) 

486 self.files_to_pre_stage = {} # src, dest 

487 self.idds_client_workflow = IDDS_client_workflow(name=name) 

488 self.run_attrs = {} 

489 

490 @classmethod 

491 def from_generic_workflow(cls, config, generic_workflow, out_prefix, service_class): 

492 # Docstring inherited from BaseWmsWorkflow.from_generic_workflow. 

493 wms_workflow = cls(generic_workflow.name, config) 

494 

495 if generic_workflow.run_attrs: 

496 wms_workflow.run_attrs.update(generic_workflow.run_attrs) 

497 

498 files, dag_sink_work, task_count = add_idds_work( 

499 config, generic_workflow, wms_workflow.idds_client_workflow 

500 ) 

501 wms_workflow.files_to_pre_stage.update(files) 

502 

503 files = add_final_idds_work( 

504 config, generic_workflow, wms_workflow.idds_client_workflow, dag_sink_work, task_count + 1, 1 

505 ) 

506 wms_workflow.files_to_pre_stage.update(files) 

507 

508 return wms_workflow 

509 

510 def write(self, out_prefix): 

511 # Docstring inherited from BaseWmsWorkflow.write. 

512 with open(os.path.join(out_prefix, "panda_workflow.pickle"), "wb") as fh: 

513 pickle.dump(self, fh)