Coverage for python / lsst / ctrl / execute / allocator.py: 18%

220 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-18 08:53 +0000

1# This file is part of ctrl_execute. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <https://www.gnu.org/licenses/>. 

27 

28import logging 

29import os 

30import pwd 

31import sys 

32from datetime import datetime 

33from string import Template 

34 

35from lsst.ctrl.execute.allocationConfig import AllocationConfig 

36from lsst.ctrl.execute.condorInfoConfig import CondorInfoConfig 

37from lsst.ctrl.execute.templateWriter import TemplateWriter 

38from lsst.resources import ResourcePath, ResourcePathExpression 

39 

40_LOG = logging.getLogger(__name__) 

41 

42 

43class Allocator: 

44 """A class which consolidates allocation pex_config information with 

45 override information (obtained from the command line) and produces a 

46 PBS file using these values. 

47 

48 Parameters 

49 ---------- 

50 platform : `str` 

51 the name of the platform to execute on 

52 opts : `Config` 

53 Config object containing options 

54 condorInfoFileName : `lsst.resources.ResourcePathExpression` 

55 Name of the file containing Config information 

56 

57 Raises 

58 ------ 

59 TypeError 

60 If the condorInfoFileName is the wrong type. 

61 """ 

62 

63 def __init__( 

64 self, 

65 platform: str, 

66 opts, 

67 configuration, 

68 condorInfoFileName: ResourcePathExpression, 

69 ): 

70 """Constructor 

71 @param platform: target platform for PBS submission 

72 @param opts: options to override 

73 """ 

74 self.opts = opts 

75 self.defaults = {} 

76 self.configuration = configuration 

77 

78 condorInfoConfig = CondorInfoConfig() 

79 condorInfoConfig.load(condorInfoFileName) 

80 

81 self.platform = platform 

82 

83 # Look up the user's name and home and scratch directory in the 

84 # $HOME/.lsst/condor-info.py file 

85 user_name = None 

86 user_home = None 

87 user_scratch = None 

88 for name in condorInfoConfig.platform: 

89 if name == self.platform: 

90 user_name = condorInfoConfig.platform[name].user.name 

91 user_home = condorInfoConfig.platform[name].user.home 

92 user_scratch = condorInfoConfig.platform[name].user.scratch 

93 if user_scratch is None and "SCRATCH" in os.environ: 

94 user_scratch = os.environ["SCRATCH"] 

95 if user_name is None: 

96 raise RuntimeError( 

97 f"error: {condorInfoFileName} does not specify user name for platform == {self.platform}" 

98 ) 

99 if user_home is None: 

100 raise RuntimeError( 

101 f"error: {condorInfoFileName} does not specify user home for platform == {self.platform}" 

102 ) 

103 if user_scratch is None: 

104 raise RuntimeError( 

105 f"error: {condorInfoFileName} does not specify user scratch for platform == {self.platform}" 

106 ) 

107 self.defaults["USER_NAME"] = user_name 

108 self.defaults["USER_HOME"] = user_home 

109 self.defaults["USER_SCRATCH"] = user_scratch 

110 self.commandLineDefaults = {} 

111 self.commandLineDefaults["NODE_COUNT"] = self.opts.nodeCount 

112 if self.configuration.platform.collector: 

113 self.commandLineDefaults["COLLECTOR"] = self.configuration.platform.collector 

114 if self.opts.collector: 

115 self.commandLineDefaults["COLLECTOR"] = self.opts.collector 

116 self.commandLineDefaults["CPORT"] = self.opts.collectorport 

117 if self.configuration.platform.peakcpus: 

118 self.commandLineDefaults["PEAKCPUS"] = self.configuration.platform.peakcpus 

119 else: 

120 self.commandLineDefaults["PEAKCPUS"] = 256 

121 if self.configuration.platform.peakmemory: 

122 self.commandLineDefaults["PEAKMEMORY"] = self.configuration.platform.peakmemory 

