Coverage for tests / test_lssthtc.py: 19%

570 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-26 09:05 +0000

1# This file is part of ctrl_bps_htcondor. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <https://www.gnu.org/licenses/>. 

27"""Unit tests for classes and functions in lssthtc.py.""" 

28 

29import io 

30import logging 

31import os 

32import pathlib 

33import stat 

34import sys 

35import tempfile 

36import unittest 

37from shutil import copy2, copytree, ignore_patterns, rmtree, which 

38 

39import htcondor 

40 

41from lsst.ctrl.bps import BpsConfig 

42from lsst.ctrl.bps.htcondor import dagman_configurator, htcondor_config, lssthtc 

43from lsst.daf.butler import Config 

44from lsst.utils.tests import temporaryDirectory 

45 

46logger = logging.getLogger("lsst.ctrl.bps.htcondor") 

47TESTDIR = os.path.abspath(os.path.dirname(__file__)) 

48 

49 

50class TestLsstHtc(unittest.TestCase): 

51 """Test basic usage.""" 

52 

53 def testHtcEscapeInt(self): 

54 self.assertEqual(lssthtc.htc_escape(100), 100) 

55 

56 def testHtcEscapeDouble(self): 

57 self.assertEqual(lssthtc.htc_escape('"double"'), '""double""') 

58 

59 def testHtcEscapeSingle(self): 

60 self.assertEqual(lssthtc.htc_escape("'single'"), "''single''") 

61 

62 def testHtcEscapeNoSideEffect(self): 

63 val = "'val'" 

64 self.assertEqual(lssthtc.htc_escape(val), "''val''") 

65 self.assertEqual(val, "'val'") 

66 

67 def testHtcEscapeQuot(self): 

68 self.assertEqual(lssthtc.htc_escape("&quot;val&quot;"), '"val"') 

69 

70 def testHtcVersion(self): 

71 ver = lssthtc.htc_version() 

72 self.assertRegex(ver, r"^\d+\.\d+\.\d+$") 

73 

74 

75class HtcTweakJobInfoTestCase(unittest.TestCase): 

76 """Test the function responsible for massaging job information.""" 

77 

78 def setUp(self): 

79 self.log_dir = tempfile.TemporaryDirectory() 

80 self.log_dirname = pathlib.Path(self.log_dir.name) 

81 self.job = { 

82 "Cluster": 1, 

83 "Proc": 0, 

84 "Iwd": str(self.log_dirname), 

85 "Owner": self.log_dirname.owner(), 

86 "MyType": None, 

87 "TerminatedNormally": True, 

88 } 

89 

90 def tearDown(self): 

91 self.log_dir.cleanup() 

92 

93 def testDirectAssignments(self): 

94 lssthtc.htc_tweak_log_info(self.log_dirname, self.job) 

95 self.assertEqual(self.job["ClusterId"], self.job["Cluster"]) 

96 self.assertEqual(self.job["ProcId"], self.job["Proc"]) 

97 self.assertEqual(self.job["Iwd"], str(self.log_dirname)) 

98 self.assertEqual(self.job["Owner"], self.log_dirname.owner()) 

99 

100 def testIncompatibleAdPassThru(self): 

101 # Passing a job ad with insufficient information should be a no-op. 

102 expected = {"foo": "bar"} 

103 result = dict(expected) 

104 lssthtc.htc_tweak_log_info(self.log_dirname, result) 

105 self.assertEqual(result, expected) 

106 

107 def testJobStatusAssignmentJobAbortedEvent(self): 

108 job = self.job | {"MyType": "JobAbortedEvent"} 

109 lssthtc.htc_tweak_log_info(self.log_dirname, job) 

110 self.assertTrue("JobStatus" in job) 

111 self.assertEqual(job["JobStatus"], htcondor.JobStatus.REMOVED) 

112 

113 def testJobStatusAssignmentExecuteEvent(self): 

114 job = self.job | {"MyType": "ExecuteEvent"} 

115 lssthtc.htc_tweak_log_info(self.log_dirname, job) 

116 self.assertTrue("JobStatus" in job) 

117 self.assertEqual(job["JobStatus"], htcondor.JobStatus.RUNNING) 

118 

119 def testJobStatusAssignmentSubmitEvent(self): 

120 job = self.job | {"MyType": "SubmitEvent"} 

121 lssthtc.htc_tweak_log_info(self.log_dirname, job) 

122 self.assertTrue("JobStatus" in job) 

123 self.assertEqual(job["JobStatus"], htcondor.JobStatus.IDLE) 

124 

125 def testJobStatusAssignmentJobHeldEvent(self): 

126 job = self.job | {"MyType": "JobHeldEvent"} 

127 lssthtc.htc_tweak_log_info(self.log_dirname, job) 

128 self.assertTrue("JobStatus" in job) 

129 self.assertEqual(job["JobStatus"], htcondor.JobStatus.HELD) 

130 

131 def testJobStatusAssignmentJobTerminatedEvent(self): 

132 job = self.job | {"MyType": "JobTerminatedEvent"} 

133 lssthtc.htc_tweak_log_info(self.log_dirname, job) 

134 self.assertTrue("JobStatus" in job) 

135 self.assertEqual(job["JobStatus"], htcondor.JobStatus.COMPLETED) 

136 

137 def testJobStatusAssignmentPostScriptTerminatedEvent(self): 

138 job = self.job | {"MyType": "PostScriptTerminatedEvent"} 

139 lssthtc.htc_tweak_log_info(self.log_dirname, job) 

140 self.assertTrue("JobStatus" in job) 

141 self.assertEqual(job["JobStatus"], htcondor.JobStatus.COMPLETED) 

142 

143 def testJobStatusAssignmentReleaseEventMainDagJob(self): 

144 job = self.job | {"MyType": "JobReleaseEvent"} 

145 lssthtc.htc_tweak_log_info(self.log_dirname, job) 

146 self.assertTrue("JobStatus" in job) 

147 self.assertEqual(job["JobStatus"], htcondor.JobStatus.RUNNING) 

148 

149 def testJobStatusAssignmentReleaseEventForNodeJob(self): 

150 job = self.job | {"MyType": "JobReleaseEvent", "DAGNodeName": "test_payload_job"} 

151 lssthtc.htc_tweak_log_info(self.log_dirname, job) 

152 self.assertTrue("JobStatus" in job) 

153 self.assertEqual(job["JobStatus"], None) 

154 

155 def testAddingExitStatusSuccess(self): 

156 job = self.job | { 

157 "MyType": "JobTerminatedEvent", 

158 "ToE": {"ExitBySignal": False, "ExitCode": 1}, 

159 } 

160 lssthtc.htc_tweak_log_info(self.log_dirname, job) 

161 self.assertIn("ExitBySignal", job) 

162 self.assertIs(job["ExitBySignal"], False) 

163 self.assertIn("ExitCode", job) 

164 self.assertEqual(job["ExitCode"], 1) 

165 

166 def testAddingExitStatusFailure(self): 

167 job = self.job | { 

168 "MyType": "JobHeldEvent", 

169 } 

170 with self.assertLogs(logger=logger, level="ERROR") as cm: 

171 lssthtc.htc_tweak_log_info(self.log_dirname, job) 

172 self.assertIn("Could not determine exit status", cm.output[0]) 

173 

174 def testLoggingUnknownLogEvent(self): 

175 job = self.job | {"MyType": "Foo"} 

176 with self.assertLogs(logger=logger, level="DEBUG") as cm: 

177 lssthtc.htc_tweak_log_info(self.log_dirname, job) 

178 self.assertIn("Unknown log event", cm.output[1]) 

179 

180 def testMissingKey(self): 

181 job = self.job 

182 del job["Cluster"] 

183 with self.assertRaises(KeyError) as cm: 

184 lssthtc.htc_tweak_log_info(self.log_dirname, job) 

185 self.assertEqual(str(cm.exception), "'Cluster'") 

186 

187 

188class HtcCheckDagmanOutputTestCase(unittest.TestCase): 

189 """Test htc_check_dagman_output function.""" 

190 

191 def test_missing_output_file(self): 

192 with temporaryDirectory() as tmp_dir: 

193 with self.assertRaises(FileNotFoundError): 

194 _ = lssthtc.htc_check_dagman_output(tmp_dir) 

195 

196 def test_permissions_output_file(self): 

197 with temporaryDirectory() as tmp_dir: 

198 copy2(f"{TESTDIR}/data/test_tmpdir_abort.dag.dagman.out", tmp_dir) 

199 os.chmod(f"{tmp_dir}/test_tmpdir_abort.dag.dagman.out", 0o200) 

200 print(os.stat(f"{tmp_dir}/test_tmpdir_abort.dag.dagman.out")) 

201 results = lssthtc.htc_check_dagman_output(tmp_dir) 

202 os.chmod(f"{tmp_dir}/test_tmpdir_abort.dag.dagman.out", 0o600) 

203 self.assertIn("Could not read dagman output file", results) 

204 

205 def test_submit_failure(self): 

