Coverage for python / lsst / ctrl / execute / allocator.py: 18%
220 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-26 09:09 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-26 09:09 +0000
1# This file is part of ctrl_execute.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <https://www.gnu.org/licenses/>.
28import logging
29import os
30import pwd
31import sys
32from datetime import datetime
33from string import Template
35from lsst.ctrl.execute.allocationConfig import AllocationConfig
36from lsst.ctrl.execute.condorInfoConfig import CondorInfoConfig
37from lsst.ctrl.execute.templateWriter import TemplateWriter
38from lsst.resources import ResourcePath, ResourcePathExpression
40_LOG = logging.getLogger(__name__)
43class Allocator:
44 """A class which consolidates allocation pex_config information with
45 override information (obtained from the command line) and produces a
46 PBS file using these values.
48 Parameters
49 ----------
50 platform : `str`
51 the name of the platform to execute on
52 opts : `Config`
53 Config object containing options
54 condorInfoFileName : `lsst.resources.ResourcePathExpression`
55 Name of the file containing Config information
57 Raises
58 ------
59 TypeError
60 If the condorInfoFileName is the wrong type.
61 """
63 def __init__(
64 self,
65 platform: str,
66 opts,
67 configuration,
68 condorInfoFileName: ResourcePathExpression,
69 ):
70 """Constructor
71 @param platform: target platform for PBS submission
72 @param opts: options to override
73 """
74 self.opts = opts
75 self.defaults = {}
76 self.configuration = configuration
78 condorInfoConfig = CondorInfoConfig()
79 condorInfoConfig.load(condorInfoFileName)
81 self.platform = platform
83 # Look up the user's name and home and scratch directory in the
84 # $HOME/.lsst/condor-info.py file
85 user_name = None
86 user_home = None
87 user_scratch = None
88 for name in condorInfoConfig.platform:
89 if name == self.platform:
90 user_name = condorInfoConfig.platform[name].user.name
91 user_home = condorInfoConfig.platform[name].user.home
92 user_scratch = condorInfoConfig.platform[name].user.scratch
93 if user_scratch is None and "SCRATCH" in os.environ:
94 user_scratch = os.environ["SCRATCH"]
95 if user_name is None:
96 raise RuntimeError(
97 f"error: {condorInfoFileName} does not specify user name for platform == {self.platform}"
98 )
99 if user_home is None:
100 raise RuntimeError(
101 f"error: {condorInfoFileName} does not specify user home for platform == {self.platform}"
102 )
103 if user_scratch is None:
104 raise RuntimeError(
105 f"error: {condorInfoFileName} does not specify user scratch for platform == {self.platform}"
106 )
107 self.defaults["USER_NAME"] = user_name
108 self.defaults["USER_HOME"] = user_home
109 self.defaults["USER_SCRATCH"] = user_scratch
110 self.commandLineDefaults = {}
111 self.commandLineDefaults["NODE_COUNT"] = self.opts.nodeCount
112 if self.configuration.platform.collector:
113 self.commandLineDefaults["COLLECTOR"] = self.configuration.platform.collector
114 if self.opts.collector:
115 self.commandLineDefaults["COLLECTOR"] = self.opts.collector
116 self.commandLineDefaults["CPORT"] = self.opts.collectorport
117 if self.configuration.platform.peakcpus:
118 self.commandLineDefaults["PEAKCPUS"] = self.configuration.platform.peakcpus
119 else:
120 self.commandLineDefaults["PEAKCPUS"] = 256
121 if self.configuration.platform.peakmemory:
122 self.commandLineDefaults["PEAKMEMORY"] = self.configuration.platform.peakmemory
123 else:
124 self.commandLineDefaults["PEAKMEMORY"] = 1000000
125 if self.opts.exclusive:
126 self.commandLineDefaults["CPUS"] = self.configuration.platform.peakcpus
127 else:
128 if self.opts.cpus < self.configuration.platform.peakcpus:
129 self.commandLineDefaults["CPUS"] = self.opts.cpus
130 else:
131 self.commandLineDefaults["CPUS"] = self.configuration.platform.peakcpus
132 self.commandLineDefaults["WALL_CLOCK"] = self.opts.maximumWallClock
133 self.commandLineDefaults["ACCOUNT"] = self.opts.account
134 self.commandLineDefaults["MEMPERCORE"] = self.opts.mempercore
135 self.commandLineDefaults["ALLOWEDAUTO"] = 500
136 self.commandLineDefaults["AUTOCPUS"] = 16
137 self.commandLineDefaults["MINAUTOCPUS"] = 15
138 self.commandLineDefaults["QUEUE"] = self.opts.queue
139 self.commandLineDefaults["NODESET"] = self.opts.nodeset
140 self.load()
142 def createUniqueIdentifier(self):
143 """Creates a unique file identifier, based on the user's name
144 and the time at which this method is invoked.
146 Returns
147 -------
148 ident : `str`
149 the new identifier
150 """
151 # This naming scheme follows the conventions used for creating
152 # RUNID names. We've found this allows these files to be more
153 # easily located and shared with other users when debugging
154 # The tempfile.mkstemp method restricts the file to only the user,
155 # and does not guarantee a file name can that easily be identified.
156 now = datetime.now()
157 self.defaults["DATE_STRING"] = f"{now.year:02d}_{now.month:02d}{now.day:02d}"
158 username = pwd.getpwuid(os.geteuid()).pw_name
159 ident = (
160 f"{username}_{now.year:02d}_{now.month:02d}{now.day:02d}_"
161 f"{now.hour:02d}{now.minute:02d}{now.second:02d}"
162 )
163 return ident
165 def load(self):
166 """Loads all values from configuration and command line overrides into
167 data structures suitable for use by the TemplateWriter object.
168 """
169 tempLocalScratch = Template(self.configuration.platform.localScratch)
170 self.defaults["LOCAL_SCRATCH"] = tempLocalScratch.substitute(
171 USER_SCRATCH=self.defaults["USER_SCRATCH"]
172 )
173 self.defaults["SCHEDULER"] = self.configuration.platform.scheduler
175 def loadAllocationConfig(self, name: ResourcePathExpression, suffix):
176 """Loads all values from allocationConfig and command line overrides
177 into data structures suitable for use by the TemplateWriter object.
178 """
179 if not (name_ := ResourcePath(name)).exists():
180 raise RuntimeError(f"{name_} was not found.")
181 allocationConfig = AllocationConfig()
182 allocationConfig.load(name_)
184 self.defaults["QUEUE"] = allocationConfig.platform.queue
185 self.defaults["EMAIL_NOTIFICATION"] = allocationConfig.platform.email
186 self.defaults["HOST_NAME"] = allocationConfig.platform.loginHostName
188 self.defaults["UTILITY_PATH"] = allocationConfig.platform.utilityPath
190 if self.opts.glideinShutdown is None:
191 self.defaults["GLIDEIN_SHUTDOWN"] = str(allocationConfig.platform.glideinShutdown)
192 else:
193 self.defaults["GLIDEIN_SHUTDOWN"] = str(self.opts.glideinShutdown)
195 if self.opts.outputLog is not None:
196 self.defaults["OUTPUT_LOG"] = self.opts.outputLog
197 else:
198 self.defaults["OUTPUT_LOG"] = "glide.out"
200 if self.opts.errorLog is not None:
201 self.defaults["ERROR_LOG"] = self.opts.errorLog
202 else:
203 self.defaults["ERROR_LOG"] = "glide.err"
205 # This is the TOTAL number of cores in the job, not just the total
206 # of the cores you intend to use. In other words, the total available
207 # on a machine, times the number of machines.
208 totalCoresPerNode = allocationConfig.platform.totalCoresPerNode
209 self.commandLineDefaults["TOTAL_CORE_COUNT"] = self.opts.nodeCount * totalCoresPerNode
211 self.uniqueIdentifier = self.createUniqueIdentifier()
213 # write these pbs and config files to {LOCAL_DIR}/configs
214 self.configDir = os.path.join(
215 self.defaults["LOCAL_SCRATCH"],
216 self.defaults["DATE_STRING"],
217 self.uniqueIdentifier,
218 "configs",
219 )
221 self.submitFileName = os.path.join(self.configDir, f"alloc_{self.uniqueIdentifier}.{suffix}")
223 self.condorConfigFileName = os.path.join(self.configDir, f"condor_{self.uniqueIdentifier}.config")
225 self.defaults["GENERATED_CONFIG"] = os.path.basename(self.condorConfigFileName)
226 self.defaults["CONFIGURATION_ID"] = self.uniqueIdentifier
227 return allocationConfig
229 def createSubmitFile(self, inputFile):
230 """Creates a batch submit file using the file "input" as a Template
232 Returns
233 -------
234 outfile : `str`
235 The newly created file name
236 """
237 if not os.path.exists(self.configDir):
238 os.makedirs(self.configDir)
239 outfile = self.createFile(inputFile, self.submitFileName)
240 _LOG.debug("Wrote new submit file to %s", outfile)
241 return outfile
243 def createCondorConfigFile(self, input):
244 """Creates a Condor config file using the file "input" as a Template
246 Returns
247 -------
248 outfile : `str`
249 The newly created file name
250 """
251 outfile = self.createFile(input, self.condorConfigFileName)
252 _LOG.debug("Wrote new condor configuration file to %s", outfile)
253 return outfile
255 def createFile(self, input: ResourcePathExpression, output: ResourcePathExpression):
256 """Creates a new file, using "input" as a Template, and writes the
257 new file to output.
259 Returns
260 -------
261 outfile : `str`
262 The newly created file name
263 """
264 _LOG.debug("Creating file from template using %s", input)
265 template = TemplateWriter()
266 # Uses the associative arrays of "defaults" and "commandLineDefaults"
267 # to write out the new file from the template.
268 # The commandLineDefaults override values in "defaults"
269 substitutes = self.defaults.copy()
270 for key in self.commandLineDefaults:
271 val = self.commandLineDefaults[key]
272 if val is not None:
273 substitutes[key] = self.commandLineDefaults[key]
274 template.rewrite(input, output, substitutes)
275 return output
277 def isVerbose(self):
278 """Status of the verbose flag
279 @return True if the flag was set, False otherwise
280 """
281 return self.opts.verbose
283 def isAuto(self):
284 """Status of the auto flag
285 @return True if the flag was set, False otherwise
286 """
287 return self.opts.auto
289 def getUserName(self):
290 """Accessor for USER_NAME
291 @return the value of USER_NAME
292 """
293 return self.getParameter("USER_NAME")
295 def getUserHome(self):
296 """Accessor for USER_HOME
297 @return the value of USER_HOME
298 """
299 return self.getParameter("USER_HOME")
301 def getUserScratch(self):
302 """Accessor for USER_SCRATCH
303 @return the value of USER_SCRATCH
304 """
305 return self.getParameter("USER_SCRATCH")
307 def getHostName(self):
308 """Accessor for HOST_NAME
309 @return the value of HOST_NAME
310 """
311 return self.getParameter("HOST_NAME")
313 def getUtilityPath(self):
314 """Accessor for UTILITY_PATH
315 @return the value of UTILITY_PATH
316 """
317 return self.getParameter("UTILITY_PATH")
319 def getScratchDirectory(self):
320 """Accessor for SCRATCH_DIR
321 @return the value of SCRATCH_DIR
322 """
323 return self.getParameter("SCRATCH_DIR")
325 def getLocalScratchDirectory(self):
326 """Accessor for LOCAL_SCRATCH
327 @return the value of LOCAL_SCRATCH
328 """
329 return self.getParameter("LOCAL_SCRATCH")
331 def getNodeSetName(self):
332 """Accessor for NODE_SET
333 @return the value of NODE_SET
334 """
335 return self.getParameter("NODE_SET")
337 def getNodes(self):
338 """Accessor for NODE_COUNT
339 @return the value of NODE_COUNT
340 """
341 return self.getParameter("NODE_COUNT")
343 def getMemoryPerCore(self):
344 """Accessor for MemoryPerCore
345 @return the value of MemoryPerCore
346 """
347 return self.getParameter("MEMPERCORE")
349 def getAllowedAutoGlideins(self):
350 """Accessor for AllowedAutoGlideins
351 @return the value of AllowedAuto
352 """
353 return self.getParameter("ALLOWEDAUTO")
355 def getQOS(self):
356 """Accessor for QOS
357 @return the value of QOS
358 """
359 return self.getParameter("QOS")
361 def getCPUs(self):
362 """Accessor for CPUS
363 @return the value of CPUS
364 """
365 return self.getParameter("CPUS")
367 def getPeakcpus(self):
368 """Accessor for PEAKCPUS
369 @return the value of PEAKCPUS
370 """
371 return self.getParameter("PEAKCPUS")
373 def getPeakmemory(self):
374 """Accessor for PEAKMEMORY
375 @return the value of PEAKMEMORY
376 """
377 peakmemory = self.getParameter("PEAKMEMORY")
378 if self.opts.queue == "torino":
379 peakmemory = int(3 * peakmemory / 2)
380 return peakmemory
382 def getAutoCPUs(self):
383 """Size of standard glideins for allocateNodes auto
384 @return the value of autoCPUs
385 """
386 if self.getParameter("EXCLUSIVE"):
387 peakcpus = self.configuration.platform.peakcpus
388 return peakcpus
389 else:
390 return self.getParameter("AUTOCPUS")
392 def getMinAutoCPUs(self):
393 """Minimum Size of standard glideins for allocateNodes auto
394 @return the value of minAutoCPUs
395 """
396 return self.getParameter("MINAUTOCPUS")
398 def getCollector(self):
399 """Accessor for COLLECTOR
400 @return the value of COLLECTOR
401 """
402 return self.getParameter("COLLECTOR")
404 def getWallClock(self):
405 """Accessor for WALL_CLOCK
406 @return the value of WALL_CLOCK
407 """
408 return self.getParameter("WALL_CLOCK")
410 def getScheduler(self):
411 """Accessor for SCHEDULER
412 @return the value of SCHEDULER
413 """
414 return self.getParameter("SCHEDULER")
416 def getReservation(self):
417 """Accessor for RESERVATION
418 @return the value of RESERVATION
419 """
420 return self.getParameter("RESERVATION")
422 def getExclusive(self):
423 """Accessor for EXCLUSIVE
424 @return the value of EXCLUSIVE
425 """
426 return self.getParameter("EXCLUSIVE")
428 def getExcluser(self):
429 """Accessor for EXCLUSER
430 @return the value of EXCLUSER
431 """
432 return self.getParameter("EXCLUSER")
434 def getNodeset(self):
435 """Accessor for NODESET
436 @return the value of NODESET
437 """
438 return self.getParameter("NODESET")
440 def getParameter(self, value):
441 """Accessor for generic value
442 @return None if value is not set. Otherwise, use the command line
443 override (if set), or the default Config value
444 """
445 if value in self.commandLineDefaults:
446 return self.commandLineDefaults[value]
447 if value in self.defaults:
448 return self.defaults[value]
449 return None
451 def printNodeSetInfo(self):
452 nodes = self.getNodes()
453 cpus = self.getCPUs()
454 wallClock = self.getWallClock()
455 nodeString = ""
457 if int(nodes) > 1:
458 nodeString = "s"
459 if self.opts.dynamic is None:
460 print(
461 f"{nodes} glidein{nodeString} will be allocated on "
462 f"{self.platform} using default dynamic slots configuration."
463 )
464 print(f"There will be {cpus} cores per glidein and a maximum time limit of {wallClock}")
465 elif self.opts.dynamic == "__default__":
466 print(
467 f"{nodes} glidein{nodeString} will be allocated on {self.platform} "
468 "using default dynamic slots configuration."
469 )
470 print(f"There will be {cpus} cores per glidein and a maximum time limit of {wallClock}")
471 else:
472 print(
473 f"{nodes} node{nodeString} will be allocated on {self.platform} "
474 f"using dynamic slot block specified in '{self.opts.dynamic}'"
475 )
476 print(f"There will be {cpus} cores per node and maximum time limit of {wallClock}")
477 print("Node set name:")
478 print(self.getNodeSetName())
480 def runCommand(self, cmd, verbose):
481 cmd_split = cmd.split()
482 pid = os.fork()
483 if not pid:
484 # Methods of file transfer and login may
485 # produce different output, depending on how
486 # the "gsi" utilities are used. The user can
487 # either use grid proxies or ssh, and gsiscp/gsissh
488 # does the right thing. Since the output will be
489 # different in either case anything potentially parsing this
490 # output (like drpRun), would have to go through extra
491 # steps to deal with this output, and which ultimately
492 # end up not being useful. So we optinally close the i/o output
493 # of the executing command down.
494 #
495 # stdin/stdio/stderr is treated specially
496 # by python, so we have to close down
497 # both the python objects and the
498 # underlying c implementations
499 if not verbose:
500 # close python i/o
501 sys.stdin.close()
502 sys.stdout.close()
503 sys.stderr.close()
504 # close C's i/o
505 os.close(0)
506 os.close(1)
507 os.close(2)
508 os.execvp(cmd_split[0], cmd_split)
509 pid, status = os.wait()
510 # high order bits are status, low order bits are signal.
511 exitCode = (status & 0xFF00) >> 8
512 return exitCode
514 def submit(self):
515 """Submit the glidein jobs to the Batch system."""
516 raise NotImplementedError