Coverage for tests / test_handlers.py: 24%

286 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-26 09:05 +0000

1# This file is part of ctrl_bps_htcondor. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <https://www.gnu.org/licenses/>. 

27 

28"""Unit tests for job ClassAd handlers.""" 

29 

30import logging 

31import unittest 

32from typing import Any 

33 

34from lsst.ctrl.bps.htcondor.handlers import ( 

35 Chain, 

36 Handler, 

37 JobAbortedByPeriodicRemoveHandler, 

38 JobAbortedByUserHandler, 

39 JobCompletedWithExecTicketHandler, 

40 JobCompletedWithoutExecTicketHandler, 

41 JobHeldByOtherHandler, 

42 JobHeldBySignalHandler, 

43 JobHeldByUserHandler, 

44) 

45 

46logger = logging.getLogger("lsst.ctrl.bps.htcondor") 

47 

48 

49class DummyHandler(Handler): 

50 """A concrete handler that does nothing.""" 

51 

52 def handle(self, ad: dict[str, Any]) -> dict[str, Any]: 

53 pass 

54 

55 

56class RaisingHandler(Handler): 

57 """A concrete handler that raises KeyError exception.""" 

58 

59 def handle(self, ad: dict[str, Any]) -> dict[str, Any]: 

60 raise KeyError("foo") 

61 

62 

63class ChainTestCase(unittest.TestCase): 

64 """Test the Chain class.""" 

65 

66 def setUp(self): 

67 pass 

68 

69 def tearDown(self): 

70 pass 

71 

72 def testDefaultInitialization(self): 

73 chain = Chain() 

74 self.assertEqual(len(chain), 0) 

75 

76 def testCustomInitialization(self): 

77 handler = DummyHandler() 

78 chain = Chain(handlers=[handler]) 

79 self.assertEqual(len(chain), 1) 

80 self.assertIs(chain[0], handler) 

81 

82 def testAppendingHandler(self): 

83 first = DummyHandler() 

84 second = DummyHandler() 

85 chain = Chain(handlers=[first]) 

86 chain.append(second) 

87 self.assertEqual(len(chain), 2) 

88 self.assertIs(chain[0], first) 

89 self.assertIs(chain[1], second) 

90 

91 def testAppendingNonHandler(self): 

92 handler = "foo" 

93 chain = Chain() 

94 with self.assertRaises(TypeError): 

95 chain.append(handler) 

96 

97 

98class JobCompletedWithExecTicketHandlerTestCase(unittest.TestCase): 

99 """Test the handler for a completed job with the ticket of execution.""" 

100 

101 def setUp(self): 

102 self.ad = {"ClusterId": 1, "ProcId": 0, "MyType": "JobTerminatedEvent"} 

103 self.handler = JobCompletedWithExecTicketHandler() 

104 

105 def tearDown(self): 

106 pass 

107 

108 def testNormalTermination(self): 

109 ad = self.ad | {"ToE": {"ExitBySignal": False, "ExitCode": 0}} 

110 result = self.handler.handle(ad) 

111 self.assertIsNotNone(result) 

112 self.assertIn("ExitBySignal", result) 

113 self.assertFalse(result["ExitBySignal"]) 

114 self.assertIn("ExitCode", result) 

115 self.assertEqual(result["ExitCode"], 0) 

116 

117 def testAbnormalTermination(self): 

118 ad = self.ad | {"ToE": {"ExitBySignal": True, "ExitSignal": 9}} 

119 result = self.handler.handle(ad) 

120 self.assertIsNotNone(result) 

121 self.assertIn("ExitBySignal", result) 

122 self.assertTrue(result["ExitBySignal"]) 

123 self.assertIn("ExitSignal", result) 

124 self.assertEqual(result["ExitSignal"], 9) 

125 

126 def testNotHandlingMissingExecTicket(self): 

127 with self.assertLogs(logger=logger, level="DEBUG") as cm: 

128 result = self.handler.handle(self.ad) 

129 self.assertIsNone(result) 

130 self.assertIn("ticket of execution", cm.output[0]) 

131 self.assertIn("missing", cm.output[0]) 

132 

133 def testNotHandlingJobNotCompleted(self): 

134 ad = self.ad | {"MyType": "foo"} 

135 with self.assertLogs(logger=logger, level="DEBUG") as cm: 

136 result = self.handler.handle(ad) 

137 self.assertIsNone(result) 

