Coverage for python / lsst / ctrl / bps / htcondor / prepare_utils.py: 5%

387 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-22 09:01 +0000

1# This file is part of ctrl_bps_htcondor. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <https://www.gnu.org/licenses/>. 

27 

28"""Utility functions for preparing the HTCondor workflow.""" 

29 

30import logging 

31import os 

32import re 

33from collections import defaultdict 

34from copy import deepcopy 

35from pathlib import Path 

36from typing import Any, cast 

37 

38from lsst.ctrl.bps import ( 

39 BpsConfig, 

40 GenericWorkflow, 

41 GenericWorkflowGroup, 

42 GenericWorkflowJob, 

43 GenericWorkflowNodeType, 

44 GenericWorkflowNoopJob, 

45) 

46from lsst.ctrl.bps.bps_utils import create_count_summary 

47 

48from .lssthtc import ( 

49 HTCDag, 

50 HTCJob, 

51 _update_dicts, 

52 condor_status, 

53 htc_escape, 

54) 

55 

56_LOG = logging.getLogger(__name__) 

57 

58DEFAULT_HTC_EXEC_PATT = ".*worker.*" 

59"""Default pattern for searching execute machines in an HTCondor pool. 

60""" 

61 

62 

63def _create_job(subdir_template, cached_values, generic_workflow, gwjob, out_prefix): 

64 """Convert GenericWorkflow job nodes to DAG jobs. 

65 

66 Parameters 

67 ---------- 

68 subdir_template : `str` 

69 Template for making subdirs. 

70 cached_values : `dict` 

71 Site and label specific values. 

72 generic_workflow : `lsst.ctrl.bps.GenericWorkflow` 

73 Generic workflow that is being converted. 

74 gwjob : `lsst.ctrl.bps.GenericWorkflowJob` 

75 The generic job to convert to a HTCondor job. 

76 out_prefix : `str` 

77 Directory prefix for HTCondor files. 

78 

79 Returns 

80 ------- 

81 htc_job : `lsst.ctrl.bps.wms.htcondor.HTCJob` 

82 The HTCondor job equivalent to the given generic job. 

83 """ 

84 htc_job = HTCJob(gwjob.name, label=gwjob.label) 

85 

86 curvals = defaultdict(str) 

87 curvals["label"] = gwjob.label 

88 if gwjob.tags: 

89 curvals.update(gwjob.tags) 

90 

91 subdir = Path("jobs") / subdir_template.format_map(curvals) 

92 htc_job.subdir = subdir 

93 htc_job.subfile = f"{gwjob.name}.sub" 

94 htc_job.add_dag_cmds({"dir": subdir}) 

95 

96 htc_job_cmds = { 

97 "universe": "vanilla", 

98 "should_transfer_files": "YES", 

99 "when_to_transfer_output": "ON_EXIT_OR_EVICT", 

100 "transfer_output_files": '""', # Set to empty string to disable 

101 "transfer_executable": "False", 

102 "getenv": "True", 

103 # Exceeding memory sometimes triggers SIGBUS or SIGSEGV error. Tell 

104 # htcondor to put on hold any jobs which exited by a signal. If 

105 # executed in a bash script, like finalJob, the signals will become 

106 # exit codes above 128 (exit code = 128 + signal number). 

107 "on_exit_hold": "ExitBySignal == true || ExitCode > 128", 

108 "on_exit_hold_reason": "ExitBySignal == true ? " 

109 'strcat("Job raised a signal ", string(ExitSignal), ' 

110 '". Handling job as if it has gone over memory limit.") : ' 

111 'strcat("Job exit code (", string(ExitCode), ") > 128. ' 

112 'Handling job as if it has gone over memory limit.")', 

113 "on_exit_hold_subcode": "34", 

114 } 

115 

116 htc_job_cmds.update(_translate_job_cmds(cached_values, generic_workflow, gwjob)) 

117 

118 # Combine stdout and stderr to reduce the number of files. 

119 for key in ("output", "error"): 

120 if cached_values["overwriteJobFiles"]: 

121 htc_job_cmds[key] = f"{gwjob.name}.$(Cluster).out" 

122 else: 

123 htc_job_cmds[key] = f"{gwjob.name}.$(Cluster).$$([NumJobStarts ?: 0]).out" 

124 _LOG.debug("HTCondor %s = %s", key, htc_job_cmds[key]) 

125 

126 key = "log" 

127 htc_job_cmds[key] = f"{gwjob.name}.$(Cluster).{key}" 

128 _LOG.debug("HTCondor %s = %s", key, htc_job_cmds[key]) 

129 

130 htc_job_cmds.update( 

131 _handle_job_inputs(generic_workflow, gwjob.name, cached_values["bpsUseShared"], out_prefix) 

132 ) 

133 

134 htc_job_cmds.update( 

135 _handle_job_outputs(generic_workflow, gwjob.name, cached_values["bpsUseShared"], out_prefix) 

136 ) 

137 

138 # If specified, add nodeset to the job 

139 if "nodeset" in cached_values: 

140 htc_job.add_job_attrs({"JobNodeset": cached_values["nodeset"]}) 

141 clause = f'( Target.Nodeset == "{cached_values["nodeset"]}" )' 

142 if "requirements" in htc_job_cmds: 

143 htc_job_cmds["requirements"] = f"({htc_job_cmds['requirements']}) && {clause}" 

144 else: 

