Coverage for tests / test_handlers.py: 24%
286 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 09:05 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 09:05 +0000
1# This file is part of ctrl_bps_htcondor.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <https://www.gnu.org/licenses/>.
28"""Unit tests for job ClassAd handlers."""
30import logging
31import unittest
32from typing import Any
34from lsst.ctrl.bps.htcondor.handlers import (
35 Chain,
36 Handler,
37 JobAbortedByPeriodicRemoveHandler,
38 JobAbortedByUserHandler,
39 JobCompletedWithExecTicketHandler,
40 JobCompletedWithoutExecTicketHandler,
41 JobHeldByOtherHandler,
42 JobHeldBySignalHandler,
43 JobHeldByUserHandler,
44)
46logger = logging.getLogger("lsst.ctrl.bps.htcondor")
49class DummyHandler(Handler):
50 """A concrete handler that does nothing."""
52 def handle(self, ad: dict[str, Any]) -> dict[str, Any]:
53 pass
56class RaisingHandler(Handler):
57 """A concrete handler that raises KeyError exception."""
59 def handle(self, ad: dict[str, Any]) -> dict[str, Any]:
60 raise KeyError("foo")
63class ChainTestCase(unittest.TestCase):
64 """Test the Chain class."""
66 def setUp(self):
67 pass
69 def tearDown(self):
70 pass
72 def testDefaultInitialization(self):
73 chain = Chain()
74 self.assertEqual(len(chain), 0)
76 def testCustomInitialization(self):
77 handler = DummyHandler()
78 chain = Chain(handlers=[handler])
79 self.assertEqual(len(chain), 1)
80 self.assertIs(chain[0], handler)
82 def testAppendingHandler(self):
83 first = DummyHandler()
84 second = DummyHandler()
85 chain = Chain(handlers=[first])
86 chain.append(second)
87 self.assertEqual(len(chain), 2)
88 self.assertIs(chain[0], first)
89 self.assertIs(chain[1], second)
91 def testAppendingNonHandler(self):
92 handler = "foo"
93 chain = Chain()
94 with self.assertRaises(TypeError):
95 chain.append(handler)
98class JobCompletedWithExecTicketHandlerTestCase(unittest.TestCase):
99 """Test the handler for a completed job with the ticket of execution."""
101 def setUp(self):
102 self.ad = {"ClusterId": 1, "ProcId": 0, "MyType": "JobTerminatedEvent"}
103 self.handler = JobCompletedWithExecTicketHandler()
105 def tearDown(self):
106 pass
108 def testNormalTermination(self):
109 ad = self.ad | {"ToE": {"ExitBySignal": False, "ExitCode": 0}}
110 result = self.handler.handle(ad)
111 self.assertIsNotNone(result)
112 self.assertIn("ExitBySignal", result)
113 self.assertFalse(result["ExitBySignal"])
114 self.assertIn("ExitCode", result)
115 self.assertEqual(result["ExitCode"], 0)
117 def testAbnormalTermination(self):
118 ad = self.ad | {"ToE": {"ExitBySignal": True, "ExitSignal": 9}}
119 result = self.handler.handle(ad)
120 self.assertIsNotNone(result)
121 self.assertIn("ExitBySignal", result)
122 self.assertTrue(result["ExitBySignal"])
123 self.assertIn("ExitSignal", result)
124 self.assertEqual(result["ExitSignal"], 9)
126 def testNotHandlingMissingExecTicket(self):
127 with self.assertLogs(logger=logger, level="DEBUG") as cm:
128 result = self.handler.handle(self.ad)
129 self.assertIsNone(result)
130 self.assertIn("ticket of execution", cm.output[0])
131 self.assertIn("missing", cm.output[0])
133 def testNotHandlingJobNotCompleted(self):
134 ad = self.ad | {"MyType": "foo"}
135 with self.assertLogs(logger=logger, level="DEBUG") as cm:
136 result = self.handler.handle(ad)
137 self.assertIsNone(result)
138 self.assertIn("job not completed", cm.output[0])
141class JobCompletedWithoutExecTicketHandlerTestCase(unittest.TestCase):
142 """Test the handler for a completed job w/o the ticket of execution."""
144 def setUp(self):
145 self.ad = {"ClusterId": 1, "ProcId": 0, "MyType": "JobTerminatedEvent"}
146 self.handler = JobCompletedWithoutExecTicketHandler()
148 def tearDown(self):
149 pass
151 def testNormalTermination(self):
152 ad = self.ad | {"TerminatedNormally": True, "ReturnValue": 0}
153 result = self.handler.handle(ad)
154 self.assertIsNotNone(result)
155 self.assertIn("ExitBySignal", result)
156 self.assertFalse(result["ExitBySignal"])
157 self.assertIn("ExitCode", result)
158 self.assertEqual(result["ExitCode"], 0)
160 def testAbnormalTermination(self):
161 ad = self.ad | {"TerminatedNormally": False, "TerminatedBySignal": 9}
162 result = self.handler.handle(ad)
163 self.assertIsNotNone(result)
164 self.assertIn("ExitBySignal", result)
165 self.assertTrue(result["ExitBySignal"])
166 self.assertIn("ExitSignal", result)
167 self.assertEqual(result["ExitSignal"], 9)
169 def testNotHandlingExecTicketExists(self):
170 ad = self.ad | {"ToE": {"ExitBySignal": False, "ExitCode": 0}}
171 with self.assertLogs(logger=logger, level="DEBUG") as cm:
172 result = self.handler.handle(ad)
173 self.assertIsNone(result)
174 self.assertIn("ticket of execution", cm.output[0])
175 self.assertIn("found", cm.output[0])
177 def testNotHandlingJobNotCompleted(self):
178 ad = self.ad | {"MyType": "foo"}
179 with self.assertLogs(logger=logger, level="DEBUG") as cm:
180 result = self.handler.handle(ad)
181 self.assertIsNone(result)
182 self.assertIn("job not completed", cm.output[0])
185class JobHeldOtherTestCase(unittest.TestCase):
186 """Test the handler for a held job."""
188 def setUp(self):
189 self.ad = {"ClusterId": 1, "ProcId": 0, "MyType": "JobHeldEvent"}
190 self.handler = JobHeldByOtherHandler()
192 def tearDown(self):
193 pass
195 def testHeld(self):
196 ad = self.ad | {"HoldReasonCode": 42}
197 result = self.handler.handle(ad)
198 self.assertIsNotNone(result)
199 self.assertIn("ExitBySignal", result)
200 self.assertFalse(result["ExitBySignal"])
201 self.assertIn("ExitCode", result)
202 self.assertEqual(result["ExitCode"], 42)
204 def testHeldBySignal(self):
205 ad = self.ad | {"HoldReasonCode": 3}
206 with self.assertLogs(logger=logger, level="DEBUG") as cm:
207 result = self.handler.handle(ad)
208 self.assertIsNone(result)
209 self.assertIn("invalid hold reason code", cm.output[0])
210 self.assertIn("HoldReasonCode = 3", cm.output[0])
212 def testHeldByUser(self):
213 ad = self.ad | {"HoldReasonCode": 1}
214 with self.assertLogs(logger=logger, level="DEBUG") as cm:
215 result = self.handler.handle(ad)
216 self.assertIsNone(result)
217 self.assertIn("invalid hold reason code", cm.output[0])
218 self.assertIn("HoldReasonCode = 1", cm.output[0])
220 def testNotHandlingJobNotHeld(self):
221 ad = self.ad | {"MyType": "foo"}
222 with self.assertLogs(logger=logger, level="DEBUG") as cm:
223 result = self.handler.handle(ad)
224 self.assertIsNone(result)
225 self.assertIn("job not held", cm.output[0])
228class JobHeldBySignalHandlerTestCase(unittest.TestCase):
229 """Test the handler for a job held by a signal."""
231 def setUp(self):
232 self.ad = {"ClusterId": 1, "ProcId": 0, "MyType": "JobHeldEvent"}
233 self.handler = JobHeldBySignalHandler()
235 def tearDown(self):
236 pass
238 def testSignalAvailable(self):
239 ad = self.ad | {"HoldReasonCode": 3, "HoldReason": "Job raised a signal 9."}
240 result = self.handler.handle(ad)
241 self.assertIsNotNone(ad)
242 self.assertIn("ExitBySignal", result)
243 self.assertTrue(result["ExitBySignal"])
244 self.assertIn("ExitSignal", result)
245 self.assertEqual(int(result["ExitSignal"]), 9)
247 def testSignalNotAvailable(self):
248 ad = self.ad | {"HoldReasonCode": 3, "HoldReason": "foo"}
249 with self.assertLogs(logger=logger, level="DEBUG") as cm:
250 result = self.handler.handle(ad)
251 self.assertIsNone(result)
252 self.assertIn("signal not found", cm.output[0])
254 def testNotHandlingInvalidHoldReasonCode(self):
255 ad = self.ad | {"HoldReasonCode": 1, "HoldReason": "via condor_hold (by user foo)"}
256 with self.assertLogs(logger=logger, level="DEBUG") as cm:
257 result = self.handler.handle(ad)
258 self.assertIsNone(result)
259 self.assertIn("not held by a signal", cm.output[0])
261 def testNotHandlingJobNotHeld(self):
262 ad = self.ad | {"MyType": "foo"}
263 with self.assertLogs(logger=logger, level="DEBUG") as cm:
264 result = self.handler.handle(ad)
265 self.assertIsNone(result)
266 self.assertIn("job not held", cm.output[0])
269class JobHeldByUserHandlerTestCase(unittest.TestCase):
270 """Test the handler for a job held by the user."""
272 def setUp(self):
273 self.ad = {"ClusterId": 1, "ProcId": 0, "MyType": "JobHeldEvent"}
274 self.handler = JobHeldByUserHandler()
276 def tearDown(self):
277 pass
279 def testHandling(self):
280 ad = self.ad | {"HoldReasonCode": 1}
281 result = self.handler.handle(ad)
282 self.assertIsNotNone(result)
283 self.assertIn("ExitBySignal", result)
284 self.assertFalse(result["ExitBySignal"])
285 self.assertIn("ExitCode", result)
286 self.assertEqual(result["ExitCode"], 0)
288 def testNotHandlingInvalidHoldReaconCode(self):
289 ad = self.ad | {"HoldReasonCode": 3, "HoldReason": "Job raised a signal 9."}
290 with self.assertLogs(logger=logger, level="DEBUG") as cm:
291 result = self.handler.handle(ad)
292 self.assertIsNone(result)
293 self.assertIn("not held by the user", cm.output[0])
295 def testNotHandlingJobNotHeld(self):
296 ad = self.ad | {"MyType": "foo"}
297 with self.assertLogs(logger=logger, level="DEBUG") as cm:
298 result = self.handler.handle(ad)
299 self.assertIsNone(result)
300 self.assertIn("job not held", cm.output[0])
303class JobAbortedByPeriodicRemoveHandlerTestCase(unittest.TestCase):
304 """Test the handler for jobs deleted by periodic removal policy."""
306 def setUp(self):
307 self.ad = {
308 "ClusterId": 1,
309 "ProcId": 0,
310 "MyType": "JobAbortedEvent",
311 "Reason": "The job attribute PeriodicRemove expression 'foo' evaluated to TRUE",
312 }
313 self.handler = JobAbortedByPeriodicRemoveHandler()
315 def tearDown(self):
316 pass
318 def testHandling(self):
319 self.ad |= {"HoldReason": "Job raised a signal 9."}
320 result = self.handler.handle(self.ad)
321 self.assertIn("ExitBySignal", result)
322 self.assertTrue(result["ExitBySignal"])
323 self.assertIn("ExitSignal", result)
324 self.assertEqual(result["ExitSignal"], 9)
326 def testHandlingWithHoldReasonNoExitSignal(self):
327 self.ad |= {"HoldReason": "Job raised a signal."}
328 result = self.handler.handle(self.ad)
329 self.assertIn("ExitBySignal", result)
330 self.assertTrue(result["ExitBySignal"])
331 self.assertIn("ExitSignal", result)
332 self.assertEqual(result["ExitSignal"], -1)
334 def testHandlingWithoutHoldReason(self):
335 result = self.handler.handle(self.ad)
336 self.assertIn("ExitBySignal", result)
337 self.assertTrue(result["ExitBySignal"])
338 self.assertIn("ExitSignal", result)
339 self.assertEqual(result["ExitSignal"], -1)
341 def testNotHandlingJobNotRemoved(self):
342 self.ad["MyType"] = "foo"
343 with self.assertLogs(logger=logger, level="DEBUG") as cm:
344 result = self.handler.handle(self.ad)
345 self.assertIsNone(result)
346 self.assertIn("job not removed", cm.output[0])
348 def testNotHandlingJobNotRemovedByPeriodicRemoval(self):
349 self.ad["Reason"] = "DAG Abort"
350 with self.assertLogs(logger=logger, level="DEBUG") as cm:
351 result = self.handler.handle(self.ad)
352 self.assertIsNone(result)
353 self.assertIn("not removed by the periodic removal policy", cm.output[0])
355 def testNotHandlingNoReason(self):
356 del self.ad["Reason"]
357 with self.assertLogs(logger=logger, level="DEBUG") as cm:
358 result = self.handler.handle(self.ad)
359 self.assertIsNone(result)
360 self.assertIn("unable to determine the reason", cm.output[0])
363class JobAbortedByUserHandlerTestCase(unittest.TestCase):
364 """Test the handler for jobs deleted by the user."""
366 def setUp(self):
367 self.ad = {
368 "ClusterId": 1,
369 "ProcId": 0,
370 "MyType": "JobAbortedEvent",
371 }
372 self.handler = JobAbortedByUserHandler()
374 def tearDown(self):
375 pass
377 def testHandlingAbortedDagmanJob(self):
378 self.ad |= {"Reason": "Python-initiated action"}
379 result = self.handler.handle(self.ad)
380 self.assertIn("ExitBySignal", result)
381 self.assertFalse(result["ExitBySignal"])
382 self.assertIn("ExitCode", result)
383 self.assertEqual(result["ExitCode"], 0)
385 def testHandlingAbortedPayloadJob(self):
386 self.ad |= {"Reason": "DAG Removed"}
387 result = self.handler.handle(self.ad)
388 self.assertIn("ExitBySignal", result)
389 self.assertFalse(result["ExitBySignal"])
390 self.assertIn("ExitCode", result)
391 self.assertEqual(result["ExitCode"], 0)
393 def testHandlingAbortedSubdagJob(self):
394 self.ad |= {"Reason": "OtherJobRemoveRequirements = DAGManJobId =?= 78"}
395 result = self.handler.handle(self.ad)
396 self.assertIn("ExitBySignal", result)
397 self.assertFalse(result["ExitBySignal"])
398 self.assertIn("ExitCode", result)
399 self.assertEqual(result["ExitCode"], 0)
401 def testNotHandlingJobNotRemoved(self):
402 self.ad["MyType"] = "foo"
403 with self.assertLogs(logger=logger, level="DEBUG") as cm:
404 result = self.handler.handle(self.ad)
405 self.assertIsNone(result)
406 self.assertIn("job not removed", cm.output[0])
408 def testNotHandlingJobNotRemovedByUser(self):
409 self.ad |= {"Reason": "The job attribute PeriodicRemove expression 'foo' evaluated to TRUE"}
410 with self.assertLogs(logger=logger, level="DEBUG") as cm:
411 result = self.handler.handle(self.ad)
412 self.assertIsNone(result)
413 self.assertIn("job not removed", cm.output[0])
415 def testNotHandlingNoReason(self):
416 with self.assertLogs(logger=logger, level="DEBUG") as cm:
417 result = self.handler.handle(self.ad)
418 self.assertIsNone(result)
419 self.assertIn("unable to determine the reason", cm.output[0])