123 else: 

124 self.commandLineDefaults["PEAKMEMORY"] = 1000000 

125 if self.opts.exclusive: 

126 self.commandLineDefaults["CPUS"] = self.configuration.platform.peakcpus 

127 else: 

128 if self.opts.cpus < self.configuration.platform.peakcpus: 

129 self.commandLineDefaults["CPUS"] = self.opts.cpus 

130 else: 

131 self.commandLineDefaults["CPUS"] = self.configuration.platform.peakcpus 

132 self.commandLineDefaults["WALL_CLOCK"] = self.opts.maximumWallClock 

133 self.commandLineDefaults["ACCOUNT"] = self.opts.account 

134 self.commandLineDefaults["MEMPERCORE"] = self.opts.mempercore 

135 self.commandLineDefaults["ALLOWEDAUTO"] = 500 

136 self.commandLineDefaults["AUTOCPUS"] = 16 

137 self.commandLineDefaults["MINAUTOCPUS"] = 15 

138 self.commandLineDefaults["QUEUE"] = self.opts.queue 

139 self.commandLineDefaults["NODESET"] = self.opts.nodeset 

140 self.load() 

141 

142 def createUniqueIdentifier(self): 

143 """Creates a unique file identifier, based on the user's name 

144 and the time at which this method is invoked. 

145 

146 Returns 

147 ------- 

148 ident : `str` 

149 the new identifier 

150 """ 

151 # This naming scheme follows the conventions used for creating 

152 # RUNID names. We've found this allows these files to be more 

153 # easily located and shared with other users when debugging 

154 # The tempfile.mkstemp method restricts the file to only the user, 

155 # and does not guarantee a file name can that easily be identified. 

156 now = datetime.now() 

157 self.defaults["DATE_STRING"] = f"{now.year:02d}_{now.month:02d}{now.day:02d}" 

158 username = pwd.getpwuid(os.geteuid()).pw_name 

159 ident = ( 

160 f"{username}_{now.year:02d}_{now.month:02d}{now.day:02d}_" 

161 f"{now.hour:02d}{now.minute:02d}{now.second:02d}" 

162 ) 

163 return ident 

164 

165 def load(self): 

166 """Loads all values from configuration and command line overrides into 

167 data structures suitable for use by the TemplateWriter object. 

168 """ 

169 tempLocalScratch = Template(self.configuration.platform.localScratch) 

170 self.defaults["LOCAL_SCRATCH"] = tempLocalScratch.substitute( 

171 USER_SCRATCH=self.defaults["USER_SCRATCH"] 

172 ) 

173 self.defaults["SCHEDULER"] = self.configuration.platform.scheduler 

174 

175 def loadAllocationConfig(self, name: ResourcePathExpression, suffix): 

176 """Loads all values from allocationConfig and command line overrides 

177 into data structures suitable for use by the TemplateWriter object. 

178 """ 

179 if not (name_ := ResourcePath(name)).exists(): 

180 raise RuntimeError(f"{name_} was not found.") 

181 allocationConfig = AllocationConfig() 

182 allocationConfig.load(name_) 

183 

184 self.defaults["QUEUE"] = allocationConfig.platform.queue 

185 self.defaults["EMAIL_NOTIFICATION"] = allocationConfig.platform.email 

186 self.defaults["HOST_NAME"] = allocationConfig.platform.loginHostName 

187 

188 self.defaults["UTILITY_PATH"] = allocationConfig.platform.utilityPath 

189 

190 if self.opts.glideinShutdown is None: 

191 self.defaults["GLIDEIN_SHUTDOWN"] = str(allocationConfig.platform.glideinShutdown) 

192 else: 

193 self.defaults["GLIDEIN_SHUTDOWN"] = str(self.opts.glideinShutdown) 

194 

195 if self.opts.outputLog is not None: 

196 self.defaults["OUTPUT_LOG"] = self.opts.outputLog 

