Coverage for python / lsst / ctrl / bps / parsl / sites / ccin2p3.py: 29%

58 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-06 08:37 +0000

1import copy 

2import platform 

3from typing import TYPE_CHECKING, Any 

4 

5import parsl.config 

6from parsl.executors import HighThroughputExecutor 

7from parsl.executors.base import ParslExecutor 

8from parsl.providers import SlurmProvider 

9 

10from ..configuration import get_bps_config_value 

11from ..site import SiteConfig 

12 

13if TYPE_CHECKING: 

14 from ..job import ParslJob 

15 

16__all__ = ("Ccin2p3",) 

17 

18Kwargs = dict[str, Any] 

19 

20 

21class Ccin2p3(SiteConfig): 

22 """Configuration for executing Parsl jobs in CC-IN2P3 Slurm batch farm. 

23 

24 This class provides four job slot sizes each with its specific 

25 requirements, in particular in terms of memory. Those slot sizes are named 

26 "small", "medium", "large" and "xlarge". 

27 

28 Sensible default values for those requirements are provided for each 

29 job slot but you can overwrite those defaults either in the 

30 the BPS submission file or in a site configuration file that you 

31 include in your BPS submission file. 

32 

33 If you don't need to modify the default requirements for the job slot 

34 sizes, use the site specification below in your BPS configuration 

35 file: 

36 

37 .. code-block:: yaml 

38 

39 wmsServiceClass: lsst.ctrl.bps.parsl.ParslService 

40 computeSite: ccin2p3 

41 

42 site: 

43 ccin2p3: 

44 class: lsst.ctrl.bps.parsl.sites.ccin2p3.Ccin2p3 

45 

46 If you do need to modify those defaults, you can overwrite them for 

47 all job slots or for specific each job slots. Requirements specified 

48 for a job slot take priority over those specified for all job slots 

49 at the level of entry ``.site.ccin2p3:``. 

50 

51 This is an example of how to overwrite selected requirements in your BPS 

52 submission file: 

53 

54 .. code-block:: yaml 

55 

56 wmsServiceClass: lsst.ctrl.bps.parsl.ParslService 

57 computeSite: ccin2p3 

58 

59 site: 

60 ccin2p3: 

61 class: lsst.ctrl.bps.parsl.sites.ccin2p3.Ccin2p3 

62 walltime: "72:00:00" 

63 scheduler_options: 

64 - "--licenses=sps" 

65 - "--qos=normal" 

66 small: 

67 memory: 6 

68 partition: "flash" 

69 medium: 

70 memory: 10 

71 partition: "lsst,htc" 

72 large: 

73 memory: 80 

74 xlarge: 

75 memory: 180 

76 partition: "lsst" 

77 scheduler_options: 

78 - "--constraint=el7" 

79 - "--licenses=my_product" 

80 - "--reservation=my_reservation" 

81 

82 At the level of entry ``site.ccin2p3:`` in the BPS submission file, the 

83 following configuration parameters are accepted, which apply to all slot 

84 sizes: 

85 

86 - ``partition`` (`str`): name of the one or more configured partitions. If 

87 more than one, separate them with comma (','). 

88 (Default: "lsst,htc") 

89 - ``walltime`` (`str`): walltime to require for the job 

90 (Default: "72:00:00") 

91 - ``scheduler_options`` (`list` [`str`] ): scheduler options to send to 

92 Slurm for scheduling purposes. 

93 (Default: "--licenses=sps") 

94 

95 In addition, as shown in the previous example, for each job slot (i.e. 

96 "small", "medium", etc.) you can specify the requirements above as well as 

97 the following: 

98 

99 - ``max_blocks`` (`int`): maximum number of Slurm jobs that your workflow 

100 can simultaneously use. 

101 - ``memory`` (`int`): required amount of memory for each job, in Gigabytes. 

102 (Defaults: 4 for "small", 10 for "medium", 50 for "large" and 

103 150 for "xlarge"). 

104 

105 Parameters 

106 ---------- 

107 *args 

108 Arguments to initialize the super-class. 

109 **kwargs 

110 Keyword arguments to initialize the super-class. 

111 

112 Returns 

113 ------- 

114 Ccin2p3 : `lsst.ctrl.bps.parsl.SiteConfig` 

115 Concrete instance of a `~lsst.ctrl.bps.parsl.SiteConfig` specific for 

116 the CC-IN2P3 Slurm farm. 

117 """ 

118 

119 DEFAULT_ACCOUNT: str = "lsst" 

120 DEFAULT_WALLTIME: str = "72:00:00" 

121 DEFAULT_SCHEDULER_OPTIONS: list[str] = [ 

122 "--licenses=sps", 

123 ] 

124 

125 def __init__(self, *args, **kwargs): 

126 # Have BPS-defined resource requests for each job passed to executor. 

127 kwargs["resource_list"] = ["priority"] 

128 super().__init__(*args, **kwargs) 

129 self._account = get_bps_config_value(self.site, ".account", str, self.DEFAULT_ACCOUNT) 