138 self.assertIn("job not completed", cm.output[0]) 

139 

140 

141class JobCompletedWithoutExecTicketHandlerTestCase(unittest.TestCase): 

142 """Test the handler for a completed job w/o the ticket of execution.""" 

143 

144 def setUp(self): 

145 self.ad = {"ClusterId": 1, "ProcId": 0, "MyType": "JobTerminatedEvent"} 

146 self.handler = JobCompletedWithoutExecTicketHandler() 

147 

148 def tearDown(self): 

149 pass 

150 

151 def testNormalTermination(self): 

152 ad = self.ad | {"TerminatedNormally": True, "ReturnValue": 0} 

153 result = self.handler.handle(ad) 

154 self.assertIsNotNone(result) 

155 self.assertIn("ExitBySignal", result) 

156 self.assertFalse(result["ExitBySignal"]) 

157 self.assertIn("ExitCode", result) 

158 self.assertEqual(result["ExitCode"], 0) 

159 

160 def testAbnormalTermination(self): 

161 ad = self.ad | {"TerminatedNormally": False, "TerminatedBySignal": 9} 

162 result = self.handler.handle(ad) 

163 self.assertIsNotNone(result) 

164 self.assertIn("ExitBySignal", result) 

165 self.assertTrue(result["ExitBySignal"]) 

166 self.assertIn("ExitSignal", result) 

167 self.assertEqual(result["ExitSignal"], 9) 

168 

169 def testNotHandlingExecTicketExists(self): 

170 ad = self.ad | {"ToE": {"ExitBySignal": False, "ExitCode": 0}} 

171 with self.assertLogs(logger=logger, level="DEBUG") as cm: 

172 result = self.handler.handle(ad) 

173 self.assertIsNone(result) 

174 self.assertIn("ticket of execution", cm.output[0]) 

175 self.assertIn("found", cm.output[0]) 

176 

177 def testNotHandlingJobNotCompleted(self): 

178 ad = self.ad | {"MyType": "foo"} 

179 with self.assertLogs(logger=logger, level="DEBUG") as cm: 

180 result = self.handler.handle(ad) 

181 self.assertIsNone(result) 

182 self.assertIn("job not completed", cm.output[0]) 

183 

184 

185class JobHeldOtherTestCase(unittest.TestCase): 

186 """Test the handler for a held job.""" 

187 

188 def setUp(self): 

189 self.ad = {"ClusterId": 1, "ProcId": 0, "MyType": "JobHeldEvent"} 

190 self.handler = JobHeldByOtherHandler() 

191 

192 def tearDown(self): 

193 pass 

194 

195 def testHeld(self): 

196 ad = self.ad | {"HoldReasonCode": 42} 

197 result = self.handler.handle(ad) 

198 self.assertIsNotNone(result) 

199 self.assertIn("ExitBySignal", result) 

200 self.assertFalse(result["ExitBySignal"]) 

201 self.assertIn("ExitCode", result) 

202 self.assertEqual(result["ExitCode"], 42) 

203 

204 def testHeldBySignal(self): 

205 ad = self.ad | {"HoldReasonCode": 3} 

206 with self.assertLogs(logger=logger, level="DEBUG") as cm: 

207 result = self.handler.handle(ad) 

208 self.assertIsNone(result) 

209 self.assertIn("invalid hold reason code", cm.output[0]) 

210 self.assertIn("HoldReasonCode = 3", cm.output[0]) 

211 

212 def testHeldByUser(self): 

213 ad = self.ad | {"HoldReasonCode": 1} 

214 with self.assertLogs(logger=logger, level="DEBUG") as cm: 

215 result = self.handler.handle(ad) 

216 self.assertIsNone(result) 

217 self.assertIn("invalid hold reason code", cm.output[0]) 

218 self.assertIn("HoldReasonCode = 1", cm.output[0]) 

219 

220 def testNotHandlingJobNotHeld(self): 

221 ad = self.ad | {"MyType": "foo"} 

222 with self.assertLogs(logger=logger, level="DEBUG") as cm: 

223 result = self.handler.handle(ad) 

224 self.assertIsNone(result) 

225 self.assertIn("job not held", cm.output[0]) 

226 

227 

228class JobHeldBySignalHandlerTestCase(unittest.TestCase): 

229 """Test the handler for a job held by a signal.""" 

230 

231 def setUp(self): 

232 self.ad = {"ClusterId": 1, "ProcId": 0, "MyType": "JobHeldEvent"} 

