Coverage for python / lsst / ctrl / bps / htcondor / handlers.py: 24%

139 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-22 09:01 +0000

1# This file is part of ctrl_bps_htcondor. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <https://www.gnu.org/licenses/>. 

27 

28"""Definitions of handlers of HTCondor job ClassAds.""" 

29 

30__all__ = [ 

31 "HTC_JOB_AD_HANDLERS", 

32 "Chain", 

33 "Handler", 

34 "JobAbortedByPeriodicRemoveHandler", 

35 "JobAbortedByUserHandler", 

36 "JobCompletedWithExecTicketHandler", 

37 "JobCompletedWithoutExecTicketHandler", 

38 "JobHeldByOtherHandler", 

39 "JobHeldBySignalHandler", 

40 "JobHeldByUserHandler", 

41] 

42 

43 

44import abc 

45import logging 

46import re 

47from collections.abc import Sequence 

48from typing import Any, overload 

49 

50_LOG = logging.getLogger(__name__) 

51 

52 

53class Handler(abc.ABC): 

54 """Abstract base class defining Handler interface.""" 

55 

56 @abc.abstractmethod 

57 def handle(self, ad: dict[str, Any]) -> dict[str, Any] | None: 

58 """Handle a ClassAd. 

59 

60 Parameters 

61 ---------- 

62 ad : `dict[`str`, Any]` 

63 The dictionary representing ClassAd that need to be processed. 

64 

65 Returns 

66 ------- 

67 ad : `dict[`str`, Any]` | None 

68 The dictionary representing ClassAd after processing and ``None`` 

69 if the handler was not able to process the ad. 

70 

71 Notes 

72 ----- 

73 To optimize the memory usage, the implementation of this method may 

74 modify the ClassAd in place. In such a case, the ClassAd returned by 

75 the method will be the same object that was passed to it as 

76 the argument, but including any modifications that were made. 

77 """ 

78 

79 

80class Chain(Sequence): 

81 """Class defining chaining of handlers. 

82 

83 Parameters 

84 ---------- 

85 handlers : `Sequence` [`Handler`] 

86 List of handlers that will be used to initialize the chain. 

87 """ 

88 

89 def __init__(self, handlers: Sequence[Handler] | None = None) -> None: 

90 self._handlers: list[Handler] = [] 

91 if handlers is not None: 91 ↛ exitline 91 didn't return from function '__init__' because the condition on line 91 was always true

92 for handler in handlers: 

93 self.append(handler) 

94 

95 @overload 

96 def __getitem__(self, index: int) -> Handler: ... 96 ↛ exitline 96 didn't return from function '__getitem__' because

97 @overload 

98 def __getitem__(self, index: slice) -> Sequence[Handler]: ... 98 ↛ exitline 98 didn't return from function '__getitem__' because

99 def __getitem__(self, index): 

100 return self._handlers[index] 

101 

102 def __len__(self) -> int: 

103 return len(self._handlers) 

104 

105 def append(self, handler: Handler) -> None: 

106 """Append a handler to the chain. 

107 

108 Parameters 

109 ---------- 

110 handler : `Handler` 

111 The handler that needs to be added to the chain. 

112 

113 Raises 

114 ------ 

115 TypeError 

116 Raised if the passed object in not a ``Handler``. 

117 """ 

118 if not isinstance(handler, Handler): 118 ↛ 119line 118 didn't jump to line 119 because the condition on line 118 was never true

119 raise TypeError(f"append() argument must be a 'Handler', not a '{type(handler)}'") 

120 self._handlers.append(handler) 

121 

122 def handle(self, ad: dict[str, Any]) -> dict[str, Any] | None: 

123 """Handle a ClassAd. 

124 

125 Parameters 

126 ---------- 

127 ad : `dict[`str`, Any]` 

128 The dictionary representing a ClassAd that need to be handled. 

129 

130 Returns 

131 ------- 

132 ad : `dict[`str`, Any]` 

133 A modified ClassAd if any handler in the chain was able to 

134 process the ad, None otherwise. 

135 """ 

136 new_ad = None 

137 for handler in self: 

138 try: 

139 new_ad = handler.handle(ad) 

140 except Exception as e: 

141 _LOG.warning( 

142 "Handler '%s' raised an exception while processing the ad: %s. " 

143 "Proceeding to the next handler (if any).", 

144 type(handler).__name__, 

145 repr(e), 

146 ) 

147 else: 

148 if new_ad is not None: 

149 break 

150 return new_ad 

151 

152 

153class JobCompletedWithExecTicketHandler(Handler): 

154 """Handler of ClassAds for completed jobs with the ticket of execution. 

155 

156 Usually, the entry in the event log for a completed job contains the ticket 

157 of execution -- a record describing how and when the job was terminated. 

158 If it exists, this handler will use it to add the attributes describing 

159 job's exit status. 

160 """ 

161 

162 def handle(self, ad: dict[str, Any]) -> dict[str, Any] | None: 

163 if not ad["MyType"].endswith("TerminatedEvent"): 