206 with temporaryDirectory() as tmp_dir: 

207 copy2(f"{TESTDIR}/data/bad_submit.dag.dagman.out", tmp_dir) 

208 results = lssthtc.htc_check_dagman_output(tmp_dir) 

209 self.assertIn("Warn: Job submission issues (last: ", results) 

210 

211 def test_tmpdir_abort(self): 

212 with temporaryDirectory() as tmp_dir: 

213 copy2(f"{TESTDIR}/data/test_tmpdir_abort.dag.dagman.out", tmp_dir) 

214 results = lssthtc.htc_check_dagman_output(tmp_dir) 

215 self.assertIn("Cannot submit from /tmp", results) 

216 

217 def test_no_messages(self): 

218 with temporaryDirectory() as tmp_dir: 

219 copy2(f"{TESTDIR}/data/test_no_messages.dag.dagman.out", tmp_dir) 

220 results = lssthtc.htc_check_dagman_output(tmp_dir) 

221 self.assertEqual("", results) 

222 

223 

224class SummarizeDagTestCase(unittest.TestCase): 

225 """Test summarize_dag function.""" 

226 

227 def test_no_dag_file(self): 

228 with temporaryDirectory() as tmp_dir: 

229 summary, job_name_to_pipetask, job_name_to_type = lssthtc.summarize_dag(tmp_dir) 

230 self.assertFalse(len(job_name_to_pipetask)) 

231 self.assertFalse(len(job_name_to_type)) 

232 self.assertFalse(summary) 

233 

234 def test_success(self): 

235 with temporaryDirectory() as tmp_dir: 

236 copy2(f"{TESTDIR}/data/good.dag", tmp_dir) 

237 summary, job_name_to_label, job_name_to_type = lssthtc.summarize_dag(tmp_dir) 

238 self.assertEqual(summary, "pipetaskInit:1;label1:1;label2:1;label3:1;finalJob:1") 

239 self.assertEqual( 

240 job_name_to_label, 

241 { 

242 "pipetaskInit": "pipetaskInit", 

243 "0682f8f9-12f0-40a5-971e-8b30c7231e5c_label1_val1_val2": "label1", 

244 "d0305e2d-f164-4a85-bd24-06afe6c84ed9_label2_val1_val2": "label2", 

245 "2806ecc9-1bba-4362-8fff-ab4e6abb9f83_label3_val1_val2": "label3", 

246 "finalJob": "finalJob", 

247 }, 

248 ) 

249 self.assertEqual( 

250 job_name_to_type, 

251 { 

252 "pipetaskInit": lssthtc.WmsNodeType.PAYLOAD, 

253 "0682f8f9-12f0-40a5-971e-8b30c7231e5c_label1_val1_val2": lssthtc.WmsNodeType.PAYLOAD, 

254 "d0305e2d-f164-4a85-bd24-06afe6c84ed9_label2_val1_val2": lssthtc.WmsNodeType.PAYLOAD, 

255 "2806ecc9-1bba-4362-8fff-ab4e6abb9f83_label3_val1_val2": lssthtc.WmsNodeType.PAYLOAD, 

256 "finalJob": lssthtc.WmsNodeType.FINAL, 

257 }, 

258 ) 

259 

260 def test_service(self): 

261 with temporaryDirectory() as tmp_dir: 

262 copy2(f"{TESTDIR}/data/tiny_problems/tiny_problems.dag", tmp_dir) 

263 summary, job_name_to_label, job_name_to_type = lssthtc.summarize_dag(tmp_dir) 

264 self.assertEqual(summary, "pipetaskInit:1;label1:2;label2:2;finalJob:1") 

265 self.assertEqual( 

266 job_name_to_label, 

267 { 

268 "pipetaskInit": "pipetaskInit", 

269 "057c8caf-66f6-4612-abf7-cdea5b666b1b_label1_val1a_val2b": "label1", 

270 "4a7f478b-2e9b-435c-a730-afac3f621658_label1_val1a_val2a": "label1", 

271 "40040b97-606d-4997-98d3-e0493055fe7e_label2_val1a_val2b": "label2", 

272 "696ee50d-e711-40d6-9caf-ee29ae4a656d_label2_val1a_val2a": "label2", 

273 "finalJob": "finalJob", 

274 "provisioningJob": "provisioningJob", 

275 }, 

276 ) 

277 self.assertEqual( 

278 job_name_to_type, 

279 { 

280 "pipetaskInit": lssthtc.WmsNodeType.PAYLOAD, 

281 "057c8caf-66f6-4612-abf7-cdea5b666b1b_label1_val1a_val2b": lssthtc.WmsNodeType.PAYLOAD, 

282 "4a7f478b-2e9b-435c-a730-afac3f621658_label1_val1a_val2a": lssthtc.WmsNodeType.PAYLOAD, 

283 "40040b97-606d-4997-98d3-e0493055fe7e_label2_val1a_val2b": lssthtc.WmsNodeType.PAYLOAD, 

284 "696ee50d-e711-40d6-9caf-ee29ae4a656d_label2_val1a_val2a": lssthtc.WmsNodeType.PAYLOAD, 

285 "finalJob": lssthtc.WmsNodeType.FINAL, 

286 "provisioningJob": lssthtc.WmsNodeType.SERVICE, 

287 }, 

288 ) 

289 

290 def test_noop(self): 

291 with temporaryDirectory() as tmp_dir: 

292 copy2(f"{TESTDIR}/data/noop_running_1/noop_running_1.dag", tmp_dir) 

293 summary, job_name_to_label, job_name_to_type = lssthtc.summarize_dag(tmp_dir) 

294 self.assertEqual( 

295 set(summary.split(";")), 

296 {"pipetaskInit:1", "label1:6", "label2:6", "label3:6", "label4:6", "label5:6", "finalJob:1"}, 

297 ) 

298 self.assertEqual( 

299 job_name_to_label, 

300 { 

301 "label1_val1a_val2a": "label1", 

302 "label1_val1a_val2b": "label1", 

303 "label1_val1b_val2a": "label1", 

304 "label1_val1b_val2b": "label1", 

305 "label1_val1c_val2a": "label1", 

306 "label1_val1c_val2b": "label1", 

307 "label2_val1a_val2a": "label2", 

308 "label2_val1a_val2b": "label2", 

309 "label2_val1b_val2a": "label2", 

310 "label2_val1b_val2b": "label2", 

311 "label2_val1c_val2a": "label2", 

312 "label2_val1c_val2b": "label2", 

313 "label3_val1a_val2a": "label3", 

314 "label3_val1a_val2b": "label3", 

315 "label3_val1b_val2a": "label3", 

316 "label3_val1b_val2b": "label3", 

317 "label3_val1c_val2a": "label3", 

318 "label3_val1c_val2b": "label3", 

319 "label4_val1a_val2a": "label4", 

320 "label4_val1a_val2b": "label4", 

321 "label4_val1b_val2a": "label4", 

322 "label4_val1b_val2b": "label4", 

323 "label4_val1c_val2a": "label4", 

324 "label4_val1c_val2b": "label4", 

325 "label5_val1a_val2a": "label5", 

326 "label5_val1a_val2b": "label5", 

327 "label5_val1b_val2a": "label5", 

328 "label5_val1b_val2b": "label5", 

329 "label5_val1c_val2a": "label5", 

330 "label5_val1c_val2b": "label5", 

331 "finalJob": "finalJob", 

332 "pipetaskInit": "pipetaskInit", 

333 "wms_noop_order1_val1a": "order1", 

334 "wms_noop_order1_val1b": "order1", 

335 }, 

336 ) 

