Coverage for python / lsst / ctrl / bps / parsl / sites / ccin2p3.py: 29%
58 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-24 08:23 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-24 08:23 +0000
1import copy
2import platform
3from typing import TYPE_CHECKING, Any
5import parsl.config
6from parsl.executors import HighThroughputExecutor
7from parsl.executors.base import ParslExecutor
8from parsl.providers import SlurmProvider
10from ..configuration import get_bps_config_value
11from ..site import SiteConfig
13if TYPE_CHECKING:
14 from ..job import ParslJob
16__all__ = ("Ccin2p3",)
18Kwargs = dict[str, Any]
21class Ccin2p3(SiteConfig):
22 """Configuration for executing Parsl jobs in CC-IN2P3 Slurm batch farm.
24 This class provides four job slot sizes each with its specific
25 requirements, in particular in terms of memory. Those slot sizes are named
26 "small", "medium", "large" and "xlarge".
28 Sensible default values for those requirements are provided for each
29 job slot but you can overwrite those defaults either in the
30 the BPS submission file or in a site configuration file that you
31 include in your BPS submission file.
33 If you don't need to modify the default requirements for the job slot
34 sizes, use the site specification below in your BPS configuration
35 file:
37 .. code-block:: yaml
39 wmsServiceClass: lsst.ctrl.bps.parsl.ParslService
40 computeSite: ccin2p3
42 site:
43 ccin2p3:
44 class: lsst.ctrl.bps.parsl.sites.ccin2p3.Ccin2p3
46 If you do need to modify those defaults, you can overwrite them for
47 all job slots or for specific each job slots. Requirements specified
48 for a job slot take priority over those specified for all job slots
49 at the level of entry ``.site.ccin2p3:``.
51 This is an example of how to overwrite selected requirements in your BPS
52 submission file:
54 .. code-block:: yaml
56 wmsServiceClass: lsst.ctrl.bps.parsl.ParslService
57 computeSite: ccin2p3
59 site:
60 ccin2p3:
61 class: lsst.ctrl.bps.parsl.sites.ccin2p3.Ccin2p3
62 walltime: "72:00:00"
63 scheduler_options:
64 - "--licenses=sps"
65 - "--qos=normal"
66 small:
67 memory: 6
68 partition: "flash"
69 medium:
70 memory: 10
71 partition: "lsst,htc"
72 large:
73 memory: 80
74 xlarge:
75 memory: 180
76 partition: "lsst"
77 scheduler_options:
78 - "--constraint=el7"
79 - "--licenses=my_product"
80 - "--reservation=my_reservation"
82 At the level of entry ``site.ccin2p3:`` in the BPS submission file, the
83 following configuration parameters are accepted, which apply to all slot
84 sizes:
86 - ``partition`` (`str`): name of the one or more configured partitions. If
87 more than one, separate them with comma (',').
88 (Default: "lsst,htc")
89 - ``walltime`` (`str`): walltime to require for the job
90 (Default: "72:00:00")
91 - ``scheduler_options`` (`list` [`str`] ): scheduler options to send to
92 Slurm for scheduling purposes.
93 (Default: "--licenses=sps")
95 In addition, as shown in the previous example, for each job slot (i.e.
96 "small", "medium", etc.) you can specify the requirements above as well as
97 the following:
99 - ``max_blocks`` (`int`): maximum number of Slurm jobs that your workflow
100 can simultaneously use.
101 - ``memory`` (`int`): required amount of memory for each job, in Gigabytes.
102 (Defaults: 4 for "small", 10 for "medium", 50 for "large" and
103 150 for "xlarge").
105 Parameters
106 ----------
107 *args
108 Arguments to initialize the super-class.
109 **kwargs
110 Keyword arguments to initialize the super-class.
112 Returns
113 -------
114 Ccin2p3 : `lsst.ctrl.bps.parsl.SiteConfig`
115 Concrete instance of a `~lsst.ctrl.bps.parsl.SiteConfig` specific for
116 the CC-IN2P3 Slurm farm.
117 """
119 DEFAULT_ACCOUNT: str = "lsst"
120 DEFAULT_WALLTIME: str = "72:00:00"
121 DEFAULT_SCHEDULER_OPTIONS: list[str] = [
122 "--licenses=sps",
123 ]
125 def __init__(self, *args, **kwargs):
126 # Have BPS-defined resource requests for each job passed to executor.
127 kwargs["resource_list"] = ["priority"]
128 super().__init__(*args, **kwargs)
129 self._account = get_bps_config_value(self.site, ".account", str, self.DEFAULT_ACCOUNT)
130 self._scheduler_options = get_bps_config_value(
131 self.site, ".scheduler_options", list, self.DEFAULT_SCHEDULER_OPTIONS
132 )
133 self._slot_size = {
134 "small": {
135 "memory": get_bps_config_value(self.site, ".small.memory", int, 4),
136 "walltime": self._get_walltime_for_slot("small"),
137 "partition": self._get_partition_for_slot("small"),
138 "max_blocks": get_bps_config_value(self.site, ".small.max_blocks", int, 3_000),
139 "scheduler_options": get_bps_config_value(self.site, ".small.scheduler_options", list, []),
140 },
141 "medium": {
142 "memory": get_bps_config_value(self.site, ".medium.memory", int, 10),
143 "walltime": self._get_walltime_for_slot("medium"),
144 "partition": self._get_partition_for_slot("medium"),
145 "max_blocks": get_bps_config_value(self.site, ".medium.max_blocks", int, 1_000),
146 "scheduler_options": get_bps_config_value(self.site, ".medium.scheduler_options", list, []),
147 },
148 "large": {
149 "memory": get_bps_config_value(self.site, ".large.memory", int, 50),
150 "walltime": self._get_walltime_for_slot("large"),
151 "partition": self._get_partition_for_slot("large"),
152 "max_blocks": get_bps_config_value(self.site, ".large.max_blocks", int, 100),
153 "scheduler_options": get_bps_config_value(self.site, ".large.scheduler_options", list, []),
154 },
155 "xlarge": {
156 "memory": get_bps_config_value(self.site, ".xlarge.memory", int, 150),
157 "walltime": self._get_walltime_for_slot("xlarge"),
158 "partition": self._get_partition_for_slot("xlarge"),
159 "max_blocks": get_bps_config_value(self.site, ".xlarge.max_blocks", int, 10),
160 "scheduler_options": get_bps_config_value(self.site, ".xlarge.scheduler_options", list, []),
161 },
162 }
164 def _get_partition_for_slot(self, slot: str) -> str:
165 """Return the Slurm partition Parsl must use to submit jobs for the
166 job slot `slot`. Values of `slot` can be "small", "medium", "large"
167 or "xlarge".
168 """
169 # The target Slurm partition must be selected according to the type of
170 # the job slot but also according to the CPU architecture of the
171 # compute node.
172 #
173 # Parsl requires that the CPU architecture of its orchestrator to
174 # be identical to the architecture of its executors. Therefore,
175 # we need to ensure that Slurm schedules our Parsl executors on
176 # compute nodes with the same architecture as the host where this
177 # orchestrator runs.
179 # Default target Slurm partitions per CPU architecture
180 default_partition = {
181 "aarch64": {
182 "small": "htc_arm",
183 "medium": "htc_arm",
184 "large": "htc_arm",
185 "xlarge": "htc_arm",
186 },
187 "x86_64": {
188 "small": "lsst,htc",
189 "medium": "lsst",
190 "large": "lsst",
191 "xlarge": "lsst",
192 },
193 }
194 architecture = platform.machine()
195 if architecture not in default_partition:
196 raise ValueError(f"architecture {architecture} is not supported")
198 # If a partition was specified in the workflow description file
199 # specifically for this job slot, use that partition. For instance:
200 #
201 # site:
202 # ccin2p3:
203 # class: lsst.ctrl.bps.parsl.sites.ccin2p3.Ccin2p3
204 # small:
205 # partition: htc
206 slot_partition = get_bps_config_value(self.site, f".{slot}.partition", str, "")
207 if slot_partition != "":
208 return slot_partition
210 # If a partition was specified in the workflow description file at
211 # the site level, use that partition. For instance:
212 #
213 # site:
214 # ccin2p3:
215 # class: lsst.ctrl.bps.parsl.sites.ccin2p3.Ccin2p3
216 # partition: htc
217 #
218 # Otherwise, use the default for this slot on this architecture.
219 return get_bps_config_value(self.site, ".partition", str, default_partition[architecture][slot])
221 def _get_walltime_for_slot(self, slot: str) -> str:
222 """Return the value for walltime Parsl must use to submit jobs for the
223 job slot `slot`. Values of `slot` can be "small", "medium", "large"
224 or "xlarge".
225 """
226 # If a specific walltime value was specified for this job slot in the
227 # configuration use that value. For instance:
228 #
229 # site:
230 # ccin2p3:
231 # class: lsst.ctrl.bps.parsl.sites.ccin2p3.Ccin2p3
232 # small:
233 # walltime: "3:00:00"
234 slot_walltime = get_bps_config_value(self.site, f".{slot}.walltime", str, "")
235 if slot_walltime != "":
236 return slot_walltime
238 # If a walltime value was specified for the site use that value.
239 # Otherwise, use the default walltime. For instance:
240 #
241 # site:
242 # ccin2p3:
243 # class: lsst.ctrl.bps.parsl.sites.ccin2p3.Ccin2p3
244 # walltime: "3:00:00"
245 return get_bps_config_value(self.site, ".walltime", str, self.DEFAULT_WALLTIME)
247 def get_executors(self) -> list[ParslExecutor]:
248 """Get a list of Parsl executors that can be used for processing a
249 workflow.
251 Each executor must have a unique ``label``.
252 """
253 executors: list[ParslExecutor] = []
254 for label, slot in self._slot_size.items():
255 # Compute the scheduler options for this job slot. Options
256 # specified at the slot level in the configuration file
257 # overwrite those specified at the site level.
258 scheduler_options = copy.deepcopy(self._scheduler_options)
259 if slot_scheduler_options := slot.get("scheduler_options", []):
260 scheduler_options = copy.deepcopy(slot_scheduler_options)
262 options = f"#SBATCH {' '.join(opt for opt in scheduler_options)}" if scheduler_options else ""
264 executor = HighThroughputExecutor(
265 label,
266 provider=SlurmProvider(
267 # Slurm partition to request blocks from.
268 partition=slot["partition"],
269 # Slurm account to which to charge resources used by the
270 # job.
271 account=self._account,
272 # Nodes to provision per block (1 block = 1 CPU core).
273 nodes_per_block=1,
274 # Number of CPU cores to provision per node.
275 cores_per_node=1,
276 # Memory per node (GB) for each Slurm job.
277 mem_per_node=slot["memory"],
278 # Initial number of blocks.
279 init_blocks=0,
280 # Minimum number of blocks to maintain.
281 min_blocks=0,
282 # Maximum number of blocks to maintain.
283 max_blocks=slot["max_blocks"],
284 # Time limit for each Slurm job.
285 walltime=slot["walltime"],
286 # '#SBATCH' directives to prepend to the Slurm submission
287 # script.
288 scheduler_options=options,
289 # Set the number of file descriptors and processes to
290 # the maximum allowed.
291 worker_init="ulimit -n hard && ulimit -u hard",
292 # Requests nodes which are not shared with other running
293 # jobs.
294 exclusive=False,
295 ),
296 # Address to connect to the main Parsl process.
297 address=self.get_address(),
298 # GB of memory required per worker. If specified the node
299 # manager will check the available memory at startup and limit
300 # the number of workers such that the there’s sufficient memory
301 # for each worker.
302 mem_per_worker=None,
303 # Caps the number of workers launched per node.
304 max_workers_per_node=1,
305 # Timeout period (in milliseconds) to be used by the
306 # executor components.
307 poll_period=1_000,
308 # Retry submitting to Slurm in case of submission error.
309 block_error_handler=False,
310 )
311 executors.append(executor)
313 return executors
315 def select_executor(self, job: "ParslJob") -> str:
316 """Get the ``label`` of the executor to use to execute ``job``.
318 Parameters
319 ----------
320 job : `lsst.ctrl.bps.parsl.ParslJob`
321 Job to be executed.
323 Returns
324 -------
325 label : `str`
326 Label of executor to use to execute ``job``.
327 """
328 # We choose the executor to use based only on the memory required
329 # by the job.
330 memory = job.generic.request_memory / 1024 # Convert to GB
331 for label in ("small", "medium", "large"):
332 if memory <= self._slot_size[label]["memory"]:
333 return label
335 return "xlarge"
337 def get_parsl_config(self) -> parsl.config.Config:
338 """Get Parsl configuration for using CC-IN2P3 Slurm farm as a
339 Parsl execution site.
341 Returns
342 -------
343 config : `parsl.config.Config`
344 The configuration to be used to initialize Parsl for this site.
345 """
346 executors = self.get_executors()
347 monitor = self.get_monitor()
349 # Number of retries in case of job failure.
350 retries = get_bps_config_value(self.site, ".retries", int, 0)
352 # Path to run directory.
353 run_dir = get_bps_config_value(self.site, ".run_dir", str, "parsl_runinfo")
355 # Strategy for scaling blocks according to workflow needs.
356 # Use a strategy that allows for scaling up and down Parsl workers.
357 strategy = get_bps_config_value(self.site, ".strategy", str, "htex_auto_scale")
359 return parsl.config.Config(
360 executors=executors,
361 monitoring=monitor,
362 retries=retries,
363 checkpoint_mode="task_exit",
364 run_dir=run_dir,
365 strategy=strategy,
366 )