Coverage for tests / test_common_utils.py: 34%

119 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-18 08:49 +0000

1# This file is part of ctrl_bps_htcondor. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <https://www.gnu.org/licenses/>. 

27 

28"""Unit tests for the common utility functions.""" 

29 

30import logging 

31import os 

32import unittest 

33from pathlib import Path 

34 

35import htcondor 

36 

37from lsst.ctrl.bps import ( 

38 WmsStates, 

39) 

40from lsst.ctrl.bps.htcondor import common_utils 

41from lsst.utils.tests import temporaryDirectory 

42 

43logger = logging.getLogger("lsst.ctrl.bps.htcondor") 

44 

45 

46class HtcNodeStatusToWmsStateTestCase(unittest.TestCase): 

47 """Test assigning WMS state base on HTCondor node status.""" 

48 

49 def setUp(self): 

50 pass 

51 

52 def tearDown(self): 

53 pass 

54 

55 def testNotReady(self): 

56 job = {"NodeStatus": common_utils.NodeStatus.NOT_READY} 

57 result = common_utils._htc_node_status_to_wms_state(job) 

58 self.assertEqual(result, WmsStates.UNREADY) 

59 

60 def testReady(self): 

61 job = {"NodeStatus": common_utils.NodeStatus.READY} 

62 result = common_utils._htc_node_status_to_wms_state(job) 

63 self.assertEqual(result, WmsStates.READY) 

64 

65 def testPrerun(self): 

66 job = {"NodeStatus": common_utils.NodeStatus.PRERUN} 

67 result = common_utils._htc_node_status_to_wms_state(job) 

68 self.assertEqual(result, WmsStates.MISFIT) 

69 

70 def testSubmittedHeld(self): 

71 job = { 

72 "NodeStatus": common_utils.NodeStatus.SUBMITTED, 

73 "JobProcsHeld": 1, 

74 "StatusDetails": "", 

75 "JobProcsQueued": 0, 

76 } 

77 result = common_utils._htc_node_status_to_wms_state(job) 

78 self.assertEqual(result, WmsStates.HELD) 

79 

80 def testSubmittedRunning(self): 

81 job = { 

82 "NodeStatus": common_utils.NodeStatus.SUBMITTED, 

83 "JobProcsHeld": 0, 

84 "StatusDetails": "not_idle", 

85 "JobProcsQueued": 0, 

86 } 

87 result = common_utils._htc_node_status_to_wms_state(job) 

88 self.assertEqual(result, WmsStates.RUNNING) 

89 

90 def testSubmittedPending(self): 

91 job = { 

92 "NodeStatus": common_utils.NodeStatus.SUBMITTED, 

93 "JobProcsHeld": 0, 

94 "StatusDetails": "", 

95 "JobProcsQueued": 1, 

96 } 

97 result = common_utils._htc_node_status_to_wms_state(job) 

98 self.assertEqual(result, WmsStates.PENDING) 

99 

100 def testPostrun(self): 

101 job = {"NodeStatus": common_utils.NodeStatus.POSTRUN} 

102 result = common_utils._htc_node_status_to_wms_state(job) 

103 self.assertEqual(result, WmsStates.MISFIT) 

104 

105 def testDone(self): 

106 job = {"NodeStatus": common_utils.NodeStatus.DONE} 

107 result = common_utils._htc_node_status_to_wms_state(job) 

108 self.assertEqual(result, WmsStates.SUCCEEDED) 

109 

110 def testErrorDagmanSuccess(self): 

111 job = {"NodeStatus": common_utils.NodeStatus.ERROR, "StatusDetails": "DAGMAN error 0"} 

112 result = common_utils._htc_node_status_to_wms_state(job) 

113 self.assertEqual(result, WmsStates.SUCCEEDED) 

114 

115 def testErrorDagmanFailure(self): 

116 job = {"NodeStatus": common_utils.NodeStatus.ERROR, "StatusDetails": "DAGMAN error 1"} 

117 result = common_utils._htc_node_status_to_wms_state(job) 

118 self.assertEqual(result, WmsStates.FAILED) 

119 

120 def testFutile(self): 

121 job = {"NodeStatus": common_utils.NodeStatus.FUTILE} 

122 result = common_utils._htc_node_status_to_wms_state(job) 

123 self.assertEqual(result, WmsStates.PRUNED) 

124 

125 def testDeletedJob(self): 

126 job = { 

127 "NodeStatus": common_utils.NodeStatus.ERROR, 

128 "StatusDetails": "HTCondor reported ULOG_JOB_ABORTED event for job proc (1.0.0)", 

129 "JobProcsQueued": 0, 

130 } 

131 result = common_utils._htc_node_status_to_wms_state(job) 

132 self.assertEqual(result, WmsStates.DELETED) 

133 

134 

135class HtcStatusToWmsStateTestCase(unittest.TestCase): 

136 """Test assigning WMS state base on HTCondor status.""" 

137 

138 def testJobStatus(self): 