337 self.assertEqual( 

338 job_name_to_type, 

339 { 

340 "label1_val1a_val2a": lssthtc.WmsNodeType.PAYLOAD, 

341 "label1_val1a_val2b": lssthtc.WmsNodeType.PAYLOAD, 

342 "label1_val1b_val2a": lssthtc.WmsNodeType.PAYLOAD, 

343 "label1_val1b_val2b": lssthtc.WmsNodeType.PAYLOAD, 

344 "label1_val1c_val2a": lssthtc.WmsNodeType.PAYLOAD, 

345 "label1_val1c_val2b": lssthtc.WmsNodeType.PAYLOAD, 

346 "label2_val1a_val2a": lssthtc.WmsNodeType.PAYLOAD, 

347 "label2_val1a_val2b": lssthtc.WmsNodeType.PAYLOAD, 

348 "label2_val1b_val2a": lssthtc.WmsNodeType.PAYLOAD, 

349 "label2_val1b_val2b": lssthtc.WmsNodeType.PAYLOAD, 

350 "label2_val1c_val2a": lssthtc.WmsNodeType.PAYLOAD, 

351 "label2_val1c_val2b": lssthtc.WmsNodeType.PAYLOAD, 

352 "label3_val1a_val2a": lssthtc.WmsNodeType.PAYLOAD, 

353 "label3_val1a_val2b": lssthtc.WmsNodeType.PAYLOAD, 

354 "label3_val1b_val2a": lssthtc.WmsNodeType.PAYLOAD, 

355 "label3_val1b_val2b": lssthtc.WmsNodeType.PAYLOAD, 

356 "label3_val1c_val2a": lssthtc.WmsNodeType.PAYLOAD, 

357 "label3_val1c_val2b": lssthtc.WmsNodeType.PAYLOAD, 

358 "label4_val1a_val2a": lssthtc.WmsNodeType.PAYLOAD, 

359 "label4_val1a_val2b": lssthtc.WmsNodeType.PAYLOAD, 

360 "label4_val1b_val2a": lssthtc.WmsNodeType.PAYLOAD, 

361 "label4_val1b_val2b": lssthtc.WmsNodeType.PAYLOAD, 

362 "label4_val1c_val2a": lssthtc.WmsNodeType.PAYLOAD, 

363 "label4_val1c_val2b": lssthtc.WmsNodeType.PAYLOAD, 

364 "label5_val1a_val2a": lssthtc.WmsNodeType.PAYLOAD, 

365 "label5_val1a_val2b": lssthtc.WmsNodeType.PAYLOAD, 

366 "label5_val1b_val2a": lssthtc.WmsNodeType.PAYLOAD, 

367 "label5_val1b_val2b": lssthtc.WmsNodeType.PAYLOAD, 

368 "label5_val1c_val2a": lssthtc.WmsNodeType.PAYLOAD, 

369 "label5_val1c_val2b": lssthtc.WmsNodeType.PAYLOAD, 

370 "finalJob": lssthtc.WmsNodeType.FINAL, 

371 "pipetaskInit": lssthtc.WmsNodeType.PAYLOAD, 

372 "wms_noop_order1_val1a": lssthtc.WmsNodeType.NOOP, 

373 "wms_noop_order1_val1b": lssthtc.WmsNodeType.NOOP, 

374 }, 

375 ) 

376 

377 def test_subdags(self): 

378 with temporaryDirectory() as tmp_dir: 

379 submit_dir = os.path.join(tmp_dir, "group_running_1") 

380 copytree(f"{TESTDIR}/data/group_running_1", submit_dir, ignore=ignore_patterns("*~", ".???*")) 

381 summary, job_name_to_label, job_name_to_type = lssthtc.summarize_dag(submit_dir) 

382 self.assertEqual( 

383 set(summary.split(";")), 

384 {"pipetaskInit:1", "label1:6", "label2:6", "label3:6", "label4:6", "label5:6", "finalJob:1"}, 

385 ) 

386 

387 self.assertEqual( 

388 job_name_to_label, 

389 { 

390 "pipetaskInit": "pipetaskInit", 

391 "label1_val1b_val2a": "label1", 

392 "label1_val1c_val2a": "label1", 

393 "label1_val1a_val2b": "label1", 

394 "label1_val1b_val2b": "label1", 

395 "label1_val1c_val2b": "label1", 

396 "label1_val1a_val2a": "label1", 

397 "label2_val1a_val2b": "label2", 

398 "label2_val1a_val2a": "label2", 

399 "label2_val1b_val2a": "label2", 

400 "label2_val1b_val2b": "label2", 

401 "label2_val1c_val2a": "label2", 

402 "label2_val1c_val2b": "label2", 

403 "label3_val1b_val2a": "label3", 

404 "label3_val1c_val2a": "label3", 

405 "label3_val1a_val2b": "label3", 

406 "label3_val1b_val2b": "label3", 

407 "label3_val1c_val2b": "label3", 

408 "label3_val1a_val2a": "label3", 

409 "label4_val1a_val2b": "label4", 

410 "label4_val1a_val2a": "label4", 

411 "label4_val1b_val2a": "label4", 

412 "label4_val1b_val2b": "label4", 

413 "label4_val1c_val2a": "label4", 

414 "label4_val1c_val2b": "label4", 

415 "label5_val1a_val2b": "label5", 

416 "label5_val1a_val2a": "label5", 

417 "label5_val1b_val2a": "label5", 

418 "label5_val1b_val2b": "label5", 

419 "label5_val1c_val2a": "label5", 

420 "label5_val1c_val2b": "label5", 

421 "finalJob": "finalJob", 

422 "provisioningJob": "provisioningJob", 

423 "wms_group_order1_val1a": "order1", 

424 "wms_group_order1_val1b": "order1", 

425 "wms_group_order1_val1c": "order1", 

426 "wms_check_status_wms_group_order1_val1a": "order1", 

427 "wms_check_status_wms_group_order1_val1b": "order1", 

428 "wms_check_status_wms_group_order1_val1c": "order1", 

429 }, 

430 ) 

431 

432 self.assertEqual( 

433 job_name_to_type, 

434 { 

435 "pipetaskInit": lssthtc.WmsNodeType.PAYLOAD, 

436 "label1_val1b_val2a": lssthtc.WmsNodeType.PAYLOAD, 

437 "label1_val1c_val2a": lssthtc.WmsNodeType.PAYLOAD, 

438 "label1_val1a_val2b": lssthtc.WmsNodeType.PAYLOAD, 

439 "label1_val1b_val2b": lssthtc.WmsNodeType.PAYLOAD, 

440 "label1_val1c_val2b": lssthtc.WmsNodeType.PAYLOAD, 

441 "label1_val1a_val2a": lssthtc.WmsNodeType.PAYLOAD, 

442 "label2_val1a_val2b": lssthtc.WmsNodeType.PAYLOAD, 

443 "label2_val1a_val2a": lssthtc.WmsNodeType.PAYLOAD, 

444 "label2_val1b_val2a": lssthtc.WmsNodeType.PAYLOAD, 

445 "label2_val1b_val2b": lssthtc.WmsNodeType.PAYLOAD, 

446 "label2_val1c_val2a": lssthtc.WmsNodeType.PAYLOAD, 

447 "label2_val1c_val2b": lssthtc.WmsNodeType.PAYLOAD, 

448 "label3_val1b_val2a": lssthtc.WmsNodeType.PAYLOAD, 

449 "label3_val1c_val2a": lssthtc.WmsNodeType.PAYLOAD, 

450 "label3_val1a_val2b": lssthtc.WmsNodeType.PAYLOAD, 

451 "label3_val1b_val2b": lssthtc.WmsNodeType.PAYLOAD, 

452 "label3_val1c_val2b": lssthtc.WmsNodeType.PAYLOAD, 

453 "label3_val1a_val2a": lssthtc.WmsNodeType.PAYLOAD, 

454 "label4_val1a_val2b": lssthtc.WmsNodeType.PAYLOAD, 

455 "label4_val1a_val2a": lssthtc.WmsNodeType.PAYLOAD, 

456 "label4_val1b_val2a": lssthtc.WmsNodeType.PAYLOAD, 

457 "label4_val1b_val2b": lssthtc.WmsNodeType.PAYLOAD, 

458 "label4_val1c_val2a": lssthtc.WmsNodeType.PAYLOAD, 

459 "label4_val1c_val2b": lssthtc.WmsNodeType.PAYLOAD, 

460 "label5_val1a_val2b": lssthtc.WmsNodeType.PAYLOAD, 

461 "label5_val1a_val2a": lssthtc.WmsNodeType.PAYLOAD, 

462 "label5_val1b_val2a": lssthtc.WmsNodeType.PAYLOAD, 

463 "label5_val1b_val2b": lssthtc.WmsNodeType.PAYLOAD, 

464 "label5_val1c_val2a": lssthtc.WmsNodeType.PAYLOAD, 

465 "label5_val1c_val2b": lssthtc.WmsNodeType.PAYLOAD, 

466 "finalJob": lssthtc.WmsNodeType.FINAL, 

467 "provisioningJob": lssthtc.WmsNodeType.SERVICE, 

468 "wms_group_order1_val1a": lssthtc.WmsNodeType.SUBDAG, 

469 "wms_group_order1_val1b": lssthtc.WmsNodeType.SUBDAG, 

470 "wms_group_order1_val1c": lssthtc.WmsNodeType.SUBDAG, 

471 "wms_check_status_wms_group_order1_val1a": lssthtc.WmsNodeType.SUBDAG_CHECK, 

472 "wms_check_status_wms_group_order1_val1b": lssthtc.WmsNodeType.SUBDAG_CHECK, 

473 "wms_check_status_wms_group_order1_val1c": lssthtc.WmsNodeType.SUBDAG_CHECK, 

474 }, 

475 ) 

476 

477 

478class ReadDagNodesLogTestCase(unittest.TestCase): 

479 """Test read_dag_nodes_log function.""" 

480 

481 def setUp(self): 

482 self.tmpdir = tempfile.mkdtemp() 

483 

484 def tearDown(self): 

485 rmtree(self.tmpdir, ignore_errors=True) 

486 

487 def testFileMissing(self): 

488 with self.assertRaisesRegex(FileNotFoundError, "DAGMan node log not found in"): 

489 _ = lssthtc.read_dag_nodes_log(self.tmpdir) 

490 

