Coverage for tests / test_htcondor_service.py: 35%

118 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-18 08:49 +0000

1# This file is part of ctrl_bps_htcondor. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <https://www.gnu.org/licenses/>. 

27 

28"""Unit tests for the HTCondor WMS service class and related functions.""" 

29 

30import logging 

31import tempfile 

32import unittest 

33from pathlib import Path 

34 

35import htcondor 

36 

37from lsst.ctrl.bps import BpsConfig, WmsStates 

38from lsst.ctrl.bps.htcondor import htcondor_service 

39from lsst.ctrl.bps.htcondor.htcondor_config import HTC_DEFAULTS_URI 

40from lsst.ctrl.bps.tests.gw_test_utils import make_3_label_workflow 

41from lsst.daf.butler import Config 

42 

43logger = logging.getLogger("lsst.ctrl.bps.htcondor") 

44 

45LOCATE_SUCCESS = """[ 

46 CondorPlatform = "$CondorPlatform: X86_64-CentOS_7.9 $"; 

47 MyType = "Scheduler"; 

48 Machine = "testmachine"; 

49 Name = "testmachine"; 

50 CondorVersion = "$CondorVersion: 23.0.3 2024-04-04 $"; 

51 MyAddress = "<127.0.0.1:9618?addrs=127.0.0.1-9618+snip>" 

52 ] 

53""" 

54 

55PING_SUCCESS = """[ 

56 AuthCommand = 60011; 

57 AuthMethods = "FS_REMOTE"; 

58 Command = 60040; 

59 AuthorizationSucceeded = true; 

60 ValidCommands = "60002,60003,60011,60014,60045,60046,60047,60048,60049,60050,60052,523"; 

61 TriedAuthentication = true; 

62 RemoteVersion = "$CondorVersion: 10.9.0 2023-09-28 BuildID: 678228 PackageID: 10.9.0-1 $"; 

63 MyRemoteUserName = "testuser@testmachine"; 

64 Authentication = "YES"; 

65 ] 

66""" 

67 

68 

69class HTCondorServiceTestCase(unittest.TestCase): 

70 """Test selected methods of the HTCondor WMS service class.""" 

71 

72 def setUp(self): 

73 config = BpsConfig({}, wms_service_class_fqn="lsst.ctrl.bps.htcondor.HTCondorService") 

74 self.service = htcondor_service.HTCondorService(config) 

75 

76 def tearDown(self): 

77 pass 

78 

79 def testDefaults(self): 

80 self.assertEqual(self.service.defaults["memoryLimit"], 491520) 

81 

82 def testDefaultsPath(self): 

83 self.assertEqual(self.service.defaults_uri, HTC_DEFAULTS_URI) 

84 self.assertFalse(self.service.defaults_uri.isdir()) 

85 

86 @unittest.mock.patch.object(htcondor.SecMan, "ping", return_value=PING_SUCCESS) 

87 @unittest.mock.patch.object(htcondor.Collector, "locate", return_value=LOCATE_SUCCESS) 

88 def testPingSuccess(self, mock_locate, mock_ping): 

89 status, message = self.service.ping(None) 

90 self.assertEqual(status, 0) 

91 self.assertEqual(message, "") 

92 

93 def testPingFailure(self): 

94 with unittest.mock.patch("htcondor.Collector.locate") as locate_mock: 

95 locate_mock.side_effect = htcondor.HTCondorLocateError() 

96 status, message = self.service.ping(None) 

97 self.assertEqual(status, 1) 

98 self.assertEqual(message, "Could not locate Schedd service.") 

99 

100 @unittest.mock.patch.object(htcondor.Collector, "locate", return_value=LOCATE_SUCCESS) 

101 def testPingPermission(self, mock_locate): 

102 with unittest.mock.patch("htcondor.SecMan.ping") as ping_mock: 

103 ping_mock.side_effect = htcondor.HTCondorIOError() 

104 status, message = self.service.ping(None) 

105 self.assertEqual(status, 1) 

106 self.assertEqual(message, "Permission problem with Schedd service.") 

107 

108 @unittest.mock.patch("lsst.ctrl.bps.htcondor.htcondor_service._get_status_from_id") 

109 @unittest.mock.patch("lsst.ctrl.bps.htcondor.htcondor_service._locate_schedds") 

110 @unittest.mock.patch("lsst.ctrl.bps.htcondor.htcondor_service._wms_id_type") 

111 def testGetStatusLocal(self, mock_type, mock_locate, mock_status): 

112 mock_type.return_value = htcondor_service.WmsIdType.LOCAL 

113 mock_locate.return_value = {} 

114 mock_status.return_value = (WmsStates.RUNNING, "") 

115 

116 fake_id = "100" 

117 state, message = self.service.get_status(fake_id) 

118 

119 mock_type.assert_called_once_with(fake_id) 

120 mock_locate.assert_called_once_with(locate_all=False) 

121 mock_status.assert_called_once_with(fake_id, 1, schedds={}) 

122 

123 self.assertEqual(state, WmsStates.RUNNING) 

124 self.assertEqual(message, "") 

125 

126 @unittest.mock.patch("lsst.ctrl.bps.htcondor.htcondor_service._get_status_from_id") 