233 self.handler = JobHeldBySignalHandler() 

234 

235 def tearDown(self): 

236 pass 

237 

238 def testSignalAvailable(self): 

239 ad = self.ad | {"HoldReasonCode": 3, "HoldReason": "Job raised a signal 9."} 

240 result = self.handler.handle(ad) 

241 self.assertIsNotNone(ad) 

242 self.assertIn("ExitBySignal", result) 

243 self.assertTrue(result["ExitBySignal"]) 

244 self.assertIn("ExitSignal", result) 

245 self.assertEqual(int(result["ExitSignal"]), 9) 

246 

247 def testSignalNotAvailable(self): 

248 ad = self.ad | {"HoldReasonCode": 3, "HoldReason": "foo"} 

249 with self.assertLogs(logger=logger, level="DEBUG") as cm: 

250 result = self.handler.handle(ad) 

251 self.assertIsNone(result) 

252 self.assertIn("signal not found", cm.output[0]) 

253 

254 def testNotHandlingInvalidHoldReasonCode(self): 

255 ad = self.ad | {"HoldReasonCode": 1, "HoldReason": "via condor_hold (by user foo)"} 

256 with self.assertLogs(logger=logger, level="DEBUG") as cm: 

257 result = self.handler.handle(ad) 

258 self.assertIsNone(result) 

259 self.assertIn("not held by a signal", cm.output[0]) 

260 

261 def testNotHandlingJobNotHeld(self): 

262 ad = self.ad | {"MyType": "foo"} 

263 with self.assertLogs(logger=logger, level="DEBUG") as cm: 

264 result = self.handler.handle(ad) 

265 self.assertIsNone(result) 

266 self.assertIn("job not held", cm.output[0]) 

267 

268 

269class JobHeldByUserHandlerTestCase(unittest.TestCase): 

270 """Test the handler for a job held by the user.""" 

271 

272 def setUp(self): 

273 self.ad = {"ClusterId": 1, "ProcId": 0, "MyType": "JobHeldEvent"} 

274 self.handler = JobHeldByUserHandler() 

275 

276 def tearDown(self): 

277 pass 

278 

279 def testHandling(self): 

280 ad = self.ad | {"HoldReasonCode": 1} 

281 result = self.handler.handle(ad) 

282 self.assertIsNotNone(result) 

283 self.assertIn("ExitBySignal", result) 

284 self.assertFalse(result["ExitBySignal"]) 

285 self.assertIn("ExitCode", result) 

286 self.assertEqual(result["ExitCode"], 0) 

287 

288 def testNotHandlingInvalidHoldReaconCode(self): 

289 ad = self.ad | {"HoldReasonCode": 3, "HoldReason": "Job raised a signal 9."} 

290 with self.assertLogs(logger=logger, level="DEBUG") as cm: 

291 result = self.handler.handle(ad) 

292 self.assertIsNone(result) 

293 self.assertIn("not held by the user", cm.output[0]) 

294 

295 def testNotHandlingJobNotHeld(self): 

296 ad = self.ad | {"MyType": "foo"} 

297 with self.assertLogs(logger=logger, level="DEBUG") as cm: 

298 result = self.handler.handle(ad) 

299 self.assertIsNone(result) 

300 self.assertIn("job not held", cm.output[0]) 

301 

302 

303class JobAbortedByPeriodicRemoveHandlerTestCase(unittest.TestCase): 

304 """Test the handler for jobs deleted by periodic removal policy.""" 

305 

306 def setUp(self): 

307 self.ad = { 

308 "ClusterId": 1, 

309 "ProcId": 0, 

310 "MyType": "JobAbortedEvent", 

311 "Reason": "The job attribute PeriodicRemove expression 'foo' evaluated to TRUE", 

312 } 

313 self.handler = JobAbortedByPeriodicRemoveHandler() 

314 

315 def tearDown(self): 

316 pass 

317 

318 def testHandling(self): 

319 self.ad |= {"HoldReason": "Job raised a signal 9."} 

320 result = self.handler.handle(self.ad) 

321 self.assertIn("ExitBySignal", result) 

322 self.assertTrue(result["ExitBySignal"]) 

323 self.assertIn("ExitSignal", result) 

324 self.assertEqual(result["ExitSignal"], 9) 

325 

326 def testHandlingWithHoldReasonNoExitSignal(self): 

327 self.ad |= {"HoldReason": "Job raised a signal."} 