491 def testRegular(self): 

492 with temporaryDirectory() as tmp_dir: 

493 submit_dir = os.path.join(tmp_dir, "tiny_problems") 

494 copytree(f"{TESTDIR}/data/tiny_problems", submit_dir, ignore=ignore_patterns("*~", ".???*")) 

495 results = lssthtc.read_dag_nodes_log(submit_dir) 

496 self.assertEqual(results["9231.0"]["Cluster"], 9231) 

497 self.assertEqual(results["9231.0"]["Proc"], 0) 

498 self.assertEqual(results["9231.0"]["ToE"]["ExitCode"], 1) 

499 self.assertEqual(len(results), 6) 

500 

501 def testSubdags(self): 

502 """Making sure it gets data from subdag dirs and doesn't 

503 fail if some subdags haven't started running yet. 

504 """ 

505 with temporaryDirectory() as tmp_dir: 

506 submit_dir = os.path.join(tmp_dir, "group_running_1") 

507 copytree(f"{TESTDIR}/data/group_running_1", submit_dir, ignore=ignore_patterns("*~", ".???*")) 

508 results = lssthtc.read_dag_nodes_log(submit_dir) 

509 # main dag 

510 self.assertEqual(results["10094.0"]["Cluster"], 10094) 

511 # subdag 

512 self.assertEqual(results["10112.0"]["Cluster"], 10112) 

513 self.assertEqual(results["10116.0"]["Cluster"], 10116) 

514 

515 

516class ReadNodeStatusTestCase(unittest.TestCase): 

517 """Test read_node_status function.""" 

518 

519 def setUp(self): 

520 self.tmpdir = tempfile.mkdtemp() 

521 

522 def tearDown(self): 

523 rmtree(self.tmpdir, ignore_errors=True) 

524 

525 def testServiceJobNotSubmitted(self): 

526 # tiny_prov_no_submit files have successful workflow 

527 # but provisioningJob could not submit. 

528 copy2(f"{TESTDIR}/data/tiny_prov_no_submit/tiny_prov_no_submit.dag.nodes.log", self.tmpdir) 

529 copy2(f"{TESTDIR}/data/tiny_prov_no_submit/tiny_prov_no_submit.dag.dagman.log", self.tmpdir) 

530 copy2(f"{TESTDIR}/data/tiny_prov_no_submit/tiny_prov_no_submit.node_status", self.tmpdir) 

531 copy2(f"{TESTDIR}/data/tiny_prov_no_submit/tiny_prov_no_submit.dag", self.tmpdir) 

532 

533 jobs = lssthtc.read_node_status(self.tmpdir) 

534 found = [ 

535 id_ 

536 for id_ in jobs 

537 if jobs[id_].get("wms_node_type", lssthtc.WmsNodeType.UNKNOWN) == lssthtc.WmsNodeType.SERVICE 

538 ] 

539 self.assertEqual(len(found), 1) 

540 self.assertEqual(jobs[found[0]]["DAGNodeName"], "provisioningJob") 

541 self.assertEqual(jobs[found[0]]["NodeStatus"], lssthtc.NodeStatus.NOT_READY) 

542 

543 def testMissingStatusFile(self): 

544 copy2(f"{TESTDIR}/data/tiny_problems/tiny_problems.dag.nodes.log", self.tmpdir) 

545 copy2(f"{TESTDIR}/data/tiny_problems/tiny_problems.dag.dagman.log", self.tmpdir) 

546 copy2(f"{TESTDIR}/data/tiny_problems/tiny_problems.dag", self.tmpdir) 

547 

548 jobs = lssthtc.read_node_status(self.tmpdir) 

549 self.assertEqual(len(jobs), 7) 

550 self.assertEqual(jobs["9230.0"]["DAGNodeName"], "pipetaskInit") 

551 self.assertEqual(jobs["9230.0"]["wms_node_type"], lssthtc.WmsNodeType.PAYLOAD) 

552 found = [ 

553 id_ 

554 for id_ in jobs 

555 if jobs[id_].get("wms_node_type", lssthtc.WmsNodeType.UNKNOWN) == lssthtc.WmsNodeType.SERVICE 

556 ] 

557 self.assertEqual(len(found), 1) 

558 self.assertEqual(jobs[found[0]]["DAGNodeName"], "provisioningJob") 

559 

560 def testSubdagsRunning(self): 

561 with temporaryDirectory() as tmp_dir: 

562 test_tmp_dir = pathlib.Path(tmp_dir) 

563 submit_dir = test_tmp_dir / "submit" 

564 copytree(f"{TESTDIR}/data/group_running_1", submit_dir, ignore=ignore_patterns("*~", ".???*")) 

565 jobs = lssthtc.read_node_status(submit_dir) 

566 self.assertEqual(len(jobs), 39) # includes non-payload jobs 

567 # not guaranteed ids are same, so use names instead 

568 job_name_to_id = {} 

569 for id_, info in jobs.items(): 

570 job_name_to_id[info.get("DAGNodeName", id_)] = id_ 

571 job_type_to_names = {} 

572 for id_, info in jobs.items(): 

573 job_type_to_names.setdefault( 

574 info.get("wms_node_type", lssthtc.WmsNodeType.UNKNOWN), set() 

575 ).add(info.get("DAGNodeName", id_)) 

576 

577 # check counts 

578 self.assertNotIn(lssthtc.WmsNodeType.NOOP, job_type_to_names) 

579 self.assertEqual(len(job_type_to_names[lssthtc.WmsNodeType.PAYLOAD]), 31) 

580 self.assertEqual(len(job_type_to_names[lssthtc.WmsNodeType.FINAL]), 1) 

581 self.assertEqual(len(job_type_to_names[lssthtc.WmsNodeType.SERVICE]), 1) 

582 self.assertEqual(len(job_type_to_names[lssthtc.WmsNodeType.SUBDAG]), 3) 

583 self.assertEqual(len(job_type_to_names[lssthtc.WmsNodeType.SUBDAG_CHECK]), 3) 

584 

585 # spot check some statuses 

586 self.assertEqual( 

587 jobs[job_name_to_id["label3_val1a_val2b"]]["NodeStatus"], lssthtc.NodeStatus.DONE 

588 ) 

589 self.assertEqual( 

590 jobs[job_name_to_id["wms_group_order1_val1a"]]["NodeStatus"], lssthtc.NodeStatus.SUBMITTED 

591 ) 

592 self.assertEqual( 

593 jobs[job_name_to_id["label5_val1a_val2a"]]["NodeStatus"], lssthtc.NodeStatus.NOT_READY 

594 ) 

595 self.assertEqual( 

596 jobs[job_name_to_id["label2_val1a_val2a"]]["NodeStatus"], lssthtc.NodeStatus.DONE 

597 ) 

598 

599 def testSubdagsFailed(self): 

600 with temporaryDirectory() as tmp_dir: 

601 test_tmp_dir = pathlib.Path(tmp_dir) 

602 submit_dir = test_tmp_dir / "submit" 

603 copytree(f"{TESTDIR}/data/group_failed_1", submit_dir, ignore=ignore_patterns("*~", ".???*")) 

604 jobs = lssthtc.read_node_status(submit_dir) 

605 self.assertEqual(len(jobs), 39) 

606 # not guaranteed ids are same, so use names instead 

607 job_name_to_id = {} 

608 for id_, info in jobs.items(): 

609 job_name_to_id[info.get("DAGNodeName", id_)] = id_ 

610 job_type_to_names = {} 

611 for id_, info in jobs.items(): 

612 job_type_to_names.setdefault( 

613 info.get("wms_node_type", lssthtc.WmsNodeType.UNKNOWN), set() 

614 ).add(info.get("DAGNodeName", id_)) 

615 

616 # check counts 

617 self.assertNotIn(lssthtc.WmsNodeType.NOOP, job_type_to_names) 

618 self.assertEqual(len(job_type_to_names[lssthtc.WmsNodeType.PAYLOAD]), 31) 

619 self.assertEqual(len(job_type_to_names[lssthtc.WmsNodeType.FINAL]), 1) 

620 self.assertEqual(len(job_type_to_names[lssthtc.WmsNodeType.SERVICE]), 1) 

621 self.assertEqual(len(job_type_to_names[lssthtc.WmsNodeType.SUBDAG]), 3) 

622 self.assertEqual(len(job_type_to_names[lssthtc.WmsNodeType.SUBDAG_CHECK]), 3) 

623 

624 # spot check some statuses 

625 self.assertEqual( 

626 jobs[job_name_to_id["label3_val1a_val2b"]]["NodeStatus"], lssthtc.NodeStatus.DONE 

627 ) 

628 self.assertEqual( 

629 jobs[job_name_to_id["wms_group_order1_val1a"]]["NodeStatus"], lssthtc.NodeStatus.DONE 

630 ) 

631 self.assertEqual( 

632 jobs[job_name_to_id["label5_val1a_val2a"]]["NodeStatus"], lssthtc.NodeStatus.DONE 

633 ) 

