Coverage for python / lsst / ctrl / bps / parsl / job.py: 22%

91 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-24 08:23 +0000

1# This file is part of ctrl_bps_parsl. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org) and the LSST DESC (https://www.lsstdesc.org/). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <https://www.gnu.org/licenses/>. 

27 

28import os 

29import re 

30import subprocess 

31from collections import defaultdict 

32from collections.abc import Sequence 

33from functools import partial 

34from textwrap import dedent 

35from typing import Any 

36 

37from parsl.app.bash import BashApp 

38from parsl.dataflow.futures import AppFuture 

39 

40from lsst.ctrl.bps import BpsConfig, GenericWorkflow, GenericWorkflowJob 

41 

42from .configuration import get_bps_config_value 

43 

44__all__ = ("ParslJob", "get_file_paths") 

45 

46_env_regex = re.compile(r"<ENV:(\S+)>") # Regex for replacing <ENV:WHATEVER> in BPS job command-lines 

47_file_regex = re.compile(r"<FILE:(\S+)>") # Regex for replacing <FILE:WHATEVER> in BPS job command-lines 

48 

49 

50def run_command( 

51 command_line: str, 

52 inputs: Sequence[AppFuture] = (), 

53 stdout: str | None = None, 

54 stderr: str | None = None, 

55 parsl_resource_specification: dict[str, Any] | None = None, 

56) -> str: 

57 """Run a command. 

58 

59 This function exists to get information into parsl, through the ``inputs``, 

60 ``stdout`` and ``stderr`` parameters. It needs to be wrapped by a parsl 

61 ``bash_app`` decorator before use, after which it will return a 

62 `parsl.dataflow.futures.AppFuture`. 

63 

64 Parameters 

65 ---------- 

66 command_line : `str` 

67 Command-line to have parsl run. 

68 inputs : `list` [`parsl.dataflow.futures.AppFuture`] 

69 Other commands that must have run before this. 

70 stdout, stderr : `str`, optional 

71 Filenames for stdout and stderr. 

72 parsl_resource_specification : `dict`, optional 

73 Resources required for job. 

74 

75 Returns 

76 ------- 

77 command_line : `str` 

78 Command-line to have parsl run. 

79 """ 

80 return command_line 

81 

82 

83def get_file_paths(workflow: GenericWorkflow, name: str) -> dict[str, str]: 

84 """Extract file paths for a job. 

85 

86 Parameters 

87 ---------- 

88 workflow : `lsst.ctrl.bps.GenericWorkflow` 

89 BPS workflow that knows the file paths. 

90 name : `str` 

91 Job name. 

92 

93 Returns 

94 ------- 

95 paths : `dict` mapping `str` to `str` 

96 File paths for job, indexed by symbolic name. 

97 """ 

98 return {ff.name: ff.src_uri for ff in workflow.get_job_inputs(name)} 

99 

100 

101class ParslJob: 

102 """Job to execute with parsl. 

103 

104 Parameters 

105 ---------- 

106 generic : `lsst.ctrl.bps.GenericWorkflowJob` 

107 BPS job information. 

108 config : `lsst.ctrl.bps.BpsConfig` 

109 BPS configuration. 

110 file_paths : `dict` mapping `str` to `str` 

111 File paths for job, indexed by symbolic name. 

112 """ 

113 

114 def __init__( 

115 self, 

116 generic: GenericWorkflowJob, 

117 config: BpsConfig, 

118 file_paths: dict[str, str], 

119 ): 

120 self.generic = generic 

121 self.name = generic.name 

122 self.config = config 

123 self.file_paths = file_paths 

124 self.future = None 

125 self.done = False 

126 

127 # Determine directory for job stdout and stderr 

128 log_dir = os.path.join(get_bps_config_value(self.config, "submitPath", str, required=True), "logs") 

129 _, template = self.config.search( 

130 "subDirTemplate", 

131 opt={ 

132 "curvals": {"curr_site": self.config["computeSite"], "curr_cluster": self.generic.label}, 

133 "replaceVars": False, 

134 "default": "", 

135 }, 

136 ) 

137 job_vals = defaultdict(str) 

138 job_vals["label"] = self.generic.label 

139 if self.generic.tags: 

140 job_vals.update(self.generic.tags) 

141 subdir = template.format_map(job_vals) 

142 # Call normpath just to make paths easier to read as templates tend 

143 # to have variables that aren't used by every job. Avoid calling on 

144 # empty string because changes it to dot. 

145 same_part = os.path.normpath(os.path.join(log_dir, subdir, self.name)) 

146 self.stdout = same_part + ".stdout" 

147 self.stderr = same_part + ".stderr" 

148 

149 def __reduce__(self): 

150 """Recipe for pickling.""" 

151 return type(self), (self.generic, self.config, self.file_paths) 

152 

153 def get_command_line(self, allow_stage=True) -> str: 

154 """Get the bash command-line to run to execute this job. 

155 

156 Parameters 

157 ---------- 

158 allow_stage : `bool` 

159 Allow staging of execution butler? This is not appropriate for the 

160 initial or final jobs that run on the local nodes. 

161 

162 Returns 

163 ------- 

164 command : `str` 

165 Command-line to execute for job. 

166 """ 

167 command: str = self.generic.executable.src_uri + " " + self.generic.arguments 

168 if not allow_stage: 

169 return command 

170 exec_butler_dir = get_bps_config_value(self.config, "executionButlerDir", str) 

171 if not exec_butler_dir or not os.path.isdir(exec_butler_dir): 

172 # We're not using the execution butler 

173 return command 

174 

175 # Add commands to copy the execution butler. 

176 # This keeps workers from overloading the sqlite database. 

177 # The copy can be deleted once we're done, because the original 

178 # execution butler contains everything that's required. 

