Coverage for python / lsst / ctrl / execute / slurmPlugin.py: 0%

305 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-06 08:41 +0000

1# This file is part of ctrl_execute. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <https://www.gnu.org/licenses/>. 

27 

28import hashlib 

29import logging 

30import math 

31import os 

32import subprocess 

33import sys 

34import time 

35from pathlib import Path 

36from string import Template 

37 

38import htcondor 

39 

40from lsst.ctrl.bps.htcondor import condor_q 

41from lsst.ctrl.execute.allocator import Allocator 

42from lsst.ctrl.execute.findPackageFile import find_package_file 

43from lsst.resources import ResourcePath, ResourcePathExpression 

44 

45_LOG = logging.getLogger(__name__) 

46 

47 

48class SlurmPlugin(Allocator): 

49 @staticmethod 

50 def countSlurmJobs(jobname, jobstates): 

51 """Check Slurm queue for Glideins of given states 

52 

53 Parameters 

54 ---------- 

55 jobname : `string` 

56 Slurm jobname to be searched for via squeue. 

57 jobstates : `string` 

58 Slurm jobstates to be searched for via squeue. 

59 

60 Returns 

61 ------- 

62 numberOfJobs : `int` 

63 The number of Slurm jobs detected via squeue. 

64 """ 

65 batcmd = f"squeue --noheader --states={jobstates} --name={jobname} | wc -l" 

66 _LOG.debug("The squeue command is %s", batcmd) 

67 time.sleep(3) 

68 try: 

69 resultPD = subprocess.check_output(batcmd, shell=True) 

70 except subprocess.CalledProcessError as e: 

71 _LOG.error(e.output) 

72 numberOfJobs = int(resultPD.decode("UTF-8")) 

73 return numberOfJobs 

74 

75 @staticmethod 

76 def countIdleSlurmJobs(jobname): 

77 """Check Slurm queue for Idle Glideins 

78 

79 Parameters 

80 ---------- 

81 jobname : `string` 

82 Slurm jobname to be searched for via squeue. 

83 

84 Returns 

85 ------- 

86 numberOfJobs : `int` 

87 The number of Slurm jobs detected via squeue. 

88 """ 

89 _LOG.info("Checking if idle Slurm job %s exists:", jobname) 

90 numberOfJobs = SlurmPlugin.countSlurmJobs(jobname, jobstates="PD") 

91 return numberOfJobs 

92 

93 @staticmethod 

94 def countRunningSlurmJobs(jobname): 

95 """Check Slurm queue for Running Glideins 

96 

97 Parameters 

98 ---------- 

99 jobname : `string` 

100 Slurm jobname to be searched for via squeue. 

101 

102 Returns 

103 ------- 

104 numberOfJobs : `int` 

105 The number of Slurm jobs detected via squeue. 

106 """ 

107 _LOG.info("Checking if running Slurm job %s exists:", jobname) 

108 numberOfJobs = SlurmPlugin.countSlurmJobs(jobname, jobstates="R") 

109 return numberOfJobs 

110 

111 def createFilesFromTemplates(self): 

112 """Create the Slurm submit, script, and htcondor config files 

113 

114 Returns 

115 ------- 

116 generatedSlurmFile : `str` 

117 name of the Slurm job description file 

118 """ 

119 

120 scratchDirParam = self.getScratchDirectory() 

121 template = Template(scratchDirParam) 

122 template.substitute(USER_HOME=self.getUserHome()) 

123 

124 # create the slurm submit file 

125 slurmName = find_package_file("generic.slurm.template", kind="templates", platform=self.platform) 

126 generatedSlurmFile = self.createSubmitFile(slurmName) 

127 

128 # create the condor configuration file 

129 condorFile = find_package_file( 

130 "glidein_condor_config.template", kind="templates", platform=self.platform 

131 ) 

132 self.createCondorConfigFile(condorFile) 

133 

134 # create the script that the slurm submit file calls 