634 

635 self.assertEqual( 

636 jobs[job_name_to_id["label5_val1b_val2a"]]["NodeStatus"], lssthtc.NodeStatus.FUTILE 

637 ) 

638 self.assertEqual( 

639 jobs[job_name_to_id["wms_group_order1_val1b"]]["NodeStatus"], lssthtc.NodeStatus.DONE 

640 ) 

641 self.assertEqual( 

642 jobs[job_name_to_id["wms_check_status_wms_group_order1_val1b"]]["NodeStatus"], 

643 lssthtc.NodeStatus.ERROR, 

644 ) 

645 

646 

647class HTCJobTestCase(unittest.TestCase): 

648 """Test HTCJob methods.""" 

649 

650 def testWriteDagCommandsPayload(self): 

651 job = lssthtc.HTCJob( 

652 "job1", 

653 "label1", 

654 {"executable": "/bin/sleep", "arguments": "60", "log": "job1.log"}, 

655 {"dir": "jobs/label1"}, 

656 ) 

657 job.subfile = "job1.sub" 

658 

659 mockfh = io.StringIO() 

660 job.write_dag_commands(mockfh, "../..") 

661 self.assertIn('JOB job1 "job1.sub" DIR "../../jobs/label1"', mockfh.getvalue()) 

662 

663 def testWriteDagCommandsNotJob(self): 

664 # Testing giving command_name, no dag_rel_path and no dir 

665 job = lssthtc.HTCJob( 

666 "finalJob", 

667 "finalJob", 

668 {"executable": "/bin/sleep", "arguments": "60", "log": "job1.log"}, 

669 ) 

670 job.subfile = "jobs/finalJob/finalJob.sub" 

671 mockfh = io.StringIO() 

672 job.write_dag_commands(mockfh, "", "FINAL") 

673 self.assertIn('FINAL finalJob "jobs/finalJob/finalJob.sub"', mockfh.getvalue()) 

674 

675 def testWriteDagCommandsNoop(self): 

676 job = lssthtc.HTCJob("wms_noop_job1", "label1", {}, {"noop": True}) 

677 job.subfile = "notthere.sub" 

678 mockfh = io.StringIO() 

679 job.write_dag_commands(mockfh, "") 

680 self.assertIn("NOOP", mockfh.getvalue()) 

681 

682 def testWriteSubmitFile(self): 

683 job = lssthtc.HTCJob( 

684 "job1", 

685 "label1", 

686 {"executable": "/bin/sleep", "arguments": "60", "log": "job1.log"}, 

687 ) 

688 with temporaryDirectory() as tmp_dir: 

689 filename = pathlib.Path(tmp_dir) / "label1/job1.sub" 

690 job.write_submit_file(filename.parent) 

691 self.assertTrue(filename.exists()) 

692 # Try to make Submit object from file to find any syntax issues 

693 _ = lssthtc.htc_create_submit_from_file(filename) 

694 

695 def testWriteSubmitFileExists(self): 

696 job = lssthtc.HTCJob( 

697 "job1", 

698 "label1", 

699 {"executable": "/bin/sleep", "arguments": "60", "log": "job1.log"}, 

700 ) 

701 with temporaryDirectory() as tmp_dir: 

702 filename = pathlib.Path(tmp_dir) / "job1.sub" 

703 job.subfile = filename 

704 with open(filename, "w"): 

705 pass # make empty file 

706 job.write_submit_file(filename.parent) 

707 # make sure didn't overwrite file 

708 self.assertEqual(filename.stat().st_size, 0, "Incorrectly overwrote existing file") 

709 

710 

711class HtcWriteJobCommands(unittest.TestCase): 

712 """Test _htc_write_job_commands function.""" 

713 

714 def testAllCommands(self): 

715 dag_cmds = { 

716 "pre": { 

717 "defer": {"status": 1, "time": 120}, 

718 "debug": {"filename": "debug_pre.txt", "type": "ALL"}, 

719 "executable": "exec1", 

720 "arguments": "arg1 arg2", 

721 }, 

722 "post": { 

723 "defer": {"status": 2, "time": 180}, 

724 "debug": {"filename": "debug_post.txt", "type": "ALL"}, 

725 "executable": "exec2", 

726 "arguments": "arg3 arg4", 

727 }, 

728 "vars": {"num": 8, "spaces": "a space"}, 

729 "pre_skip": "1", 

730 "retry": 3, 

731 "retry_unless_exit": 1, 

732 "abort_dag_on": {"node_exit": 100, "abort_exit": 4}, 

733 "priority": 123, 

734 } 

735 

736 truth = """SCRIPT DEFER 1 120 DEBUG debug_pre.txt ALL PRE job1 exec1 arg1 arg2 

737SCRIPT DEFER 2 180 DEBUG debug_post.txt ALL POST job1 exec2 arg3 arg4 

738VARS job1 num="8" 

739VARS job1 spaces="a space" 

740PRE_SKIP job1 1 

741RETRY job1 3 UNLESS-EXIT 1 

742ABORT-DAG-ON job1 100 RETURN 4 

743PRIORITY job1 123 

744""" 

745 mockfh = io.StringIO() 

746 lssthtc._htc_write_job_commands(mockfh, "job1", dag_cmds) 

747 self.assertEqual(mockfh.getvalue(), truth) 

748 

749 def testPartialCommands(self): 

750 # Trigger skipping the inner if clauses. 

751 dag_cmds = { 

752 "pre": { 

753 "executable": "exec1", 

754 }, 

755 "post": { 

756 "executable": "exec2", 

757 }, 

758 "vars": {"num": 8, "spaces": "a space"}, 

759 "pre_skip": "1", 

760 "retry": 3, 

761 } 

762 

763 truth = """SCRIPT PRE job1 exec1 

764SCRIPT POST job1 exec2 

765VARS job1 num="8" 

766VARS job1 spaces="a space" 

767PRE_SKIP job1 1 

768RETRY job1 3 

769""" 

770 mockfh = io.StringIO() 

771 lssthtc._htc_write_job_commands(mockfh, "job1", dag_cmds) 

772 self.assertEqual(mockfh.getvalue(), truth) 

773 

774 def testNoCommands(self): 

775 dag_cmds = {} 

776 mockfh = io.StringIO() 

777 lssthtc._htc_write_job_commands(mockfh, "job2", dag_cmds) 

778 self.assertEqual(mockfh.getvalue(), "") 

779 

780 def testFinal(self): 

781 self.maxDiff = None 

782 dag_cmds = { 

783 "pre": { 

784 "defer": {"status": 1, "time": 120}, 

785 "debug": {"filename": "debug_pre.txt", "type": "ALL"}, 

786 "executable": "exec1", 

787 "arguments": "arg1 arg2", 

788 }, 

789 "post": { 

790 "defer": {"status": 2, "time": 180}, 

791 "debug": {"filename": "debug_post.txt", "type": "ALL"}, 

792 "executable": "exec2", 

793 "arguments": "arg3 arg4", 

794 }, 

795 "vars": {"num": 8, "spaces": "a space"}, 

796 "pre_skip": "1", 

797 "retry": 3, 

798 "retry_unless_exit": 1, 

799 "abort_dag_on": {"node_exit": 100, "abort_exit": 4}, 

800 "priority": 123, 

801 } 

802 

803 truth = """SCRIPT DEFER 1 120 DEBUG debug_pre.txt ALL PRE finalJob exec1 arg1 arg2 

804SCRIPT DEFER 2 180 DEBUG debug_post.txt ALL POST finalJob exec2 arg3 arg4 

805VARS finalJob num="8" 

806VARS finalJob spaces="a space" 

807PRE_SKIP finalJob 1 

808""" 

809 mockfh = io.StringIO() 

810 lssthtc._htc_write_job_commands(mockfh, "finalJob", dag_cmds, "FINAL") 

811 self.assertEqual(mockfh.getvalue(), truth) 

812 

813 

814class HTCBackupFilesSinglePathTestCase(unittest.TestCase): 

815 """Test htc_backup_files_single_path function.""" 

816 

817 def testSrcDestSame(self): 

818 with temporaryDirectory() as tmp_dir: 

819 with self.assertRaisesRegex( 

820 RuntimeError, "Destination directory is same as the source directory" 

821 ): 

822 lssthtc.htc_backup_files_single_path(tmp_dir, tmp_dir) 

823 

824 def testSuccess(self): 

825 with temporaryDirectory() as tmp_dir: 

826 test_tmp_dir = pathlib.Path(tmp_dir) 

827 submit_dir = test_tmp_dir / "the_src_dir" 

828 copytree(f"{TESTDIR}/data/tiny_success", submit_dir, ignore=ignore_patterns("*~", ".???*")) 

829 backup_dir = test_tmp_dir / "the_dest_dir" 

830 backup_dir.mkdir() 

831 lssthtc.htc_backup_files_single_path(submit_dir, backup_dir) 

832 result_submit = [] 

833 for root, _, files in os.walk(submit_dir): 