164 _LOG.debug( 

165 "Handler '%s': refusing to process the ad for the job '%s.%s': job not completed", 

166 self.__class__.__name__, 

167 ad["ClusterId"], 

168 ad["ProcId"], 

169 ) 

170 return None 

171 if "ToE" in ad: 

172 toe = ad["ToE"] 

173 ad["ExitBySignal"] = toe["ExitBySignal"] 

174 if ad["ExitBySignal"]: 

175 ad["ExitSignal"] = toe["ExitSignal"] 

176 else: 

177 ad["ExitCode"] = toe["ExitCode"] 

178 else: 

179 _LOG.debug( 

180 "%s: refusing to process the ad for the job '%s.%s': ticket of execution missing", 

181 self.__class__.__name__, 

182 ad["ClusterId"], 

183 ad["ProcId"], 

184 ) 

185 return None 

186 return ad 

187 

188 

189class JobCompletedWithoutExecTicketHandler(Handler): 

190 """Handler of ClassAds for completed jobs w/o the ticket of execution. 

191 

192 The entry in the event log for some completed jobs (e.g. jobs that run 

193 ``condor_dagman``) do *not* contain the ticket of execution -- a record 

194 describing how and when the job was terminated. This handler will try 

195 to use other existing attributes to add the ones describing job's exit 

196 status. 

197 """ 

198 

199 def handle(self, ad: dict[str, Any]) -> dict[str, Any] | None: 

200 if not ad["MyType"].endswith("TerminatedEvent"): 

201 _LOG.debug( 

202 "Handler '%s': refusing to process the ad for the job '%s.%s': job not completed", 

203 self.__class__.__name__, 

204 ad["ClusterId"], 

205 ad["ProcId"], 

206 ) 

207 return None 

208 if "ToE" not in ad: 

209 ad["ExitBySignal"] = not ad["TerminatedNormally"] 

210 if ad["ExitBySignal"]: 

211 ad["ExitSignal"] = ad["TerminatedBySignal"] 

212 else: 

213 ad["ExitCode"] = ad["ReturnValue"] 

214 else: 

215 _LOG.debug( 

216 "Handler %s: refusing to process the ad for the job '%s.%s': ticket of execution found", 

217 self.__class__.__name__, 

218 ad["ClusterId"], 

219 ad["ProcId"], 

220 ) 

221 return None 

222 return ad 

223 

224 

225class JobHeldByOtherHandler(Handler): 

226 """Handler of ClassAds for jobs put on hold.""" 

227 

228 def handle(self, ad: dict[str, Any]) -> dict[str, Any] | None: 

229 if not ad["MyType"].endswith("HeldEvent"): 

230 _LOG.debug( 

231 "Handler '%s': refusing to process the ad for the job '%s.%s': job not held", 

232 self.__class__.__name__, 

233 ad["ClusterId"], 

234 ad["ProcId"], 

235 ) 

236 return None 

237 if ad["HoldReasonCode"] not in {1, 3}: 

238 ad["ExitBySignal"] = False 

239 ad["ExitCode"] = ad["HoldReasonCode"] 

240 else: 

241 _LOG.debug( 

242 "Handler '%s': refusing to process the ad for the job '%s.%s': " 

243 "invalid hold reason code: HoldReasonCode = %s", 

244 self.__class__.__name__, 

245 ad["ClusterId"], 

246 ad["ProcId"], 

247 ad["HoldReasonCode"], 

248 ) 

249 return None 

250 return ad 

251 

252 

253class JobHeldBySignalHandler(Handler): 

254 """Handler of ClassAds for jobs put on hold by signals.""" 

255 

256 def handle(self, ad: dict[str, Any]) -> dict[str, Any] | None: 

257 if not ad["MyType"].endswith("HeldEvent"): 

258 _LOG.debug( 

259 "Handler '%s': refusing to process the ad for the job '%s.%s': job not held", 

260 self.__class__.__name__, 

261 ad["ClusterId"], 

262 ad["ProcId"], 

263 ) 

264 return None 

265 if ad["HoldReasonCode"] == 3: 

266 match = re.search(r"signal (\d+)", ad["HoldReason"]) 

267 if match is not None: 

268 ad["ExitBySignal"] = True 

269 ad["ExitSignal"] = match.group(1) 

270 else: 

271 _LOG.debug( 

272 "Handler '%s': refusing to process the ad for the job '%s.%s': " 

273 "signal not found: HoldReason = %s", 

274 self.__class__.__name__, 

275 ad["ClusterId"], 

276 ad["ProcId"], 

277 ad["HoldReason"], 

278 ) 

279 return None 

280 else: 

281 _LOG.debug( 

282 "Handler '%s': refusing to process the ad for the job '%s.%s': " 

283 "job not held by a signal: HoldReasonCode = %s, HoldReason = %s", 

284 self.__class__.__name__, 

285 ad["ClusterId"], 

286 ad["ProcId"], 

287 ad["HoldReasonCode"], 

288 ad["HoldReason"], 

289 ) 

