Coverage for python / lsst / ctrl / bps / parsl / sites / slurm.py: 19%

75 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-22 09:02 +0000

1# This file is part of ctrl_bps_parsl. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org) and the LSST DESC (https://www.lsstdesc.org/). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <https://www.gnu.org/licenses/>. 

27 

28from typing import TYPE_CHECKING, Any 

29 

30from parsl.executors import HighThroughputExecutor 

31from parsl.executors.base import ParslExecutor 

32from parsl.launchers import SrunLauncher 

33from parsl.providers import SlurmProvider 

34 

35from ..configuration import get_bps_config_value, get_workflow_name 

36from ..site import SiteConfig 

37 

38if TYPE_CHECKING: 

39 from ..job import ParslJob 

40 

41__all__ = ("Slurm", "TripleSlurm") 

42 

43 

44Kwargs = dict[str, Any] 

45 

46 

47class Slurm(SiteConfig): 

48 """Configuration for generic Slurm cluster. 

49 

50 Parameters 

51 ---------- 

52 *args : `~typing.Any` 

53 Parameters forwarded to base class constructor. 

54 **kwargs : `~typing.Any` 

55 Keyword arguments passed to base class constructor, augmented by 

56 the ``resource_list`` argument. 

57 

58 Notes 

59 ----- 

60 This can be used directly as the site configuration for a Slurm cluster by 

61 setting the BPS config, e.g.: 

62 

63 .. code-block:: yaml 

64 

65 computeSite: slurm 

66 site: 

67 slurm: 

68 class: lsst.ctrl.bps.parsl.sites.Slurm 

69 nodes: 3 

70 cores_per_node: 20 

71 walltime: "00:59:00" # Note: always quote walltime in YAML 

72 

73 Alternatively, it can be used as a base class for Slurm cluster 

74 configurations. 

75 

76 The following BPS configuration parameters are recognised (and required 

77 unless there is a default mentioned here, or provided by a subclass): 

78 

79 - ``nodes`` (`int`): number of nodes for each Slurm job. 

80 - ``cores_per_node`` (`int`): number of cores per node for each Slurm job; 

81 by default we use all cores on the node. 

82 - ``walltime`` (`str`): time limit for each Slurm job. 

83 - ``mem_per_node`` (`int`): memory per node (GB) for each Slurm job; by 

84 default we use whatever Slurm gives us. 

85 - ``qos`` (`str`): quality of service to request for each Slurm job; by 

86 default we use whatever Slurm gives us. 

87 - ``singleton`` (`bool`): allow only one job to run at a time; by default 

88 ``False``. 

89 - ``account`` (`str`): account to use for Slurm jobs. 

90 - ``scheduler_options`` (`str`): text to prepend to the Slurm submission 

91 script (each line usually starting with ``#SBATCH``). 

92 """ 

93 

94 def __init__(self, *args, **kwargs): 

95 # Have BPS-defined resource requests for each job passed to executor. 

96 kwargs["resource_list"] = ["priority"] 

97 super().__init__(*args, **kwargs) 

98 

99 def make_executor( 

100 self, 

101 label: str, 

102 *, 

103 nodes: int | None = None, 

104 cores_per_node: int | None = None, 

105 walltime: str | None = None, 

106 mem_per_node: int | None = None, 

107 mem_per_worker: float | None = None, 

108 qos: str | None = None, 

109 constraint: str | None = None, 

110 singleton: bool = False, 

111 exclusive: bool = False, 

112 scheduler_options: str | None = None, 

113 provider_options: Kwargs | None = None, 

114 executor_options: Kwargs | None = None, 

115 ) -> ParslExecutor: 

116 """Return an executor for running on a Slurm cluster. 

117 

118 Parameters 

119 ---------- 

120 label : `str` 

121 Label for executor. 

122 nodes : `int`, optional 

123 Default number of nodes for each Slurm job. 

124 cores_per_node : `int`, optional 

125 Default number of cores per node for each Slurm job. 

126 walltime : `str`, optional 

127 Default time limit for each Slurm job. 

128 mem_per_node : `float`, optional 

129 Memory per node (GB) to request for each Slurm job. 

130 mem_per_worker : `float`, optional 

131 Minimum memory per worker (GB), limited by the executor. 

132 qos : `str`, optional 

133 Quality of service for each Slurm job. 

134 constraint : `str`, optional 

135 Node feature(s) to require for each Slurm job. 

136 singleton : `bool`, optional 

137 Whether to allow only a single Slurm job to run at a time. 

138 exclusive : `bool`, optional 

139 Flag to specify exclusive nodes. 

140 scheduler_options : `str`, optional 

141 ``#SBATCH`` directives to prepend to the Slurm submission script. 

142 provider_options : `dict`, optional 

143 Additional arguments for `parsl.providers.SlurmProvider` 

144 constructor. 

145 executor_options : `dict`, optional 

146 Additional arguments for `parsl.executors.HighThroughputExecutor` 

147 constructor. 

148 

149 Returns 

150 ------- 

151 executor : `parsl.executors.HighThroughputExecutor` 

152 Executor for Slurm jobs. 

153 """ 