328 result = self.handler.handle(self.ad) 

329 self.assertIn("ExitBySignal", result) 

330 self.assertTrue(result["ExitBySignal"]) 

331 self.assertIn("ExitSignal", result) 

332 self.assertEqual(result["ExitSignal"], -1) 

333 

334 def testHandlingWithoutHoldReason(self): 

335 result = self.handler.handle(self.ad) 

336 self.assertIn("ExitBySignal", result) 

337 self.assertTrue(result["ExitBySignal"]) 

338 self.assertIn("ExitSignal", result) 

339 self.assertEqual(result["ExitSignal"], -1) 

340 

341 def testNotHandlingJobNotRemoved(self): 

342 self.ad["MyType"] = "foo" 

343 with self.assertLogs(logger=logger, level="DEBUG") as cm: 

344 result = self.handler.handle(self.ad) 

345 self.assertIsNone(result) 

346 self.assertIn("job not removed", cm.output[0]) 

347 

348 def testNotHandlingJobNotRemovedByPeriodicRemoval(self): 

349 self.ad["Reason"] = "DAG Abort" 

350 with self.assertLogs(logger=logger, level="DEBUG") as cm: 

351 result = self.handler.handle(self.ad) 

352 self.assertIsNone(result) 

353 self.assertIn("not removed by the periodic removal policy", cm.output[0]) 

354 

355 def testNotHandlingNoReason(self): 

356 del self.ad["Reason"] 

357 with self.assertLogs(logger=logger, level="DEBUG") as cm: 

358 result = self.handler.handle(self.ad) 

359 self.assertIsNone(result) 

360 self.assertIn("unable to determine the reason", cm.output[0]) 

361 

362 

363class JobAbortedByUserHandlerTestCase(unittest.TestCase): 

364 """Test the handler for jobs deleted by the user.""" 

365 

366 def setUp(self): 

367 self.ad = { 

368 "ClusterId": 1, 

369 "ProcId": 0, 

370 "MyType": "JobAbortedEvent", 

371 } 

372 self.handler = JobAbortedByUserHandler() 

373 

374 def tearDown(self): 

375 pass 

376 

377 def testHandlingAbortedDagmanJob(self): 

378 self.ad |= {"Reason": "Python-initiated action"} 

379 result = self.handler.handle(self.ad) 

380 self.assertIn("ExitBySignal", result) 

381 self.assertFalse(result["ExitBySignal"]) 

382 self.assertIn("ExitCode", result) 

383 self.assertEqual(result["ExitCode"], 0) 

384 

385 def testHandlingAbortedPayloadJob(self): 

386 self.ad |= {"Reason": "DAG Removed"} 

387 result = self.handler.handle(self.ad) 

388 self.assertIn("ExitBySignal", result) 

389 self.assertFalse(result["ExitBySignal"]) 

390 self.assertIn("ExitCode", result) 

391 self.assertEqual(result["ExitCode"], 0) 

392 

393 def testHandlingAbortedSubdagJob(self): 

394 self.ad |= {"Reason": "OtherJobRemoveRequirements = DAGManJobId =?= 78"} 

395 result = self.handler.handle(self.ad) 

396 self.assertIn("ExitBySignal", result) 

397 self.assertFalse(result["ExitBySignal"]) 

398 self.assertIn("ExitCode", result) 

399 self.assertEqual(result["ExitCode"], 0) 

400 

401 def testNotHandlingJobNotRemoved(self): 

402 self.ad["MyType"] = "foo" 

403 with self.assertLogs(logger=logger, level="DEBUG") as cm: 

404 result = self.handler.handle(self.ad) 

405 self.assertIsNone(result) 

406 self.assertIn("job not removed", cm.output[0]) 

407 

408 def testNotHandlingJobNotRemovedByUser(self): 

409 self.ad |= {"Reason": "The job attribute PeriodicRemove expression 'foo' evaluated to TRUE"} 

410 with self.assertLogs(logger=logger, level="DEBUG") as cm: 

411 result = self.handler.handle(self.ad) 

412 self.assertIsNone(result) 

413 self.assertIn("job not removed", cm.output[0]) 

414 

415 def testNotHandlingNoReason(self): 

416 with self.assertLogs(logger=logger, level="DEBUG") as cm: 

417 result = self.handler.handle(self.ad) 

418 self.assertIsNone(result) 

419 self.assertIn("unable to determine the reason", cm.output[0])