Coverage for python / lsst / ctrl / bps / parsl / sites / slurm.py: 19%
75 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-06 08:37 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-06 08:37 +0000
1# This file is part of ctrl_bps_parsl.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org) and the LSST DESC (https://www.lsstdesc.org/).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <https://www.gnu.org/licenses/>.
28from typing import TYPE_CHECKING, Any
30from parsl.executors import HighThroughputExecutor
31from parsl.executors.base import ParslExecutor
32from parsl.launchers import SrunLauncher
33from parsl.providers import SlurmProvider
35from ..configuration import get_bps_config_value, get_workflow_name
36from ..site import SiteConfig
38if TYPE_CHECKING:
39 from ..job import ParslJob
41__all__ = ("Slurm", "TripleSlurm")
44Kwargs = dict[str, Any]
47class Slurm(SiteConfig):
48 """Configuration for generic Slurm cluster.
50 Parameters
51 ----------
52 *args : `~typing.Any`
53 Parameters forwarded to base class constructor.
54 **kwargs : `~typing.Any`
55 Keyword arguments passed to base class constructor, augmented by
56 the ``resource_list`` argument.
58 Notes
59 -----
60 This can be used directly as the site configuration for a Slurm cluster by
61 setting the BPS config, e.g.:
63 .. code-block:: yaml
65 computeSite: slurm
66 site:
67 slurm:
68 class: lsst.ctrl.bps.parsl.sites.Slurm
69 nodes: 3
70 cores_per_node: 20
71 walltime: "00:59:00" # Note: always quote walltime in YAML
73 Alternatively, it can be used as a base class for Slurm cluster
74 configurations.
76 The following BPS configuration parameters are recognised (and required
77 unless there is a default mentioned here, or provided by a subclass):
79 - ``nodes`` (`int`): number of nodes for each Slurm job.
80 - ``cores_per_node`` (`int`): number of cores per node for each Slurm job;
81 by default we use all cores on the node.
82 - ``walltime`` (`str`): time limit for each Slurm job.
83 - ``mem_per_node`` (`int`): memory per node (GB) for each Slurm job; by
84 default we use whatever Slurm gives us.
85 - ``qos`` (`str`): quality of service to request for each Slurm job; by
86 default we use whatever Slurm gives us.
87 - ``singleton`` (`bool`): allow only one job to run at a time; by default
88 ``False``.
89 - ``account`` (`str`): account to use for Slurm jobs.
90 - ``scheduler_options`` (`str`): text to prepend to the Slurm submission
91 script (each line usually starting with ``#SBATCH``).
92 """
94 def __init__(self, *args, **kwargs):
95 # Have BPS-defined resource requests for each job passed to executor.
96 kwargs["resource_list"] = ["priority"]
97 super().__init__(*args, **kwargs)
99 def make_executor(
100 self,
101 label: str,
102 *,
103 nodes: int | None = None,
104 cores_per_node: int | None = None,
105 walltime: str | None = None,
106 mem_per_node: int | None = None,
107 mem_per_worker: float | None = None,
108 qos: str | None = None,
109 constraint: str | None = None,
110 singleton: bool = False,
111 exclusive: bool = False,
112 scheduler_options: str | None = None,
113 provider_options: Kwargs | None = None,
114 executor_options: Kwargs | None = None,
115 ) -> ParslExecutor:
116 """Return an executor for running on a Slurm cluster.
118 Parameters
119 ----------
120 label : `str`
121 Label for executor.
122 nodes : `int`, optional
123 Default number of nodes for each Slurm job.
124 cores_per_node : `int`, optional
125 Default number of cores per node for each Slurm job.
126 walltime : `str`, optional
127 Default time limit for each Slurm job.
128 mem_per_node : `float`, optional
129 Memory per node (GB) to request for each Slurm job.
130 mem_per_worker : `float`, optional
131 Minimum memory per worker (GB), limited by the executor.
132 qos : `str`, optional
133 Quality of service for each Slurm job.
134 constraint : `str`, optional
135 Node feature(s) to require for each Slurm job.
136 singleton : `bool`, optional
137 Whether to allow only a single Slurm job to run at a time.
138 exclusive : `bool`, optional
139 Flag to specify exclusive nodes.
140 scheduler_options : `str`, optional
141 ``#SBATCH`` directives to prepend to the Slurm submission script.
142 provider_options : `dict`, optional
143 Additional arguments for `parsl.providers.SlurmProvider`
144 constructor.
145 executor_options : `dict`, optional
146 Additional arguments for `parsl.executors.HighThroughputExecutor`
147 constructor.
149 Returns
150 -------
151 executor : `parsl.executors.HighThroughputExecutor`
152 Executor for Slurm jobs.
153 """
154 nodes = get_bps_config_value(self.site, "nodes", int, nodes, required=True)
155 cores_per_node = get_bps_config_value(self.site, "cores_per_node", int, cores_per_node)
156 walltime = get_bps_config_value(self.site, "walltime", str, walltime, required=True)
157 mem_per_node = get_bps_config_value(self.site, "mem_per_node", int, mem_per_node)
158 qos = get_bps_config_value(self.site, "qos", str, qos)
159 constraint = get_bps_config_value(self.site, "constraint", str, constraint)
160 singleton = get_bps_config_value(self.site, "singleton", bool, singleton)
161 exclusive = get_bps_config_value(self.site, "exclusive", bool, exclusive)
162 account = get_bps_config_value(self.site, "account", str)
163 scheduler_options = get_bps_config_value(self.site, "scheduler_options", str, scheduler_options)
165 job_name = get_workflow_name(self.config)
166 if nodes > 1:
167 launcher = SrunLauncher(overrides="-K0 -k --cpu-bind=none")
168 if provider_options is None:
169 provider_options = {"launcher": launcher}
170 else:
171 provider_options["launcher"] = launcher
172 if scheduler_options is None:
173 scheduler_options = ""
174 else:
175 scheduler_options += "\n"
176 scheduler_options += f"#SBATCH --job-name={job_name}\n"
177 if qos:
178 scheduler_options += f"#SBATCH --qos={qos}\n"
179 if constraint:
180 scheduler_options += f"#SBATCH --constraint={constraint}\n"
181 if singleton:
182 # The following SBATCH directives allow only a single slurm job
183 # (parsl block) with our job_name to run at once. This means we can
184 # have one job running, and one already in the queue when the first
185 # exceeds the walltime limit. More backups could be achieved with a
186 # larger value of max_blocks. This only allows one job to be
187 # actively running at once, so that needs to be sized appropriately
188 # by the user.
189 scheduler_options += "#SBATCH --dependency=singleton\n"
190 return HighThroughputExecutor(
191 label,
192 provider=SlurmProvider(
193 nodes_per_block=nodes,
194 cores_per_node=cores_per_node,
195 mem_per_node=mem_per_node,
196 walltime=walltime,
197 account=account,
198 scheduler_options=scheduler_options,
199 **(provider_options or {}),
200 ),
201 mem_per_worker=mem_per_worker,
202 address=self.get_address(),
203 **(executor_options or {}),
204 )
206 def get_executors(self) -> list[ParslExecutor]:
207 """Get a list of executors to be used in processing.
209 Each executor should have a unique ``label``.
210 """
211 return [self.make_executor("slurm")]
213 def select_executor(self, job: "ParslJob") -> str:
214 """Get the ``label`` of the executor to use to execute a job.
216 Parameters
217 ----------
218 job : `lsst.ctrl.bps.parsl.ParslJob`
219 Job to be executed.
221 Returns
222 -------
223 label : `str`
224 Label of executor to use to execute ``job``.
225 """
226 return "slurm"
229class TripleSlurm(Slurm):
230 """Configuration for running jobs on a Slurm cluster with three levels.
232 Parameters
233 ----------
234 *args : `~typing.Any`
235 Parameters forwarded to base class constructor.
236 **kwargs : `~typing.Any`
237 Keyword arguments passed to base class constructor.
239 Notes
240 -----
241 The three levels are useful for having workers with different amount of
242 available memory (and this is how executors are selected, by default),
243 though other uses are possible.
245 The following BPS configuration parameters are recognised, overriding the
246 defaults:
248 - ``nodes`` (`int`): number of nodes for each Slurm job.
249 - ``cores_per_node`` (`int`): number of cores per node for each Slurm job;
250 by default we use all cores on the node.
251 - ``walltime`` (`str`): time limit for each Slurm job; setting this would
252 override each of the ``small_walltime``, ``medium_walltime`` and
253 ``large_walltime`` values.
254 - ``mem_per_node`` (`float`): memory per node for each Slurm job; by
255 default we use whatever Slurm gives us.
256 - ``qos`` (`str`): quality of service to request for each Slurm job; by
257 default we use whatever Slurm gives us.
258 - ``small_memory`` (`float`): memory per worker (GB) for each 'small' Slurm
259 job.
260 - ``medium_memory`` (`float`): memory per worker (GB) for each 'medium'
261 Slurm job.
262 - ``large_memory`` (`float`): memory per worker (GB) for each 'large' Slurm
263 job.
264 - ``small_walltime`` (`str`): time limit for each 'small' Slurm job.
265 - ``medium_walltime`` (`str`): time limit for each 'medium' Slurm job.
266 - ``large_walltime`` (`str`): time limit for each 'large' Slurm job.
267 """
269 def __init__(self, *args, **kwargs):
270 super().__init__(*args, **kwargs)
271 self.small_memory = get_bps_config_value(self.site, "small_memory", float, 2.0)
272 self.medium_memory = get_bps_config_value(self.site, "medium_memory", float, 4.0)
273 self.large_memory = get_bps_config_value(self.site, "large_memory", float, 8.0)
274 self.small_walltime = get_bps_config_value(self.site, "small_walltime", str, "10:00:00")
275 self.medium_walltime = get_bps_config_value(self.site, "medium_walltime", str, "10:00:00")
276 self.large_walltime = get_bps_config_value(self.site, "large_walltime", str, "40:00:00")
278 def get_executors(
279 self,
280 small_options: Kwargs | None = None,
281 medium_options: Kwargs | None = None,
282 large_options: Kwargs | None = None,
283 **common_options,
284 ) -> list[ParslExecutor]:
285 """Get a list of executors to be used in processing.
287 We create three executors, with different walltime and memory per
288 worker.
290 Parameters
291 ----------
292 small_options : `dict` [`str`, `typing.Any`]
293 Options for ``make_executor`` for small executor.
294 medium_options : `dict` [`str`, `typing.Any`]
295 Options for ``make_executor`` for medium executor.
296 large_options : `dict` [`str`, `typing.Any`]
297 Options for ``make_executor`` for large executor.
298 **common_options
299 Common options for ``make_executor`` for each of the executors.
300 """
301 if small_options is None:
302 small_options = {}
303 if medium_options is None:
304 medium_options = {}
305 if large_options is None:
306 large_options = {}
308 small_options["walltime"] = small_options.get("walltime", self.small_walltime)
309 medium_options["walltime"] = medium_options.get("walltime", self.medium_walltime)
310 large_options["walltime"] = large_options.get("walltime", self.large_walltime)
312 small_options["mem_per_worker"] = small_options.get("mem_per_worker", self.small_memory)
313 medium_options["mem_per_worker"] = medium_options.get("mem_per_worker", self.medium_memory)
314 large_options["mem_per_worker"] = large_options.get("mem_per_worker", self.large_memory)
316 return [
317 self.make_executor("small", **small_options, **(common_options or {})),
318 self.make_executor("medium", **medium_options, **(common_options or {})),
319 self.make_executor("large", **large_options, **(common_options or {})),
320 ]
322 def select_executor(self, job: "ParslJob") -> str:
323 """Get the ``label`` of the executor to use to execute a job.
325 This implementation only looks at the requested memory.
327 Parameters
328 ----------
329 job : `lsst.ctrl.bps.parsl.ParslJob`
330 Job to be executed.
332 Returns
333 -------
334 label : `str`
335 Label of executor to use to execute ``job``.
336 """
337 memory = job.generic.request_memory / 1024 # GB
338 if memory <= self.small_memory:
339 return "small"
340 if memory <= self.medium_memory:
341 return "medium"
342 return "large"