Coverage for python / lsst / ctrl / bps / htcondor / common_utils.py: 11%
139 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-24 08:23 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-24 08:23 +0000
1# This file is part of ctrl_bps_htcondor.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <https://www.gnu.org/licenses/>.
28"""Utility functions used by multiple functions in ctrl_bps_htcondor."""
30import logging
31from enum import IntEnum, auto
32from pathlib import Path
34import htcondor
36from lsst.ctrl.bps import (
37 WmsStates,
38)
40from .lssthtc import (
41 NodeStatus,
42 condor_history,
43 condor_q,
44 read_dag_info,
45)
47_LOG = logging.getLogger(__name__)
50class WmsIdType(IntEnum):
51 """Type of valid WMS ids."""
53 UNKNOWN = auto()
54 """The type of id cannot be determined.
55 """
57 LOCAL = auto()
58 """The id is HTCondor job's ClusterId (with optional '.ProcId').
59 """
61 GLOBAL = auto()
62 """Id is a HTCondor's global job id.
63 """
65 PATH = auto()
66 """Id is a submission path.
67 """
70def _htc_status_to_wms_state(job):
71 """Convert HTCondor job status to generic wms state.
73 Parameters
74 ----------
75 job : `dict` [`str`, `~typing.Any`]
76 HTCondor job information.
78 Returns
79 -------
80 wms_state : `WmsStates`
81 The equivalent WmsState to given job's status.
82 """
83 wms_state = WmsStates.MISFIT
84 if "JobStatus" in job:
85 wms_state = _htc_job_status_to_wms_state(job)
87 if wms_state == WmsStates.MISFIT and "NodeStatus" in job:
88 wms_state = _htc_node_status_to_wms_state(job)
89 return wms_state
92def _htc_job_status_to_wms_state(job):
93 """Convert HTCondor job status to generic wms state.
95 Parameters
96 ----------
97 job : `dict` [`str`, `~typing.Any`]
98 HTCondor job information.
100 Returns
101 -------
102 wms_state : `lsst.ctrl.bps.WmsStates`
103 The equivalent WmsState to given job's status.
104 """
105 _LOG.debug(
106 "htc_job_status_to_wms_state: %s=%s, %s", job["ClusterId"], job["JobStatus"], type(job["JobStatus"])
107 )
108 wms_state = WmsStates.MISFIT
109 if "JobStatus" in job and job["JobStatus"]:
110 job_status = int(job["JobStatus"])
112 _LOG.debug("htc_job_status_to_wms_state: job_status = %s", job_status)
113 if job_status == htcondor.JobStatus.IDLE:
114 wms_state = WmsStates.PENDING
115 elif job_status == htcondor.JobStatus.RUNNING:
116 wms_state = WmsStates.RUNNING
117 elif job_status == htcondor.JobStatus.REMOVED:
118 if (job.get("ExitBySignal", False) and job.get("ExitSignal", 0)) or job.get("ExitCode", 0):
119 wms_state = WmsStates.FAILED
120 else:
121 wms_state = WmsStates.DELETED
122 elif job_status == htcondor.JobStatus.COMPLETED:
123 if (
124 (job.get("ExitBySignal", False) and job.get("ExitSignal", 0))
125 or job.get("ExitCode", 0)
126 or job.get("DAG_Status", 0)
127 ):
128 wms_state = WmsStates.FAILED
129 else:
130 wms_state = WmsStates.SUCCEEDED
131 elif job_status == htcondor.JobStatus.HELD:
132 wms_state = WmsStates.HELD
134 return wms_state
137def _htc_node_status_to_wms_state(job):
138 """Convert HTCondor node status to generic wms state.
140 Parameters
141 ----------
142 job : `dict` [`str`, `~typing.Any`]
143 HTCondor job information.
145 Returns
146 -------
147 wms_state : `lsst.ctrl.bps.WmsStates`
148 The equivalent WmsState to given node's status.
149 """
150 wms_state = WmsStates.MISFIT
151 match job["NodeStatus"]:
152 case NodeStatus.NOT_READY:
153 wms_state = WmsStates.UNREADY
154 case NodeStatus.READY:
155 wms_state = WmsStates.READY
156 case NodeStatus.PRERUN:
157 wms_state = WmsStates.MISFIT
158 case NodeStatus.SUBMITTED:
159 if job["JobProcsHeld"]:
160 wms_state = WmsStates.HELD
161 elif job["StatusDetails"] == "not_idle":
162 wms_state = WmsStates.RUNNING
163 elif job["JobProcsQueued"]:
164 wms_state = WmsStates.PENDING
165 case NodeStatus.POSTRUN:
166 wms_state = WmsStates.MISFIT
167 case NodeStatus.DONE:
168 wms_state = WmsStates.SUCCEEDED
169 case NodeStatus.ERROR:
170 # Use job exit status instead of post script exit status.
171 if "DAGMAN error 0" in job["StatusDetails"]:
172 wms_state = WmsStates.SUCCEEDED
173 elif "ULOG_JOB_ABORTED" in job["StatusDetails"]:
174 wms_state = WmsStates.DELETED
175 else:
176 wms_state = WmsStates.FAILED
177 case NodeStatus.FUTILE:
178 wms_state = WmsStates.PRUNED
179 return wms_state
182def _wms_id_type(wms_id):
183 """Determine the type of the WMS id.
185 Parameters
186 ----------
187 wms_id : `str`
188 WMS id identifying a job.
190 Returns
191 -------
192 id_type : `lsst.ctrl.bps.htcondor.WmsIdType`
193 Type of WMS id.
194 """
195 try:
196 int(float(wms_id))
197 except ValueError:
198 wms_path = Path(wms_id)
199 if wms_path.is_dir():
200 id_type = WmsIdType.PATH
201 else:
202 id_type = WmsIdType.GLOBAL
203 except TypeError:
204 id_type = WmsIdType.UNKNOWN
205 else:
206 id_type = WmsIdType.LOCAL
207 return id_type
210def _wms_id_to_cluster(wms_id):
211 """Convert WMS id to cluster id.
213 Parameters
214 ----------
215 wms_id : `int` or `float` or `str`
216 HTCondor job id or path.
218 Returns
219 -------
220 schedd_ad : `classad.ClassAd`
221 ClassAd describing the scheduler managing the job with the given id.
222 cluster_id : `int`
223 HTCondor cluster id.
224 id_type : `lsst.ctrl.bps.wms.htcondor.IdType`
225 The type of the provided id.
226 """
227 coll = htcondor.Collector()
229 schedd_ad = None
230 cluster_id = None
231 id_type = _wms_id_type(wms_id)
232 if id_type == WmsIdType.LOCAL:
233 schedd_ad = coll.locate(htcondor.DaemonTypes.Schedd)
234 cluster_id = int(float(wms_id))
235 elif id_type == WmsIdType.GLOBAL:
236 constraint = f'GlobalJobId == "{wms_id}"'
237 schedd_ads = {ad["Name"]: ad for ad in coll.locateAll(htcondor.DaemonTypes.Schedd)}
238 schedds = {name: htcondor.Schedd(ad) for name, ad in schedd_ads.items()}
239 job_info = condor_q(constraint=constraint, schedds=schedds)
240 if job_info:
241 schedd_name, job_rec = job_info.popitem()
242 job_id, _ = job_rec.popitem()
243 schedd_ad = schedd_ads[schedd_name]
244 cluster_id = int(float(job_id))
245 elif id_type == WmsIdType.PATH:
246 try:
247 job_info = read_dag_info(wms_id)
248 except (FileNotFoundError, PermissionError, OSError):
249 pass
250 else:
251 schedd_name, job_rec = job_info.popitem()
252 job_id, _ = job_rec.popitem()
253 schedd_ad = coll.locate(htcondor.DaemonTypes.Schedd, schedd_name)
254 cluster_id = int(float(job_id))
255 else:
256 pass
257 return schedd_ad, cluster_id, id_type
260def _wms_id_to_dir(wms_id):
261 """Convert WMS id to a submit directory candidate.
263 The function does not check if the directory exists or if it is a valid
264 BPS submit directory.
266 Parameters
267 ----------
268 wms_id : `int` or `float` or `str`
269 HTCondor job id or path.
271 Returns
272 -------
273 wms_path : `pathlib.Path` or None
274 Submit directory candidate for the run with the given job id. If no
275 directory can be associated with the provided WMS id, it will be set
276 to None.
277 id_type : `lsst.ctrl.bps.wms.htcondor.IdType`
278 The type of the provided id.
280 Raises
281 ------
282 TypeError
283 Raised if provided WMS id has invalid type.
284 """
285 coll = htcondor.Collector()
286 schedd_ads = []
288 constraint = None
289 wms_path = None
290 id_type = _wms_id_type(wms_id)
291 match id_type:
292 case WmsIdType.LOCAL:
293 constraint = f"ClusterId == {int(float(wms_id))}"
294 schedd_ads.append(coll.locate(htcondor.DaemonTypes.Schedd))
295 case WmsIdType.GLOBAL:
296 constraint = f'GlobalJobId == "{wms_id}"'
297 schedd_ads.extend(coll.locateAll(htcondor.DaemonTypes.Schedd))
298 case WmsIdType.PATH:
299 wms_path = Path(wms_id).resolve()
300 case WmsIdType.UNKNOWN:
301 raise TypeError(f"Invalid job id type: {wms_id}")
302 if constraint is not None:
303 schedds = {ad["name"]: htcondor.Schedd(ad) for ad in schedd_ads}
304 job_info = condor_history(constraint=constraint, schedds=schedds, projection=["Iwd"])
305 if job_info:
306 _, job_rec = job_info.popitem()
307 _, job_ad = job_rec.popitem()
308 wms_path = Path(job_ad["Iwd"])
309 return wms_path, id_type