Coverage for python / lsst / ctrl / bps / parsl / sites / torque.py: 34%

50 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-18 08:49 +0000

1# This file is part of ctrl_bps_parsl. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org) and the LSST DESC (https://www.lsstdesc.org/). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <https://www.gnu.org/licenses/>. 

27 

28from typing import TYPE_CHECKING, Any 

29 

30from parsl.executors import HighThroughputExecutor 

31from parsl.executors.base import ParslExecutor 

32from parsl.launchers import MpiRunLauncher 

33from parsl.providers import TorqueProvider 

34 

35from ..configuration import get_bps_config_value, get_workflow_name 

36from ..site import SiteConfig 

37 

38if TYPE_CHECKING: 

39 from ..job import ParslJob 

40 

41__all__ = ("Torque",) 

42 

43 

44Kwargs = dict[str, Any] 

45 

46 

47class Torque(SiteConfig): 

48 """Configuration for generic Torque cluster. 

49 

50 Parameters 

51 ---------- 

52 *args : `~typing.Any` 

53 Parameters forwarded to base class constructor. 

54 **kwargs : `~typing.Any` 

55 Keyword arguments passed to base class constructor, augmented by 

56 the ``resource_list`` argument. 

57 

58 Notes 

59 ----- 

60 This can be used directly as the site configuration for a Torque cluster by 

61 setting the BPS config, e.g.: 

62 

63 .. code-block:: yaml 

64 

65 computeSite: torque 

66 site: 

67 torque: 

68 class: lsst.ctrl.bps.parsl.sites.Torque 

69 nodes: 4 

70 tasks_per_node: 20 

71 walltime: "00:59:00" # Note: always quote walltime in YAML 

72 

73 Alternatively, it can be used as a base class for Torque cluster 

74 configurations. 

75 

76 The following BPS configuration parameters are recognised (and required 

77 unless there is a default mentioned here, or provided by a subclass): 

78 

79 - ``queue`` (`int`): Queue for the Torque job. 

80 - ``nodes`` (`int`): number of nodes for each Torque job. 

81 - ``tasks_per_node`` (`int`): number of cores per node for each Torque job; 

82 by default we use all cores on the node. 

83 - ``walltime`` (`str`): time limit for each Torque job. 

84 - ``scheduler_options`` (`str`): text to prepend to the Torque submission 

85 script (each line usually starting with ``#PBS``). 

86 """ 

87 

88 def __init__(self, *args, **kwargs): 

89 # Have BPS-defined resource requests for each job passed to executor. 

90 kwargs["resource_list"] = ["priority"] 

91 super().__init__(*args, **kwargs) 

92 

93 def make_executor( 

94 self, 

95 label: str, 

96 *, 

97 queue: str | None = None, 

98 nodes: int | None = None, 

99 tasks_per_node: int | None = None, 

100 walltime: str | None = None, 

101 mem_per_worker: float | None = None, 

102 scheduler_options: str | None = None, 

103 worker_init: str | None = None, 

104 provider_options: Kwargs | None = None, 

105 executor_options: Kwargs | None = None, 

106 ) -> ParslExecutor: 

107 """Return an executor for running on a Torque cluster. 

108 

109 Parameters 

110 ---------- 

111 label : `str` 

112 Label for executor. 

113 queue : `str`, optional 

114 Queue for the Torque job. 

115 nodes : `int`, optional 

116 Default number of nodes for each Torque job. 

117 tasks_per_node : `int`, optional 

118 Default number of cores per node for each Torque job. 

119 walltime : `str`, optional 

120 Default time limit for each Torque job. 

121 mem_per_worker : `float`, optional 

122 Minimum memory per worker (GB), limited by the executor. 

123 scheduler_options : `str`, optional 

124 ``#SBATCH`` directives to prepend to the Torque submission script. 

125 worker_init : `str`, optional 

126 Environment initiation command. 

127 provider_options : `dict`, optional 

128 Additional arguments for `parsl.providers.TorqueProvider` 

129 constructor. 

130 executor_options : `dict`, optional 

131 Additional arguments for `parsl.executors.HighThroughputExecutor` 

132 constructor. 

133 

134 Returns 

135 ------- 

136 executor : `parsl.executors.HighThroughputExecutor` 

137 Executor for Torque jobs. 

138 """ 

139 nodes = get_bps_config_value(self.site, "nodes", int, nodes, required=True) 

140 walltime = get_bps_config_value(self.site, "walltime", str, walltime, required=True) 

141 queue = get_bps_config_value(self.site, "queue", str, queue) 

142 tasks_per_node = get_bps_config_value(self.site, "tasks_per_node", int, tasks_per_node) 

143 worker_init = get_bps_config_value(self.site, "worker_init", str, "") 

144 scheduler_options = get_bps_config_value(self.site, "scheduler_options", str, scheduler_options) 

145 

146 if tasks_per_node is None: 

147 tasks_per_node = 1 

148 

149 job_name = get_workflow_name(self.config) 

150 

151 if scheduler_options is None: 

152 scheduler_options = "" 

153 else: 

154 scheduler_options += "\n" 

155 scheduler_options += f"#PBS -N {job_name}\n" 

156 if queue: 

157 scheduler_options += f"#PBS -q {queue}\n" 

158 

159 if worker_init is None: 

160 worker_init = "" 

161 