145 htc_job_cmds["requirements"] = clause 

146 

147 # Add the job cmds dict to the job object. 

148 htc_job.add_job_cmds(htc_job_cmds) 

149 

150 # Add job-related cmds to the DAG (e.g., VARS) 

151 htc_job.add_dag_cmds(_translate_dag_cmds(gwjob)) 

152 

153 # Add job attributes to job. 

154 _LOG.debug("gwjob.attrs = %s", gwjob.attrs) 

155 htc_job.add_job_attrs(gwjob.attrs) 

156 htc_job.add_job_attrs(cached_values["attrs"]) 

157 htc_job.add_job_attrs({"bps_job_quanta": create_count_summary(gwjob.quanta_counts)}) 

158 htc_job.add_job_attrs({"bps_job_name": gwjob.name, "bps_job_label": gwjob.label}) 

159 

160 return htc_job 

161 

162 

163def _translate_job_cmds(cached_vals, generic_workflow, gwjob): 

164 """Translate the job data that are one to one mapping 

165 

166 Parameters 

167 ---------- 

168 cached_vals : `dict` [`str`, `~typing.Any`] 

169 Config values common to jobs with same site or label. 

170 generic_workflow : `lsst.ctrl.bps.GenericWorkflow` 

171 Generic workflow that contains job to being converted. 

172 gwjob : `lsst.ctrl.bps.GenericWorkflowJob` 

173 Generic workflow job to be converted. 

174 

175 Returns 

176 ------- 

177 htc_job_commands : `dict` [`str`, `~typing.Any`] 

178 Contains commands which can appear in the HTCondor submit description 

179 file. 

180 """ 

181 # Values in the job script that just are name mappings. 

182 job_translation = { 

183 "mail_to": "notify_user", 

184 "when_to_mail": "notification", 

185 "request_cpus": "request_cpus", 

186 "priority": "priority", 

187 "category": "category", 

188 "accounting_group": "accounting_group", 

189 "accounting_user": "accounting_group_user", 

190 } 

191 

192 jobcmds = {} 

193 for gwkey, htckey in job_translation.items(): 

194 jobcmds[htckey] = getattr(gwjob, gwkey, None) 

195 

196 # If accounting info was not set explicitly, use site settings if any. 

197 if not gwjob.accounting_group: 

198 jobcmds["accounting_group"] = cached_vals.get("accountingGroup") 

199 if not gwjob.accounting_user: 

200 jobcmds["accounting_group_user"] = cached_vals.get("accountingUser") 

201 

202 # job commands that need modification 

203 if gwjob.retry_unless_exit: 

204 if isinstance(gwjob.retry_unless_exit, int): 

205 jobcmds["retry_until"] = f"{gwjob.retry_unless_exit}" 

206 elif isinstance(gwjob.retry_unless_exit, list): 

207 jobcmds["retry_until"] = ( 

208 f"member(ExitCode, {{{','.join([str(x) for x in gwjob.retry_unless_exit])}}})" 

209 ) 

210 else: 

211 raise ValueError("retryUnlessExit must be an integer or a list of integers.") 

212 

213 if gwjob.request_disk: 

214 jobcmds["request_disk"] = f"{gwjob.request_disk}MB" 

215 

216 if gwjob.request_memory: 

217 jobcmds["request_memory"] = f"{gwjob.request_memory}" 

218 

219 memory_max = 0 

220 if gwjob.memory_multiplier: 

221 # Do not use try-except! At the moment, BpsConfig returns an empty 

222 # string if it does not contain the key. 

223 memory_limit = cached_vals["memoryLimit"] 

224 if not memory_limit: 

225 raise RuntimeError( 

226 "Memory autoscaling enabled, but automatic detection of the memory limit " 

227 "failed; setting it explicitly with 'memoryLimit' or changing worker node " 

228 "search pattern 'executeMachinesPattern' might help." 

229 ) 

230 

231 # Set maximal amount of memory job can ask for. 

232 # 

233 # The check below assumes that 'memory_limit' was set to a value which 

234 # realistically reflects actual physical limitations of a given compute 

235 # resource. 

236 memory_max = memory_limit 

237 if gwjob.request_memory_max and gwjob.request_memory_max < memory_limit: 

238 memory_max = gwjob.request_memory_max 

239 

240 # Make job ask for more memory each time it failed due to insufficient 

241 # memory requirements. 

242 jobcmds["request_memory"] = _create_request_memory_expr( 

243 gwjob.request_memory, gwjob.memory_multiplier, memory_max 

244 ) 

245 

246 user_release_expr = cached_vals.get("releaseExpr", "") 

247 if gwjob.number_of_retries is not None and gwjob.number_of_retries >= 0: 

248 jobcmds["max_retries"] = gwjob.number_of_retries 

249 

250 # No point in adding periodic_release if 0 retries 

251 if gwjob.number_of_retries > 0: 

252 periodic_release = _create_periodic_release_expr( 

253 gwjob.request_memory, gwjob.memory_multiplier, memory_max, user_release_expr 

254 ) 

255 if periodic_release: 

256 jobcmds["periodic_release"] = periodic_release 

257 

258 jobcmds["periodic_remove"] = _create_periodic_remove_expr( 

259 gwjob.request_memory, gwjob.memory_multiplier, memory_max 

260 ) 

261 

262 # Assume concurrency_limit implemented using HTCondor concurrency limits. 

263 # May need to move to special site-specific implementation if sites use 