834 result_submit.extend([str(os.path.join(os.path.relpath(root, submit_dir), f)) for f in files]) 

835 self.assertEqual( 

836 set(result_submit), 

837 { 

838 "./tiny_success.dag.dagman.log", 

839 "./tiny_success.dag.dagman.out", 

840 "./tiny_success.dag", 

841 }, 

842 ) 

843 result_backup = [] 

844 for root, _, files in os.walk(backup_dir): 

845 result_backup.extend([str(os.path.join(os.path.relpath(root, backup_dir), f)) for f in files]) 

846 self.assertEqual( 

847 set(result_backup), 

848 { 

849 "./tiny_success.info.json", 

850 "./tiny_success.dag.metrics", 

851 "./tiny_success.dag.nodes.log", 

852 "./tiny_success.node_status", 

853 }, 

854 ) 

855 

856 

857class HTCBackupFilesTestCase(unittest.TestCase): 

858 """Test htc_backup_files function.""" 

859 

860 def testDirectoryNotFound(self): 

861 with temporaryDirectory() as tmp_dir: 

862 test_tmp_dir = pathlib.Path(tmp_dir) 

863 submit_dir = test_tmp_dir / "submit" 

864 with self.assertRaises(FileNotFoundError): 

865 _ = lssthtc.htc_backup_files(submit_dir) 

866 

867 def testSuccess(self): 

868 with temporaryDirectory() as tmp_dir: 

869 test_tmp_dir = pathlib.Path(tmp_dir) 

870 submit_dir = test_tmp_dir / "submit" 

871 copytree(f"{TESTDIR}/data/tiny_success", submit_dir, ignore=ignore_patterns("*~", ".???*")) 

872 result_rescue = lssthtc.htc_backup_files(submit_dir) 

873 self.assertIsNone(result_rescue) 

874 result_submit = [] 

875 for root, _, files in os.walk(submit_dir): 

876 result_submit.extend([str(os.path.join(os.path.relpath(root, submit_dir), f)) for f in files]) 

877 self.assertEqual( 

878 set(result_submit), 

879 { 

880 "./tiny_success.dag.dagman.log", 

881 "./tiny_success.dag.dagman.out", 

882 "./tiny_success.dag", 

883 "000/tiny_success.info.json", 

884 "000/tiny_success.dag.metrics", 

885 "000/tiny_success.dag.nodes.log", 

886 "000/tiny_success.node_status", 

887 }, 

888 ) 

889 

890 def testDestNotInSubmitDir(self): 

891 with temporaryDirectory() as tmp_dir: 

892 test_tmp_dir = pathlib.Path(tmp_dir) 

893 submit_dir = test_tmp_dir / "submit" 

894 copytree(f"{TESTDIR}/data/tiny_problems", submit_dir, ignore=ignore_patterns("*~", ".???*")) 

895 with self.assertLogs("lsst.ctrl.bps.htcondor", level="WARNING") as cm: 

896 result_rescue = lssthtc.htc_backup_files(submit_dir, test_tmp_dir / "backup") 

897 self.assertIn("Invalid backup location:", cm.output[-1]) 

898 result_rescue = lssthtc.htc_backup_files(submit_dir) 

899 self.assertTrue((submit_dir / "tiny_problems.dag.rescue001").samefile(result_rescue)) 

900 result_submit = [] 

901 for root, _, files in os.walk(submit_dir): 

902 result_submit.extend([str(os.path.join(os.path.relpath(root, submit_dir), f)) for f in files]) 

903 self.assertEqual( 

904 set(result_submit), 

905 { 

906 "./tiny_problems.dag.dagman.log", 

907 "./tiny_problems.dag.dagman.out", 

908 "./tiny_problems.dag", 

909 "./tiny_problems.dag.rescue001", 

910 "001/tiny_problems.info.json", 

911 "001/tiny_problems.dag.metrics", 

912 "001/tiny_problems.dag.nodes.log", 

913 "001/tiny_problems.node_status", 

914 }, 

915 ) 

916 

917 def testDestInSubmitDir(self): 

918 with temporaryDirectory() as tmp_dir: 

919 test_tmp_dir = pathlib.Path(tmp_dir) 

920 submit_dir = test_tmp_dir / "submit" 

921 backup_dir = submit_dir / "subdir" 

922 copytree(f"{TESTDIR}/data/tiny_problems", submit_dir, ignore=ignore_patterns("*~", ".???*")) 

923 result_rescue = lssthtc.htc_backup_files(submit_dir, backup_dir) 

924 self.assertTrue((submit_dir / "tiny_problems.dag.rescue001").samefile(result_rescue)) 

925 result_submit = [] 

926 for root, _, files in os.walk(submit_dir): 

927 result_submit.extend([str(os.path.join(os.path.relpath(root, submit_dir), f)) for f in files]) 

928 self.assertEqual( 

929 set(result_submit), 

930 { 

931 "./tiny_problems.dag.dagman.log", 

932 "./tiny_problems.dag.dagman.out", 

933 "./tiny_problems.dag", 

934 "./tiny_problems.dag.rescue001", 

935 "subdir/001/tiny_problems.info.json", 

936 "subdir/001/tiny_problems.dag.metrics", 

937 "subdir/001/tiny_problems.dag.nodes.log", 

938 "subdir/001/tiny_problems.node_status", 

939 }, 

940 ) 

941 

942 def testRelativeSubdir(self): 

943 with temporaryDirectory() as tmp_dir: 

944 test_tmp_dir = pathlib.Path(tmp_dir) 

945 submit_dir = test_tmp_dir / "submit" 

946 copytree(f"{TESTDIR}/data/tiny_problems", submit_dir, ignore=ignore_patterns("*~", ".???*")) 

947 result_rescue = lssthtc.htc_backup_files(submit_dir, "reldir") 

948 self.assertTrue((submit_dir / "tiny_problems.dag.rescue001").samefile(result_rescue)) 

949 result_submit = [] 

950 for root, _, files in os.walk(submit_dir): 

951 result_submit.extend([str(os.path.join(os.path.relpath(root, submit_dir), f)) for f in files]) 

952 self.assertEqual( 

953 set(result_submit), 

954 { 

955 "./tiny_problems.dag.dagman.log", 

956 "./tiny_problems.dag.dagman.out", 

957 "./tiny_problems.dag", 

958 "./tiny_problems.dag.rescue001", 

959 "reldir/001/tiny_problems.info.json", 

960 "reldir/001/tiny_problems.dag.metrics", 

961 "reldir/001/tiny_problems.dag.nodes.log", 

962 "reldir/001/tiny_problems.node_status", 

963 }, 

964 ) 

965 

966 def testSubdags(self): 

967 with temporaryDirectory() as tmp_dir: 

968 test_tmp_dir = pathlib.Path(tmp_dir) 

969 submit_dir = test_tmp_dir / "submit" 

970 copytree(f"{TESTDIR}/data/group_failed_1", submit_dir, ignore=ignore_patterns("*~", ".???*")) 

971 result_rescue = lssthtc.htc_backup_files(submit_dir) 

972 self.assertTrue(result_rescue.samefile(submit_dir / "group_failed_1.dag.rescue001")) 

973 result_submit = [] 

974 for root, _, files in os.walk(submit_dir): 

975 result_submit.extend([str(os.path.join(os.path.relpath(root, submit_dir), f)) for f in files]) 

976 self.assertEqual( 

977 set(result_submit), 

978 { 

979 "./group_failed_1.dag", 

980 "./group_failed_1.dag.dagman.log", 

981 "./group_failed_1.dag.dagman.out", 

982 "./group_failed_1.dag.rescue001", 

983 "subdags/wms_group_order1_val1a/group_order1_val1a.dag", 

984 "subdags/wms_group_order1_val1a/group_order1_val1a.dag.dagman.log", 

985 "subdags/wms_group_order1_val1a/group_order1_val1a.dag.dagman.out", 

986 "subdags/wms_group_order1_val1b/group_order1_val1b.dag", 

987 "subdags/wms_group_order1_val1b/group_order1_val1b.dag.dagman.log", 

988 "subdags/wms_group_order1_val1b/group_order1_val1b.dag.dagman.out", 

989 "subdags/wms_group_order1_val1b/group_order1_val1b.dag.rescue001", 

990 "subdags/wms_group_order1_val1c/group_order1_val1c.dag", 

991 "subdags/wms_group_order1_val1c/group_order1_val1c.dag.dagman.log", 

992 "subdags/wms_group_order1_val1c/group_order1_val1c.dag.dagman.out", 

993 "001/group_failed_1.dag.nodes.log", 

994 "001/group_failed_1.info.json", 

995 "001/group_failed_1.node_status", 

996 "001/subdags/wms_group_order1_val1a/group_order1_val1a.dag.nodes.log", 

997 "001/subdags/wms_group_order1_val1a/group_order1_val1a.node_status", 

998 "001/subdags/wms_group_order1_val1a/wms_group_order1_val1a.dag.post.out", 

999 "001/subdags/wms_group_order1_val1a/wms_group_order1_val1a.status.txt", 

1000 "001/subdags/wms_group_order1_val1b/group_order1_val1b.dag.nodes.log", 

1001 "001/subdags/wms_group_order1_val1b/group_order1_val1b.node_status", 

1002 "001/subdags/wms_group_order1_val1b/wms_group_order1_val1b.status.txt", 

1003 "001/subdags/wms_group_order1_val1b/wms_group_order1_val1b.dag.post.out", 

1004 "001/subdags/wms_group_order1_val1c/group_order1_val1c.dag.nodes.log", 

1005 "001/subdags/wms_group_order1_val1c/group_order1_val1c.node_status", 

1006 "001/subdags/wms_group_order1_val1c/wms_group_order1_val1c.dag.post.out", 

1007 "001/subdags/wms_group_order1_val1c/wms_group_order1_val1c.status.txt", 

1008 }, 

1009 ) 

