Coverage for python / lsst / ctrl / bps / htcondor / htcondor_service.py: 10%

235 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-22 09:01 +0000

1# This file is part of ctrl_bps_htcondor. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <https://www.gnu.org/licenses/>. 

27 

28"""Interface between generic workflow to HTCondor workflow system.""" 

29 

30__all__ = ["HTCondorService"] 

31 

32 

33import logging 

34import os 

35from pathlib import Path 

36 

37import htcondor 

38from packaging import version 

39 

40from lsst.ctrl.bps import ( 

41 BaseWmsService, 

42 WmsStates, 

43) 

44from lsst.ctrl.bps.bps_utils import chdir 

45from lsst.daf.butler import Config 

46from lsst.utils.timer import time_this 

47 

48from .common_utils import WmsIdType, _wms_id_to_cluster, _wms_id_to_dir, _wms_id_type 

49from .dagman_configurator import DagmanConfigurator 

50from .htcondor_config import HTC_DEFAULTS_URI 

51from .htcondor_workflow import HTCondorWorkflow 

52from .lssthtc import ( 

53 _locate_schedds, 

54 _update_rescue_file, 

55 condor_q, 

56 htc_backup_files, 

57 htc_create_submit_from_cmd, 

58 htc_create_submit_from_dag, 

59 htc_create_submit_from_file, 

60 htc_submit_dag, 

61 htc_version, 

62 read_dag_status, 

63 write_dag_info, 

64) 

65from .provisioner import Provisioner 

66from .report_utils import ( 

67 _get_status_from_id, 

68 _get_status_from_path, 

69 _report_from_id, 

70 _report_from_path, 

71 _summary_report, 

72) 

73 

74_LOG = logging.getLogger(__name__) 

75 

76 

77class HTCondorService(BaseWmsService): 

78 """HTCondor version of WMS service.""" 

79 

80 @property 

81 def defaults(self): 

82 return Config(HTC_DEFAULTS_URI) 

83 

84 @property 

85 def defaults_uri(self): 

86 return HTC_DEFAULTS_URI 

87 

88 def prepare(self, config, generic_workflow, out_prefix=None): 

89 """Convert generic workflow to an HTCondor DAG ready for submission. 

90 

91 Parameters 

92 ---------- 

93 config : `lsst.ctrl.bps.BpsConfig` 

94 BPS configuration that includes necessary submit/runtime 

95 information. 

96 generic_workflow : `lsst.ctrl.bps.GenericWorkflow` 

97 The generic workflow (e.g., has executable name and arguments). 

98 out_prefix : `str` 

99 The root directory into which all WMS-specific files are written. 

100 

101 Returns 

102 ------- 

103 workflow : `lsst.ctrl.bps.htcondor.HTCondorWorkflow` 

104 HTCondor workflow ready to be run. 

105 """ 

106 _LOG.debug("out_prefix = '%s'", out_prefix) 

107 with time_this(log=_LOG, level=logging.INFO, prefix=None, msg="Completed HTCondor workflow creation"): 

108 _, enable_provisioning = config.search("provisionResources") 

109 

110 # If bps is doing provisioning, force a unique nodeset 

111 # to reduce complications if user also manually does 

112 # provisioning. 

113 if enable_provisioning: 

114 config[".bps_defined.nodeset"] = config[".bps_defined.timestamp"] 

115 

116 workflow = HTCondorWorkflow.from_generic_workflow( 

117 config, 

118 generic_workflow, 

119 out_prefix, 

120 f"{self.__class__.__module__}.{self.__class__.__name__}", 

121 ) 

122 

123 if enable_provisioning: 

124 provisioner = Provisioner(config) 

125 provisioner.configure() 

126 provisioner.prepare("provisioningJob.bash", prefix=out_prefix) 

127 provisioner.provision(workflow.dag) 

128 

129 try: 

130 configurator = DagmanConfigurator(config) 

131 except KeyError: 

132 _LOG.debug( 

133 "No DAGMan-specific settings were found in BPS config; " 

134 "skipping writing DAG-specific configuration file." 

135 ) 

136 else: 

137 configurator.prepare("dagman.conf", prefix=out_prefix) 

138 configurator.configure(workflow.dag) 

139 

140 with time_this( 

141 log=_LOG, level=logging.INFO, prefix=None, msg="Completed writing out HTCondor workflow" 

142 ): 

143 workflow.write(out_prefix) 

144 return workflow 

145 

146 def submit(self, workflow, **kwargs): 