264 # other mechanisms. 

265 if gwjob.concurrency_limit: 

266 jobcmds["concurrency_limit"] = gwjob.concurrency_limit 

267 

268 # Handle command line 

269 if gwjob.executable.transfer_executable: 

270 jobcmds["transfer_executable"] = "True" 

271 jobcmds["executable"] = gwjob.executable.src_uri 

272 else: 

273 jobcmds["executable"] = _fix_env_var_syntax(gwjob.executable.src_uri) 

274 

275 if gwjob.arguments: 

276 arguments = gwjob.arguments 

277 arguments = _replace_cmd_vars(arguments, gwjob) 

278 arguments = _replace_wms_vars(arguments) 

279 arguments = _replace_file_vars(cached_vals["bpsUseShared"], arguments, generic_workflow, gwjob) 

280 arguments = _fix_env_var_syntax(arguments) 

281 jobcmds["arguments"] = arguments 

282 

283 if gwjob.environment: 

284 env_str = "" 

285 for name, value in gwjob.environment.items(): 

286 if isinstance(value, str): 

287 value2 = _replace_cmd_vars(value, gwjob) 

288 value2 = _replace_wms_vars(value) 

289 value2 = _fix_env_var_syntax(value2) 

290 value2 = htc_escape(value2) 

291 env_str += f"{name}='{value2}' " # Add single quotes to allow internal spaces 

292 else: 

293 env_str += f"{name}={value} " 

294 

295 # Process above added one trailing space 

296 jobcmds["environment"] = env_str.rstrip() 

297 

298 # Add extra "pass-thru" job commands 

299 if gwjob.profile: 

300 for key, val in gwjob.profile.items(): 

301 jobcmds[key] = val 

302 for key, val in cached_vals["profile"].items(): 

303 jobcmds[key] = val 

304 

305 return jobcmds 

306 

307 

308def _translate_dag_cmds(gwjob): 

309 """Translate job values into DAGMan commands. 

310 

311 Parameters 

312 ---------- 

313 gwjob : `lsst.ctrl.bps.GenericWorkflowJob` 

314 Job containing values to be translated. 

315 

316 Returns 

317 ------- 

318 dagcmds : `dict` [`str`, `~typing.Any`] 

319 DAGMan commands for the job. 

320 """ 

321 # Values in the dag script that just are name mappings. 

322 dag_translation = { 

323 "abort_on_value": "abort_dag_on", 

324 "abort_return_value": "abort_exit", 

325 "priority": "priority", 

326 } 

327 

328 dagcmds = {} 

329 for gwkey, htckey in dag_translation.items(): 

330 dagcmds[htckey] = getattr(gwjob, gwkey, None) 

331 

332 # Still to be coded: vars "pre_cmdline", "post_cmdline" 

333 return dagcmds 

334 

335 

336def _fix_env_var_syntax(oldstr): 

337 """Change ENV place holders to HTCondor Env var syntax. 

338 

339 Parameters 

340 ---------- 

341 oldstr : `str` 

342 String in which environment variable syntax is to be fixed. 

343 

344 Returns 

345 ------- 

346 newstr : `str` 

347 Given string with environment variable syntax fixed. 

348 """ 

349 newstr = oldstr 

350 for key in re.findall(r"<ENV:([^>]+)>", oldstr): 

351 newstr = newstr.replace(rf"<ENV:{key}>", f"$ENV({key})") 

352 return newstr 

353 

354 

355def _replace_file_vars(use_shared, arguments, workflow, gwjob): 

356 """Replace file placeholders in command line arguments with correct 

357 physical file names. 

358 

359 Parameters 

360 ---------- 

361 use_shared : `bool` 

362 Whether HTCondor can assume shared filesystem. 

363 arguments : `str` 

364 Arguments string in which to replace file placeholders. 

365 workflow : `lsst.ctrl.bps.GenericWorkflow` 

366 Generic workflow that contains file information. 

367 gwjob : `lsst.ctrl.bps.GenericWorkflowJob` 

368 The job corresponding to the arguments. 

369 

370 Returns 

371 ------- 

372 arguments : `str` 

373 Given arguments string with file placeholders replaced. 

374 """ 

375 # Replace input file placeholders with paths. 

376 for gwfile in workflow.get_job_inputs(gwjob.name, data=True, transfer_only=False): 

377 if not gwfile.wms_transfer: 

378 # Must assume full URI if in command line and told WMS is not 

379 # responsible for transferring file. 

380 uri = gwfile.src_uri 

381 elif use_shared: 

382 if gwfile.job_shared: 

383 # Have shared filesystems and jobs can share file. 

384 uri = gwfile.src_uri 

385 else: 

386 uri = os.path.basename(gwfile.src_uri) 

387 else: # Using push transfer 

388 uri = os.path.basename(gwfile.src_uri) 

389 arguments = arguments.replace(f"<FILE:{gwfile.name}>", uri) 

390 

391 # Replace output file placeholders with paths. 

392 for gwfile in workflow.get_job_outputs(gwjob.name, data=True, transfer_only=False): 

393 if not gwfile.wms_transfer: 

394 # Must assume full URI if in command line and told WMS is not 

395 # responsible for transferring file. 

396 uri = gwfile.src_uri 

397 elif use_shared: 

398 if gwfile.job_shared: 

399 # Have shared filesystems and jobs can share file. 

400 uri = gwfile.src_uri 