135 allocationName = find_package_file("allocation.sh.template", kind="templates", platform=self.platform) 

136 self.createAllocationFile(allocationName) 

137 

138 _LOG.debug("The generated Slurm submit file is %s", generatedSlurmFile) 

139 

140 return generatedSlurmFile 

141 

142 def submit(self): 

143 """Submit the glidein jobs to the Batch system.""" 

144 configName = find_package_file("slurmConfig.py", platform=self.platform) 

145 

146 self.loadSlurm(configName) 

147 verbose = self.isVerbose() 

148 auto = self.isAuto() 

149 

150 cpus = self.getCPUs() 

151 memoryPerCore = self.getMemoryPerCore() 

152 totalMemory = cpus * memoryPerCore 

153 peakMemory = self.getPeakmemory() 

154 if totalMemory > peakMemory: 

155 totalMemory = peakMemory 

156 _LOG.debug("Direct: Setting job memory to peak memory on platform.") 

157 

158 # run the sbatch command 

159 template = Template(self.getLocalScratchDirectory()) 

160 localScratchDir = Path(template.substitute(USER_SCRATCH=self.getUserScratch())) 

161 slurmSubmitDir = localScratchDir / self.defaults["DATE_STRING"] 

162 localScratchDir.mkdir(exist_ok=True) 

163 slurmSubmitDir.mkdir(exist_ok=True) 

164 os.chdir(slurmSubmitDir) 

165 _LOG.debug( 

166 "The working local scratch directory localScratchDir is %s ", 

167 localScratchDir, 

168 ) 

169 

170 auser = self.getUserName() 

171 anodeset = self.getNodeset() 

172 if anodeset is None: 

173 jobname = f"glide_{auser}" 

174 else: 

175 jobname = f"{anodeset}glide_{auser}" 

176 _LOG.debug("The unix user name is %s", auser) 

177 _LOG.debug("The Slurm job name for the glidein jobs is %s", jobname) 

178 _LOG.debug("The user home directory is %s", self.getUserHome()) 

179 

180 if auto: 

181 self.glideinsFromJobPressure() 

182 else: 

183 generatedSlurmFile = self.createFilesFromTemplates() 

184 cmd = f"sbatch --mem {totalMemory} {generatedSlurmFile}" 

185 nodes = self.getNodes() 

186 # In this case 'nodes' is the Target. 

187 

188 # Limit number of cores to be <= 8000 which 500 16-core glideins 

189 # allowed auto glideins is 500 

190 allowedAutoGlideins = self.getAllowedAutoGlideins() 

191 # auto glidein size is 16 

192 autoSize = self.getAutoCPUs() 

193 targetedCores = nodes * cpus 

194 coreLimit = allowedAutoGlideins * autoSize 

195 if targetedCores > coreLimit: 

196 # Reduce number of nodes because of threshold 

197 nodes = int(coreLimit / cpus) 

198 _LOG.info("Reducing number of glideins because of core limit threshold") 

199 _LOG.debug("coreLimit %d", coreLimit) 

200 _LOG.debug("glidein size %d", cpus) 

201 _LOG.info("New number of glideins %d", nodes) 

202 

203 _LOG.info("Targeting %d glidein(s) for the computing pool/set.", nodes) 

204 batcmd = "".join(["squeue --noheader --name=", jobname, " | wc -l"]) 

205 _LOG.debug("The squeue command is: %s", batcmd) 

206 try: 

207 result = subprocess.check_output(batcmd, shell=True) 

208 except subprocess.CalledProcessError as e: 

209 _LOG.error(e.output) 

210 strResult = result.decode("UTF-8") 

211 

212 _LOG.info("Detected this number of preexisting glidein jobs: %d", int(strResult)) 

213 

214 numberToAdd = nodes - int(strResult) 

215 _LOG.info("The number of glidein jobs to submit now is %d", numberToAdd) 

216 

217 for glide in range(0, numberToAdd): 

218 _LOG.info("Submitting glidein %d", glide) 