179 job_dir = os.path.join(os.path.dirname(exec_butler_dir), self.name) 

180 # Set the butlerConfig field to the location of the job-specific copy. 

181 command = command.replace("<FILE:butlerConfig>", job_dir) 

182 return dedent( 

183 f""" 

184 if [[ ! -d {job_dir} ]]; then mkdir -p {job_dir}; fi 

185 cp {exec_butler_dir}/* {job_dir} 

186 {command} 

187 retcode=$? 

188 rm -rf {job_dir} 

189 exit $retcode 

190 """ 

191 ) 

192 

193 def evaluate_command_line(self, command: str) -> str: 

194 """Evaluate the bash command-line. 

195 

196 BPS provides a command-line with symbolic names for BPS variables, 

197 environment variables and files. Here, we replace those symbolic names 

198 with the actual values, to provide a concrete command that can be 

199 executed. 

200 

201 In replacing file paths, we are implicitly assuming that we are working 

202 on a shared file system, i.e., that workers can see the butler 

203 directory, and that files do not need to be staged to the worker. 

204 

205 Parameters 

206 ---------- 

207 command : `str` 

208 Command-line to execute, from BPS. 

209 

210 Returns 

211 ------- 

212 command : `str` 

213 Command ready for execution on a worker. 

214 """ 

215 command = command.format(**self.generic.cmdvals) # BPS variables 

216 

217 # Make sure *all* symbolic names are resolved. 

218 # 

219 # In general, actual values for some symbolic names may contain other 

220 # symbolic names. As a result, more than one iteration may be required 

221 # to resolve all symbolic names. For example, an actual value for 

222 # a filename may contain a symbolic name for an environment variable. 

223 prev_command = command 

224 while True: 

225 command = re.sub(_env_regex, r"${\g<1>}", command) # Environment variables 

226 command = re.sub(_file_regex, lambda match: self.file_paths[match.group(1)], command) # Files 

227 if prev_command == command: 

228 break 

229 prev_command = command 

230 

231 return command 

232 

233 def get_resources(self) -> dict[str, Any]: 

234 """Return what resources are required for executing this job.""" 

235 resources = {} 

236 for bps_name, parsl_name, scale in ( 

237 ("request_memory", "memory", None), # Both BPS and WorkQueueExecutor use MB 

238 ("request_cpus", "cores", None), 

239 ("request_disk", "disk", None), # Both are MB 

240 ("request_walltime", "running_time_min", None), # Both are minutes 

241 ("priority", "priority", None), 

242 ): 

243 value = getattr(self.generic, bps_name) 

244 if value is not None and scale is not None: 

245 value *= scale 

246 # Parsl's `HighThroughputExecutor` cannot have 

247 # `priority=None`, but it can be omitted. By contrast, 

248 # `WorkQueueExecutor` needs to have the other resource 

249 # requests provided, but `priority` can be omitted, so we 

250 # need special handling for `priority`. 

251 if (parsl_name == "priority" and value is not None) or parsl_name != "priority": 

252 resources[parsl_name] = value 

253 return resources 

254 

255 def get_future( 

256 self, 

257 app: BashApp, 

258 inputs: list[AppFuture], 

259 command_prefix: str | None = None, 

260 resource_list: list | None = None, 

261 ) -> AppFuture | None: 

262 """Get the parsl app future for the job. 

263 

264 This effectively queues the job for execution by a worker, subject to 

265 dependencies. 

266 

267 Parameters 

268 ---------- 

269 app : `parsl.app.bash.BashApp` 

270 A parsl bash_app decorator to use. 

271 inputs : `list` [ `parsl.dataflow.futures.AppFuture` ] 

272 Dependencies to be satisfied before executing this job. 

273 command_prefix : `str`, optional 

274 Bash commands to execute before the job command, e.g., for setting 

275 the environment. 

276 resource_list : `list`, optional 

277 List of resource specifications to pass to the Parsl executor. 

278 

279 Returns 

280 ------- 

281 future : `parsl.dataflow.futures.AppFuture` or `None` 

282 A `parsl.dataflow.futures.AppFuture` object linked to the execution 

283 of the job, or `None` if the job has already been done (e.g., by 

284 ``run_local``). 

285 """ 

286 if self.done: 

287 return None # Nothing to do 

288 if not self.future: 

289 command = self.get_command_line() 

290 command = self.evaluate_command_line(command) 

291 if command_prefix: 

292 command = command_prefix + "\n" + command 

293 resources = ( 

294 {k: v for k, v in self.get_resources().items() if k in resource_list} 

295 if resource_list is not None 

296 else {} 

297 ) 

298 

299 # Add a layer of indirection to which we can add a useful name. 

300 # This name is used by parsl for tracking workflow status. 

301 func = partial(run_command) 

302 func.__name__ = self.generic.label # type: ignore 

303 

304 self.future = app(func)( 

305 command, 

306 inputs=inputs, 

307 stdout=self.stdout, 

308 stderr=self.stderr, 

309 parsl_resource_specification=resources, 

310 ) 

311 return self.future 

312 

313 def run_local(self): 

314 """Run the command locally. 

315 

316 This is intended to support jobs that should not be done by a 

317 worker. 

318 """ 

319 if self.done: # Nothing to do 

320 return 

321 command = self.get_command_line(False) 

322 command = self.evaluate_command_line(command) 

323 os.makedirs(os.path.dirname(self.stdout), exist_ok=True) 

324 os.makedirs(os.path.dirname(self.stderr), exist_ok=True) 

325 with open(self.stdout, "w") as stdout, open(self.stderr, "w") as stderr: 

326 subprocess.check_call(command, shell=True, executable="/bin/bash", stdout=stdout, stderr=stderr) 

327 self.done = True