154 nodes = get_bps_config_value(self.site, "nodes", int, nodes, required=True) 

155 cores_per_node = get_bps_config_value(self.site, "cores_per_node", int, cores_per_node) 

156 walltime = get_bps_config_value(self.site, "walltime", str, walltime, required=True) 

157 mem_per_node = get_bps_config_value(self.site, "mem_per_node", int, mem_per_node) 

158 qos = get_bps_config_value(self.site, "qos", str, qos) 

159 constraint = get_bps_config_value(self.site, "constraint", str, constraint) 

160 singleton = get_bps_config_value(self.site, "singleton", bool, singleton) 

161 exclusive = get_bps_config_value(self.site, "exclusive", bool, exclusive) 

162 account = get_bps_config_value(self.site, "account", str) 

163 scheduler_options = get_bps_config_value(self.site, "scheduler_options", str, scheduler_options) 

164 

165 job_name = get_workflow_name(self.config) 

166 if nodes > 1: 

167 launcher = SrunLauncher(overrides="-K0 -k --cpu-bind=none") 

168 if provider_options is None: 

169 provider_options = {"launcher": launcher} 

170 else: 

171 provider_options["launcher"] = launcher 

172 if scheduler_options is None: 

173 scheduler_options = "" 

174 else: 

175 scheduler_options += "\n" 

176 scheduler_options += f"#SBATCH --job-name={job_name}\n" 

177 if qos: 

178 scheduler_options += f"#SBATCH --qos={qos}\n" 

179 if constraint: 

180 scheduler_options += f"#SBATCH --constraint={constraint}\n" 

181 if singleton: 

182 # The following SBATCH directives allow only a single slurm job 

183 # (parsl block) with our job_name to run at once. This means we can 

184 # have one job running, and one already in the queue when the first 

185 # exceeds the walltime limit. More backups could be achieved with a 

186 # larger value of max_blocks. This only allows one job to be 

187 # actively running at once, so that needs to be sized appropriately 

188 # by the user. 

189 scheduler_options += "#SBATCH --dependency=singleton\n" 

190 return HighThroughputExecutor( 

191 label, 

192 provider=SlurmProvider( 

193 nodes_per_block=nodes, 

194 cores_per_node=cores_per_node, 

195 mem_per_node=mem_per_node, 

196 walltime=walltime, 

197 account=account, 

198 scheduler_options=scheduler_options, 

199 **(provider_options or {}), 

200 ), 

201 mem_per_worker=mem_per_worker, 

202 address=self.get_address(), 

203 **(executor_options or {}), 

204 ) 

205 

206 def get_executors(self) -> list[ParslExecutor]: 

207 """Get a list of executors to be used in processing. 

208 

209 Each executor should have a unique ``label``. 

210 """ 

211 return [self.make_executor("slurm")] 

212 

213 def select_executor(self, job: "ParslJob") -> str: 

214 """Get the ``label`` of the executor to use to execute a job. 

215 

216 Parameters 

217 ---------- 

218 job : `lsst.ctrl.bps.parsl.ParslJob` 

219 Job to be executed. 

220 

221 Returns 

222 ------- 

223 label : `str` 

224 Label of executor to use to execute ``job``. 

225 """ 

226 return "slurm" 

227 

228 

229class TripleSlurm(Slurm): 