147 """Submit a single HTCondor workflow. 

148 

149 Parameters 

150 ---------- 

151 workflow : `lsst.ctrl.bps.htcondor.HTCondorWorkflow` 

152 A single HTCondor workflow to submit. run_id is updated after 

153 successful submission to WMS. 

154 **kwargs : `~typing.Any` 

155 Keyword arguments for the options. 

156 """ 

157 dag = workflow.dag 

158 ver = version.parse(htc_version()) 

159 

160 # For workflow portability, internal paths are all relative. Hence 

161 # the DAG needs to be submitted to HTCondor from inside the submit 

162 # directory. 

163 with chdir(workflow.submit_path): 

164 try: 

165 if ver >= version.parse("8.9.3"): 

166 sub = htc_create_submit_from_dag(dag.graph["dag_filename"], dag.graph["submit_options"]) 

167 else: 

168 sub = htc_create_submit_from_cmd(dag.graph["dag_filename"], dag.graph["submit_options"]) 

169 except Exception: 

170 _LOG.error( 

171 "Problems creating HTCondor submit object from filename: %s", dag.graph["dag_filename"] 

172 ) 

173 raise 

174 

175 _LOG.info("Submitting from directory: %s", os.getcwd()) 

176 schedd_dag_info = htc_submit_dag(sub) 

177 if schedd_dag_info: 

178 write_dag_info(f"{dag.name}.info.json", schedd_dag_info) 

179 

180 _, dag_info = schedd_dag_info.popitem() 

181 _, dag_ad = dag_info.popitem() 

182 

183 dag.run_id = f"{dag_ad['ClusterId']}.{dag_ad['ProcId']}" 

184 workflow.run_id = dag.run_id 

185 else: 

186 raise RuntimeError("Submission failed: unable to retrieve DAGMan job information") 

187 

188 def restart(self, wms_workflow_id): 

189 """Restart a failed DAGMan workflow. 

190 

191 Parameters 

192 ---------- 

193 wms_workflow_id : `str` 

194 The directory with HTCondor files. 

195 

196 Returns 

197 ------- 

198 run_id : `str` 

199 HTCondor id of the restarted DAGMan job. If restart failed, it will 

200 be set to None. 

201 run_name : `str` 

202 Name of the restarted workflow. If restart failed, it will be set 

203 to None. 

204 message : `str` 

205 A message describing any issues encountered during the restart. 

206 If there were no issues, an empty string is returned. 

207 """ 

208 wms_path, id_type = _wms_id_to_dir(wms_workflow_id) 

209 if wms_path is None: 

210 return ( 

211 None, 

212 None, 

213 ( 

214 f"workflow with run id '{wms_workflow_id}' not found. " 

215 "Hint: use run's submit directory as the id instead" 

216 ), 

217 ) 

218 

219 if id_type in {WmsIdType.GLOBAL, WmsIdType.LOCAL}: 

220 if not wms_path.is_dir(): 

221 return None, None, f"submit directory '{wms_path}' for run id '{wms_workflow_id}' not found." 

222 

223 _LOG.info("Restarting workflow from directory '%s'", wms_path) 

224 rescue_dags = list(wms_path.glob("*.dag.rescue*")) 

225 if not rescue_dags: 

226 return None, None, f"HTCondor rescue DAG(s) not found in '{wms_path}'" 

227 

228 _LOG.info("Verifying that the workflow is not already in the job queue") 

229 schedd_dag_info = condor_q(constraint=f'regexp("dagman$", Cmd) && Iwd == "{wms_path}"') 

230 if schedd_dag_info: 

231 _, dag_info = schedd_dag_info.popitem() 

232 _, dag_ad = dag_info.popitem() 

233 id_ = dag_ad["GlobalJobId"] 

234 return None, None, f"Workflow already in the job queue (global job id: '{id_}')" 

235 

236 _LOG.info("Checking execution status of the workflow") 

237 warn = False 

238 dag_ad = read_dag_status(str(wms_path)) 

239 if dag_ad: 

240 nodes_total = dag_ad.get("NodesTotal", 0) 

241 if nodes_total != 0: 

242 nodes_done = dag_ad.get("NodesDone", 0) 

243 if nodes_total == nodes_done: 

244 return None, None, "All jobs in the workflow finished successfully" 

245 else: 

246 warn = True 

247 else: 

248 warn = True 

249 if warn: 

250 _LOG.warning( 

251 "Cannot determine the execution status of the workflow, continuing with restart regardless" 

252 ) 

253 

254 _LOG.info("Backing up select HTCondor files from previous run attempt") 

255 rescue_file = htc_backup_files(wms_path, subdir="backups") 