401 else: 

402 uri = os.path.basename(gwfile.src_uri) 

403 else: # Using push transfer 

404 uri = os.path.basename(gwfile.src_uri) 

405 arguments = arguments.replace(f"<FILE:{gwfile.name}>", uri) 

406 return arguments 

407 

408 

409def _replace_cmd_vars(arguments, gwjob): 

410 """Replace format-style placeholders in arguments. 

411 

412 Parameters 

413 ---------- 

414 arguments : `str` 

415 Arguments string in which to replace placeholders. 

416 gwjob : `lsst.ctrl.bps.GenericWorkflowJob` 

417 Job containing values to be used to replace placeholders 

418 (in particular gwjob.cmdvals). 

419 

420 Returns 

421 ------- 

422 arguments : `str` 

423 Given arguments string with placeholders replaced. 

424 """ 

425 replacements = gwjob.cmdvals if gwjob.cmdvals is not None else {} 

426 try: 

427 arguments = arguments.format(**replacements) 

428 except (KeyError, TypeError) as exc: # TypeError in case None instead of {} 

429 _LOG.error("Could not replace command variables: replacement for %s not provided", str(exc)) 

430 _LOG.debug("arguments: %s\ncmdvals: %s", arguments, replacements) 

431 raise 

432 return arguments 

433 

434 

435def _replace_wms_vars(orig_string: str) -> str: 

436 """Replace special wms placeholders in given string. 

437 

438 Parameters 

439 ---------- 

440 orig_string : `str` 

441 String in which to replace wms placeholders. 

442 

443 Returns 

444 ------- 

445 updated_string : `str` 

446 Given string with wms placeholders replaced. 

447 """ 

448 values = {"attemptNum": "$$([NumJobStarts])"} 

449 updated_string = orig_string 

450 for key in re.findall(r"<WMS:([^>]+)>", orig_string): 

451 try: 

452 updated_string = updated_string.replace(rf"<WMS:{key}>", values[key]) 

453 except KeyError: 

454 _LOG.error("Unrecognized WMS placeholder: %s in %s", key, orig_string) 

455 raise 

456 return updated_string 

457 

458 

459def _handle_job_inputs( 

460 generic_workflow: GenericWorkflow, job_name: str, use_shared: bool, out_prefix: str 

461) -> dict[str, str]: 

462 """Add job input files from generic workflow to job. 

463 

464 Parameters 

465 ---------- 

466 generic_workflow : `lsst.ctrl.bps.GenericWorkflow` 

467 The generic workflow (e.g., has executable name and arguments). 

468 job_name : `str` 

469 Unique name for the job. 

470 use_shared : `bool` 

471 Whether job has access to files via shared filesystem. 

472 out_prefix : `str` 

473 The root directory into which all WMS-specific files are written. 

474 

475 Returns 

476 ------- 

477 htc_commands : `dict` [`str`, `str`] 

478 HTCondor commands for the job submission script. 

479 """ 

480 inputs = [] 

481 for gwf_file in generic_workflow.get_job_inputs(job_name, data=True, transfer_only=True): 

482 _LOG.debug("src_uri=%s", gwf_file.src_uri) 

483 

484 uri = Path(gwf_file.src_uri) 

485 

486 # Note if use_shared and job_shared, don't need to transfer file. 

487 

488 if not use_shared: # Copy file using push to job 

489 inputs.append(str(uri)) 

490 elif not gwf_file.job_shared: # Jobs require own copy 

491 # if using shared filesystem, but still need copy in job. Use 

492 # HTCondor's curl plugin for a local copy. 

493 if uri.is_dir(): 

494 raise RuntimeError( 

495 f"HTCondor plugin cannot transfer directories locally within job {gwf_file.src_uri}" 

496 ) 

497 inputs.append(f"file://{uri}") 

498 

499 htc_commands = {} 

500 if inputs: 

501 htc_commands["transfer_input_files"] = ",".join(inputs) 

502 _LOG.debug("transfer_input_files=%s", htc_commands["transfer_input_files"]) 

503 return htc_commands 

504 

505 

506def _handle_job_outputs( 

507 generic_workflow: GenericWorkflow, job_name: str, use_shared: bool, out_prefix: str 

508) -> dict[str, str]: 

509 """Add job output files from generic workflow to the job if any. 

510 

511 Parameters 

512 ---------- 

513 generic_workflow : `lsst.ctrl.bps.GenericWorkflow` 

514 The generic workflow (e.g., has executable name and arguments). 

515 job_name : `str` 

516 Unique name for the job. 

517 use_shared : `bool` 

518 Whether job has access to files via shared filesystem. 

519 out_prefix : `str` 

520 The root directory into which all WMS-specific files are written. 

521 

522 Returns 

523 ------- 

524 htc_commands : `dict` [`str`, `str`] 

525 HTCondor commands for the job submission script. 

526 """ 

527 outputs = [] 

528 output_remaps = [] 

529 for gwf_file in generic_workflow.get_job_outputs(job_name, data=True, transfer_only=True): 

530 _LOG.debug("src_uri=%s", gwf_file.src_uri) 

531 

532 uri = Path(gwf_file.src_uri) 

533 if not use_shared: 

534 outputs.append(uri.name) 

535 output_remaps.append(f"{uri.name}={str(uri)}") 

536 

537 # Set to an empty string to disable and only update if there are output 

