Coverage for python / lsst / ctrl / bps / htcondor / handlers.py: 24%
139 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-24 08:23 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-24 08:23 +0000
1# This file is part of ctrl_bps_htcondor.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <https://www.gnu.org/licenses/>.
28"""Definitions of handlers of HTCondor job ClassAds."""
30__all__ = [
31 "HTC_JOB_AD_HANDLERS",
32 "Chain",
33 "Handler",
34 "JobAbortedByPeriodicRemoveHandler",
35 "JobAbortedByUserHandler",
36 "JobCompletedWithExecTicketHandler",
37 "JobCompletedWithoutExecTicketHandler",
38 "JobHeldByOtherHandler",
39 "JobHeldBySignalHandler",
40 "JobHeldByUserHandler",
41]
44import abc
45import logging
46import re
47from collections.abc import Sequence
48from typing import Any, overload
50_LOG = logging.getLogger(__name__)
53class Handler(abc.ABC):
54 """Abstract base class defining Handler interface."""
56 @abc.abstractmethod
57 def handle(self, ad: dict[str, Any]) -> dict[str, Any] | None:
58 """Handle a ClassAd.
60 Parameters
61 ----------
62 ad : `dict[`str`, Any]`
63 The dictionary representing ClassAd that need to be processed.
65 Returns
66 -------
67 ad : `dict[`str`, Any]` | None
68 The dictionary representing ClassAd after processing and ``None``
69 if the handler was not able to process the ad.
71 Notes
72 -----
73 To optimize the memory usage, the implementation of this method may
74 modify the ClassAd in place. In such a case, the ClassAd returned by
75 the method will be the same object that was passed to it as
76 the argument, but including any modifications that were made.
77 """
80class Chain(Sequence):
81 """Class defining chaining of handlers.
83 Parameters
84 ----------
85 handlers : `Sequence` [`Handler`]
86 List of handlers that will be used to initialize the chain.
87 """
89 def __init__(self, handlers: Sequence[Handler] | None = None) -> None:
90 self._handlers: list[Handler] = []
91 if handlers is not None: 91 ↛ exitline 91 didn't return from function '__init__' because the condition on line 91 was always true
92 for handler in handlers:
93 self.append(handler)
95 @overload
96 def __getitem__(self, index: int) -> Handler: ... 96 ↛ exitline 96 didn't return from function '__getitem__' because
97 @overload
98 def __getitem__(self, index: slice) -> Sequence[Handler]: ... 98 ↛ exitline 98 didn't return from function '__getitem__' because
99 def __getitem__(self, index):
100 return self._handlers[index]
102 def __len__(self) -> int:
103 return len(self._handlers)
105 def append(self, handler: Handler) -> None:
106 """Append a handler to the chain.
108 Parameters
109 ----------
110 handler : `Handler`
111 The handler that needs to be added to the chain.
113 Raises
114 ------
115 TypeError
116 Raised if the passed object in not a ``Handler``.
117 """
118 if not isinstance(handler, Handler): 118 ↛ 119line 118 didn't jump to line 119 because the condition on line 118 was never true
119 raise TypeError(f"append() argument must be a 'Handler', not a '{type(handler)}'")
120 self._handlers.append(handler)
122 def handle(self, ad: dict[str, Any]) -> dict[str, Any] | None:
123 """Handle a ClassAd.
125 Parameters
126 ----------
127 ad : `dict[`str`, Any]`
128 The dictionary representing a ClassAd that need to be handled.
130 Returns
131 -------
132 ad : `dict[`str`, Any]`
133 A modified ClassAd if any handler in the chain was able to
134 process the ad, None otherwise.
135 """
136 new_ad = None
137 for handler in self:
138 try:
139 new_ad = handler.handle(ad)
140 except Exception as e:
141 _LOG.warning(
142 "Handler '%s' raised an exception while processing the ad: %s. "
143 "Proceeding to the next handler (if any).",
144 type(handler).__name__,
145 repr(e),
146 )
147 else:
148 if new_ad is not None:
149 break
150 return new_ad
153class JobCompletedWithExecTicketHandler(Handler):
154 """Handler of ClassAds for completed jobs with the ticket of execution.
156 Usually, the entry in the event log for a completed job contains the ticket
157 of execution -- a record describing how and when the job was terminated.
158 If it exists, this handler will use it to add the attributes describing
159 job's exit status.
160 """
162 def handle(self, ad: dict[str, Any]) -> dict[str, Any] | None:
163 if not ad["MyType"].endswith("TerminatedEvent"):
164 _LOG.debug(
165 "Handler '%s': refusing to process the ad for the job '%s.%s': job not completed",
166 self.__class__.__name__,
167 ad["ClusterId"],
168 ad["ProcId"],
169 )
170 return None
171 if "ToE" in ad:
172 toe = ad["ToE"]
173 ad["ExitBySignal"] = toe["ExitBySignal"]
174 if ad["ExitBySignal"]:
175 ad["ExitSignal"] = toe["ExitSignal"]
176 else:
177 ad["ExitCode"] = toe["ExitCode"]
178 else:
179 _LOG.debug(
180 "%s: refusing to process the ad for the job '%s.%s': ticket of execution missing",
181 self.__class__.__name__,
182 ad["ClusterId"],
183 ad["ProcId"],
184 )
185 return None
186 return ad
189class JobCompletedWithoutExecTicketHandler(Handler):
190 """Handler of ClassAds for completed jobs w/o the ticket of execution.
192 The entry in the event log for some completed jobs (e.g. jobs that run
193 ``condor_dagman``) do *not* contain the ticket of execution -- a record
194 describing how and when the job was terminated. This handler will try
195 to use other existing attributes to add the ones describing job's exit
196 status.
197 """
199 def handle(self, ad: dict[str, Any]) -> dict[str, Any] | None:
200 if not ad["MyType"].endswith("TerminatedEvent"):
201 _LOG.debug(
202 "Handler '%s': refusing to process the ad for the job '%s.%s': job not completed",
203 self.__class__.__name__,
204 ad["ClusterId"],
205 ad["ProcId"],
206 )
207 return None
208 if "ToE" not in ad:
209 ad["ExitBySignal"] = not ad["TerminatedNormally"]
210 if ad["ExitBySignal"]:
211 ad["ExitSignal"] = ad["TerminatedBySignal"]
212 else:
213 ad["ExitCode"] = ad["ReturnValue"]
214 else:
215 _LOG.debug(
216 "Handler %s: refusing to process the ad for the job '%s.%s': ticket of execution found",
217 self.__class__.__name__,
218 ad["ClusterId"],
219 ad["ProcId"],
220 )
221 return None
222 return ad
225class JobHeldByOtherHandler(Handler):
226 """Handler of ClassAds for jobs put on hold."""
228 def handle(self, ad: dict[str, Any]) -> dict[str, Any] | None:
229 if not ad["MyType"].endswith("HeldEvent"):
230 _LOG.debug(
231 "Handler '%s': refusing to process the ad for the job '%s.%s': job not held",
232 self.__class__.__name__,
233 ad["ClusterId"],
234 ad["ProcId"],
235 )
236 return None
237 if ad["HoldReasonCode"] not in {1, 3}:
238 ad["ExitBySignal"] = False
239 ad["ExitCode"] = ad["HoldReasonCode"]
240 else:
241 _LOG.debug(
242 "Handler '%s': refusing to process the ad for the job '%s.%s': "
243 "invalid hold reason code: HoldReasonCode = %s",
244 self.__class__.__name__,
245 ad["ClusterId"],
246 ad["ProcId"],
247 ad["HoldReasonCode"],
248 )
249 return None
250 return ad
253class JobHeldBySignalHandler(Handler):
254 """Handler of ClassAds for jobs put on hold by signals."""
256 def handle(self, ad: dict[str, Any]) -> dict[str, Any] | None:
257 if not ad["MyType"].endswith("HeldEvent"):
258 _LOG.debug(
259 "Handler '%s': refusing to process the ad for the job '%s.%s': job not held",
260 self.__class__.__name__,
261 ad["ClusterId"],
262 ad["ProcId"],
263 )
264 return None
265 if ad["HoldReasonCode"] == 3:
266 match = re.search(r"signal (\d+)", ad["HoldReason"])
267 if match is not None:
268 ad["ExitBySignal"] = True
269 ad["ExitSignal"] = match.group(1)
270 else:
271 _LOG.debug(
272 "Handler '%s': refusing to process the ad for the job '%s.%s': "
273 "signal not found: HoldReason = %s",
274 self.__class__.__name__,
275 ad["ClusterId"],
276 ad["ProcId"],
277 ad["HoldReason"],
278 )
279 return None
280 else:
281 _LOG.debug(
282 "Handler '%s': refusing to process the ad for the job '%s.%s': "
283 "job not held by a signal: HoldReasonCode = %s, HoldReason = %s",
284 self.__class__.__name__,
285 ad["ClusterId"],
286 ad["ProcId"],
287 ad["HoldReasonCode"],
288 ad["HoldReason"],
289 )
290 return None
291 return ad
294class JobHeldByUserHandler(Handler):
295 """Handler of ClassAds for jobs put on hold by the user."""
297 def handle(self, ad: dict[str, Any]) -> dict[str, Any] | None:
298 if not ad["MyType"].endswith("HeldEvent"):
299 _LOG.debug(
300 "Handler '%s': refusing to process the ad for the job '%s.%s': job not held",
301 self.__class__.__name__,
302 ad["ClusterId"],
303 ad["ProcId"],
304 )
305 return None
306 if ad["HoldReasonCode"] == 1:
307 ad["ExitBySignal"] = False
308 ad["ExitCode"] = 0
309 else:
310 _LOG.debug(
311 "Handler '%s': refusing to process the ad for the job '%s.%s': "
312 "job not held by the user: HoldReasonCode = %s, HoldReason = %s",
313 self.__class__.__name__,
314 ad["ClusterId"],
315 ad["ProcId"],
316 ad["HoldReasonCode"],
317 ad["HoldReason"],
318 )
319 return None
320 return ad
323class JobAbortedByPeriodicRemoveHandler(Handler):
324 """Handler of ClassAds for jobs deleted by periodic remove policy."""
326 def handle(self, ad: dict[str, Any]) -> dict[str, Any] | None:
327 if not ad["MyType"].endswith("AbortedEvent"):
328 _LOG.debug(
329 "Handler '%s': refusing to process the ad for the job '%s.%s': job not removed",
330 self.__class__.__name__,
331 ad["ClusterId"],
332 ad["ProcId"],
333 )
334 return None
335 if "Reason" in ad:
336 if "PeriodicRemove" in ad["Reason"]:
337 ad["ExitBySignal"] = True
339 ad["ExitSignal"] = -1
340 if "HoldReason" in ad:
341 match = re.search(r"signal (\d+)", ad["HoldReason"])
342 if match is not None:
343 ad["ExitSignal"] = int(match.group(1))
345 else:
346 _LOG.debug(
347 "Handler '%s': refusing to process the ad for the job '%s.%s': "
348 "job was not removed by the periodic removal policy: Reason = %s",
349 self.__class__.__name__,
350 ad["ClusterId"],
351 ad["ProcId"],
352 ad["Reason"],
353 )
354 return None
355 else:
356 _LOG.debug(
357 "Handler '%s': refusing to process the ad for the job '%s.%s': "
358 "unable to determine the reason for the removal.",
359 self.__class__.__name__,
360 ad["ClusterId"],
361 ad["ProcId"],
362 )
363 return None
364 return ad
367class JobAbortedByUserHandler(Handler):
368 """Handler of ClassAds for jobs deleted by the user."""
370 def handle(self, ad: dict[str, Any]) -> dict[str, Any] | None:
371 if not ad["MyType"].endswith("AbortedEvent"):
372 _LOG.debug(
373 "Handler '%s': refusing to process the ad for the job '%s.%s': job not removed",
374 self.__class__.__name__,
375 ad["ClusterId"],
376 ad["ProcId"],
377 )
378 return None
379 if "Reason" in ad:
380 patterns = (
381 "Python-initiated action", # DAGMan job removed by the user
382 "DAG Removed", # payload job removed by the user
383 "OtherJobRemoveRequirements", # a subdag job removed by the user
384 )
385 for patt in patterns:
386 if patt in ad["Reason"]:
387 ad["ExitBySignal"] = False
388 ad["ExitCode"] = 0
389 break
390 else:
391 _LOG.debug(
392 "Handler '%s': refusing to process the ad for the job '%s.%s': "
393 "job not removed by the user: Reason = %s",
394 self.__class__.__name__,
395 ad["ClusterId"],
396 ad["ProcId"],
397 ad["Reason"],
398 )
399 return None
400 else:
401 _LOG.debug(
402 "Handler '%s': refusing to process the ad for the job '%s.%s': "
403 "unable to determine the reason for the removal.",
404 self.__class__.__name__,
405 ad["ClusterId"],
406 ad["ProcId"],
407 )
408 return None
409 return ad
412_handlers = [
413 JobAbortedByPeriodicRemoveHandler(),
414 JobAbortedByUserHandler(),
415 JobHeldByUserHandler(),
416 JobHeldBySignalHandler(),
417 JobHeldByOtherHandler(),
418 JobCompletedWithExecTicketHandler(),
419 JobCompletedWithoutExecTicketHandler(),
420]
421HTC_JOB_AD_HANDLERS = Chain(handlers=_handlers)