139 job = { 

140 "ClusterId": 1, 

141 "JobStatus": htcondor.JobStatus.IDLE, 

142 "bps_job_label": "foo", 

143 } 

144 result = common_utils._htc_status_to_wms_state(job) 

145 self.assertEqual(result, WmsStates.PENDING) 

146 

147 def testNodeStatus(self): 

148 # Hold/Release test case 

149 job = { 

150 "ClusterId": 1, 

151 "JobStatus": None, 

152 "NodeStatus": common_utils.NodeStatus.SUBMITTED, 

153 "JobProcsHeld": 0, 

154 "StatusDetails": "", 

155 "JobProcsQueued": 1, 

156 } 

157 result = common_utils._htc_status_to_wms_state(job) 

158 self.assertEqual(result, WmsStates.PENDING) 

159 

160 def testNeitherStatus(self): 

161 job = {"ClusterId": 1} 

162 result = common_utils._htc_status_to_wms_state(job) 

163 self.assertEqual(result, WmsStates.MISFIT) 

164 

165 def testRetrySuccess(self): 

166 job = { 

167 "NodeStatus": 5, 

168 "Node": "8e62c569-ae2e-44e8-be36-d1aee333a129_isr_903342_10", 

169 "RetryCount": 0, 

170 "ClusterId": 851, 

171 "ProcId": 0, 

172 "MyType": "JobTerminatedEvent", 

173 "EventTypeNumber": 5, 

174 "HoldReasonCode": 3, 

175 "HoldReason": "Job raised a signal 9. Handling signal as if job has gone over memory limit.", 

176 "HoldReasonSubCode": 34, 

177 "ToE": { 

178 "ExitBySignal": False, 

179 "ExitCode": 0, 

180 }, 

181 "JobStatus": htcondor.JobStatus.COMPLETED, 

182 "ExitBySignal": False, 

183 "ExitCode": 0, 

184 } 

185 result = common_utils._htc_status_to_wms_state(job) 

186 self.assertEqual(result, WmsStates.SUCCEEDED) 

187 

188 

189class WmsIdToDirTestCase(unittest.TestCase): 

190 """Test _wms_id_to_dir function.""" 

191 

192 @unittest.mock.patch("lsst.ctrl.bps.htcondor.common_utils._wms_id_type") 

193 def testInvalidIdType(self, _wms_id_type_mock): 

194 _wms_id_type_mock.return_value = common_utils.WmsIdType.UNKNOWN 

195 with self.assertRaises(TypeError) as cm: 

196 _, _ = common_utils._wms_id_to_dir("not_used") 

197 self.assertIn("Invalid job id type", str(cm.exception)) 

198 

199 @unittest.mock.patch("lsst.ctrl.bps.htcondor.common_utils._wms_id_type") 

200 def testAbsPathId(self, mock_wms_id_type): 

201 mock_wms_id_type.return_value = common_utils.WmsIdType.PATH 

202 with temporaryDirectory() as tmp_dir: 

203 wms_path, id_type = common_utils._wms_id_to_dir(tmp_dir) 

204 self.assertEqual(id_type, common_utils.WmsIdType.PATH) 

205 self.assertEqual(Path(tmp_dir).resolve(), wms_path) 

206 

207 @unittest.mock.patch("lsst.ctrl.bps.htcondor.common_utils._wms_id_type") 

208 def testRelPathId(self, _wms_id_type_mock): 

209 _wms_id_type_mock.return_value = common_utils.WmsIdType.PATH 

210 orig_dir = Path.cwd() 

211 with temporaryDirectory() as tmp_dir: 

212 os.chdir(tmp_dir) 

213 abs_path = Path(tmp_dir) / "newdir" 

214 abs_path.mkdir() 

215 wms_path, id_type = common_utils._wms_id_to_dir("newdir") 

216 self.assertEqual(id_type, common_utils.WmsIdType.PATH) 

217 self.assertEqual(abs_path.resolve(), wms_path) 

218 os.chdir(orig_dir) 

219 

220 

221class WmsIdTypeTestCase(unittest.TestCase): 

222 """Test _wms_id_type function.""" 

223 

224 def testIntId(self): 

225 id_type = common_utils._wms_id_type("4") 

226 self.assertEqual(id_type, common_utils.WmsIdType.LOCAL) 

227 

228 def testPathId(self): 

229 with temporaryDirectory() as tmp_dir: 

230 id_type = common_utils._wms_id_type(str(tmp_dir)) 

231 self.assertEqual(id_type, common_utils.WmsIdType.PATH) 

232 

233 def testGlobalId(self): 

234 id_type = common_utils._wms_id_type("testmachine#5044.0#1757720957") 

235 self.assertEqual(id_type, common_utils.WmsIdType.GLOBAL) 

236 

237 def testUnknownType(self): 

238 id_type = common_utils._wms_id_type(["bad param"]) 

239 self.assertEqual(id_type, common_utils.WmsIdType.UNKNOWN) 

240 

241 

242if __name__ == "__main__": 

243 unittest.main()