219 exitCode = self.runCommand(cmd, verbose) 

220 if exitCode != 0: 

221 _LOG.error("error running %s", cmd) 

222 sys.exit(exitCode) 

223 

224 def loadSlurm(self, name): 

225 if self.opts.reservation is not None: 

226 self.defaults["RESERVATION"] = f"#SBATCH --reservation {self.opts.reservation}" 

227 else: 

228 self.defaults["RESERVATION"] = "" 

229 

230 if self.opts.exclude is not None: 

231 self.defaults["EXCLUDE"] = f"#SBATCH --exclude {self.opts.exclude}" 

232 else: 

233 self.defaults["EXCLUDE"] = "" 

234 

235 if self.opts.nodelist is not None: 

236 self.defaults["NODELIST"] = f"#SBATCH --nodelist {self.opts.nodelist}" 

237 else: 

238 self.defaults["NODELIST"] = "" 

239 

240 if self.opts.exclusive is not None: 

241 self.defaults["EXCLUSIVE"] = "#SBATCH --exclusive" 

242 else: 

243 self.defaults["EXCLUSIVE"] = "" 

244 

245 if self.opts.exclusiveUser is not None: 

246 self.defaults["EXCLUSER"] = "#SBATCH --exclusive=user" 

247 else: 

248 self.defaults["EXCLUSER"] = "" 

249 

250 if self.opts.qos: 

251 self.defaults["QOS"] = f"#SBATCH --qos {self.opts.qos}" 

252 else: 

253 self.defaults["QOS"] = "" 

254 

255 allocationConfig = self.loadAllocationConfig(name, "slurm") 

256 

257 template = Template(allocationConfig.platform.scratchDirectory) 

258 scratchDir = template.substitute(USER_SCRATCH=self.getUserScratch()) 

259 self.defaults["SCRATCH_DIR"] = scratchDir 

260 

261 self.allocationFileName = Path(self.configDir) / f"allocation_{self.uniqueIdentifier}.sh" 

262 self.defaults["GENERATED_ALLOCATE_SCRIPT"] = self.allocationFileName.name 

263 

264 if self.opts.openfiles is None: 

265 self.defaults["OPEN_FILES"] = 20480 

266 else: 

267 self.defaults["OPEN_FILES"] = self.opts.openfiles 

268 

269 if self.opts.nodeset is None: 

270 self.defaults["NODESET_BLOCK"] = "#" 

271 self.defaults["NODESET"] = "" 

272 else: 

273 self.defaults["NODESET_BLOCK"] = f'Nodeset = "{self.opts.nodeset}"' 

274 self.defaults["NODESET"] = f"{self.opts.nodeset}" 

275 

276 # For partitionable slots the classad 'Cpus' shows how many cpus 

277 # remain to be allocated. Thus for a slot running jobs the value 

278 # of Rank of TotalCpus - Cpus will increase with the load. 

279 # Because higher Rank is preferred, loaded slots are favored. 

280 if self.opts.packnodes is None: 

281 self.defaults["PACK_BLOCK"] = "#" 

282 else: 

283 self.defaults["PACK_BLOCK"] = "Rank = TotalCpus - Cpus" 

284 

285 # handle dynamic slot block template: 

286 # 1) if it isn't specified, just put a comment in its place 

287 # 2) if it's specified, but without a filename, use the default 

288 # 3) if it's specified with a filename, use that. 

289 if self.opts.dynamic is None: 

290 self.defaults["DYNAMIC_SLOTS_BLOCK"] = "#" 

291 return 

292 

293 if self.opts.dynamic == "__default__": 

294 dynamicSlotsName = find_package_file( 

295 "dynamic_slots.template", kind="templates", platform=self.platform 

296 ) 

297 else: 

298 dynamicSlotsName = ResourcePath(self.opts.dynamic) 

299 

300 with dynamicSlotsName.open() as f: 

301 lines = f.readlines() 

302 block = "" 

303 for line in lines: 

304 block += line 

