Coverage for python / lsst / ctrl / bps / parsl / job.py: 22%
91 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-18 08:49 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-18 08:49 +0000
1# This file is part of ctrl_bps_parsl.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org) and the LSST DESC (https://www.lsstdesc.org/).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <https://www.gnu.org/licenses/>.
28import os
29import re
30import subprocess
31from collections import defaultdict
32from collections.abc import Sequence
33from functools import partial
34from textwrap import dedent
35from typing import Any
37from parsl.app.bash import BashApp
38from parsl.dataflow.futures import AppFuture
40from lsst.ctrl.bps import BpsConfig, GenericWorkflow, GenericWorkflowJob
42from .configuration import get_bps_config_value
44__all__ = ("ParslJob", "get_file_paths")
46_env_regex = re.compile(r"<ENV:(\S+)>") # Regex for replacing <ENV:WHATEVER> in BPS job command-lines
47_file_regex = re.compile(r"<FILE:(\S+)>") # Regex for replacing <FILE:WHATEVER> in BPS job command-lines
50def run_command(
51 command_line: str,
52 inputs: Sequence[AppFuture] = (),
53 stdout: str | None = None,
54 stderr: str | None = None,
55 parsl_resource_specification: dict[str, Any] | None = None,
56) -> str:
57 """Run a command.
59 This function exists to get information into parsl, through the ``inputs``,
60 ``stdout`` and ``stderr`` parameters. It needs to be wrapped by a parsl
61 ``bash_app`` decorator before use, after which it will return a
62 `parsl.dataflow.futures.AppFuture`.
64 Parameters
65 ----------
66 command_line : `str`
67 Command-line to have parsl run.
68 inputs : `list` [`parsl.dataflow.futures.AppFuture`]
69 Other commands that must have run before this.
70 stdout, stderr : `str`, optional
71 Filenames for stdout and stderr.
72 parsl_resource_specification : `dict`, optional
73 Resources required for job.
75 Returns
76 -------
77 command_line : `str`
78 Command-line to have parsl run.
79 """
80 return command_line
83def get_file_paths(workflow: GenericWorkflow, name: str) -> dict[str, str]:
84 """Extract file paths for a job.
86 Parameters
87 ----------
88 workflow : `lsst.ctrl.bps.GenericWorkflow`
89 BPS workflow that knows the file paths.
90 name : `str`
91 Job name.
93 Returns
94 -------
95 paths : `dict` mapping `str` to `str`
96 File paths for job, indexed by symbolic name.
97 """
98 return {ff.name: ff.src_uri for ff in workflow.get_job_inputs(name)}
101class ParslJob:
102 """Job to execute with parsl.
104 Parameters
105 ----------
106 generic : `lsst.ctrl.bps.GenericWorkflowJob`
107 BPS job information.
108 config : `lsst.ctrl.bps.BpsConfig`
109 BPS configuration.
110 file_paths : `dict` mapping `str` to `str`
111 File paths for job, indexed by symbolic name.
112 """
114 def __init__(
115 self,
116 generic: GenericWorkflowJob,
117 config: BpsConfig,
118 file_paths: dict[str, str],
119 ):
120 self.generic = generic
121 self.name = generic.name
122 self.config = config
123 self.file_paths = file_paths
124 self.future = None
125 self.done = False
127 # Determine directory for job stdout and stderr
128 log_dir = os.path.join(get_bps_config_value(self.config, "submitPath", str, required=True), "logs")
129 _, template = self.config.search(
130 "subDirTemplate",
131 opt={
132 "curvals": {"curr_site": self.config["computeSite"], "curr_cluster": self.generic.label},
133 "replaceVars": False,
134 "default": "",
135 },
136 )
137 job_vals = defaultdict(str)
138 job_vals["label"] = self.generic.label
139 if self.generic.tags:
140 job_vals.update(self.generic.tags)
141 subdir = template.format_map(job_vals)
142 # Call normpath just to make paths easier to read as templates tend
143 # to have variables that aren't used by every job. Avoid calling on
144 # empty string because changes it to dot.
145 same_part = os.path.normpath(os.path.join(log_dir, subdir, self.name))
146 self.stdout = same_part + ".stdout"
147 self.stderr = same_part + ".stderr"
149 def __reduce__(self):
150 """Recipe for pickling."""
151 return type(self), (self.generic, self.config, self.file_paths)
153 def get_command_line(self, allow_stage=True) -> str:
154 """Get the bash command-line to run to execute this job.
156 Parameters
157 ----------
158 allow_stage : `bool`
159 Allow staging of execution butler? This is not appropriate for the
160 initial or final jobs that run on the local nodes.
162 Returns
163 -------
164 command : `str`
165 Command-line to execute for job.
166 """
167 command: str = self.generic.executable.src_uri + " " + self.generic.arguments
168 if not allow_stage:
169 return command
170 exec_butler_dir = get_bps_config_value(self.config, "executionButlerDir", str)
171 if not exec_butler_dir or not os.path.isdir(exec_butler_dir):
172 # We're not using the execution butler
173 return command
175 # Add commands to copy the execution butler.
176 # This keeps workers from overloading the sqlite database.
177 # The copy can be deleted once we're done, because the original
178 # execution butler contains everything that's required.
179 job_dir = os.path.join(os.path.dirname(exec_butler_dir), self.name)
180 # Set the butlerConfig field to the location of the job-specific copy.
181 command = command.replace("<FILE:butlerConfig>", job_dir)
182 return dedent(
183 f"""
184 if [[ ! -d {job_dir} ]]; then mkdir -p {job_dir}; fi
185 cp {exec_butler_dir}/* {job_dir}
186 {command}
187 retcode=$?
188 rm -rf {job_dir}
189 exit $retcode
190 """
191 )
193 def evaluate_command_line(self, command: str) -> str:
194 """Evaluate the bash command-line.
196 BPS provides a command-line with symbolic names for BPS variables,
197 environment variables and files. Here, we replace those symbolic names
198 with the actual values, to provide a concrete command that can be
199 executed.
201 In replacing file paths, we are implicitly assuming that we are working
202 on a shared file system, i.e., that workers can see the butler
203 directory, and that files do not need to be staged to the worker.
205 Parameters
206 ----------
207 command : `str`
208 Command-line to execute, from BPS.
210 Returns
211 -------
212 command : `str`
213 Command ready for execution on a worker.
214 """
215 command = command.format(**self.generic.cmdvals) # BPS variables
217 # Make sure *all* symbolic names are resolved.
218 #
219 # In general, actual values for some symbolic names may contain other
220 # symbolic names. As a result, more than one iteration may be required
221 # to resolve all symbolic names. For example, an actual value for
222 # a filename may contain a symbolic name for an environment variable.
223 prev_command = command
224 while True:
225 command = re.sub(_env_regex, r"${\g<1>}", command) # Environment variables
226 command = re.sub(_file_regex, lambda match: self.file_paths[match.group(1)], command) # Files
227 if prev_command == command:
228 break
229 prev_command = command
231 return command
233 def get_resources(self) -> dict[str, Any]:
234 """Return what resources are required for executing this job."""
235 resources = {}
236 for bps_name, parsl_name, scale in (
237 ("request_memory", "memory", None), # Both BPS and WorkQueueExecutor use MB
238 ("request_cpus", "cores", None),
239 ("request_disk", "disk", None), # Both are MB
240 ("request_walltime", "running_time_min", None), # Both are minutes
241 ("priority", "priority", None),
242 ):
243 value = getattr(self.generic, bps_name)
244 if value is not None and scale is not None:
245 value *= scale
246 # Parsl's `HighThroughputExecutor` cannot have
247 # `priority=None`, but it can be omitted. By contrast,
248 # `WorkQueueExecutor` needs to have the other resource
249 # requests provided, but `priority` can be omitted, so we
250 # need special handling for `priority`.
251 if (parsl_name == "priority" and value is not None) or parsl_name != "priority":
252 resources[parsl_name] = value
253 return resources
255 def get_future(
256 self,
257 app: BashApp,
258 inputs: list[AppFuture],
259 command_prefix: str | None = None,
260 resource_list: list | None = None,
261 ) -> AppFuture | None:
262 """Get the parsl app future for the job.
264 This effectively queues the job for execution by a worker, subject to
265 dependencies.
267 Parameters
268 ----------
269 app : `parsl.app.bash.BashApp`
270 A parsl bash_app decorator to use.
271 inputs : `list` [ `parsl.dataflow.futures.AppFuture` ]
272 Dependencies to be satisfied before executing this job.
273 command_prefix : `str`, optional
274 Bash commands to execute before the job command, e.g., for setting
275 the environment.
276 resource_list : `list`, optional
277 List of resource specifications to pass to the Parsl executor.
279 Returns
280 -------
281 future : `parsl.dataflow.futures.AppFuture` or `None`
282 A `parsl.dataflow.futures.AppFuture` object linked to the execution
283 of the job, or `None` if the job has already been done (e.g., by
284 ``run_local``).
285 """
286 if self.done:
287 return None # Nothing to do
288 if not self.future:
289 command = self.get_command_line()
290 command = self.evaluate_command_line(command)
291 if command_prefix:
292 command = command_prefix + "\n" + command
293 resources = (
294 {k: v for k, v in self.get_resources().items() if k in resource_list}
295 if resource_list is not None
296 else {}
297 )
299 # Add a layer of indirection to which we can add a useful name.
300 # This name is used by parsl for tracking workflow status.
301 func = partial(run_command)
302 func.__name__ = self.generic.label # type: ignore
304 self.future = app(func)(
305 command,
306 inputs=inputs,
307 stdout=self.stdout,
308 stderr=self.stderr,
309 parsl_resource_specification=resources,
310 )
311 return self.future
313 def run_local(self):
314 """Run the command locally.
316 This is intended to support jobs that should not be done by a
317 worker.
318 """
319 if self.done: # Nothing to do
320 return
321 command = self.get_command_line(False)
322 command = self.evaluate_command_line(command)
323 os.makedirs(os.path.dirname(self.stdout), exist_ok=True)
324 os.makedirs(os.path.dirname(self.stderr), exist_ok=True)
325 with open(self.stdout, "w") as stdout, open(self.stderr, "w") as stderr:
326 subprocess.check_call(command, shell=True, executable="/bin/bash", stdout=stdout, stderr=stderr)
327 self.done = True