256 if (wms_path / "subdags").exists(): 

257 _update_rescue_file(rescue_file) 

258 

259 # For workflow portability, internal paths are all relative. Hence 

260 # the DAG needs to be resubmitted to HTCondor from inside the submit 

261 # directory. 

262 _LOG.info("Adding workflow to the job queue") 

263 run_id, run_name, message = None, None, "" 

264 with chdir(wms_path): 

265 try: 

266 dag_path = next(Path.cwd().glob("*.dag.condor.sub")) 

267 except StopIteration: 

268 message = f"DAGMan submit description file not found in '{wms_path}'" 

269 else: 

270 sub = htc_create_submit_from_file(dag_path.name) 

271 schedd_dag_info = htc_submit_dag(sub) 

272 

273 # Save select information about the DAGMan job to a file. Use 

274 # the run name (available in the ClassAd) as the filename. 

275 if schedd_dag_info: 

276 dag_info = next(iter(schedd_dag_info.values())) 

277 dag_ad = next(iter(dag_info.values())) 

278 write_dag_info(f"{dag_ad['bps_run']}.info.json", schedd_dag_info) 

279 run_id = f"{dag_ad['ClusterId']}.{dag_ad['ProcId']}" 

280 run_name = dag_ad["bps_run"] 

281 else: 

282 message = "DAGMan job information unavailable" 

283 

284 return run_id, run_name, message 

285 

286 def list_submitted_jobs(self, wms_id=None, user=None, require_bps=True, pass_thru=None, is_global=False): 

287 """Query WMS for list of submitted WMS workflows/jobs. 

288 

289 This should be a quick lookup function to create list of jobs for 

290 other functions. 

291 

292 Parameters 

293 ---------- 

294 wms_id : `int` or `str`, optional 

295 Id or path that can be used by WMS service to look up job. 

296 user : `str`, optional 

297 User whose submitted jobs should be listed. 

298 require_bps : `bool`, optional 

299 Whether to require jobs returned in list to be bps-submitted jobs. 

300 pass_thru : `str`, optional 

301 Information to pass through to WMS. 

302 is_global : `bool`, optional 

303 If set, all job queues (and their histories) will be queried for 

304 job information. Defaults to False which means that only the local 

305 job queue will be queried. 

306 

307 Returns 

308 ------- 

309 job_ids : `list` [`~typing.Any`] 

310 Only job ids to be used by cancel and other functions. Typically 

311 this means top-level jobs (i.e., not children jobs). 

312 """ 

313 _LOG.debug( 

314 "list_submitted_jobs params: wms_id=%s, user=%s, require_bps=%s, pass_thru=%s, is_global=%s", 

315 wms_id, 

316 user, 

317 require_bps, 

318 pass_thru, 

319 is_global, 

320 ) 

321 

322 # Determine which Schedds will be queried for job information. 

323 coll = htcondor.Collector() 

324 

325 schedd_ads = [] 

326 if is_global: 

327 schedd_ads.extend(coll.locateAll(htcondor.DaemonTypes.Schedd)) 

328 else: 

329 schedd_ads.append(coll.locate(htcondor.DaemonTypes.Schedd)) 

330 

331 # Construct appropriate constraint expression using provided arguments. 

332 constraint = "False" 

333 if wms_id is None: 

334 if user is not None: 

335 constraint = f'(Owner == "{user}")' 

336 else: 

337 schedd_ad, cluster_id, id_type = _wms_id_to_cluster(wms_id) 

338 if cluster_id is not None: 

339 constraint = f"(DAGManJobId == {cluster_id} || ClusterId == {cluster_id})" 

340 

341 # If provided id is either a submission path or a global id, 

342 # make sure the right Schedd will be queried regardless of 

343 # 'is_global' value. 

344 if id_type in {WmsIdType.GLOBAL, WmsIdType.PATH}: 

345 schedd_ads = [schedd_ad] 

346 if require_bps: 

347 constraint += ' && (bps_isjob == "True")' 

348 if pass_thru: 

349 if "-forcex" in pass_thru: 

350 pass_thru_2 = pass_thru.replace("-forcex", "") 

351 if pass_thru_2 and not pass_thru_2.isspace(): 

352 constraint += f" && ({pass_thru_2})" 

353 else: 

354 constraint += f" && ({pass_thru})" 

355 

356 # Create a list of scheduler daemons which need to be queried. 

357 schedds = {ad["Name"]: htcondor.Schedd(ad) for ad in schedd_ads} 

358 

359 _LOG.debug("constraint = %s, schedds = %s", constraint, ", ".join(schedds)) 