538 # files to transfer. Otherwise, HTCondor will transfer back all files in 

539 # the job’s temporary working directory that have been modified or created 

540 # by the job. 

541 htc_commands = {"transfer_output_files": '""'} 

542 if outputs: 

543 htc_commands["transfer_output_files"] = ",".join(outputs) 

544 _LOG.debug("transfer_output_files=%s", htc_commands["transfer_output_files"]) 

545 

546 htc_commands["transfer_output_remaps"] = f'"{";".join(output_remaps)}"' 

547 _LOG.debug("transfer_output_remaps=%s", htc_commands["transfer_output_remaps"]) 

548 return htc_commands 

549 

550 

551def _create_periodic_release_expr( 

552 memory: int, multiplier: float | None, limit: int, additional_expr: str = "" 

553) -> str: 

554 """Construct an HTCondorAd expression for releasing held jobs. 

555 

556 Parameters 

557 ---------- 

558 memory : `int` 

559 Requested memory in MB. 

560 multiplier : `float` or None 

561 Memory growth rate between retries. 

562 limit : `int` 

563 Memory limit. 

564 additional_expr : `str`, optional 

565 Expression to add to periodic_release. Defaults to empty string. 

566 

567 Returns 

568 ------- 

569 expr : `str` 

570 A string representing an HTCondor ClassAd expression for releasing job. 

571 """ 

572 _LOG.debug( 

573 "periodic_release: memory: %s, multiplier: %s, limit: %s, additional_expr: %s", 

574 memory, 

575 multiplier, 

576 limit, 

577 additional_expr, 

578 ) 

579 

580 # ctrl_bps sets multiplier to None in the GenericWorkflow if 

581 # memoryMultiplier <= 1, but checking value just in case. 

582 if (not multiplier or multiplier <= 1) and not additional_expr: 

583 return "" 

584 

585 # Job ClassAds attributes 'HoldReasonCode' and 'HoldReasonSubCode' are 

586 # UNDEFINED if job is not HELD (i.e. when 'JobStatus' is not 5). 

587 # The special comparison operators ensure that all comparisons below will 

588 # evaluate to FALSE in this case. 

589 # 

590 # Note: 

591 # May not be strictly necessary. Operators '&&' and '||' are not strict so 

592 # the entire expression should evaluate to FALSE when the job is not HELD. 

593 # According to ClassAd evaluation semantics FALSE && UNDEFINED is FALSE, 

594 # but better safe than sorry. 

595 is_held = "JobStatus == 5" 

596 is_retry_allowed = "NumJobStarts <= JobMaxRetries" 

597 

598 mem_expr = "" 

599 if memory and multiplier and multiplier > 1 and limit: 

600 was_mem_exceeded = ( 

601 "(HoldReasonCode =?= 34 && HoldReasonSubCode =?= 0 " 

602 "|| HoldReasonCode =?= 3 && HoldReasonSubCode =?= 34)" 

603 ) 

604 was_below_limit = f"min({{int({memory} * pow({multiplier}, NumJobStarts - 1)), {limit}}}) < {limit}" 

605 mem_expr = f"{was_mem_exceeded} && {was_below_limit}" 

606 

607 user_expr = "" 

608 if additional_expr: 

609 # Never auto release a job held by user. 

610 user_expr = f"HoldReasonCode =!= 1 && {additional_expr}" 

611 

612 expr = f"{is_held} && {is_retry_allowed}" 

613 if user_expr and mem_expr: 

614 expr += f" && ({mem_expr} || {user_expr})" 

615 elif user_expr: 

616 expr += f" && {user_expr}" 

617 elif mem_expr: 

618 expr += f" && {mem_expr}" 

619 

620 return expr 

621 

622 

623def _create_periodic_remove_expr(memory, multiplier, limit): 

624 """Construct an HTCondorAd expression for removing jobs from the queue. 

625 

626 Parameters 

627 ---------- 

628 memory : `int` 

629 Requested memory in MB. 

630 multiplier : `float` 

631 Memory growth rate between retries. 

632 limit : `int` 

633 Memory limit. 

634 

635 Returns 

636 ------- 

637 expr : `str` 

638 A string representing an HTCondor ClassAd expression for removing jobs. 

639 """ 

640 # Job ClassAds attributes 'HoldReasonCode' and 'HoldReasonSubCode' 

641 # are UNDEFINED if job is not HELD (i.e. when 'JobStatus' is not 5). 

642 # The special comparison operators ensure that all comparisons below 

643 # will evaluate to FALSE in this case. 

644 # 

645 # Note: 

646 # May not be strictly necessary. Operators '&&' and '||' are not 

647 # strict so the entire expression should evaluate to FALSE when the 

648 # job is not HELD. According to ClassAd evaluation semantics 

649 # FALSE && UNDEFINED is FALSE, but better safe than sorry. 

650 is_held = "JobStatus == 5" 

651 is_retry_disallowed = "NumJobStarts > JobMaxRetries" 

652 

653 mem_expr = "" 

654 if memory and multiplier and multiplier > 1 and limit: 

655 mem_limit_expr = f"min({{int({memory} * pow({multiplier}, NumJobStarts - 1)), {limit}}}) == {limit}" 

656 

657 mem_expr = ( # Add || here so only added if adding memory expr 

658 " || ((HoldReasonCode =?= 34 && HoldReasonSubCode =?= 0 " 

659 f"|| HoldReasonCode =?= 3 && HoldReasonSubCode =?= 34) && {mem_limit_expr})" 

660 ) 