127 @unittest.mock.patch("lsst.ctrl.bps.htcondor.htcondor_service._locate_schedds") 

128 @unittest.mock.patch("lsst.ctrl.bps.htcondor.htcondor_service._wms_id_type") 

129 def testGetStatusGlobal(self, mock_type, mock_locate, mock_status): 

130 mock_type.return_value = htcondor_service.WmsIdType.GLOBAL 

131 mock_locate.return_value = {} 

132 fake_message = "" 

133 mock_status.return_value = (WmsStates.RUNNING, fake_message) 

134 

135 fake_id = "100" 

136 state, message = self.service.get_status(fake_id, 2) 

137 

138 mock_type.assert_called_once_with(fake_id) 

139 mock_locate.assert_called_once_with(locate_all=True) 

140 mock_status.assert_called_once_with(fake_id, 2, schedds={}) 

141 

142 self.assertEqual(state, WmsStates.RUNNING) 

143 self.assertEqual(message, fake_message) 

144 

145 @unittest.mock.patch("lsst.ctrl.bps.htcondor.htcondor_service._get_status_from_path") 

146 @unittest.mock.patch("lsst.ctrl.bps.htcondor.htcondor_service._wms_id_type") 

147 def testGetStatusPath(self, mock_type, mock_status): 

148 fake_message = "fake message" 

149 mock_type.return_value = htcondor_service.WmsIdType.PATH 

150 mock_status.return_value = (WmsStates.FAILED, fake_message) 

151 

152 fake_id = "/fake/path" 

153 state, message = self.service.get_status(fake_id) 

154 

155 mock_type.assert_called_once_with(fake_id) 

156 mock_status.assert_called_once_with(fake_id) 

157 

158 self.assertEqual(state, WmsStates.FAILED) 

159 self.assertEqual(message, fake_message) 

160 

161 @unittest.mock.patch("lsst.ctrl.bps.htcondor.htcondor_service._wms_id_type") 

162 def testGetStatusUnknownType(self, mock_type): 

163 mock_type.return_value = htcondor_service.WmsIdType.UNKNOWN 

164 

165 fake_id = "100.0" 

166 state, message = self.service.get_status(fake_id) 

167 

168 mock_type.assert_called_once_with(fake_id) 

169 

170 self.assertEqual(state, WmsStates.UNKNOWN) 

171 self.assertEqual(message, "Invalid job id") 

172 

173 @unittest.mock.patch("lsst.ctrl.bps.htcondor.htcondor_workflow.HTCondorWorkflow.write") 

174 def testPrepare(self, mock_write): 

175 generic_workflow = make_3_label_workflow("test1", True) 

176 config = BpsConfig( 

177 { 

178 "bpsUseShared": True, 

179 "overwriteJobFiles": False, 

180 "memoryLimit": 491520, 

181 "profile": {}, 

182 "attrs": {}, 

183 "nodeset": "set1", 

184 } 

185 ) 

186 

187 with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir: 

188 htc_workflow = self.service.prepare(config, generic_workflow, tmpdir) 

189 mock_write.assert_called_once() 

190 self.assertEqual(len(htc_workflow.dag), 19) # 3 visit * 2 detectors * 3 labels + init 

191 

192 @unittest.mock.patch("lsst.ctrl.bps.htcondor.htcondor_workflow.HTCondorWorkflow.write") 

193 def testPrepareProvision(self, mock_write): 

194 # Leaves testing provisioning code to test_provisioner.py. 

195 # Just checking HTCondorService.prepare bits (like nodeset). 

196 timestamp = "20260130T211713Z" 

197 generic_workflow = make_3_label_workflow("test1", True) 

198 config = BpsConfig( 

199 { 

200 "bpsUseShared": True, 

201 "overwriteJobFiles": False, 

202 "profile": {"requirements": "dummy_val == 3"}, 

203 "attrs": {}, 

204 "nodeset": "set1", # this shouldn't be used with auto-provisioning 

205 "provisionResources": True, 

206 "provisioning": {"provisioningMaxWallTime": 1200}, 

207 "bps_defined": {"timestamp": timestamp}, 

208 }, 

209 defaults=Config(HTC_DEFAULTS_URI), 

210 ) 

211 

212 with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmpdir: 

213 prov_config = Path(f"{tmpdir}/condor-info.py") 

214 config[".provisioning.provisioningScriptConfigPath"] = str(prov_config) 

215 config[".provisioning.provisioningScriptConfig"] = "foo" 

216 

217 htc_workflow = self.service.prepare(config, generic_workflow, tmpdir) 

218 mock_write.assert_called_once() 

219 self.assertEqual(config[".bps_defined.nodeset"], timestamp) 

220 self.assertEqual(len(htc_workflow.dag), 19) # 3 visit * 2 dets * 3 labels + init 

221 self.assertIsNotNone(htc_workflow.dag.graph["service_job"]) 

222 

223 prov_script = Path(tmpdir) / "provisioningJob.bash" 

224 self.assertTrue(prov_script.is_file()) 

225 script_contents = prov_script.read_text() 

226 self.assertIn(f"--nodeset {timestamp}", script_contents)