360 results = condor_q(constraint=constraint, schedds=schedds) 

361 

362 # Prune child jobs where DAG job is in queue (i.e., aren't orphans). 

363 job_ids = [] 

364 for job_info in results.values(): 

365 for job_id, job_ad in job_info.items(): 

366 _LOG.debug("job_id=%s DAGManJobId=%s", job_id, job_ad.get("DAGManJobId", "None")) 

367 if "DAGManJobId" not in job_ad: 

368 job_ids.append(job_ad.get("GlobalJobId", job_id)) 

369 else: 

370 _LOG.debug("Looking for %s", f"{job_ad['DAGManJobId']}.0") 

371 _LOG.debug("\tin jobs.keys() = %s", job_info.keys()) 

372 if f"{job_ad['DAGManJobId']}.0" not in job_info: # orphaned job 

373 job_ids.append(job_ad.get("GlobalJobId", job_id)) 

374 

375 _LOG.debug("job_ids = %s", job_ids) 

376 return job_ids 

377 

378 def get_status( 

379 self, 

380 wms_workflow_id: str, 

381 hist: float = 1, 

382 is_global: bool = False, 

383 ) -> tuple[WmsStates, str]: 

384 """Return status of run based upon given constraints. 

385 

386 Parameters 

387 ---------- 

388 wms_workflow_id : `str` 

389 Limit to specific run based on id (queue id or path). 

390 hist : `float`, optional 

391 Limit history search to this many days. Defaults to 1. 

392 is_global : `bool`, optional 

393 If set, all job queues (and their histories) will be queried for 

394 job information. Defaults to False which means that only the local 

395 job queue will be queried. 

396 

397 Returns 

398 ------- 

399 state : `lsst.ctrl.bps.WmsStates` 

400 Status of single run from given information. 

401 message : `str` 

402 Extra message for status command to print. This could be pointers 

403 to documentation or to WMS specific commands. 

404 """ 

405 _LOG.debug("get_status: id=%s, hist=%s, is_global=%s", wms_workflow_id, hist, is_global) 

406 

407 id_type = _wms_id_type(wms_workflow_id) 

408 _LOG.debug("id_type = %s", id_type.name) 

409 

410 if id_type == WmsIdType.LOCAL: 

411 schedulers = _locate_schedds(locate_all=is_global) 

412 _LOG.debug("schedulers = %s", schedulers) 

413 state, message = _get_status_from_id(wms_workflow_id, hist, schedds=schedulers) 

414 elif id_type == WmsIdType.GLOBAL: 

415 schedulers = _locate_schedds(locate_all=True) 

416 _LOG.debug("schedulers = %s", schedulers) 

417 state, message = _get_status_from_id(wms_workflow_id, hist, schedds=schedulers) 

418 elif id_type == WmsIdType.PATH: 

419 state, message = _get_status_from_path(wms_workflow_id) 

420 else: 

421 state, message = WmsStates.UNKNOWN, "Invalid job id" 

422 _LOG.debug("state: %s, %s", state, message) 

423 

424 return state, message 

425 

426 def report( 

427 self, 

428 wms_workflow_id=None, 

429 user=None, 

430 hist=0, 

431 pass_thru=None, 

432 is_global=False, 

433 return_exit_codes=False, 

434 ): 

435 """Return run information based upon given constraints. 

436 

437 Parameters 

438 ---------- 

439 wms_workflow_id : `str`, optional 

440 Limit to specific run based on id. 

441 user : `str`, optional 

442 Limit results to runs for this user. 

443 hist : `float`, optional 

444 Limit history search to this many days. Defaults to 0. 

445 pass_thru : `str`, optional 

446 Constraints to pass through to HTCondor. 

447 is_global : `bool`, optional 

448 If set, all job queues (and their histories) will be queried for 

449 job information. Defaults to False which means that only the local 

450 job queue will be queried. 

451 return_exit_codes : `bool`, optional 

452 If set, return exit codes related to jobs with a 

453 non-success status. Defaults to False, which means that only 

454 the summary state is returned. 

455 

456 Only applicable in the context of a WMS with associated 

457 handlers to return exit codes from jobs. 

458 

459 Returns 

460 ------- 

461 runs : `list` [`lsst.ctrl.bps.WmsRunReport`] 

462 Information about runs from given job information. 

463 message : `str` 

464 Extra message for report command to print. This could be pointers 

465 to documentation or to WMS specific commands. 

466 """ 

467 if wms_workflow_id: 

468 id_type = _wms_id_type(wms_workflow_id) 