661 

662 expr = f"{is_held} && ({is_retry_disallowed}{mem_expr})" 

663 return expr 

664 

665 

666def _create_request_memory_expr(memory, multiplier, limit): 

667 """Construct an HTCondor ClassAd expression for safe memory scaling. 

668 

669 Parameters 

670 ---------- 

671 memory : `int` 

672 Requested memory in MB. 

673 multiplier : `float` 

674 Memory growth rate between retries. 

675 limit : `int` 

676 Memory limit. 

677 

678 Returns 

679 ------- 

680 expr : `str` 

681 A string representing an HTCondor ClassAd expression enabling safe 

682 memory scaling between job retries. 

683 """ 

684 # The check if the job was held due to exceeding memory requirements 

685 # will be made *after* job was released back to the job queue (is in 

686 # the IDLE state), hence the need to use `Last*` job ClassAds instead of 

687 # the ones describing job's current state. 

688 # 

689 # Also, 'Last*' job ClassAds attributes are UNDEFINED when a job is 

690 # initially put in the job queue. The special comparison operators ensure 

691 # that all comparisons below will evaluate to FALSE in this case. 

692 was_mem_exceeded = ( 

693 "LastJobStatus =?= 5 " 

694 "&& (LastHoldReasonCode =?= 34 && LastHoldReasonSubCode =?= 0 " 

695 "|| LastHoldReasonCode =?= 3 && LastHoldReasonSubCode =?= 34)" 

696 ) 

697 

698 # If job runs the first time or was held for reasons other than exceeding 

699 # the memory, set the required memory to the requested value or use 

700 # the memory value measured by HTCondor (MemoryUsage) depending on 

701 # whichever is greater. 

702 expr = ( 

703 f"({was_mem_exceeded}) " 

704 f"? min({{int({memory} * pow({multiplier}, NumJobStarts)), {limit}}}) " 

705 f": min({{max({{{memory}, MemoryUsage ?: 0}}), {limit}}})" 

706 ) 

707 return expr 

708 

709 

710def _gather_site_values(config, compute_site): 

711 """Gather values specific to given site. 

712 

713 Parameters 

714 ---------- 

715 config : `lsst.ctrl.bps.BpsConfig` 

716 BPS configuration that includes necessary submit/runtime 

717 information. 

718 compute_site : `str` 

719 Compute site name. 

720 

721 Returns 

722 ------- 

723 site_values : `dict` [`str`, `~typing.Any`] 

724 Values specific to the given site. 

725 """ 

726 site_values = {"attrs": {}, "profile": {}} 

727 search_opts = {} 

728 if compute_site: 

729 search_opts["curvals"] = {"curr_site": compute_site} 

730 

731 # Determine the hard limit for the memory requirement. 

732 found, limit = config.search("memoryLimit", opt=search_opts) 

733 if not found: 

734 search_opts["default"] = DEFAULT_HTC_EXEC_PATT 

735 _, patt = config.search("executeMachinesPattern", opt=search_opts) 

736 del search_opts["default"] 

737 

738 # To reduce the amount of data, ignore dynamic slots (if any) as, 

739 # by definition, they cannot have more memory than 

740 # the partitionable slot they are the part of. 

741 constraint = f'SlotType != "Dynamic" && regexp("{patt}", Machine)' 

742 pool_info = condor_status(constraint=constraint) 

743 try: 

744 limit = max(int(info["TotalSlotMemory"]) for info in pool_info.values()) 

745 except ValueError: 

746 _LOG.debug("No execute machine in the pool matches %s", patt) 

747 if limit: 

748 config[".bps_defined.memory_limit"] = limit 

749 

750 _, site_values["bpsUseShared"] = config.search("bpsUseShared", opt={"default": False}) 

751 site_values["memoryLimit"] = limit 

752 

753 found, value = config.search("accountingGroup", opt=search_opts) 

754 if found: 

755 site_values["accountingGroup"] = value 

756 found, value = config.search("accountingUser", opt=search_opts) 

757 if found: 

758 site_values["accountingUser"] = value 

759 

760 found, nodeset = config.search("nodeset", opt=search_opts) 

761 if found: 

762 site_values["nodeset"] = nodeset 

763 

764 searchobj = config[f".site.{compute_site}.profile.condor"] 

765 if searchobj: 

766 search_opts["searchobj"] = searchobj 

767 search_opts["replaceVars"] = True 

768 for key in searchobj: 

769 if key.startswith("+"): 

770 _, val = config.search(key, opt=search_opts) 

771 site_values["attrs"][key[1:]] = val 

772 else: 

773 _, val = config.search(key, opt=search_opts) 

774 site_values["profile"][key] = val 

775 

776 _LOG.debug("site_values = %s", site_values) 

777 return site_values 

778 

779 

780def _gather_label_values(config: BpsConfig, label: str) -> dict[str, Any]: 

781 """Gather values specific to given job label. 

782 

783 Parameters 

784 ---------- 

785 config : `lsst.ctrl.bps.BpsConfig` 

786 BPS configuration that includes necessary submit/runtime 

787 information. 

788 label : `str` 

789 GenericWorkflowJob label. 

790 

791 Returns 

792 ------- 

793 values : `dict` [`str`, `~typing.Any`] 

794 Values specific to the given job label. 

795 """ 