230 """Configuration for running jobs on a Slurm cluster with three levels. 

231 

232 Parameters 

233 ---------- 

234 *args : `~typing.Any` 

235 Parameters forwarded to base class constructor. 

236 **kwargs : `~typing.Any` 

237 Keyword arguments passed to base class constructor. 

238 

239 Notes 

240 ----- 

241 The three levels are useful for having workers with different amount of 

242 available memory (and this is how executors are selected, by default), 

243 though other uses are possible. 

244 

245 The following BPS configuration parameters are recognised, overriding the 

246 defaults: 

247 

248 - ``nodes`` (`int`): number of nodes for each Slurm job. 

249 - ``cores_per_node`` (`int`): number of cores per node for each Slurm job; 

250 by default we use all cores on the node. 

251 - ``walltime`` (`str`): time limit for each Slurm job; setting this would 

252 override each of the ``small_walltime``, ``medium_walltime`` and 

253 ``large_walltime`` values. 

254 - ``mem_per_node`` (`float`): memory per node for each Slurm job; by 

255 default we use whatever Slurm gives us. 

256 - ``qos`` (`str`): quality of service to request for each Slurm job; by 

257 default we use whatever Slurm gives us. 

258 - ``small_memory`` (`float`): memory per worker (GB) for each 'small' Slurm 

259 job. 

260 - ``medium_memory`` (`float`): memory per worker (GB) for each 'medium' 

261 Slurm job. 

262 - ``large_memory`` (`float`): memory per worker (GB) for each 'large' Slurm 

263 job. 

264 - ``small_walltime`` (`str`): time limit for each 'small' Slurm job. 

265 - ``medium_walltime`` (`str`): time limit for each 'medium' Slurm job. 

266 - ``large_walltime`` (`str`): time limit for each 'large' Slurm job. 

267 """ 

268 

269 def __init__(self, *args, **kwargs): 

270 super().__init__(*args, **kwargs) 

271 self.small_memory = get_bps_config_value(self.site, "small_memory", float, 2.0) 

272 self.medium_memory = get_bps_config_value(self.site, "medium_memory", float, 4.0) 

273 self.large_memory = get_bps_config_value(self.site, "large_memory", float, 8.0) 

274 self.small_walltime = get_bps_config_value(self.site, "small_walltime", str, "10:00:00") 

275 self.medium_walltime = get_bps_config_value(self.site, "medium_walltime", str, "10:00:00") 

276 self.large_walltime = get_bps_config_value(self.site, "large_walltime", str, "40:00:00") 

277 

278 def get_executors( 

279 self, 

280 small_options: Kwargs | None = None, 

281 medium_options: Kwargs | None = None, 

282 large_options: Kwargs | None = None, 

283 **common_options, 

284 ) -> list[ParslExecutor]: 

285 """Get a list of executors to be used in processing. 

286 

287 We create three executors, with different walltime and memory per 

288 worker. 

289 

290 Parameters 

291 ---------- 

292 small_options : `dict` [`str`, `typing.Any`] 

293 Options for ``make_executor`` for small executor. 

294 medium_options : `dict` [`str`, `typing.Any`] 

295 Options for ``make_executor`` for medium executor. 

296 large_options : `dict` [`str`, `typing.Any`] 

297 Options for ``make_executor`` for large executor. 

298 **common_options 

299 Common options for ``make_executor`` for each of the executors. 

300 """ 

301 if small_options is None: 

302 small_options = {} 

303 if medium_options is None: 

304 medium_options = {} 

305 if large_options is None: 

306 large_options = {} 

307 

308 small_options["walltime"] = small_options.get("walltime", self.small_walltime) 

309 medium_options["walltime"] = medium_options.get("walltime", self.medium_walltime) 

310 large_options["walltime"] = large_options.get("walltime", self.large_walltime) 

311 

312 small_options["mem_per_worker"] = small_options.get("mem_per_worker", self.small_memory) 

313 medium_options["mem_per_worker"] = medium_options.get("mem_per_worker", self.medium_memory) 

314 large_options["mem_per_worker"] = large_options.get("mem_per_worker", self.large_memory) 

315 

316 return [ 

317 self.make_executor("small", **small_options, **(common_options or {})), 

318 self.make_executor("medium", **medium_options, **(common_options or {})), 

319 self.make_executor("large", **large_options, **(common_options or {})), 

320 ] 

321 

322 def select_executor(self, job: "ParslJob") -> str: 

323 """Get the ``label`` of the executor to use to execute a job. 

324 

325 This implementation only looks at the requested memory. 

326 

327 Parameters 

328 ---------- 

329 job : `lsst.ctrl.bps.parsl.ParslJob` 

330 Job to be executed. 

331 

332 Returns 

333 ------- 

334 label : `str` 

335 Label of executor to use to execute ``job``. 

336 """ 

337 memory = job.generic.request_memory / 1024 # GB 

338 if memory <= self.small_memory: 

339 return "small" 

340 if memory <= self.medium_memory: 

341 return "medium" 

342 return "large"