469 if id_type == WmsIdType.LOCAL: 

470 schedulers = _locate_schedds(locate_all=is_global) 

471 run_reports, message = _report_from_id(wms_workflow_id, hist, schedds=schedulers) 

472 elif id_type == WmsIdType.GLOBAL: 

473 schedulers = _locate_schedds(locate_all=True) 

474 run_reports, message = _report_from_id(wms_workflow_id, hist, schedds=schedulers) 

475 elif id_type == WmsIdType.PATH: 

476 run_reports, message = _report_from_path(wms_workflow_id) 

477 else: 

478 run_reports, message = {}, "Invalid job id" 

479 else: 

480 schedulers = _locate_schedds(locate_all=is_global) 

481 run_reports, message = _summary_report(user, hist, pass_thru, schedds=schedulers) 

482 _LOG.debug("report: %s, %s", run_reports, message) 

483 

484 return list(run_reports.values()), message 

485 

486 def cancel(self, wms_id, pass_thru=None): 

487 """Cancel submitted workflows/jobs. 

488 

489 Parameters 

490 ---------- 

491 wms_id : `str` 

492 Id or path of job that should be canceled. 

493 pass_thru : `str`, optional 

494 Information to pass through to WMS. 

495 

496 Returns 

497 ------- 

498 deleted : `bool` 

499 Whether successful deletion or not. Currently, if any doubt or any 

500 individual jobs not deleted, return False. 

501 message : `str` 

502 Any message from WMS (e.g., error details). 

503 """ 

504 _LOG.debug("Canceling wms_id = %s", wms_id) 

505 

506 schedd_ad, cluster_id, _ = _wms_id_to_cluster(wms_id) 

507 

508 if cluster_id is None: 

509 deleted = False 

510 message = "invalid id" 

511 else: 

512 _LOG.debug( 

513 "Canceling job managed by schedd_name = %s with cluster_id = %s", 

514 cluster_id, 

515 schedd_ad["Name"], 

516 ) 

517 schedd = htcondor.Schedd(schedd_ad) 

518 

519 constraint = f"ClusterId == {cluster_id}" 

520 if pass_thru is not None and "-forcex" in pass_thru: 

521 pass_thru_2 = pass_thru.replace("-forcex", "") 

522 if pass_thru_2 and not pass_thru_2.isspace(): 

523 constraint += f"&& ({pass_thru_2})" 

524 _LOG.debug("JobAction.RemoveX constraint = %s", constraint) 

525 results = schedd.act(htcondor.JobAction.RemoveX, constraint) 

526 else: 

527 if pass_thru: 

528 constraint += f"&& ({pass_thru})" 

529 _LOG.debug("JobAction.Remove constraint = %s", constraint) 

530 results = schedd.act(htcondor.JobAction.Remove, constraint) 

531 _LOG.debug("Remove results: %s", results) 

532 

533 if results["TotalSuccess"] > 0 and results["TotalError"] == 0: 

534 deleted = True 

535 message = "" 

536 else: 

537 deleted = False 

538 if results["TotalSuccess"] == 0 and results["TotalError"] == 0: 

539 message = "no such bps job in batch queue" 

540 else: 

541 message = f"unknown problems deleting: {results}" 

542 

543 _LOG.debug("deleted: %s; message = %s", deleted, message) 

544 return deleted, message 

545 

546 def ping(self, pass_thru): 

547 """Check whether WMS services are up, reachable, and can authenticate 

548 if authentication is required. 

549 

550 The services to be checked are those needed for submit, report, cancel, 

551 restart, but ping cannot guarantee whether jobs would actually run 

552 successfully. 

553 

554 Parameters 

555 ---------- 

556 pass_thru : `str`, optional 

557 Information to pass through to WMS. 

558 

559 Returns 

560 ------- 

561 status : `int` 

562 0 for success, non-zero for failure. 

563 message : `str` 

564 Any message from WMS (e.g., error details). 

565 """ 

566 coll = htcondor.Collector() 

567 secman = htcondor.SecMan() 

568 status = 0 

569 message = "" 

570 _LOG.info("Not verifying that compute resources exist.") 

571 try: 

572 for daemon_type in [htcondor.DaemonTypes.Schedd, htcondor.DaemonTypes.Collector]: 

573 _ = secman.ping(coll.locate(daemon_type)) 

574 except htcondor.HTCondorLocateError: 

575 status = 1 

576 message = f"Could not locate {daemon_type} service." 

577 except htcondor.HTCondorIOError: 

578 status = 1 

579 message = f"Permission problem with {daemon_type} service." 

580 return status, message