305 self.defaults["DYNAMIC_SLOTS_BLOCK"] = block 

306 

307 def createAllocationFile(self, input: ResourcePathExpression): 

308 """Creates Allocation script file using the file "input" as a Template 

309 

310 Returns 

311 ------- 

312 outfile : `str` 

313 The newly created file name 

314 """ 

315 outfile = self.createFile(input, self.allocationFileName) 

316 _LOG.debug("Wrote new Slurm job allocation bash script to %s", outfile) 

317 os.chmod(outfile, 0o755) 

318 return outfile 

319 

320 def glideinsFromJobPressure(self): 

321 """Determine and submit the glideins needed from job pressure.""" 

322 

323 verbose = self.isVerbose() 

324 cpus = self.getCPUs() 

325 autoCPUs = self.getAutoCPUs() 

326 minAutoCPUs = self.getMinAutoCPUs() 

327 if cpus >= minAutoCPUs: 

328 autoCPUs = cpus 

329 memoryPerCore = self.getMemoryPerCore() 

330 memoryLimit = autoCPUs * memoryPerCore 

331 peakMemory = self.getPeakmemory() 

332 if memoryLimit > peakMemory: 

333 memoryLimit = peakMemory 

334 _LOG.debug("Auto: Setting job memory to peak memory on platform.") 

335 

336 auser = self.getUserName() 

337 anodeset = self.getNodeset() 

338 

339 # projection contains the job classads to be returned. 

340 # These include the cpu and memory profile of each job, 

341 # in the form of RequestCpus and RequestMemory 

342 projection = [ 

343 "ClusterId", 

344 "ProcId", 

345 "JobStatus", 

346 "Owner", 

347 "RequestCpus", 

348 "JobUniverse", 

349 "RequestMemory", 

350 ] 

351 owner = f'(Owner=="{auser}")' 

352 # query for idle jobs 

353 jstat = f"(JobStatus=={htcondor.JobStatus.IDLE})" 

354 # query for vanilla universe 

355 # JobUniverse constants are in htcondor C++ 

356 # UNIVERSE = { 1: "Standard", ..., 5: "Vanilla", ... } 

357 juniv = "(JobUniverse==5)" 

358 

359 # The constraint determines that the jobs to be returned belong to 

360 # the current user (Owner) and are Idle vanilla universe jobs. 

361 full_constraint = f"{owner} && {jstat} && {juniv}" 

362 if anodeset is None: 

363 full_constraint += " && (JobNodeset is None)" 

364 else: 

365 jnodeset = f'(JobNodeset=="{anodeset}")' 

366 full_constraint += f" && {jnodeset}" 

367 _LOG.info("Auto: Query for htcondor jobs.") 

368 _LOG.debug("full_constraint %s", full_constraint) 

369 try: 

370 condorq_data = condor_q( 

371 constraint=full_constraint, 

372 projection=projection, 

373 ) 

374 

375 except Exception as exc: 

376 raise type(exc)("Problem querying condor schedd for jobs") from None 

377 

378 if not condorq_data: 

379 _LOG.info("Auto: No HTCondor Jobs detected.") 

380 return 

381 

382 generatedSlurmFile = self.createFilesFromTemplates() 

383 condorq_large = [] 

384 condorq_small = [] 

385 schedd_name, condorq_full = condorq_data.popitem() 

386 

387 _LOG.info("Auto: Search for Large htcondor jobs.") 

388 for jid, ajob in condorq_full.items(): 

389 thisCpus = ajob["RequestCpus"] 

390 if isinstance(ajob["RequestMemory"], int): 

391 thisEvalMemory = ajob["RequestMemory"] 

392 else: 

393 thisEvalMemory = ajob["RequestMemory"].eval() 

394 _LOG.debug("Making an evaluation %s", thisEvalMemory) 

395 # Search for jobs that are Large jobs 

396 # thisCpus > 16 or thisEvalMemory > 16*4096 

397 ajob["RequestMemoryEval"] = thisEvalMemory 