130 self._scheduler_options = get_bps_config_value( 

131 self.site, ".scheduler_options", list, self.DEFAULT_SCHEDULER_OPTIONS 

132 ) 

133 self._slot_size = { 

134 "small": { 

135 "memory": get_bps_config_value(self.site, ".small.memory", int, 4), 

136 "walltime": self._get_walltime_for_slot("small"), 

137 "partition": self._get_partition_for_slot("small"), 

138 "max_blocks": get_bps_config_value(self.site, ".small.max_blocks", int, 3_000), 

139 "scheduler_options": get_bps_config_value(self.site, ".small.scheduler_options", list, []), 

140 }, 

141 "medium": { 

142 "memory": get_bps_config_value(self.site, ".medium.memory", int, 10), 

143 "walltime": self._get_walltime_for_slot("medium"), 

144 "partition": self._get_partition_for_slot("medium"), 

145 "max_blocks": get_bps_config_value(self.site, ".medium.max_blocks", int, 1_000), 

146 "scheduler_options": get_bps_config_value(self.site, ".medium.scheduler_options", list, []), 

147 }, 

148 "large": { 

149 "memory": get_bps_config_value(self.site, ".large.memory", int, 50), 

150 "walltime": self._get_walltime_for_slot("large"), 

151 "partition": self._get_partition_for_slot("large"), 

152 "max_blocks": get_bps_config_value(self.site, ".large.max_blocks", int, 100), 

153 "scheduler_options": get_bps_config_value(self.site, ".large.scheduler_options", list, []), 

154 }, 

155 "xlarge": { 

156 "memory": get_bps_config_value(self.site, ".xlarge.memory", int, 150), 

157 "walltime": self._get_walltime_for_slot("xlarge"), 

158 "partition": self._get_partition_for_slot("xlarge"), 

159 "max_blocks": get_bps_config_value(self.site, ".xlarge.max_blocks", int, 10), 

160 "scheduler_options": get_bps_config_value(self.site, ".xlarge.scheduler_options", list, []), 

161 }, 

162 } 

163 

164 def _get_partition_for_slot(self, slot: str) -> str: 

165 """Return the Slurm partition Parsl must use to submit jobs for the 

166 job slot `slot`. Values of `slot` can be "small", "medium", "large" 

167 or "xlarge". 

168 """ 

169 # The target Slurm partition must be selected according to the type of 

170 # the job slot but also according to the CPU architecture of the 

171 # compute node. 

172 # 

173 # Parsl requires that the CPU architecture of its orchestrator to 

174 # be identical to the architecture of its executors. Therefore, 

175 # we need to ensure that Slurm schedules our Parsl executors on 

176 # compute nodes with the same architecture as the host where this 

177 # orchestrator runs. 

178 

179 # Default target Slurm partitions per CPU architecture 

180 default_partition = { 

181 "aarch64": { 

182 "small": "htc_arm", 

183 "medium": "htc_arm", 

184 "large": "htc_arm", 

185 "xlarge": "htc_arm", 

186 }, 

187 "x86_64": { 

188 "small": "lsst,htc", 

189 "medium": "lsst", 

190 "large": "lsst", 

191 "xlarge": "lsst", 

192 }, 

193 } 

194 architecture = platform.machine() 

195 if architecture not in default_partition: 

196 raise ValueError(f"architecture {architecture} is not supported") 

197 

198 # If a partition was specified in the workflow description file 

199 # specifically for this job slot, use that partition. For instance: 

200 # 

201 # site: 

202 # ccin2p3: 

203 # class: lsst.ctrl.bps.parsl.sites.ccin2p3.Ccin2p3 

204 # small: 

205 # partition: htc 

206 slot_partition = get_bps_config_value(self.site, f".{slot}.partition", str, "") 

207 if slot_partition != "": 

208 return slot_partition 

209 

210 # If a partition was specified in the workflow description file at 

211 # the site level, use that partition. For instance: 

212 # 

213 # site: 

214 # ccin2p3: 

215 # class: lsst.ctrl.bps.parsl.sites.ccin2p3.Ccin2p3 

216 # partition: htc 

217 # 

218 # Otherwise, use the default for this slot on this architecture. 

219 return get_bps_config_value(self.site, ".partition", str, default_partition[architecture][slot]) 

220 

221 def _get_walltime_for_slot(self, slot: str) -> str: 

222 """Return the value for walltime Parsl must use to submit jobs for the 

223 job slot `slot`. Values of `slot` can be "small", "medium", "large" 

224 or "xlarge". 

225 """ 

226 # If a specific walltime value was specified for this job slot in the 

227 # configuration use that value. For instance: 

228 # 

229 # site: 

230 # ccin2p3: 

231 # class: lsst.ctrl.bps.parsl.sites.ccin2p3.Ccin2p3 

232 # small: 

233 # walltime: "3:00:00" 

234 slot_walltime = get_bps_config_value(self.site, f".{slot}.walltime", str, "") 

235 if slot_walltime != "": 

236 return slot_walltime 

237 