197 else: 

198 self.defaults["OUTPUT_LOG"] = "glide.out" 

199 

200 if self.opts.errorLog is not None: 

201 self.defaults["ERROR_LOG"] = self.opts.errorLog 

202 else: 

203 self.defaults["ERROR_LOG"] = "glide.err" 

204 

205 # This is the TOTAL number of cores in the job, not just the total 

206 # of the cores you intend to use. In other words, the total available 

207 # on a machine, times the number of machines. 

208 totalCoresPerNode = allocationConfig.platform.totalCoresPerNode 

209 self.commandLineDefaults["TOTAL_CORE_COUNT"] = self.opts.nodeCount * totalCoresPerNode 

210 

211 self.uniqueIdentifier = self.createUniqueIdentifier() 

212 

213 # write these pbs and config files to {LOCAL_DIR}/configs 

214 self.configDir = os.path.join( 

215 self.defaults["LOCAL_SCRATCH"], 

216 self.defaults["DATE_STRING"], 

217 self.uniqueIdentifier, 

218 "configs", 

219 ) 

220 

221 self.submitFileName = os.path.join(self.configDir, f"alloc_{self.uniqueIdentifier}.{suffix}") 

222 

223 self.condorConfigFileName = os.path.join(self.configDir, f"condor_{self.uniqueIdentifier}.config") 

224 

225 self.defaults["GENERATED_CONFIG"] = os.path.basename(self.condorConfigFileName) 

226 self.defaults["CONFIGURATION_ID"] = self.uniqueIdentifier 

227 return allocationConfig 

228 

229 def createSubmitFile(self, inputFile): 

230 """Creates a batch submit file using the file "input" as a Template 

231 

232 Returns 

233 ------- 

234 outfile : `str` 

235 The newly created file name 

236 """ 

237 if not os.path.exists(self.configDir): 

238 os.makedirs(self.configDir) 

239 outfile = self.createFile(inputFile, self.submitFileName) 

240 _LOG.debug("Wrote new submit file to %s", outfile) 

241 return outfile 

242 

243 def createCondorConfigFile(self, input): 

244 """Creates a Condor config file using the file "input" as a Template 

245 

246 Returns 

247 ------- 

248 outfile : `str` 

249 The newly created file name 

250 """ 

251 outfile = self.createFile(input, self.condorConfigFileName) 

252 _LOG.debug("Wrote new condor configuration file to %s", outfile) 

253 return outfile 

254 

255 def createFile(self, input: ResourcePathExpression, output: ResourcePathExpression): 

256 """Creates a new file, using "input" as a Template, and writes the 

257 new file to output. 

258 

259 Returns 

260 ------- 

261 outfile : `str` 

262 The newly created file name 

263 """ 

264 _LOG.debug("Creating file from template using %s", input) 

265 template = TemplateWriter() 

266 # Uses the associative arrays of "defaults" and "commandLineDefaults" 

267 # to write out the new file from the template. 

268 # The commandLineDefaults override values in "defaults" 

269 substitutes = self.defaults.copy() 

270 for key in self.commandLineDefaults: 

271 val = self.commandLineDefaults[key] 

272 if val is not None: 

273 substitutes[key] = self.commandLineDefaults[key] 

274 template.rewrite(input, output, substitutes) 

275 return output 

276 

277 def isVerbose(self): 

278 """Status of the verbose flag 

279 @return True if the flag was set, False otherwise 

280 """ 

281 return self.opts.verbose 

282 

283 def isAuto(self): 

284 """Status of the auto flag 

285 @return True if the flag was set, False otherwise 

286 """ 

287 return self.opts.auto 

288 

289 def getUserName(self): 

290 """Accessor for USER_NAME 

291 @return the value of USER_NAME 

292 """ 

293 return self.getParameter("USER_NAME") 

294 

295 def getUserHome(self): 