398 if thisEvalMemory > memoryLimit or thisCpus > autoCPUs: 

399 _LOG.info("Appending a Large Job %s", jid) 

400 condorq_large.append(ajob) 

401 else: 

402 condorq_small.append(ajob) 

403 

404 if not condorq_large: 

405 _LOG.info("Auto: no Large jobs detected.") 

406 else: 

407 _LOG.info("Auto: detected Large jobs") 

408 for ajob in condorq_large: 

409 _LOG.debug("\n%d.%d", ajob["ClusterId"], ajob["ProcId"]) 

410 _LOG.debug("%s", ajob) 

411 thisMemory = ajob["RequestMemoryEval"] 

412 peakMemory = self.getPeakmemory() 

413 if thisMemory > peakMemory: 

414 thisMemory = peakMemory 

415 _LOG.debug("Auto large: Setting job memory to peak memory on platform.") 

416 useCores = ajob["RequestCpus"] 

417 clusterid = ajob["ClusterId"] 

418 procid = ajob["ProcId"] 

419 job_label = f"{clusterid}_{procid}_{thisMemory}" 

420 if useCores < autoCPUs: 

421 useCores = autoCPUs 

422 hash = hashlib.sha1(job_label.encode("UTF-8")).hexdigest() 

423 shash = hash[:6] 

424 if anodeset is None: 

425 jobname = f"{auser}_{shash}" 

426 else: 

427 jobname = f"{anodeset}{auser}_{shash}" 

428 _LOG.debug("jobname %s", jobname) 

429 # Check if Job exists Idle in the queue 

430 numberJobname = SlurmPlugin.countIdleSlurmJobs(jobname) 

431 if numberJobname > 0: 

432 _LOG.info("Job %s already exists, do not submit", jobname) 

433 continue 

434 cpuopt = f"--cpus-per-task {useCores}" 

435 memopt = f"--mem {thisMemory}" 

436 jobopt = f"-J {jobname}" 

437 cmd = f"sbatch {cpuopt} {memopt} {jobopt} {generatedSlurmFile}" 

438 _LOG.debug(cmd) 

439 _LOG.info( 

440 "Submitting Large glidein for %d.%d", 

441 ajob["ClusterId"], 

442 ajob["ProcId"], 

443 ) 

444 time.sleep(3) 

445 exitCode = self.runCommand(cmd, verbose) 

446 if exitCode != 0: 

447 _LOG.error("error running %s", cmd) 

448 sys.exit(exitCode) 

449 

450 if not condorq_small: 

451 _LOG.info("Auto: no small Jobs detected.") 

452 else: 

453 _LOG.info("Auto: summarize small jobs.") 

454 maxNumberOfGlideins = self.getNodes() 

455 maxAllowedNumberOfGlideins = self.getAllowedAutoGlideins() 

456 _LOG.debug("maxNumberOfGlideins %d", maxNumberOfGlideins) 

457 _LOG.debug("maxAllowedNumberOfGlideins %d", maxAllowedNumberOfGlideins) 

458 # The number of cores for the small glideins is capped at 8000 

459 # Corresponds to maxAllowedNumberOfGlideins = 500 16-core glideins 

460 if maxNumberOfGlideins > maxAllowedNumberOfGlideins: 

461 maxNumberOfGlideins = maxAllowedNumberOfGlideins 

462 _LOG.info("Reducing Small Glidein limit due to threshold.") 

463 # 

464 # In the following loop we calculate the number of cores 

465 # required by the set of small jobs. This calculation utilizes 

466 # the requested cpus for a job, but also checks the requested 

467 # memory and counts an effective core for each 'memoryPerCore' 

468 # of memory (by default the 4GB per core of S3DF Slurm scheduler). 

469 totalCores = 0 

470 for ajob in condorq_small: 

471 requestedCpus = ajob["RequestCpus"] 

472 # if isinstance(ajob["RequestMemory"], int): 

473 # requestedMemory = ajob["RequestMemory"] 