796 values: dict[str, Any] = {"attrs": {}, "profile": {}} 

797 

798 search_opts = {} 

799 profile_key = "" 

800 if label == "finalJob" and "finalJob" in config: 

801 search_opts["searchobj"] = config["finalJob"] 

802 profile_key = ".finalJob.profile.condor" 

803 elif label in config["cluster"]: 

804 search_opts["curvals"] = {"curr_cluster": label} 

805 profile_key = f".cluster.{label}.profile.condor" 

806 elif label in config["pipetask"]: 

807 search_opts["curvals"] = {"curr_pipetask": label} 

808 profile_key = f".pipetask.{label}.profile.condor" 

809 

810 found, value = config.search("releaseExpr", opt=search_opts) 

811 if found: 

812 values["releaseExpr"] = value 

813 

814 found, value = config.search("overwriteJobFiles", opt=search_opts) 

815 if found: 

816 values["overwriteJobFiles"] = value 

817 else: 

818 values["overwriteJobFiles"] = True 

819 

820 if profile_key and profile_key in config: 

821 for subkey, val in config[profile_key].items(): 

822 if subkey.startswith("+"): 

823 values["attrs"][subkey[1:]] = val 

824 else: 

825 values["profile"][subkey] = val 

826 

827 return values 

828 

829 

830def _group_to_subdag( 

831 config: BpsConfig, generic_workflow_group: GenericWorkflowGroup, out_prefix: str 

832) -> HTCJob: 

833 """Convert a generic workflow group to an HTCondor dag. 

834 

835 Parameters 

836 ---------- 

837 config : `lsst.ctrl.bps.BpsConfig` 

838 Workflow configuration. 

839 generic_workflow_group : `lsst.ctrl.bps.GenericWorkflowGroup` 

840 The generic workflow group to convert. 

841 out_prefix : `str` 

842 Location prefix to be used when creating jobs. 

843 

844 Returns 

845 ------- 

846 htc_job : `lsst.ctrl.bps.htcondor.HTCJob` 

847 Job for running the HTCondor dag. 

848 """ 

849 jobname = f"wms_{generic_workflow_group.name}" 

850 htc_job = HTCJob(name=jobname, label=generic_workflow_group.label) 

851 htc_job.add_dag_cmds({"dir": f"subdags/{jobname}"}) 

852 htc_job.subdag = _generic_workflow_to_htcondor_dag(config, generic_workflow_group, out_prefix) 

853 if not generic_workflow_group.blocking: 

854 htc_job.dagcmds["post"] = { 

855 "defer": "", 

856 "executable": f"{os.path.dirname(__file__)}/subdag_post.sh", 

857 "arguments": f"{jobname} $RETURN", 

858 } 

859 return htc_job 

860 

861 

862def _create_check_job(group_job_name: str, job_label: str) -> HTCJob: 

863 """Create a job to check status of a group job. 

864 

865 Parameters 

866 ---------- 

867 group_job_name : `str` 

868 Name of the group job. 

869 job_label : `str` 

870 Label to use for the check status job. 

871 

872 Returns 

873 ------- 

874 htc_job : `lsst.ctrl.bps.htcondor.HTCJob` 

875 Job description for the job to check group job status. 

876 """ 

877 htc_job = HTCJob(name=f"wms_check_status_{group_job_name}", label=job_label) 

878 htc_job.subfile = "${CTRL_BPS_HTCONDOR_DIR}/python/lsst/ctrl/bps/htcondor/check_group_status.sub" 

879 htc_job.add_dag_cmds({"dir": f"subdags/{group_job_name}", "vars": {"group_job_name": group_job_name}}) 

880 

881 return htc_job 

882 

883 

884def _generic_workflow_to_htcondor_dag( 

885 config: BpsConfig, generic_workflow: GenericWorkflow, out_prefix: str 

886) -> HTCDag: 

887 """Convert a GenericWorkflow to a HTCDag. 

888 

889 Parameters 

890 ---------- 

891 config : `lsst.ctrl.bps.BpsConfig` 

892 Workflow configuration. 

893 generic_workflow : `lsst.ctrl.bps.GenericWorkflow` 

894 The GenericWorkflow to convert. 

895 out_prefix : `str` 

896 Location prefix where the HTCondor files will be written. 

897 

898 Returns 

899 ------- 

900 dag : `lsst.ctrl.bps.htcondor.HTCDag` 

901 The HTCDag representation of the given GenericWorkflow. 

902 """ 

903 dag = HTCDag(name=generic_workflow.name) 

904 

905 _LOG.debug("htcondor dag attribs %s", generic_workflow.run_attrs) 

906 dag.add_attribs(generic_workflow.run_attrs) 

907 dag.add_attribs( 

908 { 

909 "bps_run_quanta": create_count_summary(generic_workflow.quanta_counts), 

910 "bps_job_summary": create_count_summary(generic_workflow.job_counts), 

911 } 

912 ) 

913 

914 _, tmp_template = config.search("subDirTemplate", opt={"replaceVars": False, "default": ""}) 

915 if isinstance(tmp_template, str): 

916 subdir_template = defaultdict(lambda: tmp_template) 

917 else: 

918 subdir_template = tmp_template 

919 

920 # Create all DAG jobs 

921 site_values = {} # Cache compute site specific values to reduce config lookups. 

922 cached_values = {} # Cache label-specific values to reduce config lookups. 

923 # Note: Can't use get_job_by_label because those only include payload jobs. 