162 launcher = PbsMpiRunLauncher(overrides=f"--map-by core:{tasks_per_node}") 

163 

164 return HighThroughputExecutor( 

165 label, 

166 provider=PbsTorqueProvider( 

167 nodes_per_block=nodes, 

168 tasks_per_node=tasks_per_node, 

169 queue=queue, 

170 walltime=walltime, 

171 scheduler_options=scheduler_options, 

172 worker_init=worker_init, 

173 launcher=launcher, 

174 **(provider_options or {}), 

175 ), 

176 max_workers_per_node=1, 

177 mem_per_worker=mem_per_worker, 

178 address=self.get_address(), 

179 **(executor_options or {}), 

180 ) 

181 

182 def get_executors(self) -> list[ParslExecutor]: 

183 """Get a list of executors to be used in processing. 

184 

185 Each executor should have a unique ``label``. 

186 """ 

187 return [self.make_executor("torque")] 

188 

189 def select_executor(self, job: "ParslJob") -> str: 

190 """Get the ``label`` of the executor to use to execute a job. 

191 

192 Parameters 

193 ---------- 

194 job : `lsst.ctrl.bps.parsl.ParslJob` 

195 Job to be executed. 

196 

197 Returns 

198 ------- 

199 label : `str` 

200 Label of executor to use to execute ``job``. 

201 """ 

202 return "torque" 

203 

204 

205class PbsTorqueProvider(TorqueProvider): 

206 """Torque Execution Provider. 

207 

208 Parameters 

209 ---------- 

210 *args : `~typing.Any` 

211 Parameters forwarded to base class constructor. 

212 tasks_per_node : `int`, optional 

213 Number of tasks per node. 

214 **kwargs : `~typing.Any` 

215 Keyword arguments passed to base class constructor, augmented by 

216 the ``resource_list`` argument. 

217 

218 Notes 

219 ----- 

220 This provider uses qsub to submit, qstat for status, and qdel to cancel 

221 jobs. The qsub script to be used is created from a template file in this 

222 same module. 

223 

224 This subclass allows the ``tasks_per_node`` to be set at construction time 

225 instead of at submission time. 

226 """ 

227 

228 def __init__(self, *args, tasks_per_node: int = 1, **kwargs): 

229 super().__init__(*args, **kwargs) 

230 self.tasks_per_node = tasks_per_node 

231 

232 def submit(self, command, tasks_per_node, job_name="parsl.torque"): 

233 """Submit the command onto an Local Resource Manager job. 

234 

235 Parameters 

236 ---------- 

237 command : `str` 

238 Command-line invocation to be made on the remote side. 

239 tasks_per_node : `int` 

240 Number of tasks to be launched per node. This is ignored in this 

241 provider. 

242 job_name : `str`: 

243 Name for job, must be unique. 

244 

245 Returns 

246 ------- 

247 response : `str` or `None` 

248 If `None`: At capacity, cannot provision more. 

249 If job_id (`str`): Identifier for the job. 

250 

251 Notes 

252 ----- 

253 This function returns an ID that corresponds to the task that was just 

254 submitted. 

255 

256 The ``tasks_per_node`` parameter is ignored in this provider, as it is 

257 set at construction time. 

258 """ 

259 return super().submit( 

260 command=command, 

261 tasks_per_node=self.tasks_per_node, 

262 job_name=job_name, 

263 ) 

264 

265 

266class PbsMpiRunLauncher(MpiRunLauncher): 

267 """Worker launcher that wraps the user's command with the framework to 

268 launch multiple command invocations via ``mpirun``. 

269 

270 Parameters 

271 ---------- 

272 debug : `bool`, optional 

273 Enable or disable debug logging. 

274 bash_location : `str`, optional 

275 Path to the ``bash`` shell binary. 

276 overrides : `str`, optional 

277 Any override options. 

278 

279 Notes 

280 ----- 

281 This wrapper sets the bash env variable ``CORES`` to the number of cores on 

282 the machine. 

283 

284 This launcher makes the following assumptions: 

285 

286 - mpirun is installed and can be located in ``$PATH`` 

287 - The provider makes available the ``$PBS_NODEFILE`` environment variable 

288 """ 

289 

290 def __init__( 

291 self, 

292 debug: bool = True, 

293 bash_location: str = "/bin/bash", 

294 overrides: str = "", 

295 ): 

296 super().__init__(debug=debug, bash_location=bash_location, overrides=overrides) 

297 

298 def __call__(self, command: str, tasks_per_node: int, nodes_per_block: int) -> str: 

299 """Wrap the user's command with mpirun invocation""" 

300 worker_count = nodes_per_block * tasks_per_node 

301 debug_num = int(self.debug) 

302 

303 return f"""set -e 

304export CORES=$(getconf _NPROCESSORS_ONLN) 

305[[ "{debug_num}" == "1" ]] && echo "Found cores : $CORES" 

306WORKERCOUNT={worker_count} 

307 

308cat << MPIRUN_EOF > cmd_$JOBNAME.sh 

309{command} 

310MPIRUN_EOF 

311chmod u+x cmd_$JOBNAME.sh 

312 

313mpirun -np $WORKERCOUNT {self.overrides} {self.bash_location} cmd_$JOBNAME.sh 

314 

315[[ "{debug_num}" == "1" ]] && echo "All workers done" 

316"""