238 # If a walltime value was specified for the site use that value. 

239 # Otherwise, use the default walltime. For instance: 

240 # 

241 # site: 

242 # ccin2p3: 

243 # class: lsst.ctrl.bps.parsl.sites.ccin2p3.Ccin2p3 

244 # walltime: "3:00:00" 

245 return get_bps_config_value(self.site, ".walltime", str, self.DEFAULT_WALLTIME) 

246 

247 def get_executors(self) -> list[ParslExecutor]: 

248 """Get a list of Parsl executors that can be used for processing a 

249 workflow. 

250 

251 Each executor must have a unique ``label``. 

252 """ 

253 executors: list[ParslExecutor] = [] 

254 for label, slot in self._slot_size.items(): 

255 # Compute the scheduler options for this job slot. Options 

256 # specified at the slot level in the configuration file 

257 # overwrite those specified at the site level. 

258 scheduler_options = copy.deepcopy(self._scheduler_options) 

259 if slot_scheduler_options := slot.get("scheduler_options", []): 

260 scheduler_options = copy.deepcopy(slot_scheduler_options) 

261 

262 options = f"#SBATCH {' '.join(opt for opt in scheduler_options)}" if scheduler_options else "" 

263 

264 executor = HighThroughputExecutor( 

265 label, 

266 provider=SlurmProvider( 

267 # Slurm partition to request blocks from. 

268 partition=slot["partition"], 

269 # Slurm account to which to charge resources used by the 

270 # job. 

271 account=self._account, 

272 # Nodes to provision per block (1 block = 1 CPU core). 

273 nodes_per_block=1, 

274 # Number of CPU cores to provision per node. 

275 cores_per_node=1, 

276 # Memory per node (GB) for each Slurm job. 

277 mem_per_node=slot["memory"], 

278 # Initial number of blocks. 

279 init_blocks=0, 

280 # Minimum number of blocks to maintain. 

281 min_blocks=0, 

282 # Maximum number of blocks to maintain. 

283 max_blocks=slot["max_blocks"], 

284 # Time limit for each Slurm job. 

285 walltime=slot["walltime"], 

286 # '#SBATCH' directives to prepend to the Slurm submission 

287 # script. 

288 scheduler_options=options, 

289 # Set the number of file descriptors and processes to 

290 # the maximum allowed. 

291 worker_init="ulimit -n hard && ulimit -u hard", 

292 # Requests nodes which are not shared with other running 

293 # jobs. 

294 exclusive=False, 

295 ), 

296 # Address to connect to the main Parsl process. 

297 address=self.get_address(), 

298 # GB of memory required per worker. If specified the node 

299 # manager will check the available memory at startup and limit 

300 # the number of workers such that the there’s sufficient memory 

301 # for each worker. 

302 mem_per_worker=None, 

303 # Caps the number of workers launched per node. 

304 max_workers_per_node=1, 

305 # Timeout period (in milliseconds) to be used by the 

306 # executor components. 

307 poll_period=1_000, 

308 # Retry submitting to Slurm in case of submission error. 

309 block_error_handler=False, 

310 ) 

311 executors.append(executor) 

312 

313 return executors 

314 

315 def select_executor(self, job: "ParslJob") -> str: 

316 """Get the ``label`` of the executor to use to execute ``job``. 

317 

318 Parameters 

319 ---------- 

320 job : `lsst.ctrl.bps.parsl.ParslJob` 

321 Job to be executed. 

322 

323 Returns 

324 ------- 

325 label : `str` 

326 Label of executor to use to execute ``job``. 

327 """ 

328 # We choose the executor to use based only on the memory required 

329 # by the job. 

330 memory = job.generic.request_memory / 1024 # Convert to GB 

331 for label in ("small", "medium", "large"): 

332 if memory <= self._slot_size[label]["memory"]: 

333 return label 

334 

335 return "xlarge" 

336 

337 def get_parsl_config(self) -> parsl.config.Config: 

338 """Get Parsl configuration for using CC-IN2P3 Slurm farm as a 

339 Parsl execution site. 

340 

341 Returns 

342 ------- 

343 config : `parsl.config.Config` 

344 The configuration to be used to initialize Parsl for this site. 

345 """ 

346 executors = self.get_executors() 

347 monitor = self.get_monitor() 

348 

349 # Number of retries in case of job failure. 

350 retries = get_bps_config_value(self.site, ".retries", int, 0) 

351 

352 # Path to run directory. 

353 run_dir = get_bps_config_value(self.site, ".run_dir", str, "parsl_runinfo") 

354 

355 # Strategy for scaling blocks according to workflow needs. 

356 # Use a strategy that allows for scaling up and down Parsl workers. 

357 strategy = get_bps_config_value(self.site, ".strategy", str, "htex_auto_scale") 

358 

359 return parsl.config.Config( 

360 executors=executors, 

361 monitoring=monitor, 

362 retries=retries, 

363 checkpoint_mode="task_exit", 

364 run_dir=run_dir, 

365 strategy=strategy, 

366 )