924 for job_name in generic_workflow: 

925 gwjob = generic_workflow.get_job(job_name) 

926 if gwjob.node_type == GenericWorkflowNodeType.PAYLOAD: 

927 gwjob = cast(GenericWorkflowJob, gwjob) 

928 if gwjob.compute_site not in site_values: 

929 site_values[gwjob.compute_site] = _gather_site_values(config, gwjob.compute_site) 

930 if gwjob.label not in cached_values: 

931 cached_values[gwjob.label] = deepcopy(site_values[gwjob.compute_site]) 

932 _update_dicts(cached_values[gwjob.label], _gather_label_values(config, gwjob.label)) 

933 _LOG.debug("cached: %s= %s", gwjob.label, cached_values[gwjob.label]) 

934 htc_job = _create_job( 

935 subdir_template[gwjob.label], 

936 cached_values[gwjob.label], 

937 generic_workflow, 

938 gwjob, 

939 out_prefix, 

940 ) 

941 elif gwjob.node_type == GenericWorkflowNodeType.NOOP: 

942 gwjob = cast(GenericWorkflowNoopJob, gwjob) 

943 htc_job = HTCJob(f"wms_{gwjob.name}", label=gwjob.label) 

944 htc_job.subfile = "${CTRL_BPS_HTCONDOR_DIR}/python/lsst/ctrl/bps/htcondor/noop.sub" 

945 htc_job.add_job_attrs({"bps_job_name": gwjob.name, "bps_job_label": gwjob.label}) 

946 htc_job.add_dag_cmds({"noop": True}) 

947 elif gwjob.node_type == GenericWorkflowNodeType.GROUP: 

948 gwjob = cast(GenericWorkflowGroup, gwjob) 

949 htc_job = _group_to_subdag(config, gwjob, out_prefix) 

950 else: 

951 raise RuntimeError(f"Unsupported generic workflow node type {gwjob.node_type} ({gwjob.name})") 

952 _LOG.debug("Calling adding job %s %s", htc_job.name, htc_job.label) 

953 dag.add_job(htc_job) 

954 

955 # Add job dependencies to the DAG (be careful with wms_ jobs) 

956 for job_name in generic_workflow: 

957 gwjob = generic_workflow.get_job(job_name) 

958 parent_name = ( 

959 gwjob.name if gwjob.node_type == GenericWorkflowNodeType.PAYLOAD else f"wms_{gwjob.name}" 

960 ) 

961 successor_jobs = [generic_workflow.get_job(j) for j in generic_workflow.successors(job_name)] 

962 children_names = [] 

963 if gwjob.node_type == GenericWorkflowNodeType.GROUP: 

964 gwjob = cast(GenericWorkflowGroup, gwjob) 

965 group_children = [] # Dependencies between same group jobs 

966 for sjob in successor_jobs: 

967 if sjob.node_type == GenericWorkflowNodeType.GROUP and sjob.label == gwjob.label: 

968 group_children.append(f"wms_{sjob.name}") 

969 elif sjob.node_type == GenericWorkflowNodeType.PAYLOAD: 

970 children_names.append(sjob.name) 

971 else: 

972 children_names.append(f"wms_{sjob.name}") 

973 if group_children: 

974 dag.add_job_relationships([parent_name], group_children) 

975 if not gwjob.blocking: 

976 # Since subdag will always succeed, need to add a special 

977 # job that fails if group failed to block payload children. 

978 check_job = _create_check_job(f"wms_{gwjob.name}", gwjob.label) 

979 dag.add_job(check_job) 

980 dag.add_job_relationships([f"wms_{gwjob.name}"], [check_job.name]) 

981 parent_name = check_job.name 

982 else: 

983 for sjob in successor_jobs: 

984 if sjob.node_type == GenericWorkflowNodeType.PAYLOAD: 

985 children_names.append(sjob.name) 

986 else: 

987 children_names.append(f"wms_{sjob.name}") 

988 

989 dag.add_job_relationships([parent_name], children_names) 

990 

991 # If final job exists in generic workflow, create DAG final job 

992 final = generic_workflow.get_final() 

993 if final and isinstance(final, GenericWorkflowJob): 

994 if final.compute_site and final.compute_site not in site_values: 

995 site_values[final.compute_site] = _gather_site_values(config, final.compute_site) 

996 if final.label not in cached_values: 

997 cached_values[final.label] = deepcopy(site_values[final.compute_site]) 

998 _update_dicts(cached_values[final.label], _gather_label_values(config, final.label)) 

999 final_htjob = _create_job( 

1000 subdir_template[final.label], 

1001 cached_values[final.label], 

1002 generic_workflow, 

1003 final, 

1004 out_prefix, 

1005 ) 

1006 if "post" not in final_htjob.dagcmds: 

1007 final_htjob.dagcmds["post"] = { 

1008 "defer": "", 

1009 "executable": f"{os.path.dirname(__file__)}/final_post.sh", 

1010 "arguments": f"{final.name} $DAG_STATUS $RETURN", 

1011 } 

1012 dag.add_final_job(final_htjob) 

1013 elif final and isinstance(final, GenericWorkflow): 

1014 raise NotImplementedError("HTCondor plugin does not support a workflow as the final job") 

1015 elif final: 

1016 raise TypeError(f"Invalid type for GenericWorkflow.get_final() results ({type(final)})") 

1017 

1018 return dag