Coverage for python / lsst / ctrl / bps / htcondor / prepare_utils.py: 5%
387 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-01 08:24 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-01 08:24 +0000
1# This file is part of ctrl_bps_htcondor.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <https://www.gnu.org/licenses/>.
28"""Utility functions for preparing the HTCondor workflow."""
30import logging
31import os
32import re
33from collections import defaultdict
34from copy import deepcopy
35from pathlib import Path
36from typing import Any, cast
38from lsst.ctrl.bps import (
39 BpsConfig,
40 GenericWorkflow,
41 GenericWorkflowGroup,
42 GenericWorkflowJob,
43 GenericWorkflowNodeType,
44 GenericWorkflowNoopJob,
45)
46from lsst.ctrl.bps.bps_utils import create_count_summary
48from .lssthtc import (
49 HTCDag,
50 HTCJob,
51 _update_dicts,
52 condor_status,
53 htc_escape,
54)
56_LOG = logging.getLogger(__name__)
58DEFAULT_HTC_EXEC_PATT = ".*worker.*"
59"""Default pattern for searching execute machines in an HTCondor pool.
60"""
63def _create_job(subdir_template, cached_values, generic_workflow, gwjob, out_prefix):
64 """Convert GenericWorkflow job nodes to DAG jobs.
66 Parameters
67 ----------
68 subdir_template : `str`
69 Template for making subdirs.
70 cached_values : `dict`
71 Site and label specific values.
72 generic_workflow : `lsst.ctrl.bps.GenericWorkflow`
73 Generic workflow that is being converted.
74 gwjob : `lsst.ctrl.bps.GenericWorkflowJob`
75 The generic job to convert to a HTCondor job.
76 out_prefix : `str`
77 Directory prefix for HTCondor files.
79 Returns
80 -------
81 htc_job : `lsst.ctrl.bps.wms.htcondor.HTCJob`
82 The HTCondor job equivalent to the given generic job.
83 """
84 htc_job = HTCJob(gwjob.name, label=gwjob.label)
86 curvals = defaultdict(str)
87 curvals["label"] = gwjob.label
88 if gwjob.tags:
89 curvals.update(gwjob.tags)
91 subdir = Path("jobs") / subdir_template.format_map(curvals)
92 htc_job.subdir = subdir
93 htc_job.subfile = f"{gwjob.name}.sub"
94 htc_job.add_dag_cmds({"dir": subdir})
96 htc_job_cmds = {
97 "universe": "vanilla",
98 "should_transfer_files": "YES",
99 "when_to_transfer_output": "ON_EXIT_OR_EVICT",
100 "transfer_output_files": '""', # Set to empty string to disable
101 "transfer_executable": "False",
102 "getenv": "True",
103 # Exceeding memory sometimes triggers SIGBUS or SIGSEGV error. Tell
104 # htcondor to put on hold any jobs which exited by a signal. If
105 # executed in a bash script, like finalJob, the signals will become
106 # exit codes above 128 (exit code = 128 + signal number).
107 "on_exit_hold": "ExitBySignal == true || ExitCode > 128",
108 "on_exit_hold_reason": "ExitBySignal == true ? "
109 'strcat("Job raised a signal ", string(ExitSignal), '
110 '". Handling job as if it has gone over memory limit.") : '
111 'strcat("Job exit code (", string(ExitCode), ") > 128. '
112 'Handling job as if it has gone over memory limit.")',
113 "on_exit_hold_subcode": "34",
114 }
116 htc_job_cmds.update(_translate_job_cmds(cached_values, generic_workflow, gwjob))
118 # Combine stdout and stderr to reduce the number of files.
119 for key in ("output", "error"):
120 if cached_values["overwriteJobFiles"]:
121 htc_job_cmds[key] = f"{gwjob.name}.$(Cluster).out"
122 else:
123 htc_job_cmds[key] = f"{gwjob.name}.$(Cluster).$$([NumJobStarts ?: 0]).out"
124 _LOG.debug("HTCondor %s = %s", key, htc_job_cmds[key])
126 key = "log"
127 htc_job_cmds[key] = f"{gwjob.name}.$(Cluster).{key}"
128 _LOG.debug("HTCondor %s = %s", key, htc_job_cmds[key])
130 htc_job_cmds.update(
131 _handle_job_inputs(generic_workflow, gwjob.name, cached_values["bpsUseShared"], out_prefix)
132 )
134 htc_job_cmds.update(
135 _handle_job_outputs(generic_workflow, gwjob.name, cached_values["bpsUseShared"], out_prefix)
136 )
138 # If specified, add nodeset to the job
139 if "nodeset" in cached_values:
140 htc_job.add_job_attrs({"JobNodeset": cached_values["nodeset"]})
141 clause = f'( Target.Nodeset == "{cached_values["nodeset"]}" )'
142 if "requirements" in htc_job_cmds:
143 htc_job_cmds["requirements"] = f"({htc_job_cmds['requirements']}) && {clause}"
144 else:
145 htc_job_cmds["requirements"] = clause
147 # Add the job cmds dict to the job object.
148 htc_job.add_job_cmds(htc_job_cmds)
150 # Add job-related cmds to the DAG (e.g., VARS)
151 htc_job.add_dag_cmds(_translate_dag_cmds(gwjob))
153 # Add job attributes to job.
154 _LOG.debug("gwjob.attrs = %s", gwjob.attrs)
155 htc_job.add_job_attrs(gwjob.attrs)
156 htc_job.add_job_attrs(cached_values["attrs"])
157 htc_job.add_job_attrs({"bps_job_quanta": create_count_summary(gwjob.quanta_counts)})
158 htc_job.add_job_attrs({"bps_job_name": gwjob.name, "bps_job_label": gwjob.label})
160 return htc_job
163def _translate_job_cmds(cached_vals, generic_workflow, gwjob):
164 """Translate the job data that are one to one mapping
166 Parameters
167 ----------
168 cached_vals : `dict` [`str`, `~typing.Any`]
169 Config values common to jobs with same site or label.
170 generic_workflow : `lsst.ctrl.bps.GenericWorkflow`
171 Generic workflow that contains job to being converted.
172 gwjob : `lsst.ctrl.bps.GenericWorkflowJob`
173 Generic workflow job to be converted.
175 Returns
176 -------
177 htc_job_commands : `dict` [`str`, `~typing.Any`]
178 Contains commands which can appear in the HTCondor submit description
179 file.
180 """
181 # Values in the job script that just are name mappings.
182 job_translation = {
183 "mail_to": "notify_user",
184 "when_to_mail": "notification",
185 "request_cpus": "request_cpus",
186 "priority": "priority",
187 "category": "category",
188 "accounting_group": "accounting_group",
189 "accounting_user": "accounting_group_user",
190 }
192 jobcmds = {}
193 for gwkey, htckey in job_translation.items():
194 jobcmds[htckey] = getattr(gwjob, gwkey, None)
196 # If accounting info was not set explicitly, use site settings if any.
197 if not gwjob.accounting_group:
198 jobcmds["accounting_group"] = cached_vals.get("accountingGroup")
199 if not gwjob.accounting_user:
200 jobcmds["accounting_group_user"] = cached_vals.get("accountingUser")
202 # job commands that need modification
203 if gwjob.retry_unless_exit:
204 if isinstance(gwjob.retry_unless_exit, int):
205 jobcmds["retry_until"] = f"{gwjob.retry_unless_exit}"
206 elif isinstance(gwjob.retry_unless_exit, list):
207 jobcmds["retry_until"] = (
208 f"member(ExitCode, {{{','.join([str(x) for x in gwjob.retry_unless_exit])}}})"
209 )
210 else:
211 raise ValueError("retryUnlessExit must be an integer or a list of integers.")
213 if gwjob.request_disk:
214 jobcmds["request_disk"] = f"{gwjob.request_disk}MB"
216 if gwjob.request_memory:
217 jobcmds["request_memory"] = f"{gwjob.request_memory}"
219 memory_max = 0
220 if gwjob.memory_multiplier:
221 # Do not use try-except! At the moment, BpsConfig returns an empty
222 # string if it does not contain the key.
223 memory_limit = cached_vals["memoryLimit"]
224 if not memory_limit:
225 raise RuntimeError(
226 "Memory autoscaling enabled, but automatic detection of the memory limit "
227 "failed; setting it explicitly with 'memoryLimit' or changing worker node "
228 "search pattern 'executeMachinesPattern' might help."
229 )
231 # Set maximal amount of memory job can ask for.
232 #
233 # The check below assumes that 'memory_limit' was set to a value which
234 # realistically reflects actual physical limitations of a given compute
235 # resource.
236 memory_max = memory_limit
237 if gwjob.request_memory_max and gwjob.request_memory_max < memory_limit:
238 memory_max = gwjob.request_memory_max
240 # Make job ask for more memory each time it failed due to insufficient
241 # memory requirements.
242 jobcmds["request_memory"] = _create_request_memory_expr(
243 gwjob.request_memory, gwjob.memory_multiplier, memory_max
244 )
246 user_release_expr = cached_vals.get("releaseExpr", "")
247 if gwjob.number_of_retries is not None and gwjob.number_of_retries >= 0:
248 jobcmds["max_retries"] = gwjob.number_of_retries
250 # No point in adding periodic_release if 0 retries
251 if gwjob.number_of_retries > 0:
252 periodic_release = _create_periodic_release_expr(
253 gwjob.request_memory, gwjob.memory_multiplier, memory_max, user_release_expr
254 )
255 if periodic_release:
256 jobcmds["periodic_release"] = periodic_release
258 jobcmds["periodic_remove"] = _create_periodic_remove_expr(
259 gwjob.request_memory, gwjob.memory_multiplier, memory_max
260 )
262 # Assume concurrency_limit implemented using HTCondor concurrency limits.
263 # May need to move to special site-specific implementation if sites use
264 # other mechanisms.
265 if gwjob.concurrency_limit:
266 jobcmds["concurrency_limit"] = gwjob.concurrency_limit
268 # Handle command line
269 if gwjob.executable.transfer_executable:
270 jobcmds["transfer_executable"] = "True"
271 jobcmds["executable"] = gwjob.executable.src_uri
272 else:
273 jobcmds["executable"] = _fix_env_var_syntax(gwjob.executable.src_uri)
275 if gwjob.arguments:
276 arguments = gwjob.arguments
277 arguments = _replace_cmd_vars(arguments, gwjob)
278 arguments = _replace_wms_vars(arguments)
279 arguments = _replace_file_vars(cached_vals["bpsUseShared"], arguments, generic_workflow, gwjob)
280 arguments = _fix_env_var_syntax(arguments)
281 jobcmds["arguments"] = arguments
283 if gwjob.environment:
284 env_str = ""
285 for name, value in gwjob.environment.items():
286 if isinstance(value, str):
287 value2 = _replace_cmd_vars(value, gwjob)
288 value2 = _replace_wms_vars(value)
289 value2 = _fix_env_var_syntax(value2)
290 value2 = htc_escape(value2)
291 env_str += f"{name}='{value2}' " # Add single quotes to allow internal spaces
292 else:
293 env_str += f"{name}={value} "
295 # Process above added one trailing space
296 jobcmds["environment"] = env_str.rstrip()
298 # Add extra "pass-thru" job commands
299 if gwjob.profile:
300 for key, val in gwjob.profile.items():
301 jobcmds[key] = val
302 for key, val in cached_vals["profile"].items():
303 jobcmds[key] = val
305 return jobcmds
308def _translate_dag_cmds(gwjob):
309 """Translate job values into DAGMan commands.
311 Parameters
312 ----------
313 gwjob : `lsst.ctrl.bps.GenericWorkflowJob`
314 Job containing values to be translated.
316 Returns
317 -------
318 dagcmds : `dict` [`str`, `~typing.Any`]
319 DAGMan commands for the job.
320 """
321 # Values in the dag script that just are name mappings.
322 dag_translation = {
323 "abort_on_value": "abort_dag_on",
324 "abort_return_value": "abort_exit",
325 "priority": "priority",
326 }
328 dagcmds = {}
329 for gwkey, htckey in dag_translation.items():
330 dagcmds[htckey] = getattr(gwjob, gwkey, None)
332 # Still to be coded: vars "pre_cmdline", "post_cmdline"
333 return dagcmds
336def _fix_env_var_syntax(oldstr):
337 """Change ENV place holders to HTCondor Env var syntax.
339 Parameters
340 ----------
341 oldstr : `str`
342 String in which environment variable syntax is to be fixed.
344 Returns
345 -------
346 newstr : `str`
347 Given string with environment variable syntax fixed.
348 """
349 newstr = oldstr
350 for key in re.findall(r"<ENV:([^>]+)>", oldstr):
351 newstr = newstr.replace(rf"<ENV:{key}>", f"$ENV({key})")
352 return newstr
355def _replace_file_vars(use_shared, arguments, workflow, gwjob):
356 """Replace file placeholders in command line arguments with correct
357 physical file names.
359 Parameters
360 ----------
361 use_shared : `bool`
362 Whether HTCondor can assume shared filesystem.
363 arguments : `str`
364 Arguments string in which to replace file placeholders.
365 workflow : `lsst.ctrl.bps.GenericWorkflow`
366 Generic workflow that contains file information.
367 gwjob : `lsst.ctrl.bps.GenericWorkflowJob`
368 The job corresponding to the arguments.
370 Returns
371 -------
372 arguments : `str`
373 Given arguments string with file placeholders replaced.
374 """
375 # Replace input file placeholders with paths.
376 for gwfile in workflow.get_job_inputs(gwjob.name, data=True, transfer_only=False):
377 if not gwfile.wms_transfer:
378 # Must assume full URI if in command line and told WMS is not
379 # responsible for transferring file.
380 uri = gwfile.src_uri
381 elif use_shared:
382 if gwfile.job_shared:
383 # Have shared filesystems and jobs can share file.
384 uri = gwfile.src_uri
385 else:
386 uri = os.path.basename(gwfile.src_uri)
387 else: # Using push transfer
388 uri = os.path.basename(gwfile.src_uri)
389 arguments = arguments.replace(f"<FILE:{gwfile.name}>", uri)
391 # Replace output file placeholders with paths.
392 for gwfile in workflow.get_job_outputs(gwjob.name, data=True, transfer_only=False):
393 if not gwfile.wms_transfer:
394 # Must assume full URI if in command line and told WMS is not
395 # responsible for transferring file.
396 uri = gwfile.src_uri
397 elif use_shared:
398 if gwfile.job_shared:
399 # Have shared filesystems and jobs can share file.
400 uri = gwfile.src_uri
401 else:
402 uri = os.path.basename(gwfile.src_uri)
403 else: # Using push transfer
404 uri = os.path.basename(gwfile.src_uri)
405 arguments = arguments.replace(f"<FILE:{gwfile.name}>", uri)
406 return arguments
409def _replace_cmd_vars(arguments, gwjob):
410 """Replace format-style placeholders in arguments.
412 Parameters
413 ----------
414 arguments : `str`
415 Arguments string in which to replace placeholders.
416 gwjob : `lsst.ctrl.bps.GenericWorkflowJob`
417 Job containing values to be used to replace placeholders
418 (in particular gwjob.cmdvals).
420 Returns
421 -------
422 arguments : `str`
423 Given arguments string with placeholders replaced.
424 """
425 replacements = gwjob.cmdvals if gwjob.cmdvals is not None else {}
426 try:
427 arguments = arguments.format(**replacements)
428 except (KeyError, TypeError) as exc: # TypeError in case None instead of {}
429 _LOG.error("Could not replace command variables: replacement for %s not provided", str(exc))
430 _LOG.debug("arguments: %s\ncmdvals: %s", arguments, replacements)
431 raise
432 return arguments
435def _replace_wms_vars(orig_string: str) -> str:
436 """Replace special wms placeholders in given string.
438 Parameters
439 ----------
440 orig_string : `str`
441 String in which to replace wms placeholders.
443 Returns
444 -------
445 updated_string : `str`
446 Given string with wms placeholders replaced.
447 """
448 values = {"attemptNum": "$$([NumJobStarts])"}
449 updated_string = orig_string
450 for key in re.findall(r"<WMS:([^>]+)>", orig_string):
451 try:
452 updated_string = updated_string.replace(rf"<WMS:{key}>", values[key])
453 except KeyError:
454 _LOG.error("Unrecognized WMS placeholder: %s in %s", key, orig_string)
455 raise
456 return updated_string
459def _handle_job_inputs(
460 generic_workflow: GenericWorkflow, job_name: str, use_shared: bool, out_prefix: str
461) -> dict[str, str]:
462 """Add job input files from generic workflow to job.
464 Parameters
465 ----------
466 generic_workflow : `lsst.ctrl.bps.GenericWorkflow`
467 The generic workflow (e.g., has executable name and arguments).
468 job_name : `str`
469 Unique name for the job.
470 use_shared : `bool`
471 Whether job has access to files via shared filesystem.
472 out_prefix : `str`
473 The root directory into which all WMS-specific files are written.
475 Returns
476 -------
477 htc_commands : `dict` [`str`, `str`]
478 HTCondor commands for the job submission script.
479 """
480 inputs = []
481 for gwf_file in generic_workflow.get_job_inputs(job_name, data=True, transfer_only=True):
482 _LOG.debug("src_uri=%s", gwf_file.src_uri)
484 uri = Path(gwf_file.src_uri)
486 # Note if use_shared and job_shared, don't need to transfer file.
488 if not use_shared: # Copy file using push to job
489 inputs.append(str(uri))
490 elif not gwf_file.job_shared: # Jobs require own copy
491 # if using shared filesystem, but still need copy in job. Use
492 # HTCondor's curl plugin for a local copy.
493 if uri.is_dir():
494 raise RuntimeError(
495 f"HTCondor plugin cannot transfer directories locally within job {gwf_file.src_uri}"
496 )
497 inputs.append(f"file://{uri}")
499 htc_commands = {}
500 if inputs:
501 htc_commands["transfer_input_files"] = ",".join(inputs)
502 _LOG.debug("transfer_input_files=%s", htc_commands["transfer_input_files"])
503 return htc_commands
506def _handle_job_outputs(
507 generic_workflow: GenericWorkflow, job_name: str, use_shared: bool, out_prefix: str
508) -> dict[str, str]:
509 """Add job output files from generic workflow to the job if any.
511 Parameters
512 ----------
513 generic_workflow : `lsst.ctrl.bps.GenericWorkflow`
514 The generic workflow (e.g., has executable name and arguments).
515 job_name : `str`
516 Unique name for the job.
517 use_shared : `bool`
518 Whether job has access to files via shared filesystem.
519 out_prefix : `str`
520 The root directory into which all WMS-specific files are written.
522 Returns
523 -------
524 htc_commands : `dict` [`str`, `str`]
525 HTCondor commands for the job submission script.
526 """
527 outputs = []
528 output_remaps = []
529 for gwf_file in generic_workflow.get_job_outputs(job_name, data=True, transfer_only=True):
530 _LOG.debug("src_uri=%s", gwf_file.src_uri)
532 uri = Path(gwf_file.src_uri)
533 if not use_shared:
534 outputs.append(uri.name)
535 output_remaps.append(f"{uri.name}={str(uri)}")
537 # Set to an empty string to disable and only update if there are output
538 # files to transfer. Otherwise, HTCondor will transfer back all files in
539 # the job’s temporary working directory that have been modified or created
540 # by the job.
541 htc_commands = {"transfer_output_files": '""'}
542 if outputs:
543 htc_commands["transfer_output_files"] = ",".join(outputs)
544 _LOG.debug("transfer_output_files=%s", htc_commands["transfer_output_files"])
546 htc_commands["transfer_output_remaps"] = f'"{";".join(output_remaps)}"'
547 _LOG.debug("transfer_output_remaps=%s", htc_commands["transfer_output_remaps"])
548 return htc_commands
551def _create_periodic_release_expr(
552 memory: int, multiplier: float | None, limit: int, additional_expr: str = ""
553) -> str:
554 """Construct an HTCondorAd expression for releasing held jobs.
556 Parameters
557 ----------
558 memory : `int`
559 Requested memory in MB.
560 multiplier : `float` or None
561 Memory growth rate between retries.
562 limit : `int`
563 Memory limit.
564 additional_expr : `str`, optional
565 Expression to add to periodic_release. Defaults to empty string.
567 Returns
568 -------
569 expr : `str`
570 A string representing an HTCondor ClassAd expression for releasing job.
571 """
572 _LOG.debug(
573 "periodic_release: memory: %s, multiplier: %s, limit: %s, additional_expr: %s",
574 memory,
575 multiplier,
576 limit,
577 additional_expr,
578 )
580 # ctrl_bps sets multiplier to None in the GenericWorkflow if
581 # memoryMultiplier <= 1, but checking value just in case.
582 if (not multiplier or multiplier <= 1) and not additional_expr:
583 return ""
585 # Job ClassAds attributes 'HoldReasonCode' and 'HoldReasonSubCode' are
586 # UNDEFINED if job is not HELD (i.e. when 'JobStatus' is not 5).
587 # The special comparison operators ensure that all comparisons below will
588 # evaluate to FALSE in this case.
589 #
590 # Note:
591 # May not be strictly necessary. Operators '&&' and '||' are not strict so
592 # the entire expression should evaluate to FALSE when the job is not HELD.
593 # According to ClassAd evaluation semantics FALSE && UNDEFINED is FALSE,
594 # but better safe than sorry.
595 is_held = "JobStatus == 5"
596 is_retry_allowed = "NumJobStarts <= JobMaxRetries"
598 mem_expr = ""
599 if memory and multiplier and multiplier > 1 and limit:
600 was_mem_exceeded = (
601 "(HoldReasonCode =?= 34 && HoldReasonSubCode =?= 0 "
602 "|| HoldReasonCode =?= 3 && HoldReasonSubCode =?= 34)"
603 )
604 was_below_limit = f"min({{int({memory} * pow({multiplier}, NumJobStarts - 1)), {limit}}}) < {limit}"
605 mem_expr = f"{was_mem_exceeded} && {was_below_limit}"
607 user_expr = ""
608 if additional_expr:
609 # Never auto release a job held by user.
610 user_expr = f"HoldReasonCode =!= 1 && {additional_expr}"
612 expr = f"{is_held} && {is_retry_allowed}"
613 if user_expr and mem_expr:
614 expr += f" && ({mem_expr} || {user_expr})"
615 elif user_expr:
616 expr += f" && {user_expr}"
617 elif mem_expr:
618 expr += f" && {mem_expr}"
620 return expr
623def _create_periodic_remove_expr(memory, multiplier, limit):
624 """Construct an HTCondorAd expression for removing jobs from the queue.
626 Parameters
627 ----------
628 memory : `int`
629 Requested memory in MB.
630 multiplier : `float`
631 Memory growth rate between retries.
632 limit : `int`
633 Memory limit.
635 Returns
636 -------
637 expr : `str`
638 A string representing an HTCondor ClassAd expression for removing jobs.
639 """
640 # Job ClassAds attributes 'HoldReasonCode' and 'HoldReasonSubCode'
641 # are UNDEFINED if job is not HELD (i.e. when 'JobStatus' is not 5).
642 # The special comparison operators ensure that all comparisons below
643 # will evaluate to FALSE in this case.
644 #
645 # Note:
646 # May not be strictly necessary. Operators '&&' and '||' are not
647 # strict so the entire expression should evaluate to FALSE when the
648 # job is not HELD. According to ClassAd evaluation semantics
649 # FALSE && UNDEFINED is FALSE, but better safe than sorry.
650 is_held = "JobStatus == 5"
651 is_retry_disallowed = "NumJobStarts > JobMaxRetries"
653 mem_expr = ""
654 if memory and multiplier and multiplier > 1 and limit:
655 mem_limit_expr = f"min({{int({memory} * pow({multiplier}, NumJobStarts - 1)), {limit}}}) == {limit}"
657 mem_expr = ( # Add || here so only added if adding memory expr
658 " || ((HoldReasonCode =?= 34 && HoldReasonSubCode =?= 0 "
659 f"|| HoldReasonCode =?= 3 && HoldReasonSubCode =?= 34) && {mem_limit_expr})"
660 )
662 expr = f"{is_held} && ({is_retry_disallowed}{mem_expr})"
663 return expr
666def _create_request_memory_expr(memory, multiplier, limit):
667 """Construct an HTCondor ClassAd expression for safe memory scaling.
669 Parameters
670 ----------
671 memory : `int`
672 Requested memory in MB.
673 multiplier : `float`
674 Memory growth rate between retries.
675 limit : `int`
676 Memory limit.
678 Returns
679 -------
680 expr : `str`
681 A string representing an HTCondor ClassAd expression enabling safe
682 memory scaling between job retries.
683 """
684 # The check if the job was held due to exceeding memory requirements
685 # will be made *after* job was released back to the job queue (is in
686 # the IDLE state), hence the need to use `Last*` job ClassAds instead of
687 # the ones describing job's current state.
688 #
689 # Also, 'Last*' job ClassAds attributes are UNDEFINED when a job is
690 # initially put in the job queue. The special comparison operators ensure
691 # that all comparisons below will evaluate to FALSE in this case.
692 was_mem_exceeded = (
693 "LastJobStatus =?= 5 "
694 "&& (LastHoldReasonCode =?= 34 && LastHoldReasonSubCode =?= 0 "
695 "|| LastHoldReasonCode =?= 3 && LastHoldReasonSubCode =?= 34)"
696 )
698 # If job runs the first time or was held for reasons other than exceeding
699 # the memory, set the required memory to the requested value or use
700 # the memory value measured by HTCondor (MemoryUsage) depending on
701 # whichever is greater.
702 expr = (
703 f"({was_mem_exceeded}) "
704 f"? min({{int({memory} * pow({multiplier}, NumJobStarts)), {limit}}}) "
705 f": min({{max({{{memory}, MemoryUsage ?: 0}}), {limit}}})"
706 )
707 return expr
710def _gather_site_values(config, compute_site):
711 """Gather values specific to given site.
713 Parameters
714 ----------
715 config : `lsst.ctrl.bps.BpsConfig`
716 BPS configuration that includes necessary submit/runtime
717 information.
718 compute_site : `str`
719 Compute site name.
721 Returns
722 -------
723 site_values : `dict` [`str`, `~typing.Any`]
724 Values specific to the given site.
725 """
726 site_values = {"attrs": {}, "profile": {}}
727 search_opts = {}
728 if compute_site:
729 search_opts["curvals"] = {"curr_site": compute_site}
731 # Determine the hard limit for the memory requirement.
732 found, limit = config.search("memoryLimit", opt=search_opts)
733 if not found:
734 search_opts["default"] = DEFAULT_HTC_EXEC_PATT
735 _, patt = config.search("executeMachinesPattern", opt=search_opts)
736 del search_opts["default"]
738 # To reduce the amount of data, ignore dynamic slots (if any) as,
739 # by definition, they cannot have more memory than
740 # the partitionable slot they are the part of.
741 constraint = f'SlotType != "Dynamic" && regexp("{patt}", Machine)'
742 pool_info = condor_status(constraint=constraint)
743 try:
744 limit = max(int(info["TotalSlotMemory"]) for info in pool_info.values())
745 except ValueError:
746 _LOG.debug("No execute machine in the pool matches %s", patt)
747 if limit:
748 config[".bps_defined.memory_limit"] = limit
750 _, site_values["bpsUseShared"] = config.search("bpsUseShared", opt={"default": False})
751 site_values["memoryLimit"] = limit
753 found, value = config.search("accountingGroup", opt=search_opts)
754 if found:
755 site_values["accountingGroup"] = value
756 found, value = config.search("accountingUser", opt=search_opts)
757 if found:
758 site_values["accountingUser"] = value
760 found, nodeset = config.search("nodeset", opt=search_opts)
761 if found:
762 site_values["nodeset"] = nodeset
764 searchobj = config[f".site.{compute_site}.profile.condor"]
765 if searchobj:
766 search_opts["searchobj"] = searchobj
767 search_opts["replaceVars"] = True
768 for key in searchobj:
769 if key.startswith("+"):
770 _, val = config.search(key, opt=search_opts)
771 site_values["attrs"][key[1:]] = val
772 else:
773 _, val = config.search(key, opt=search_opts)
774 site_values["profile"][key] = val
776 _LOG.debug("site_values = %s", site_values)
777 return site_values
780def _gather_label_values(config: BpsConfig, label: str) -> dict[str, Any]:
781 """Gather values specific to given job label.
783 Parameters
784 ----------
785 config : `lsst.ctrl.bps.BpsConfig`
786 BPS configuration that includes necessary submit/runtime
787 information.
788 label : `str`
789 GenericWorkflowJob label.
791 Returns
792 -------
793 values : `dict` [`str`, `~typing.Any`]
794 Values specific to the given job label.
795 """
796 values: dict[str, Any] = {"attrs": {}, "profile": {}}
798 search_opts = {}
799 profile_key = ""
800 if label == "finalJob" and "finalJob" in config:
801 search_opts["searchobj"] = config["finalJob"]
802 profile_key = ".finalJob.profile.condor"
803 elif label in config["cluster"]:
804 search_opts["curvals"] = {"curr_cluster": label}
805 profile_key = f".cluster.{label}.profile.condor"
806 elif label in config["pipetask"]:
807 search_opts["curvals"] = {"curr_pipetask": label}
808 profile_key = f".pipetask.{label}.profile.condor"
810 found, value = config.search("releaseExpr", opt=search_opts)
811 if found:
812 values["releaseExpr"] = value
814 found, value = config.search("overwriteJobFiles", opt=search_opts)
815 if found:
816 values["overwriteJobFiles"] = value
817 else:
818 values["overwriteJobFiles"] = True
820 if profile_key and profile_key in config:
821 for subkey, val in config[profile_key].items():
822 if subkey.startswith("+"):
823 values["attrs"][subkey[1:]] = val
824 else:
825 values["profile"][subkey] = val
827 return values
830def _group_to_subdag(
831 config: BpsConfig, generic_workflow_group: GenericWorkflowGroup, out_prefix: str
832) -> HTCJob:
833 """Convert a generic workflow group to an HTCondor dag.
835 Parameters
836 ----------
837 config : `lsst.ctrl.bps.BpsConfig`
838 Workflow configuration.
839 generic_workflow_group : `lsst.ctrl.bps.GenericWorkflowGroup`
840 The generic workflow group to convert.
841 out_prefix : `str`
842 Location prefix to be used when creating jobs.
844 Returns
845 -------
846 htc_job : `lsst.ctrl.bps.htcondor.HTCJob`
847 Job for running the HTCondor dag.
848 """
849 jobname = f"wms_{generic_workflow_group.name}"
850 htc_job = HTCJob(name=jobname, label=generic_workflow_group.label)
851 htc_job.add_dag_cmds({"dir": f"subdags/{jobname}"})
852 htc_job.subdag = _generic_workflow_to_htcondor_dag(config, generic_workflow_group, out_prefix)
853 if not generic_workflow_group.blocking:
854 htc_job.dagcmds["post"] = {
855 "defer": "",
856 "executable": f"{os.path.dirname(__file__)}/subdag_post.sh",
857 "arguments": f"{jobname} $RETURN",
858 }
859 return htc_job
862def _create_check_job(group_job_name: str, job_label: str) -> HTCJob:
863 """Create a job to check status of a group job.
865 Parameters
866 ----------
867 group_job_name : `str`
868 Name of the group job.
869 job_label : `str`
870 Label to use for the check status job.
872 Returns
873 -------
874 htc_job : `lsst.ctrl.bps.htcondor.HTCJob`
875 Job description for the job to check group job status.
876 """
877 htc_job = HTCJob(name=f"wms_check_status_{group_job_name}", label=job_label)
878 htc_job.subfile = "${CTRL_BPS_HTCONDOR_DIR}/python/lsst/ctrl/bps/htcondor/check_group_status.sub"
879 htc_job.add_dag_cmds({"dir": f"subdags/{group_job_name}", "vars": {"group_job_name": group_job_name}})
881 return htc_job
884def _generic_workflow_to_htcondor_dag(
885 config: BpsConfig, generic_workflow: GenericWorkflow, out_prefix: str
886) -> HTCDag:
887 """Convert a GenericWorkflow to a HTCDag.
889 Parameters
890 ----------
891 config : `lsst.ctrl.bps.BpsConfig`
892 Workflow configuration.
893 generic_workflow : `lsst.ctrl.bps.GenericWorkflow`
894 The GenericWorkflow to convert.
895 out_prefix : `str`
896 Location prefix where the HTCondor files will be written.
898 Returns
899 -------
900 dag : `lsst.ctrl.bps.htcondor.HTCDag`
901 The HTCDag representation of the given GenericWorkflow.
902 """
903 dag = HTCDag(name=generic_workflow.name)
905 _LOG.debug("htcondor dag attribs %s", generic_workflow.run_attrs)
906 dag.add_attribs(generic_workflow.run_attrs)
907 dag.add_attribs(
908 {
909 "bps_run_quanta": create_count_summary(generic_workflow.quanta_counts),
910 "bps_job_summary": create_count_summary(generic_workflow.job_counts),
911 }
912 )
914 _, tmp_template = config.search("subDirTemplate", opt={"replaceVars": False, "default": ""})
915 if isinstance(tmp_template, str):
916 subdir_template = defaultdict(lambda: tmp_template)
917 else:
918 subdir_template = tmp_template
920 # Create all DAG jobs
921 site_values = {} # Cache compute site specific values to reduce config lookups.
922 cached_values = {} # Cache label-specific values to reduce config lookups.
923 # Note: Can't use get_job_by_label because those only include payload jobs.
924 for job_name in generic_workflow:
925 gwjob = generic_workflow.get_job(job_name)
926 if gwjob.node_type == GenericWorkflowNodeType.PAYLOAD:
927 gwjob = cast(GenericWorkflowJob, gwjob)
928 if gwjob.compute_site not in site_values:
929 site_values[gwjob.compute_site] = _gather_site_values(config, gwjob.compute_site)
930 if gwjob.label not in cached_values:
931 cached_values[gwjob.label] = deepcopy(site_values[gwjob.compute_site])
932 _update_dicts(cached_values[gwjob.label], _gather_label_values(config, gwjob.label))
933 _LOG.debug("cached: %s= %s", gwjob.label, cached_values[gwjob.label])
934 htc_job = _create_job(
935 subdir_template[gwjob.label],
936 cached_values[gwjob.label],
937 generic_workflow,
938 gwjob,
939 out_prefix,
940 )
941 elif gwjob.node_type == GenericWorkflowNodeType.NOOP:
942 gwjob = cast(GenericWorkflowNoopJob, gwjob)
943 htc_job = HTCJob(f"wms_{gwjob.name}", label=gwjob.label)
944 htc_job.subfile = "${CTRL_BPS_HTCONDOR_DIR}/python/lsst/ctrl/bps/htcondor/noop.sub"
945 htc_job.add_job_attrs({"bps_job_name": gwjob.name, "bps_job_label": gwjob.label})
946 htc_job.add_dag_cmds({"noop": True})
947 elif gwjob.node_type == GenericWorkflowNodeType.GROUP:
948 gwjob = cast(GenericWorkflowGroup, gwjob)
949 htc_job = _group_to_subdag(config, gwjob, out_prefix)
950 else:
951 raise RuntimeError(f"Unsupported generic workflow node type {gwjob.node_type} ({gwjob.name})")
952 _LOG.debug("Calling adding job %s %s", htc_job.name, htc_job.label)
953 dag.add_job(htc_job)
955 # Add job dependencies to the DAG (be careful with wms_ jobs)
956 for job_name in generic_workflow:
957 gwjob = generic_workflow.get_job(job_name)
958 parent_name = (
959 gwjob.name if gwjob.node_type == GenericWorkflowNodeType.PAYLOAD else f"wms_{gwjob.name}"
960 )
961 successor_jobs = [generic_workflow.get_job(j) for j in generic_workflow.successors(job_name)]
962 children_names = []
963 if gwjob.node_type == GenericWorkflowNodeType.GROUP:
964 gwjob = cast(GenericWorkflowGroup, gwjob)
965 group_children = [] # Dependencies between same group jobs
966 for sjob in successor_jobs:
967 if sjob.node_type == GenericWorkflowNodeType.GROUP and sjob.label == gwjob.label:
968 group_children.append(f"wms_{sjob.name}")
969 elif sjob.node_type == GenericWorkflowNodeType.PAYLOAD:
970 children_names.append(sjob.name)
971 else:
972 children_names.append(f"wms_{sjob.name}")
973 if group_children:
974 dag.add_job_relationships([parent_name], group_children)
975 if not gwjob.blocking:
976 # Since subdag will always succeed, need to add a special
977 # job that fails if group failed to block payload children.
978 check_job = _create_check_job(f"wms_{gwjob.name}", gwjob.label)
979 dag.add_job(check_job)
980 dag.add_job_relationships([f"wms_{gwjob.name}"], [check_job.name])
981 parent_name = check_job.name
982 else:
983 for sjob in successor_jobs:
984 if sjob.node_type == GenericWorkflowNodeType.PAYLOAD:
985 children_names.append(sjob.name)
986 else:
987 children_names.append(f"wms_{sjob.name}")
989 dag.add_job_relationships([parent_name], children_names)
991 # If final job exists in generic workflow, create DAG final job
992 final = generic_workflow.get_final()
993 if final and isinstance(final, GenericWorkflowJob):
994 if final.compute_site and final.compute_site not in site_values:
995 site_values[final.compute_site] = _gather_site_values(config, final.compute_site)
996 if final.label not in cached_values:
997 cached_values[final.label] = deepcopy(site_values[final.compute_site])
998 _update_dicts(cached_values[final.label], _gather_label_values(config, final.label))
999 final_htjob = _create_job(
1000 subdir_template[final.label],
1001 cached_values[final.label],
1002 generic_workflow,
1003 final,
1004 out_prefix,
1005 )
1006 if "post" not in final_htjob.dagcmds:
1007 final_htjob.dagcmds["post"] = {
1008 "defer": "",
1009 "executable": f"{os.path.dirname(__file__)}/final_post.sh",
1010 "arguments": f"{final.name} $DAG_STATUS $RETURN",
1011 }
1012 dag.add_final_job(final_htjob)
1013 elif final and isinstance(final, GenericWorkflow):
1014 raise NotImplementedError("HTCondor plugin does not support a workflow as the final job")
1015 elif final:
1016 raise TypeError(f"Invalid type for GenericWorkflow.get_final() results ({type(final)})")
1018 return dag