290 return None 

291 return ad 

292 

293 

294class JobHeldByUserHandler(Handler): 

295 """Handler of ClassAds for jobs put on hold by the user.""" 

296 

297 def handle(self, ad: dict[str, Any]) -> dict[str, Any] | None: 

298 if not ad["MyType"].endswith("HeldEvent"): 

299 _LOG.debug( 

300 "Handler '%s': refusing to process the ad for the job '%s.%s': job not held", 

301 self.__class__.__name__, 

302 ad["ClusterId"], 

303 ad["ProcId"], 

304 ) 

305 return None 

306 if ad["HoldReasonCode"] == 1: 

307 ad["ExitBySignal"] = False 

308 ad["ExitCode"] = 0 

309 else: 

310 _LOG.debug( 

311 "Handler '%s': refusing to process the ad for the job '%s.%s': " 

312 "job not held by the user: HoldReasonCode = %s, HoldReason = %s", 

313 self.__class__.__name__, 

314 ad["ClusterId"], 

315 ad["ProcId"], 

316 ad["HoldReasonCode"], 

317 ad["HoldReason"], 

318 ) 

319 return None 

320 return ad 

321 

322 

323class JobAbortedByPeriodicRemoveHandler(Handler): 

324 """Handler of ClassAds for jobs deleted by periodic remove policy.""" 

325 

326 def handle(self, ad: dict[str, Any]) -> dict[str, Any] | None: 

327 if not ad["MyType"].endswith("AbortedEvent"): 

328 _LOG.debug( 

329 "Handler '%s': refusing to process the ad for the job '%s.%s': job not removed", 

330 self.__class__.__name__, 

331 ad["ClusterId"], 

332 ad["ProcId"], 

333 ) 

334 return None 

335 if "Reason" in ad: 

336 if "PeriodicRemove" in ad["Reason"]: 

337 ad["ExitBySignal"] = True 

338 

339 ad["ExitSignal"] = -1 

340 if "HoldReason" in ad: 

341 match = re.search(r"signal (\d+)", ad["HoldReason"]) 

342 if match is not None: 

343 ad["ExitSignal"] = int(match.group(1)) 

344 

345 else: 

346 _LOG.debug( 

347 "Handler '%s': refusing to process the ad for the job '%s.%s': " 

348 "job was not removed by the periodic removal policy: Reason = %s", 

349 self.__class__.__name__, 

350 ad["ClusterId"], 

351 ad["ProcId"], 

352 ad["Reason"], 

353 ) 

354 return None 

355 else: 

356 _LOG.debug( 

357 "Handler '%s': refusing to process the ad for the job '%s.%s': " 

358 "unable to determine the reason for the removal.", 

359 self.__class__.__name__, 

360 ad["ClusterId"], 

361 ad["ProcId"], 

362 ) 

363 return None 

364 return ad 

365 

366 

367class JobAbortedByUserHandler(Handler): 

368 """Handler of ClassAds for jobs deleted by the user.""" 

369 

370 def handle(self, ad: dict[str, Any]) -> dict[str, Any] | None: 

371 if not ad["MyType"].endswith("AbortedEvent"): 

372 _LOG.debug( 

373 "Handler '%s': refusing to process the ad for the job '%s.%s': job not removed", 

374 self.__class__.__name__, 

375 ad["ClusterId"], 

376 ad["ProcId"], 

377 ) 

378 return None 

379 if "Reason" in ad: 

380 patterns = ( 

381 "Python-initiated action", # DAGMan job removed by the user 

382 "DAG Removed", # payload job removed by the user 

383 "OtherJobRemoveRequirements", # a subdag job removed by the user 

384 ) 

385 for patt in patterns: 

386 if patt in ad["Reason"]: 

387 ad["ExitBySignal"] = False 

388 ad["ExitCode"] = 0 

389 break 

390 else: 

391 _LOG.debug( 

392 "Handler '%s': refusing to process the ad for the job '%s.%s': " 

393 "job not removed by the user: Reason = %s", 

394 self.__class__.__name__, 

395 ad["ClusterId"], 

396 ad["ProcId"], 

397 ad["Reason"], 

398 ) 

399 return None 

400 else: 

401 _LOG.debug( 

402 "Handler '%s': refusing to process the ad for the job '%s.%s': " 

403 "unable to determine the reason for the removal.", 

404 self.__class__.__name__, 

405 ad["ClusterId"], 

406 ad["ProcId"], 

407 ) 

408 return None 

409 return ad 

410 

411 

412_handlers = [ 

413 JobAbortedByPeriodicRemoveHandler(), 

414 JobAbortedByUserHandler(), 

415 JobHeldByUserHandler(), 

416 JobHeldBySignalHandler(), 

417 JobHeldByOtherHandler(), 

418 JobCompletedWithExecTicketHandler(), 

419 JobCompletedWithoutExecTicketHandler(), 

420] 

421HTC_JOB_AD_HANDLERS = Chain(handlers=_handlers)