1010 

1011 

1012class UpdateRescueFileTestCase(unittest.TestCase): 

1013 """Test _update_rescue_file function.""" 

1014 

1015 def testSuccess(self): 

1016 self.maxDiff = None 

1017 with temporaryDirectory() as tmp_dir: 

1018 test_tmp_dir = pathlib.Path(tmp_dir) 

1019 submit_dir = test_tmp_dir / "submit" 

1020 copytree(f"{TESTDIR}/data/group_failed_1", submit_dir, ignore=ignore_patterns("*~", ".???*")) 

1021 rescue_file = submit_dir / "group_failed_1.dag.rescue001" 

1022 lssthtc._update_rescue_file(rescue_file) 

1023 

1024 with open(rescue_file) as fh: 

1025 lines = fh.readlines() 

1026 results = "".join(lines) 

1027 

1028 truth = """# Rescue DAG file, created after running 

1029# the u_testuser_DM-46294_group_fail_20250310T160455Z.dag DAG file 

1030# Created 3/10/2025 16:08:56 UTC 

1031# Rescue DAG version: 2.0.1 (partial) 

1032# 

1033# Total number of Nodes: 26 

1034# Nodes premarked DONE: 21 

1035# Nodes that failed: 2 

1036# wms_group_order1_val1b,finalJob,<ENDLIST> 

1037 

1038DONE pipetaskInit 

1039DONE label1_val1c_val2a 

1040DONE label1_val1b_val2b 

1041DONE label1_val1b_val2a 

1042DONE label1_val1c_val2b 

1043DONE label1_val1a_val2a 

1044DONE label1_val1a_val2b 

1045DONE label3_val1c_val2a 

1046DONE label3_val1b_val2b 

1047DONE label3_val1b_val2a 

1048DONE label3_val1c_val2b 

1049DONE label3_val1a_val2a 

1050DONE label3_val1a_val2b 

1051DONE wms_group_order1_val1a 

1052DONE label5_val1a_val2a 

1053DONE label5_val1a_val2b 

1054DONE wms_group_order1_val1c 

1055DONE label5_val1c_val2a 

1056DONE label5_val1c_val2b 

1057DONE wms_check_status_wms_group_order1_val1a 

1058DONE wms_check_status_wms_group_order1_val1c 

1059""" 

1060 

1061 print("results = ", results, file=sys.stderr) 

1062 print("truth = ", truth, file=sys.stderr) 

1063 self.assertEqual(results, truth) 

1064 

1065 

1066class ReadDagStatusTestCase(unittest.TestCase): 

1067 """Test read_dag_status function and read_single_dag_status.""" 

1068 

1069 def testFileMissing(self): 

1070 with temporaryDirectory() as tmp_dir: 

1071 with self.assertRaisesRegex(FileNotFoundError, "DAGMan node status not found"): 

1072 _ = lssthtc.read_dag_status(tmp_dir) 

1073 

1074 def testRegular(self): 

1075 with temporaryDirectory() as tmp_dir: 

1076 submit_dir = os.path.join(tmp_dir, "tiny_problems") 

1077 copytree(f"{TESTDIR}/data/tiny_problems", submit_dir, ignore=ignore_patterns("*~", ".???*")) 

1078 results = lssthtc.read_dag_status(submit_dir) 

1079 truth = { 

1080 "JobProcsHeld": 0, 

1081 "NodesPost": 0, 

1082 "JobProcsIdle": 0, 

1083 "NodesTotal": 6, 

1084 "NodesFailed": 2, 

1085 "NodesDone": 3, 

1086 "NodesQueued": 0, 

1087 "NodesPre": 0, 

1088 "NodesFutile": 1, 

1089 "NodesUnready": 0, 

1090 } 

1091 self.assertEqual(results, results | truth) 

1092 

1093 def testSubdags(self): 

1094 """Making sure it gets data from subdag dirs and doesn't 

1095 fail if some subdags haven't started running yet. 

1096 """ 

1097 self.maxDiff = None 

1098 with temporaryDirectory() as tmp_dir: 

1099 submit_dir = os.path.join(tmp_dir, "submit") 

1100 copytree(f"{TESTDIR}/data/group_running_1", submit_dir, ignore=ignore_patterns("*~", ".???*")) 

1101 results = lssthtc.read_dag_status(submit_dir) 

1102 truth = { 

1103 "JobProcsHeld": 0, 

1104 "NodesPost": 0, 

1105 "JobProcsIdle": 0, 

1106 "NodesTotal": 34, 

1107 "NodesFailed": 0, 

1108 "NodesDone": 17, 

1109 "NodesQueued": 3, 

1110 "NodesPre": 0, 

1111 "NodesFutile": 0, 

1112 "NodesUnready": 14, 

1113 } 

1114 self.assertEqual(results, results | truth) 

1115 

1116 

1117class ReadDagInfoTestCase(unittest.TestCase): 

1118 """Test read_dag_info function.""" 

1119 

1120 def testFileMissing(self): 

1121 with temporaryDirectory() as tmp_dir: 

1122 with self.assertRaisesRegex(FileNotFoundError, "File with DAGMan job information not found in "): 

1123 _ = lssthtc.read_dag_info(tmp_dir) 

1124 

1125 def testSuccess(self): 

1126 with temporaryDirectory() as tmp_dir: 

1127 copy2(f"{TESTDIR}/data/tiny_success/tiny_success.info.json", tmp_dir) 

1128 results = lssthtc.read_dag_info(tmp_dir) 

1129 

1130 truth = { 

1131 "test02": { 

1132 "9208.0": { 

1133 "ClusterId": 9208, 

1134 "GlobalJobId": "test02#9208.0#1739465078", 

1135 "bps_wms_service": "lsst.ctrl.bps.htcondor.htcondor_service.HTCondorService", 

1136 "bps_project": "dev", 

1137 "bps_payload": "tiny", 

1138 "bps_operator": "testuser", 

1139 "bps_wms_workflow": "lsst.ctrl.bps.htcondor.htcondor_service.HTCondorWorkflow", 

1140 "bps_provisioning_job": "provisioningJob", 

1141 "bps_run_quanta": "label1:1;label2:1", 

1142 "bps_campaign": "quick", 

1143 "bps_runsite": "testpool", 

1144 "bps_job_summary": "pipetaskInit:1;label1:1;label2:1;finalJob:1", 

1145 "bps_run": "u_testuser_tiny_20250213T164427Z", 

1146 "bps_isjob": "True", 

1147 } 

1148 } 

1149 } 

1150 

1151 self.assertEqual(results, truth) 

1152 

1153 def testPermissionError(self): 

1154 with temporaryDirectory() as tmp_dir: 

1155 copy2(f"{TESTDIR}/data/tiny_success/tiny_success.info.json", tmp_dir) 

1156 with unittest.mock.patch("lsst.ctrl.bps.htcondor.lssthtc.open") as mocked_open: 

1157 mocked_open.side_effect = PermissionError 

1158 with self.assertLogs("lsst.ctrl.bps.htcondor", level="DEBUG") as cm: 

1159 results = lssthtc.read_dag_info(tmp_dir) 

1160 self.assertIn("Retrieving DAGMan job information failed:", cm.output[-1]) 

1161 self.assertEqual({}, results) 

1162 

1163 

1164class HtcWriteCondorFileTestCase(unittest.TestCase): 

1165 """Test htc_write_condor_file function.""" 

1166 

1167 def testSuccess(self): 

1168 with temporaryDirectory() as tmp_dir: 

1169 job_name = "job1" 

1170 filename = pathlib.Path(tmp_dir) / f"label1/{job_name}.sub" 

1171 job = { 

1172 "executable": "$(CTRL_MPEXEC_DIR)/bin/pipetask", 

1173 "arguments": "-a -b 2 -c", 

1174 "request_memory": "2000", 

1175 "environment": "one=1 two=\"2\" three='spacey 'quoted' value'", 

1176 "log": f"{job_name}.log", 

1177 } 

1178 job_attrs = { 

1179 "bps_job_name": job_name, 

1180 "bps_job_label": "label1", 

1181 "bps_job_quanta": "task1:8;task2:8", 

1182 } 

