Coverage for python / lsst / ctrl / execute / slurmPlugin.py: 0%
305 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-18 08:53 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-18 08:53 +0000
1# This file is part of ctrl_execute.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <https://www.gnu.org/licenses/>.
28import hashlib
29import logging
30import math
31import os
32import subprocess
33import sys
34import time
35from pathlib import Path
36from string import Template
38import htcondor
40from lsst.ctrl.bps.htcondor import condor_q
41from lsst.ctrl.execute.allocator import Allocator
42from lsst.ctrl.execute.findPackageFile import find_package_file
43from lsst.resources import ResourcePath, ResourcePathExpression
45_LOG = logging.getLogger(__name__)
48class SlurmPlugin(Allocator):
49 @staticmethod
50 def countSlurmJobs(jobname, jobstates):
51 """Check Slurm queue for Glideins of given states
53 Parameters
54 ----------
55 jobname : `string`
56 Slurm jobname to be searched for via squeue.
57 jobstates : `string`
58 Slurm jobstates to be searched for via squeue.
60 Returns
61 -------
62 numberOfJobs : `int`
63 The number of Slurm jobs detected via squeue.
64 """
65 batcmd = f"squeue --noheader --states={jobstates} --name={jobname} | wc -l"
66 _LOG.debug("The squeue command is %s", batcmd)
67 time.sleep(3)
68 try:
69 resultPD = subprocess.check_output(batcmd, shell=True)
70 except subprocess.CalledProcessError as e:
71 _LOG.error(e.output)
72 numberOfJobs = int(resultPD.decode("UTF-8"))
73 return numberOfJobs
75 @staticmethod
76 def countIdleSlurmJobs(jobname):
77 """Check Slurm queue for Idle Glideins
79 Parameters
80 ----------
81 jobname : `string`
82 Slurm jobname to be searched for via squeue.
84 Returns
85 -------
86 numberOfJobs : `int`
87 The number of Slurm jobs detected via squeue.
88 """
89 _LOG.info("Checking if idle Slurm job %s exists:", jobname)
90 numberOfJobs = SlurmPlugin.countSlurmJobs(jobname, jobstates="PD")
91 return numberOfJobs
93 @staticmethod
94 def countRunningSlurmJobs(jobname):
95 """Check Slurm queue for Running Glideins
97 Parameters
98 ----------
99 jobname : `string`
100 Slurm jobname to be searched for via squeue.
102 Returns
103 -------
104 numberOfJobs : `int`
105 The number of Slurm jobs detected via squeue.
106 """
107 _LOG.info("Checking if running Slurm job %s exists:", jobname)
108 numberOfJobs = SlurmPlugin.countSlurmJobs(jobname, jobstates="R")
109 return numberOfJobs
111 def createFilesFromTemplates(self):
112 """Create the Slurm submit, script, and htcondor config files
114 Returns
115 -------
116 generatedSlurmFile : `str`
117 name of the Slurm job description file
118 """
120 scratchDirParam = self.getScratchDirectory()
121 template = Template(scratchDirParam)
122 template.substitute(USER_HOME=self.getUserHome())
124 # create the slurm submit file
125 slurmName = find_package_file("generic.slurm.template", kind="templates", platform=self.platform)
126 generatedSlurmFile = self.createSubmitFile(slurmName)
128 # create the condor configuration file
129 condorFile = find_package_file(
130 "glidein_condor_config.template", kind="templates", platform=self.platform
131 )
132 self.createCondorConfigFile(condorFile)
134 # create the script that the slurm submit file calls
135 allocationName = find_package_file("allocation.sh.template", kind="templates", platform=self.platform)
136 self.createAllocationFile(allocationName)
138 _LOG.debug("The generated Slurm submit file is %s", generatedSlurmFile)
140 return generatedSlurmFile
142 def submit(self):
143 """Submit the glidein jobs to the Batch system."""
144 configName = find_package_file("slurmConfig.py", platform=self.platform)
146 self.loadSlurm(configName)
147 verbose = self.isVerbose()
148 auto = self.isAuto()
150 cpus = self.getCPUs()
151 memoryPerCore = self.getMemoryPerCore()
152 totalMemory = cpus * memoryPerCore
153 peakMemory = self.getPeakmemory()
154 if totalMemory > peakMemory:
155 totalMemory = peakMemory
156 _LOG.debug("Direct: Setting job memory to peak memory on platform.")
158 # run the sbatch command
159 template = Template(self.getLocalScratchDirectory())
160 localScratchDir = Path(template.substitute(USER_SCRATCH=self.getUserScratch()))
161 slurmSubmitDir = localScratchDir / self.defaults["DATE_STRING"]
162 localScratchDir.mkdir(exist_ok=True)
163 slurmSubmitDir.mkdir(exist_ok=True)
164 os.chdir(slurmSubmitDir)
165 _LOG.debug(
166 "The working local scratch directory localScratchDir is %s ",
167 localScratchDir,
168 )
170 auser = self.getUserName()
171 anodeset = self.getNodeset()
172 if anodeset is None:
173 jobname = f"glide_{auser}"
174 else:
175 jobname = f"{anodeset}glide_{auser}"
176 _LOG.debug("The unix user name is %s", auser)
177 _LOG.debug("The Slurm job name for the glidein jobs is %s", jobname)
178 _LOG.debug("The user home directory is %s", self.getUserHome())
180 if auto:
181 self.glideinsFromJobPressure()
182 else:
183 generatedSlurmFile = self.createFilesFromTemplates()
184 cmd = f"sbatch --mem {totalMemory} {generatedSlurmFile}"
185 nodes = self.getNodes()
186 # In this case 'nodes' is the Target.
188 # Limit number of cores to be <= 8000 which 500 16-core glideins
189 # allowed auto glideins is 500
190 allowedAutoGlideins = self.getAllowedAutoGlideins()
191 # auto glidein size is 16
192 autoSize = self.getAutoCPUs()
193 targetedCores = nodes * cpus
194 coreLimit = allowedAutoGlideins * autoSize
195 if targetedCores > coreLimit:
196 # Reduce number of nodes because of threshold
197 nodes = int(coreLimit / cpus)
198 _LOG.info("Reducing number of glideins because of core limit threshold")
199 _LOG.debug("coreLimit %d", coreLimit)
200 _LOG.debug("glidein size %d", cpus)
201 _LOG.info("New number of glideins %d", nodes)
203 _LOG.info("Targeting %d glidein(s) for the computing pool/set.", nodes)
204 batcmd = "".join(["squeue --noheader --name=", jobname, " | wc -l"])
205 _LOG.debug("The squeue command is: %s", batcmd)
206 try:
207 result = subprocess.check_output(batcmd, shell=True)
208 except subprocess.CalledProcessError as e:
209 _LOG.error(e.output)
210 strResult = result.decode("UTF-8")
212 _LOG.info("Detected this number of preexisting glidein jobs: %d", int(strResult))
214 numberToAdd = nodes - int(strResult)
215 _LOG.info("The number of glidein jobs to submit now is %d", numberToAdd)
217 for glide in range(0, numberToAdd):
218 _LOG.info("Submitting glidein %d", glide)
219 exitCode = self.runCommand(cmd, verbose)
220 if exitCode != 0:
221 _LOG.error("error running %s", cmd)
222 sys.exit(exitCode)
224 def loadSlurm(self, name):
225 if self.opts.reservation is not None:
226 self.defaults["RESERVATION"] = f"#SBATCH --reservation {self.opts.reservation}"
227 else:
228 self.defaults["RESERVATION"] = ""
230 if self.opts.exclude is not None:
231 self.defaults["EXCLUDE"] = f"#SBATCH --exclude {self.opts.exclude}"
232 else:
233 self.defaults["EXCLUDE"] = ""
235 if self.opts.nodelist is not None:
236 self.defaults["NODELIST"] = f"#SBATCH --nodelist {self.opts.nodelist}"
237 else:
238 self.defaults["NODELIST"] = ""
240 if self.opts.exclusive is not None:
241 self.defaults["EXCLUSIVE"] = "#SBATCH --exclusive"
242 else:
243 self.defaults["EXCLUSIVE"] = ""
245 if self.opts.exclusiveUser is not None:
246 self.defaults["EXCLUSER"] = "#SBATCH --exclusive=user"
247 else:
248 self.defaults["EXCLUSER"] = ""
250 if self.opts.qos:
251 self.defaults["QOS"] = f"#SBATCH --qos {self.opts.qos}"
252 else:
253 self.defaults["QOS"] = ""
255 allocationConfig = self.loadAllocationConfig(name, "slurm")
257 template = Template(allocationConfig.platform.scratchDirectory)
258 scratchDir = template.substitute(USER_SCRATCH=self.getUserScratch())
259 self.defaults["SCRATCH_DIR"] = scratchDir
261 self.allocationFileName = Path(self.configDir) / f"allocation_{self.uniqueIdentifier}.sh"
262 self.defaults["GENERATED_ALLOCATE_SCRIPT"] = self.allocationFileName.name
264 if self.opts.openfiles is None:
265 self.defaults["OPEN_FILES"] = 20480
266 else:
267 self.defaults["OPEN_FILES"] = self.opts.openfiles
269 if self.opts.nodeset is None:
270 self.defaults["NODESET_BLOCK"] = "#"
271 self.defaults["NODESET"] = ""
272 else:
273 self.defaults["NODESET_BLOCK"] = f'Nodeset = "{self.opts.nodeset}"'
274 self.defaults["NODESET"] = f"{self.opts.nodeset}"
276 # For partitionable slots the classad 'Cpus' shows how many cpus
277 # remain to be allocated. Thus for a slot running jobs the value
278 # of Rank of TotalCpus - Cpus will increase with the load.
279 # Because higher Rank is preferred, loaded slots are favored.
280 if self.opts.packnodes is None:
281 self.defaults["PACK_BLOCK"] = "#"
282 else:
283 self.defaults["PACK_BLOCK"] = "Rank = TotalCpus - Cpus"
285 # handle dynamic slot block template:
286 # 1) if it isn't specified, just put a comment in its place
287 # 2) if it's specified, but without a filename, use the default
288 # 3) if it's specified with a filename, use that.
289 if self.opts.dynamic is None:
290 self.defaults["DYNAMIC_SLOTS_BLOCK"] = "#"
291 return
293 if self.opts.dynamic == "__default__":
294 dynamicSlotsName = find_package_file(
295 "dynamic_slots.template", kind="templates", platform=self.platform
296 )
297 else:
298 dynamicSlotsName = ResourcePath(self.opts.dynamic)
300 with dynamicSlotsName.open() as f:
301 lines = f.readlines()
302 block = ""
303 for line in lines:
304 block += line
305 self.defaults["DYNAMIC_SLOTS_BLOCK"] = block
307 def createAllocationFile(self, input: ResourcePathExpression):
308 """Creates Allocation script file using the file "input" as a Template
310 Returns
311 -------
312 outfile : `str`
313 The newly created file name
314 """
315 outfile = self.createFile(input, self.allocationFileName)
316 _LOG.debug("Wrote new Slurm job allocation bash script to %s", outfile)
317 os.chmod(outfile, 0o755)
318 return outfile
320 def glideinsFromJobPressure(self):
321 """Determine and submit the glideins needed from job pressure."""
323 verbose = self.isVerbose()
324 cpus = self.getCPUs()
325 autoCPUs = self.getAutoCPUs()
326 minAutoCPUs = self.getMinAutoCPUs()
327 if cpus >= minAutoCPUs:
328 autoCPUs = cpus
329 memoryPerCore = self.getMemoryPerCore()
330 memoryLimit = autoCPUs * memoryPerCore
331 peakMemory = self.getPeakmemory()
332 if memoryLimit > peakMemory:
333 memoryLimit = peakMemory
334 _LOG.debug("Auto: Setting job memory to peak memory on platform.")
336 auser = self.getUserName()
337 anodeset = self.getNodeset()
339 # projection contains the job classads to be returned.
340 # These include the cpu and memory profile of each job,
341 # in the form of RequestCpus and RequestMemory
342 projection = [
343 "ClusterId",
344 "ProcId",
345 "JobStatus",
346 "Owner",
347 "RequestCpus",
348 "JobUniverse",
349 "RequestMemory",
350 ]
351 owner = f'(Owner=="{auser}")'
352 # query for idle jobs
353 jstat = f"(JobStatus=={htcondor.JobStatus.IDLE})"
354 # query for vanilla universe
355 # JobUniverse constants are in htcondor C++
356 # UNIVERSE = { 1: "Standard", ..., 5: "Vanilla", ... }
357 juniv = "(JobUniverse==5)"
359 # The constraint determines that the jobs to be returned belong to
360 # the current user (Owner) and are Idle vanilla universe jobs.
361 full_constraint = f"{owner} && {jstat} && {juniv}"
362 if anodeset is None:
363 full_constraint += " && (JobNodeset is None)"
364 else:
365 jnodeset = f'(JobNodeset=="{anodeset}")'
366 full_constraint += f" && {jnodeset}"
367 _LOG.info("Auto: Query for htcondor jobs.")
368 _LOG.debug("full_constraint %s", full_constraint)
369 try:
370 condorq_data = condor_q(
371 constraint=full_constraint,
372 projection=projection,
373 )
375 except Exception as exc:
376 raise type(exc)("Problem querying condor schedd for jobs") from None
378 if not condorq_data:
379 _LOG.info("Auto: No HTCondor Jobs detected.")
380 return
382 generatedSlurmFile = self.createFilesFromTemplates()
383 condorq_large = []
384 condorq_small = []
385 schedd_name, condorq_full = condorq_data.popitem()
387 _LOG.info("Auto: Search for Large htcondor jobs.")
388 for jid, ajob in condorq_full.items():
389 thisCpus = ajob["RequestCpus"]
390 if isinstance(ajob["RequestMemory"], int):
391 thisEvalMemory = ajob["RequestMemory"]
392 else:
393 thisEvalMemory = ajob["RequestMemory"].eval()
394 _LOG.debug("Making an evaluation %s", thisEvalMemory)
395 # Search for jobs that are Large jobs
396 # thisCpus > 16 or thisEvalMemory > 16*4096
397 ajob["RequestMemoryEval"] = thisEvalMemory
398 if thisEvalMemory > memoryLimit or thisCpus > autoCPUs:
399 _LOG.info("Appending a Large Job %s", jid)
400 condorq_large.append(ajob)
401 else:
402 condorq_small.append(ajob)
404 if not condorq_large:
405 _LOG.info("Auto: no Large jobs detected.")
406 else:
407 _LOG.info("Auto: detected Large jobs")
408 for ajob in condorq_large:
409 _LOG.debug("\n%d.%d", ajob["ClusterId"], ajob["ProcId"])
410 _LOG.debug("%s", ajob)
411 thisMemory = ajob["RequestMemoryEval"]
412 peakMemory = self.getPeakmemory()
413 if thisMemory > peakMemory:
414 thisMemory = peakMemory
415 _LOG.debug("Auto large: Setting job memory to peak memory on platform.")
416 useCores = ajob["RequestCpus"]
417 clusterid = ajob["ClusterId"]
418 procid = ajob["ProcId"]
419 job_label = f"{clusterid}_{procid}_{thisMemory}"
420 if useCores < autoCPUs:
421 useCores = autoCPUs
422 hash = hashlib.sha1(job_label.encode("UTF-8")).hexdigest()
423 shash = hash[:6]
424 if anodeset is None:
425 jobname = f"{auser}_{shash}"
426 else:
427 jobname = f"{anodeset}{auser}_{shash}"
428 _LOG.debug("jobname %s", jobname)
429 # Check if Job exists Idle in the queue
430 numberJobname = SlurmPlugin.countIdleSlurmJobs(jobname)
431 if numberJobname > 0:
432 _LOG.info("Job %s already exists, do not submit", jobname)
433 continue
434 cpuopt = f"--cpus-per-task {useCores}"
435 memopt = f"--mem {thisMemory}"
436 jobopt = f"-J {jobname}"
437 cmd = f"sbatch {cpuopt} {memopt} {jobopt} {generatedSlurmFile}"
438 _LOG.debug(cmd)
439 _LOG.info(
440 "Submitting Large glidein for %d.%d",
441 ajob["ClusterId"],
442 ajob["ProcId"],
443 )
444 time.sleep(3)
445 exitCode = self.runCommand(cmd, verbose)
446 if exitCode != 0:
447 _LOG.error("error running %s", cmd)
448 sys.exit(exitCode)
450 if not condorq_small:
451 _LOG.info("Auto: no small Jobs detected.")
452 else:
453 _LOG.info("Auto: summarize small jobs.")
454 maxNumberOfGlideins = self.getNodes()
455 maxAllowedNumberOfGlideins = self.getAllowedAutoGlideins()
456 _LOG.debug("maxNumberOfGlideins %d", maxNumberOfGlideins)
457 _LOG.debug("maxAllowedNumberOfGlideins %d", maxAllowedNumberOfGlideins)
458 # The number of cores for the small glideins is capped at 8000
459 # Corresponds to maxAllowedNumberOfGlideins = 500 16-core glideins
460 if maxNumberOfGlideins > maxAllowedNumberOfGlideins:
461 maxNumberOfGlideins = maxAllowedNumberOfGlideins
462 _LOG.info("Reducing Small Glidein limit due to threshold.")
463 #
464 # In the following loop we calculate the number of cores
465 # required by the set of small jobs. This calculation utilizes
466 # the requested cpus for a job, but also checks the requested
467 # memory and counts an effective core for each 'memoryPerCore'
468 # of memory (by default the 4GB per core of S3DF Slurm scheduler).
469 totalCores = 0
470 for ajob in condorq_small:
471 requestedCpus = ajob["RequestCpus"]
472 # if isinstance(ajob["RequestMemory"], int):
473 # requestedMemory = ajob["RequestMemory"]
474 # else:
475 # requestedMemory = ajob["RequestMemoryEval"]
476 # logging.debug("Using RequestMemoryEval")
477 requestedMemory = ajob["RequestMemoryEval"]
478 totalCores = totalCores + requestedCpus
479 _LOG.debug("small: jobid %d.%d", ajob["ClusterId"], ajob["ProcId"])
480 _LOG.debug("\tRequestCpus %d", requestedCpus)
481 _LOG.debug("\tCurrent value of totalCores %d", totalCores)
482 neededCpus = requestedMemory / memoryPerCore
483 if neededCpus > requestedCpus:
484 _LOG.debug("\t\tNeed to Add More:")
485 _LOG.debug("\t\tRequestMemory is %d", requestedMemory)
486 _LOG.debug("\t\tRatio to %d MB is %d", memoryPerCore, neededCpus)
487 totalCores = totalCores + (neededCpus - requestedCpus)
488 _LOG.debug("\t\tCurrent value of totalCores %d", totalCores)
490 _LOG.info("small: The final TotalCores is %d", totalCores)
492 # The number of Glideins needed to service the detected Idle jobs
493 # is "numberOfGlideins"
494 numberOfGlideins = math.ceil(totalCores / autoCPUs)
495 _LOG.info("small: Number for detected jobs is %d", numberOfGlideins)
497 if anodeset is None:
498 jobname = f"glide_{auser}"
499 else:
500 jobname = f"{anodeset}glide_{auser}"
502 # Check Slurm queue Running glideins
503 existingGlideinsRunning = SlurmPlugin.countRunningSlurmJobs(jobname)
505 # Check Slurm queue Idle Glideins
506 existingGlideinsIdle = SlurmPlugin.countIdleSlurmJobs(jobname)
508 _LOG.debug("small: existingGlideinsRunning %d", existingGlideinsRunning)
509 _LOG.debug("small: existingGlideinsIdle %d", existingGlideinsIdle)
511 # The number of Glideins needed to service the detected
512 # Idle jobs is "numberOfGlideins" less the existing Idle glideins
513 numberOfGlideinsReduced = numberOfGlideins - existingGlideinsIdle
514 _LOG.debug("small: Target Number to submit %d", numberOfGlideinsReduced)
516 # The maximum number of Glideins that we can submit with
517 # the imposed threshold (maxNumberOfGlideins)
518 # is maxSubmitGlideins
519 existingGlideins = existingGlideinsRunning + existingGlideinsIdle
520 maxSubmitGlideins = maxNumberOfGlideins - existingGlideins
521 _LOG.debug("small: maxNumberOfGlideins %d", maxNumberOfGlideins)
522 _LOG.debug("small: maxSubmitGlideins %d", maxSubmitGlideins)
524 # Reduce the number of Glideins to submit if threshold exceeded
525 if numberOfGlideinsReduced > maxSubmitGlideins:
526 numberOfGlideinsReduced = maxSubmitGlideins
527 _LOG.info("small: Reducing due to threshold.")
528 _LOG.debug("small: Number of Glideins to submit is %d", numberOfGlideinsReduced)
530 cpuopt = f"--cpus-per-task {autoCPUs}"
531 memopt = f"--mem {memoryLimit}"
532 jobopt = f"-J {jobname}"
533 cmd = f"sbatch {cpuopt} {memopt} {jobopt} {generatedSlurmFile}"
534 _LOG.debug(cmd)
535 for glide in range(0, numberOfGlideinsReduced):
536 _LOG.info("Submitting glidein %s", glide)
537 exitCode = self.runCommand(cmd, verbose)
538 if exitCode != 0:
539 _LOG.error("error running %s", cmd)
540 sys.exit(exitCode)
542 return