Coverage for python / lsst / ctrl / bps / htcondor / common_utils.py: 11%

139 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-01 08:24 +0000

1# This file is part of ctrl_bps_htcondor. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <https://www.gnu.org/licenses/>. 

27 

28"""Utility functions used by multiple functions in ctrl_bps_htcondor.""" 

29 

30import logging 

31from enum import IntEnum, auto 

32from pathlib import Path 

33 

34import htcondor 

35 

36from lsst.ctrl.bps import ( 

37 WmsStates, 

38) 

39 

40from .lssthtc import ( 

41 NodeStatus, 

42 condor_history, 

43 condor_q, 

44 read_dag_info, 

45) 

46 

47_LOG = logging.getLogger(__name__) 

48 

49 

50class WmsIdType(IntEnum): 

51 """Type of valid WMS ids.""" 

52 

53 UNKNOWN = auto() 

54 """The type of id cannot be determined. 

55 """ 

56 

57 LOCAL = auto() 

58 """The id is HTCondor job's ClusterId (with optional '.ProcId'). 

59 """ 

60 

61 GLOBAL = auto() 

62 """Id is a HTCondor's global job id. 

63 """ 

64 

65 PATH = auto() 

66 """Id is a submission path. 

67 """ 

68 

69 

70def _htc_status_to_wms_state(job): 

71 """Convert HTCondor job status to generic wms state. 

72 

73 Parameters 

74 ---------- 

75 job : `dict` [`str`, `~typing.Any`] 

76 HTCondor job information. 

77 

78 Returns 

79 ------- 

80 wms_state : `WmsStates` 

81 The equivalent WmsState to given job's status. 

82 """ 

83 wms_state = WmsStates.MISFIT 

84 if "JobStatus" in job: 

85 wms_state = _htc_job_status_to_wms_state(job) 

86 

87 if wms_state == WmsStates.MISFIT and "NodeStatus" in job: 

88 wms_state = _htc_node_status_to_wms_state(job) 

89 return wms_state 

90 

91 

92def _htc_job_status_to_wms_state(job): 

93 """Convert HTCondor job status to generic wms state. 

94 

95 Parameters 

96 ---------- 

97 job : `dict` [`str`, `~typing.Any`] 

98 HTCondor job information. 

99 

100 Returns 

101 ------- 

102 wms_state : `lsst.ctrl.bps.WmsStates` 

103 The equivalent WmsState to given job's status. 

104 """ 

105 _LOG.debug( 

106 "htc_job_status_to_wms_state: %s=%s, %s", job["ClusterId"], job["JobStatus"], type(job["JobStatus"]) 

107 ) 

108 wms_state = WmsStates.MISFIT 

109 if "JobStatus" in job and job["JobStatus"]: 

110 job_status = int(job["JobStatus"]) 

111 

112 _LOG.debug("htc_job_status_to_wms_state: job_status = %s", job_status) 

113 if job_status == htcondor.JobStatus.IDLE: 

114 wms_state = WmsStates.PENDING 

115 elif job_status == htcondor.JobStatus.RUNNING: 

116 wms_state = WmsStates.RUNNING 

117 elif job_status == htcondor.JobStatus.REMOVED: 

118 if (job.get("ExitBySignal", False) and job.get("ExitSignal", 0)) or job.get("ExitCode", 0): 

119 wms_state = WmsStates.FAILED 

120 else: 

121 wms_state = WmsStates.DELETED 

122 elif job_status == htcondor.JobStatus.COMPLETED: 

123 if ( 

124 (job.get("ExitBySignal", False) and job.get("ExitSignal", 0)) 

125 or job.get("ExitCode", 0) 

126 or job.get("DAG_Status", 0) 

127 ): 

128 wms_state = WmsStates.FAILED 

129 else: 

130 wms_state = WmsStates.SUCCEEDED 

131 elif job_status == htcondor.JobStatus.HELD: 

132 wms_state = WmsStates.HELD 

133 

134 return wms_state 

135 

136 

137def _htc_node_status_to_wms_state(job): 

138 """Convert HTCondor node status to generic wms state. 

139 

140 Parameters 

141 ---------- 

142 job : `dict` [`str`, `~typing.Any`] 

143 HTCondor job information. 

144 

145 Returns 

146 ------- 

147 wms_state : `lsst.ctrl.bps.WmsStates` 

148 The equivalent WmsState to given node's status. 

149 """ 

150 wms_state = WmsStates.MISFIT 

151 match job["NodeStatus"]: 

152 case NodeStatus.NOT_READY: 

153 wms_state = WmsStates.UNREADY 

154 case NodeStatus.READY: 

155 wms_state = WmsStates.READY 

156 case NodeStatus.PRERUN: 

157 wms_state = WmsStates.MISFIT 

158 case NodeStatus.SUBMITTED: 

159 if job["JobProcsHeld"]: 

160 wms_state = WmsStates.HELD 

161 elif job["StatusDetails"] == "not_idle": 

162 wms_state = WmsStates.RUNNING 

163 elif job["JobProcsQueued"]: 

164 wms_state = WmsStates.PENDING 

165 case NodeStatus.POSTRUN: 

166 wms_state = WmsStates.MISFIT 

167 case NodeStatus.DONE: 

168 wms_state = WmsStates.SUCCEEDED 

169 case NodeStatus.ERROR: 

170 # Use job exit status instead of post script exit status. 

171 if "DAGMAN error 0" in job["StatusDetails"]: 