1183 expected = [ 

1184 "executable=$(CTRL_MPEXEC_DIR)/bin/pipetask\n", 

1185 "arguments=-a -b 2 -c\n", 

1186 "request_memory=2000\n", 

1187 "environment=\"one=1 two=\"2\" three='spacey 'quoted' value'\"\n", 

1188 f"output={job_name}.$(Cluster).out\n", 

1189 f"error={job_name}.$(Cluster).out\n", 

1190 f"log={job_name}.log\n", 

1191 f'+bps_job_name = "{job_name}"\n', 

1192 '+bps_job_label = "label1"\n', 

1193 '+bps_job_quanta = "task1:8;task2:8"\n', 

1194 "queue\n", 

1195 ] 

1196 

1197 lssthtc.htc_write_condor_file(filename, job_name, job, job_attrs) 

1198 with open(filename, encoding="utf-8") as f: 

1199 actual = f.readlines() 

1200 

1201 self.assertEqual(set(actual), set(expected)) 

1202 self.assertTrue(filename.exists()) 

1203 # Try to make Submit object from file to find any syntax issues 

1204 _ = lssthtc.htc_create_submit_from_file(filename) 

1205 

1206 

1207class HtcCreateSubmitFromDagTestCase(unittest.TestCase): 

1208 """Test htc_create_submit_from_dag function.""" 

1209 

1210 @classmethod 

1211 def setUpClass(cls): 

1212 cls.bindir = None 

1213 # htcondor.Submit.from_dag requires condor_dagman executable in path. 

1214 if not which("condor_dagman"): # pragma: no cover 

1215 cls.bindir = tempfile.TemporaryDirectory() 

1216 fake_dagman_exec = pathlib.Path(cls.bindir.name) / "condor_dagman" 

1217 with open(fake_dagman_exec, "w") as fh: 

1218 print("#!/bin/bash", file=fh) 

1219 print("echo fake_condor_dagman $@", file=fh) 

1220 print("exit 0", file=fh) 

1221 fake_dagman_exec.chmod(fake_dagman_exec.stat().st_mode | stat.S_IEXEC) 

1222 os.environ["PATH"] = f"{os.environ['PATH']}:{cls.bindir.name}" 

1223 

1224 @classmethod 

1225 def tearDownClass(cls): 

1226 if cls.bindir: 

1227 cls.bindir.cleanup() 

1228 

1229 @unittest.mock.patch.dict(os.environ, {"_CONDOR_DAGMAN_MAX_JOBS_IDLE": "42"}) 

1230 def testMaxIdleEnvVar(self): 

1231 with temporaryDirectory() as tmp_dir: 

1232 copy2(f"{TESTDIR}/data/tiny_success/tiny_success.dag", tmp_dir) 

1233 dag_filename = pathlib.Path(tmp_dir) / "tiny_success.dag" 

1234 submit = lssthtc.htc_create_submit_from_dag(str(dag_filename), {}) 

1235 self.assertIn("-MaxIdle 42", submit["arguments"]) 

1236 

1237 @unittest.mock.patch.dict(os.environ, {}) 

1238 def testMaxIdleGiven(self): 

1239 with temporaryDirectory() as tmp_dir: 

1240 copy2(f"{TESTDIR}/data/tiny_success/tiny_success.dag", tmp_dir) 

1241 dag_filename = pathlib.Path(tmp_dir) / "tiny_success.dag" 

1242 submit = lssthtc.htc_create_submit_from_dag(str(dag_filename), {"MaxIdle": 37}) 

1243 self.assertIn("-MaxIdle 37", submit["arguments"]) 

1244 

1245 @unittest.mock.patch.dict(os.environ, {}) 

1246 def testNoMaxJobsIdle(self): 

1247 """Note: Since the produced arguments differ depending on 

1248 HTCondor version when no MaxIdle passed to from_dag, not 

1249 checking arguments string here. Instead just making sure 

1250 lssthtc code doesn't pass MaxIdle value to from_dag. 

1251 """ 

1252 with temporaryDirectory() as tmp_dir: 

1253 copy2(f"{TESTDIR}/data/tiny_success/tiny_success.dag", tmp_dir) 

1254 dag_filename = pathlib.Path(tmp_dir) / "tiny_success.dag" 

1255 with unittest.mock.patch("htcondor.Submit.from_dag") as submit_mock: 

1256 with unittest.mock.patch("htcondor.param") as mock_param: 

1257 mock_param.__contains__.return_value = False 

1258 _ = lssthtc.htc_create_submit_from_dag(str(dag_filename), {}) 

1259 submit_mock.assert_called_once_with(str(dag_filename), {}) 

1260 

1261 

1262class HtcDagTestCase(unittest.TestCase): 

1263 """Test for HTCDag class.""" 

1264 

1265 def setUp(self): 

1266 job = lssthtc.HTCJob(name="test_job") 

1267 job.add_job_cmds( 

1268 { 

1269 "executable": "/usr/bin/echo", 

1270 "arguments": "foo", 

1271 "output": "test_job.$(Cluster).out", 

1272 "error": "test_job.$(Cluster).out", 

1273 "log": "test_job.$(Cluster).log", 

1274 } 

1275 ) 

1276 job.subfile = f"{job.name}.sub" 

1277 

1278 self.dag = lssthtc.HTCDag(name="test_workflow") 

1279 self.dag.add_job(job) 

1280 

1281 self.subfile_expected = [ 

1282 "executable=/usr/bin/echo\n", 

1283 "arguments=foo\n", 

1284 "output=test_job.$(Cluster).out\n", 

1285 "error=test_job.$(Cluster).out\n", 

1286 "log=test_job.$(Cluster).log\n", 

1287 "queue\n", 

1288 ] 

1289 

1290 def tearDown(self): 

1291 pass 

1292 

1293 def testWriteWithDagConfig(self): 

1294 with temporaryDirectory() as tmp_dir: 

1295 config = BpsConfig(Config(htcondor_config.HTC_DEFAULTS_URI)) 

1296 job = self.dag.nodes["test_job"]["data"] 

1297 wms_config_filename = "dagman.conf" 

1298 wms_configurator = dagman_configurator.DagmanConfigurator(config) 

1299 wms_configurator.prepare(wms_config_filename, prefix=tmp_dir) 

1300 wms_configurator.configure(self.dag) 

1301 dagfile_expected = [ 

1302 f"CONFIG {wms_config_filename}\n", 

1303 f'JOB {job.name} "{job.subfile}"\n', 

1304 f"DOT {self.dag.name}.dot\n", 

1305 f"NODE_STATUS_FILE {self.dag.name}.node_status\n", 

1306 f'SET_JOB_ATTR bps_wms_config_path= "{wms_config_filename}"\n', 

1307 ] 

1308 

1309 self.dag.write(tmp_dir, "", "") 

1310 

1311 self.assertIn("submit_path", self.dag.graph) 

1312 self.assertEqual(self.dag.graph["submit_path"], tmp_dir) 

1313 self.assertIn("dag_filename", self.dag.graph) 

1314 self.assertEqual(self.dag.graph["dag_filename"], f"{self.dag.graph['name']}.dag") 

1315 with open(os.path.join(tmp_dir, self.dag.graph["dag_filename"]), encoding="utf-8") as f: 

1316 dagfile_actual = f.readlines() 

1317 self.assertEqual(dagfile_actual, dagfile_expected) 

1318 with open(os.path.join(tmp_dir, job.subfile), encoding="utf-8") as f: 

1319 subfile_actual = f.readlines() 

1320 self.assertEqual(subfile_actual, self.subfile_expected) 

1321 

1322 def testWriteWithoutDagConfig(self): 

1323 with temporaryDirectory() as tmp_dir: 

1324 job = self.dag.nodes["test_job"]["data"] 

1325 dagfile_expected = [ 

1326 f'JOB {job.name} "{job.subfile}"\n', 

1327 f"DOT {self.dag.name}.dot\n", 

1328 f"NODE_STATUS_FILE {self.dag.name}.node_status\n", 

1329 ] 

1330 

1331 self.dag.write(tmp_dir, "", "") 

1332 

1333 self.assertIn("submit_path", self.dag.graph) 

1334 self.assertEqual(self.dag.graph["submit_path"], tmp_dir) 

1335 self.assertIn("dag_filename", self.dag.graph) 

1336 self.assertEqual(self.dag.graph["dag_filename"], f"{self.dag.graph['name']}.dag") 

1337 with open(os.path.join(tmp_dir, self.dag.graph["dag_filename"]), encoding="utf-8") as f: 

1338 dagfile_actual = f.readlines() 

1339 self.assertEqual(dagfile_actual, dagfile_expected) 

1340 with open(os.path.join(tmp_dir, job.subfile), encoding="utf-8") as f: 

1341 subfile_actual = f.readlines() 

1342 self.assertEqual(subfile_actual, self.subfile_expected) 

1343 

1344 

1345if __name__ == "__main__": 

1346 unittest.main()