474 # else: 

475 # requestedMemory = ajob["RequestMemoryEval"] 

476 # logging.debug("Using RequestMemoryEval") 

477 requestedMemory = ajob["RequestMemoryEval"] 

478 totalCores = totalCores + requestedCpus 

479 _LOG.debug("small: jobid %d.%d", ajob["ClusterId"], ajob["ProcId"]) 

480 _LOG.debug("\tRequestCpus %d", requestedCpus) 

481 _LOG.debug("\tCurrent value of totalCores %d", totalCores) 

482 neededCpus = requestedMemory / memoryPerCore 

483 if neededCpus > requestedCpus: 

484 _LOG.debug("\t\tNeed to Add More:") 

485 _LOG.debug("\t\tRequestMemory is %d", requestedMemory) 

486 _LOG.debug("\t\tRatio to %d MB is %d", memoryPerCore, neededCpus) 

487 totalCores = totalCores + (neededCpus - requestedCpus) 

488 _LOG.debug("\t\tCurrent value of totalCores %d", totalCores) 

489 

490 _LOG.info("small: The final TotalCores is %d", totalCores) 

491 

492 # The number of Glideins needed to service the detected Idle jobs 

493 # is "numberOfGlideins" 

494 numberOfGlideins = math.ceil(totalCores / autoCPUs) 

495 _LOG.info("small: Number for detected jobs is %d", numberOfGlideins) 

496 

497 if anodeset is None: 

498 jobname = f"glide_{auser}" 

499 else: 

500 jobname = f"{anodeset}glide_{auser}" 

501 

502 # Check Slurm queue Running glideins 

503 existingGlideinsRunning = SlurmPlugin.countRunningSlurmJobs(jobname) 

504 

505 # Check Slurm queue Idle Glideins 

506 existingGlideinsIdle = SlurmPlugin.countIdleSlurmJobs(jobname) 

507 

508 _LOG.debug("small: existingGlideinsRunning %d", existingGlideinsRunning) 

509 _LOG.debug("small: existingGlideinsIdle %d", existingGlideinsIdle) 

510 

511 # The number of Glideins needed to service the detected 

512 # Idle jobs is "numberOfGlideins" less the existing Idle glideins 

513 numberOfGlideinsReduced = numberOfGlideins - existingGlideinsIdle 

514 _LOG.debug("small: Target Number to submit %d", numberOfGlideinsReduced) 

515 

516 # The maximum number of Glideins that we can submit with 

517 # the imposed threshold (maxNumberOfGlideins) 

518 # is maxSubmitGlideins 

519 existingGlideins = existingGlideinsRunning + existingGlideinsIdle 

520 maxSubmitGlideins = maxNumberOfGlideins - existingGlideins 

521 _LOG.debug("small: maxNumberOfGlideins %d", maxNumberOfGlideins) 

522 _LOG.debug("small: maxSubmitGlideins %d", maxSubmitGlideins) 

523 

524 # Reduce the number of Glideins to submit if threshold exceeded 

525 if numberOfGlideinsReduced > maxSubmitGlideins: 

526 numberOfGlideinsReduced = maxSubmitGlideins 

527 _LOG.info("small: Reducing due to threshold.") 

528 _LOG.debug("small: Number of Glideins to submit is %d", numberOfGlideinsReduced) 

529 

530 cpuopt = f"--cpus-per-task {autoCPUs}" 

531 memopt = f"--mem {memoryLimit}" 

532 jobopt = f"-J {jobname}" 

533 cmd = f"sbatch {cpuopt} {memopt} {jobopt} {generatedSlurmFile}" 

534 _LOG.debug(cmd) 

535 for glide in range(0, numberOfGlideinsReduced): 

536 _LOG.info("Submitting glidein %s", glide) 

537 exitCode = self.runCommand(cmd, verbose) 

538 if exitCode != 0: 

539 _LOG.error("error running %s", cmd) 

540 sys.exit(exitCode) 

541 

542 return