296 """Accessor for USER_HOME 

297 @return the value of USER_HOME 

298 """ 

299 return self.getParameter("USER_HOME") 

300 

301 def getUserScratch(self): 

302 """Accessor for USER_SCRATCH 

303 @return the value of USER_SCRATCH 

304 """ 

305 return self.getParameter("USER_SCRATCH") 

306 

307 def getHostName(self): 

308 """Accessor for HOST_NAME 

309 @return the value of HOST_NAME 

310 """ 

311 return self.getParameter("HOST_NAME") 

312 

313 def getUtilityPath(self): 

314 """Accessor for UTILITY_PATH 

315 @return the value of UTILITY_PATH 

316 """ 

317 return self.getParameter("UTILITY_PATH") 

318 

319 def getScratchDirectory(self): 

320 """Accessor for SCRATCH_DIR 

321 @return the value of SCRATCH_DIR 

322 """ 

323 return self.getParameter("SCRATCH_DIR") 

324 

325 def getLocalScratchDirectory(self): 

326 """Accessor for LOCAL_SCRATCH 

327 @return the value of LOCAL_SCRATCH 

328 """ 

329 return self.getParameter("LOCAL_SCRATCH") 

330 

331 def getNodeSetName(self): 

332 """Accessor for NODE_SET 

333 @return the value of NODE_SET 

334 """ 

335 return self.getParameter("NODE_SET") 

336 

337 def getNodes(self): 

338 """Accessor for NODE_COUNT 

339 @return the value of NODE_COUNT 

340 """ 

341 return self.getParameter("NODE_COUNT") 

342 

343 def getMemoryPerCore(self): 

344 """Accessor for MemoryPerCore 

345 @return the value of MemoryPerCore 

346 """ 

347 return self.getParameter("MEMPERCORE") 

348 

349 def getAllowedAutoGlideins(self): 

350 """Accessor for AllowedAutoGlideins 

351 @return the value of AllowedAuto 

352 """ 

353 return self.getParameter("ALLOWEDAUTO") 

354 

355 def getQOS(self): 

356 """Accessor for QOS 

357 @return the value of QOS 

358 """ 

359 return self.getParameter("QOS") 

360 

361 def getCPUs(self): 

362 """Accessor for CPUS 

363 @return the value of CPUS 

364 """ 

365 return self.getParameter("CPUS") 

366 

367 def getPeakcpus(self): 

368 """Accessor for PEAKCPUS 

369 @return the value of PEAKCPUS 

370 """ 

371 return self.getParameter("PEAKCPUS") 

372 

373 def getPeakmemory(self): 

374 """Accessor for PEAKMEMORY 

375 @return the value of PEAKMEMORY 

376 """ 

377 peakmemory = self.getParameter("PEAKMEMORY") 

378 if self.opts.queue == "torino": 

379 peakmemory = int(3 * peakmemory / 2) 

380 return peakmemory 

381 

382 def getAutoCPUs(self): 

383 """Size of standard glideins for allocateNodes auto 

384 @return the value of autoCPUs 

385 """ 

386 if self.getParameter("EXCLUSIVE"): 

387 peakcpus = self.configuration.platform.peakcpus 

388 return peakcpus 

389 else: 

390 return self.getParameter("AUTOCPUS") 

391 

392 def getMinAutoCPUs(self): 

393 """Minimum Size of standard glideins for allocateNodes auto 

394 @return the value of minAutoCPUs 

395 """ 

396 return self.getParameter("MINAUTOCPUS") 

397 

398 def getCollector(self): 

399 """Accessor for COLLECTOR 

400 @return the value of COLLECTOR 

401 """ 

402 return self.getParameter("COLLECTOR") 

403 

404 def getWallClock(self): 

405 """Accessor for WALL_CLOCK 

406 @return the value of WALL_CLOCK 

407 """ 

408 return self.getParameter("WALL_CLOCK") 

409 