172 wms_state = WmsStates.SUCCEEDED 

173 elif "ULOG_JOB_ABORTED" in job["StatusDetails"]: 

174 wms_state = WmsStates.DELETED 

175 else: 

176 wms_state = WmsStates.FAILED 

177 case NodeStatus.FUTILE: 

178 wms_state = WmsStates.PRUNED 

179 return wms_state 

180 

181 

182def _wms_id_type(wms_id): 

183 """Determine the type of the WMS id. 

184 

185 Parameters 

186 ---------- 

187 wms_id : `str` 

188 WMS id identifying a job. 

189 

190 Returns 

191 ------- 

192 id_type : `lsst.ctrl.bps.htcondor.WmsIdType` 

193 Type of WMS id. 

194 """ 

195 try: 

196 int(float(wms_id)) 

197 except ValueError: 

198 wms_path = Path(wms_id) 

199 if wms_path.is_dir(): 

200 id_type = WmsIdType.PATH 

201 else: 

202 id_type = WmsIdType.GLOBAL 

203 except TypeError: 

204 id_type = WmsIdType.UNKNOWN 

205 else: 

206 id_type = WmsIdType.LOCAL 

207 return id_type 

208 

209 

210def _wms_id_to_cluster(wms_id): 

211 """Convert WMS id to cluster id. 

212 

213 Parameters 

214 ---------- 

215 wms_id : `int` or `float` or `str` 

216 HTCondor job id or path. 

217 

218 Returns 

219 ------- 

220 schedd_ad : `classad.ClassAd` 

221 ClassAd describing the scheduler managing the job with the given id. 

222 cluster_id : `int` 

223 HTCondor cluster id. 

224 id_type : `lsst.ctrl.bps.wms.htcondor.IdType` 

225 The type of the provided id. 

226 """ 

227 coll = htcondor.Collector() 

228 

229 schedd_ad = None 

230 cluster_id = None 

231 id_type = _wms_id_type(wms_id) 

232 if id_type == WmsIdType.LOCAL: 

233 schedd_ad = coll.locate(htcondor.DaemonTypes.Schedd) 

234 cluster_id = int(float(wms_id)) 

235 elif id_type == WmsIdType.GLOBAL: 

236 constraint = f'GlobalJobId == "{wms_id}"' 

237 schedd_ads = {ad["Name"]: ad for ad in coll.locateAll(htcondor.DaemonTypes.Schedd)} 

238 schedds = {name: htcondor.Schedd(ad) for name, ad in schedd_ads.items()} 

239 job_info = condor_q(constraint=constraint, schedds=schedds) 

240 if job_info: 

241 schedd_name, job_rec = job_info.popitem() 

242 job_id, _ = job_rec.popitem() 

243 schedd_ad = schedd_ads[schedd_name] 

244 cluster_id = int(float(job_id)) 

245 elif id_type == WmsIdType.PATH: 

246 try: 

247 job_info = read_dag_info(wms_id) 

248 except (FileNotFoundError, PermissionError, OSError): 

249 pass 

250 else: 

251 schedd_name, job_rec = job_info.popitem() 

252 job_id, _ = job_rec.popitem() 

253 schedd_ad = coll.locate(htcondor.DaemonTypes.Schedd, schedd_name) 

254 cluster_id = int(float(job_id)) 

255 else: 

256 pass 

257 return schedd_ad, cluster_id, id_type 

258 

259 

260def _wms_id_to_dir(wms_id): 

261 """Convert WMS id to a submit directory candidate. 

262 

263 The function does not check if the directory exists or if it is a valid 

264 BPS submit directory. 

265 

266 Parameters 

267 ---------- 

268 wms_id : `int` or `float` or `str` 

269 HTCondor job id or path. 

270 

271 Returns 

272 ------- 

273 wms_path : `pathlib.Path` or None 

274 Submit directory candidate for the run with the given job id. If no 

275 directory can be associated with the provided WMS id, it will be set 

276 to None. 

277 id_type : `lsst.ctrl.bps.wms.htcondor.IdType` 

278 The type of the provided id. 

279 

280 Raises 

281 ------ 

282 TypeError 

283 Raised if provided WMS id has invalid type. 

284 """ 

285 coll = htcondor.Collector() 

286 schedd_ads = [] 

287 

288 constraint = None 

289 wms_path = None 

290 id_type = _wms_id_type(wms_id) 

291 match id_type: 

292 case WmsIdType.LOCAL: 

293 constraint = f"ClusterId == {int(float(wms_id))}" 

294 schedd_ads.append(coll.locate(htcondor.DaemonTypes.Schedd)) 

295 case WmsIdType.GLOBAL: 

296 constraint = f'GlobalJobId == "{wms_id}"' 

297 schedd_ads.extend(coll.locateAll(htcondor.DaemonTypes.Schedd)) 

298 case WmsIdType.PATH: 

299 wms_path = Path(wms_id).resolve() 

300 case WmsIdType.UNKNOWN: 

301 raise TypeError(f"Invalid job id type: {wms_id}") 

302 if constraint is not None: 

303 schedds = {ad["name"]: htcondor.Schedd(ad) for ad in schedd_ads} 

304 job_info = condor_history(constraint=constraint, schedds=schedds, projection=["Iwd"]) 

305 if job_info: 

306 _, job_rec = job_info.popitem() 

307 _, job_ad = job_rec.popitem() 

308 wms_path = Path(job_ad["Iwd"]) 

309 return wms_path, id_type