410 def getScheduler(self): 

411 """Accessor for SCHEDULER 

412 @return the value of SCHEDULER 

413 """ 

414 return self.getParameter("SCHEDULER") 

415 

416 def getReservation(self): 

417 """Accessor for RESERVATION 

418 @return the value of RESERVATION 

419 """ 

420 return self.getParameter("RESERVATION") 

421 

422 def getExclusive(self): 

423 """Accessor for EXCLUSIVE 

424 @return the value of EXCLUSIVE 

425 """ 

426 return self.getParameter("EXCLUSIVE") 

427 

428 def getExcluser(self): 

429 """Accessor for EXCLUSER 

430 @return the value of EXCLUSER 

431 """ 

432 return self.getParameter("EXCLUSER") 

433 

434 def getNodeset(self): 

435 """Accessor for NODESET 

436 @return the value of NODESET 

437 """ 

438 return self.getParameter("NODESET") 

439 

440 def getParameter(self, value): 

441 """Accessor for generic value 

442 @return None if value is not set. Otherwise, use the command line 

443 override (if set), or the default Config value 

444 """ 

445 if value in self.commandLineDefaults: 

446 return self.commandLineDefaults[value] 

447 if value in self.defaults: 

448 return self.defaults[value] 

449 return None 

450 

451 def printNodeSetInfo(self): 

452 nodes = self.getNodes() 

453 cpus = self.getCPUs() 

454 wallClock = self.getWallClock() 

455 nodeString = "" 

456 

457 if int(nodes) > 1: 

458 nodeString = "s" 

459 if self.opts.dynamic is None: 

460 print( 

461 f"{nodes} glidein{nodeString} will be allocated on " 

462 f"{self.platform} using default dynamic slots configuration." 

463 ) 

464 print(f"There will be {cpus} cores per glidein and a maximum time limit of {wallClock}") 

465 elif self.opts.dynamic == "__default__": 

466 print( 

467 f"{nodes} glidein{nodeString} will be allocated on {self.platform} " 

468 "using default dynamic slots configuration." 

469 ) 

470 print(f"There will be {cpus} cores per glidein and a maximum time limit of {wallClock}") 

471 else: 

472 print( 

473 f"{nodes} node{nodeString} will be allocated on {self.platform} " 

474 f"using dynamic slot block specified in '{self.opts.dynamic}'" 

475 ) 

476 print(f"There will be {cpus} cores per node and maximum time limit of {wallClock}") 

477 print("Node set name:") 

478 print(self.getNodeSetName()) 

479 

480 def runCommand(self, cmd, verbose): 

481 cmd_split = cmd.split() 

482 pid = os.fork() 

483 if not pid: 

484 # Methods of file transfer and login may 

485 # produce different output, depending on how 

486 # the "gsi" utilities are used. The user can 

487 # either use grid proxies or ssh, and gsiscp/gsissh 

488 # does the right thing. Since the output will be 

489 # different in either case anything potentially parsing this 

490 # output (like drpRun), would have to go through extra 

491 # steps to deal with this output, and which ultimately 

492 # end up not being useful. So we optinally close the i/o output 

493 # of the executing command down. 

494 # 

495 # stdin/stdio/stderr is treated specially 

496 # by python, so we have to close down 

497 # both the python objects and the 

498 # underlying c implementations 

499 if not verbose: 

500 # close python i/o 

501 sys.stdin.close() 

502 sys.stdout.close() 

503 sys.stderr.close() 

504 # close C's i/o 

505 os.close(0) 

506 os.close(1) 

507 os.close(2) 

508 os.execvp(cmd_split[0], cmd_split) 

509 pid, status = os.wait() 

510 # high order bits are status, low order bits are signal. 

511 exitCode = (status & 0xFF00) >> 8 

512 return exitCode 

513 

514 def submit(self): 

515 """Submit the glidein jobs to the Batch system.""" 

516 raise NotImplementedError