Coverage for tests / test_report_utils.py: 22%

319 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-18 08:49 +0000

1# This file is part of ctrl_bps_htcondor. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <https://www.gnu.org/licenses/>. 

27 

28"""Unit tests for the report utilities.""" 

29 

30import logging 

31import os 

32import unittest 

33from pathlib import Path 

34from shutil import copy2, copytree 

35 

36import htcondor 

37 

38from lsst.ctrl.bps import ( 

39 WmsSpecificInfo, 

40 WmsStates, 

41) 

42from lsst.ctrl.bps.htcondor import lssthtc, report_utils 

43from lsst.utils.tests import temporaryDirectory 

44 

45logger = logging.getLogger("lsst.ctrl.bps.htcondor") 

46 

47TESTDIR = os.path.abspath(os.path.dirname(__file__)) 

48 

49LOCATE_SUCCESS = """[ 

50 CondorPlatform = "$CondorPlatform: X86_64-CentOS_7.9 $"; 

51 MyType = "Scheduler"; 

52 Machine = "testmachine"; 

53 Name = "testmachine"; 

54 CondorVersion = "$CondorVersion: 23.0.3 2024-04-04 $"; 

55 MyAddress = "<127.0.0.1:9618?addrs=127.0.0.1-9618+snip>" 

56 ] 

57""" 

58 

59 

60class GetExitCodeSummaryTestCase(unittest.TestCase): 

61 """Test the function responsible for creating exit code summary.""" 

62 

63 def setUp(self): 

64 self.jobs = { 

65 "1.0": { 

66 "JobStatus": htcondor.JobStatus.IDLE, 

67 "bps_job_label": "foo", 

68 }, 

69 "2.0": { 

70 "JobStatus": htcondor.JobStatus.RUNNING, 

71 "bps_job_label": "foo", 

72 }, 

73 "3.0": { 

74 "JobStatus": htcondor.JobStatus.REMOVED, 

75 "bps_job_label": "foo", 

76 }, 

77 "4.0": { 

78 "ExitCode": 0, 

79 "ExitBySignal": False, 

80 "JobStatus": htcondor.JobStatus.COMPLETED, 

81 "bps_job_label": "bar", 

82 }, 

83 "5.0": { 

84 "ExitCode": 1, 

85 "ExitBySignal": False, 

86 "JobStatus": htcondor.JobStatus.COMPLETED, 

87 "bps_job_label": "bar", 

88 }, 

89 "6.0": { 

90 "ExitBySignal": True, 

91 "ExitSignal": 11, 

92 "JobStatus": htcondor.JobStatus.HELD, 

93 "bps_job_label": "baz", 

94 }, 

95 "7.0": { 

96 "ExitBySignal": False, 

97 "ExitCode": 42, 

98 "JobStatus": htcondor.JobStatus.HELD, 

99 "bps_job_label": "baz", 

100 }, 

101 "8.0": { 

102 "JobStatus": htcondor.JobStatus.TRANSFERRING_OUTPUT, 

103 "bps_job_label": "qux", 

104 }, 

105 "9.0": { 

106 "JobStatus": htcondor.JobStatus.SUSPENDED, 

107 "bps_job_label": "qux", 

108 }, 

109 } 

110 

111 def tearDown(self): 

112 pass 

113 

114 def testMainScenario(self): 

115 actual = report_utils._get_exit_code_summary(self.jobs) 

116 expected = {"foo": [], "bar": [1], "baz": [11, 42], "qux": []} 

117 self.assertEqual(actual, expected) 

118 

119 def testUnknownStatus(self): 

120 jobs = { 

121 "1.0": { 

122 "JobStatus": -1, 

123 "bps_job_label": "foo", 

124 } 

125 } 

126 with self.assertLogs(logger=logger, level="DEBUG") as cm: 

127 report_utils._get_exit_code_summary(jobs) 

128 self.assertIn("lsst.ctrl.bps.htcondor", cm.records[0].name) 

129 self.assertIn("Unknown", cm.output[0]) 

130 self.assertIn("JobStatus", cm.output[0]) 

131 

132 def testUnknownKey(self): 

133 jobs = { 

134 "1.0": { 

135 "JobStatus": htcondor.JobStatus.COMPLETED, 

136 "UnknownKey": None, 

137 "bps_job_label": "foo", 

138 } 

139 } 

140 with self.assertLogs(logger=logger, level="DEBUG") as cm: 

141 report_utils._get_exit_code_summary(jobs) 

142 self.assertIn("lsst.ctrl.bps.htcondor", cm.records[0].name) 

143 self.assertIn("Attribute", cm.output[0]) 

144 self.assertIn("not found", cm.output[0]) 

145 

146 

147class GetStateCountsFromDagJobTestCase(unittest.TestCase): 

148 """Test counting number of jobs per WMS state.""" 

149 

150 def setUp(self): 

151 pass 

152 

153 def tearDown(self): 

154 pass 

155 

156 def testCounts(self): 

157 job = { 

158 "DAG_NodesDone": 1, 

159 "DAG_JobsHeld": 2, 

160 "DAG_NodesFailed": 3, 

161 "DAG_NodesFutile": 4, 

162 "DAG_NodesQueued": 5, 

163 "DAG_NodesReady": 0, 

164 "DAG_NodesUnready": 7, 

165 "DAG_NodesTotal": 22, 

166 } 

167 

168 truth = { 

169 WmsStates.SUCCEEDED: 1, 

170 WmsStates.HELD: 2, 

171 WmsStates.UNREADY: 7, 

172 WmsStates.READY: 0, 

173 WmsStates.FAILED: 3, 

174 WmsStates.PRUNED: 4, 

175 WmsStates.MISFIT: 0, 

176 } 

177 

178 total, result = report_utils._get_state_counts_from_dag_job(job) 

179 self.assertEqual(total, 22) 

180 self.assertEqual(result, truth) 

181 

182 

183class GetInfoFromPathTestCase(unittest.TestCase): 

184 """Test _get_info_from_path function.""" 

185 

186 def test_tmpdir_abort(self): 

187 with temporaryDirectory() as tmp_dir: 

188 copy2(f"{TESTDIR}/data/test_tmpdir_abort.dag.dagman.out", tmp_dir) 

189 wms_workflow_id, jobs, message = report_utils._get_info_from_path(tmp_dir) 

190 self.assertEqual(wms_workflow_id, lssthtc.MISSING_ID) 

191 self.assertEqual(jobs, {}) 

192 self.assertIn("Cannot submit from /tmp", message) 

193 

194 def test_no_dagman_messages(self): 

195 with temporaryDirectory() as tmp_dir: 

196 copy2(f"{TESTDIR}/data/test_no_messages.dag.dagman.out", tmp_dir) 

197 wms_workflow_id, jobs, message = report_utils._get_info_from_path(tmp_dir) 

198 self.assertEqual(wms_workflow_id, lssthtc.MISSING_ID) 

199 self.assertEqual(jobs, {}) 

200 self.assertIn("Could not find HTCondor files", message) 

201 

202 def test_successful_run(self): 

203 with temporaryDirectory() as tmp_dir: 

204 copy2(f"{TESTDIR}/data/test_pipelines_check_20240727T003507Z.dag", tmp_dir) 

205 copy2(f"{TESTDIR}/data/test_pipelines_check_20240727T003507Z.dag.dagman.log", tmp_dir) 

206 copy2(f"{TESTDIR}/data/test_pipelines_check_20240727T003507Z.dag.dagman.out", tmp_dir) 

207 copy2(f"{TESTDIR}/data/test_pipelines_check_20240727T003507Z.dag.nodes.log", tmp_dir) 

208 copy2(f"{TESTDIR}/data/test_pipelines_check_20240727T003507Z.node_status", tmp_dir) 

209 copy2(f"{TESTDIR}/data/test_pipelines_check_20240727T003507Z.info.json", tmp_dir) 

210 wms_workflow_id, jobs, message = report_utils._get_info_from_path(tmp_dir) 

211 self.assertEqual(wms_workflow_id, "1163.0") 

212 self.assertEqual(len(jobs), 6) # dag, pipetaskInit, 3 science, finalJob 

213 self.assertEqual(message, "") 

214 

215 def test_relative_path(self): 

216 orig_dir = Path.cwd() 

217 with temporaryDirectory() as tmp_dir: 

218 os.chdir(tmp_dir) 

219 abs_path = Path(tmp_dir).resolve() / "subdir" 

220 abs_path.mkdir() 

221 copy2(f"{TESTDIR}/data/test_pipelines_check_20240727T003507Z.dag", abs_path) 

222 copy2(f"{TESTDIR}/data/test_pipelines_check_20240727T003507Z.dag.dagman.log", abs_path) 

223 copy2(f"{TESTDIR}/data/test_pipelines_check_20240727T003507Z.dag.dagman.out", abs_path) 

224 copy2(f"{TESTDIR}/data/test_pipelines_check_20240727T003507Z.dag.nodes.log", abs_path) 

225 copy2(f"{TESTDIR}/data/test_pipelines_check_20240727T003507Z.node_status", abs_path) 

226 copy2(f"{TESTDIR}/data/test_pipelines_check_20240727T003507Z.info.json", abs_path) 

227 wms_workflow_id, jobs, message = report_utils._get_info_from_path("subdir") 

228 self.assertEqual(wms_workflow_id, "1163.0") 

229 self.assertEqual(len(jobs), 6) # dag, pipetaskInit, 3 science, finalJob 

230 self.assertEqual(message, "") 

231 self.assertEqual(jobs["1163.0"]["Iwd"], str(abs_path)) 

232 os.chdir(orig_dir) 

233 

234 

235class AddServiceJobSpecificInfoTestCase(unittest.TestCase): 

236 """Test _add_service_job_specific_info function. 

237 

238 Note: The job_ad's are hardcoded in these tests. The 

239 values in the dictionaries come from plugin code as 

240 well as HTCondor. Changes in either of those codes 

241 that produce data for the job_ad can break this 

242 function without breaking these unit tests. 

243 

244 Also, since hold status/messages stick around, testing 

245 various cases with and without job being held just to 

246 ensure get right status in both cases. 

247 """ 

248 

249 def testNotSubmitted(self): 

250 # Service job not submitted yet or can't be submitted. 

251 # (Typically an plugin bug.) 

252 # At this function level, can't tell if not submitted 

253 # yet or problem so it never will. 

254 job_ad = { 

255 "ClusterId": -64, 

256 "DAGManJobID": "8997.0", 

257 "DAGNodeName": "provisioningJob", 

258 "NodeStatus": lssthtc.NodeStatus.NOT_READY, 

259 "ProcId": 0, 

260 "bps_job_label": "service_provisioningJob", 

261 } 

262 results = WmsSpecificInfo() 

263 report_utils._add_service_job_specific_info(job_ad, results) 

264 self.assertEqual( 

265 results.context, {"job_name": "provisioningJob", "status": "UNREADY", "status_details": ""} 

266 ) 

267 

268 def testRunning(self): 

269 # DAG hasn't completed (Running or held), 

270 # Service job is running. 

271 job_ad = { 

272 "ClusterId": 8523, 

273 "ProcId": 0, 

274 "DAGNodeName": "provisioningJob", 

275 "JobStatus": htcondor.JobStatus.RUNNING, 

276 } 

277 

278 results = WmsSpecificInfo() 

279 report_utils._add_service_job_specific_info(job_ad, results) 

280 self.assertEqual( 

281 results.context, {"job_name": "provisioningJob", "status": "RUNNING", "status_details": ""} 

282 ) 

283 

284 def testDied(self): 

285 # DAG hasn't completed (Running or held), 

286 # Service job failed (completed non-zero exit code) 

287 job_ad = { 

288 "ClusterId": 8761, 

289 "ProcId": 0, 

290 "DAGNodeName": "provisioningJob", 

291 "JobStatus": htcondor.JobStatus.COMPLETED, 

292 "ExitCode": 4, 

293 } 

294 results = WmsSpecificInfo() 

295 report_utils._add_service_job_specific_info(job_ad, results) 

296 self.assertEqual( 

297 results.context, {"job_name": "provisioningJob", "status": "FAILED", "status_details": ""} 

298 ) 

299 

300 def testDeleted(self): 

301 # Deleted by user (never held) 

302 job_ad = { 

303 "ClusterId": 9086, 

304 "DAGNodeName": "provisioningJob", 

305 "JobStatus": htcondor.JobStatus.REMOVED, 

306 "ProcId": 0, 

307 "Reason": "via condor_rm (by user mgower)", 

308 "job_evicted_time": "2025-02-11T11:35:04", 

309 } 

310 results = WmsSpecificInfo() 

311 report_utils._add_service_job_specific_info(job_ad, results) 

312 self.assertEqual( 

313 results.context, {"job_name": "provisioningJob", "status": "DELETED", "status_details": ""} 

314 ) 

315 

316 def testSucceedEarly(self): 

317 # DAG hasn't completed (Running or held), 

318 # Service job completed with exit code 0 

319 job_ad = { 

320 "ClusterId": 8761, 

321 "ProcId": 0, 

322 "DAGNodeName": "provisioningJob", 

323 "JobStatus": htcondor.JobStatus.COMPLETED, 

324 "ExitCode": 0, 

325 } 

326 results = WmsSpecificInfo() 

327 report_utils._add_service_job_specific_info(job_ad, results) 

328 self.assertEqual( 

329 results.context, 

330 { 

331 "job_name": "provisioningJob", 

332 "status": "SUCCEEDED", 

333 "status_details": "(Note: Finished before workflow.)", 

334 }, 

335 ) 

336 

337 def testSucceedOldRemoveMessage(self): 

338 # DAG completed, job was in running state when removed. 

339 job_ad = { 

340 "ClusterId": 8761, 

341 "ProcId": 0, 

342 "DAGNodeName": "provisioningJob", 

343 "JobStatus": htcondor.JobStatus.REMOVED, 

344 "Reason": "Removed by DAGMan (by user mgower)", 

345 } 

346 results = WmsSpecificInfo() 

347 report_utils._add_service_job_specific_info(job_ad, results) 

348 self.assertEqual( 

349 results.context, {"job_name": "provisioningJob", "status": "SUCCEEDED", "status_details": ""} 

350 ) 

351 

352 def testSucceed(self): 

353 # DAG completed, job was in running state when removed. 

354 job_ad = { 

355 "ClusterId": 8761, 

356 "ProcId": 0, 

357 "DAGNodeName": "provisioningJob", 

358 "JobStatus": htcondor.JobStatus.REMOVED, 

359 "Reason": ( 

360 "removed because <OtherJobRemoveRequirements = DAGManJobId =?= 8556>" 

361 " fired when job (8556.0) was removed" 

362 ), 

363 } 

364 results = WmsSpecificInfo() 

365 report_utils._add_service_job_specific_info(job_ad, results) 

366 self.assertEqual( 

367 results.context, {"job_name": "provisioningJob", "status": "SUCCEEDED", "status_details": ""} 

368 ) 

369 

370 def testUserHeldWhileRunning(self): 

371 # DAG hasn't completed (Running or held), 

372 # user put at least service job on hold 

373 job_ad = { 

374 "ClusterId": 8523, 

375 "ProcId": 0, 

376 "DAGNodeName": "provisioningJob", 

377 "JobStatus": htcondor.JobStatus.HELD, 

378 "HoldReason": "via condor_hold (by user mgower)", 

379 "HoldReasonCode": 1, 

380 "HoldReasonSubCode": 0, 

381 } 

382 

383 results = WmsSpecificInfo() 

384 report_utils._add_service_job_specific_info(job_ad, results) 

385 self.assertEqual( 

386 results.context, 

387 { 

388 "job_name": "provisioningJob", 

389 "status": "HELD", 

390 "status_details": "(via condor_hold (by user mgower))", 

391 }, 

392 ) 

393 

394 def testHeldByHTC(self): 

395 # Job put on hold by HTCondor, removed when DAG ends 

396 job_ad = { 

397 "ClusterId": 8693, 

398 "DAGNodeName": "provisioningJob", 

399 "HoldReason": "Failed to execute", 

400 "HoldReasonCode": 6, 

401 "HoldReasonSubCode": 2, 

402 "JobStatus": htcondor.JobStatus.REMOVED, 

403 "ProcId": 0, 

404 "Reason": "Removed by DAGMan (by user mgower)", 

405 "job_held_time": "2025-02-07T12:50:07", 

406 } 

407 results = WmsSpecificInfo() 

408 report_utils._add_service_job_specific_info(job_ad, results) 

409 self.assertEqual( 

410 results.context, 

411 { 

412 "job_name": "provisioningJob", 

413 "status": "DELETED", 

414 "status_details": "(Job was held for the following reason: Failed to execute)", 

415 }, 

416 ) 

417 

418 def testHeldReleasedRunning(self): 

419 # DAG hasn't completed (Running or held), 

420 # Since held info will be in job_ad, make sure knows released. 

421 job_ad = { 

422 "ClusterId": 8625, 

423 "DAGNodeName": "provisioningJob", 

424 "HoldReason": "via condor_hold (by user mgower)", 

425 "HoldReasonCode": 1, 

426 "HoldReasonSubCode": 0, 

427 "JobStatus": htcondor.JobStatus.RUNNING, 

428 "LogNotes": "DAG Node: provisioningJob", 

429 "ProcId": 0, 

430 "job_held_time": "2025-02-07T12:33:34", 

431 "job_released_time": "2025-02-07T12:33:47", 

432 } 

433 results = WmsSpecificInfo() 

434 report_utils._add_service_job_specific_info(job_ad, results) 

435 self.assertEqual( 

436 results.context, {"job_name": "provisioningJob", "status": "RUNNING", "status_details": ""} 

437 ) 

438 

439 def testHeldReleasedDied(self): 

440 # Since held info will be in job_ad, 

441 # make sure knows status after released. 

442 job_ad = { 

443 "ClusterId": 9120, 

444 "DAGNodeName": "provisioningJob", 

445 "ExitBySignal": False, 

446 "ExitCode": 4, 

447 "HoldReason": "via condor_hold (by user mgower)", 

448 "HoldReasonCode": 1, 

449 "HoldReasonSubCode": 0, 

450 "JobStatus": htcondor.JobStatus.COMPLETED, 

451 "ProcId": 0, 

452 "Reason": "via condor_release (by user mgower)", 

453 "ReturnValue": 4, 

454 "TerminatedNormally": True, 

455 "job_held_time": "2025-02-11T11:46:40", 

456 "job_released_time": "2025-02-11T11:46:47", 

457 } 

458 results = WmsSpecificInfo() 

459 report_utils._add_service_job_specific_info(job_ad, results) 

460 self.assertEqual( 

461 results.context, {"job_name": "provisioningJob", "status": "FAILED", "status_details": ""} 

462 ) 

463 

464 def testHeldReleasedSuccessEarly(self): 

465 # Since held info will be in job_ad, 

466 # make sure knows status after released. 

467 job_ad = { 

468 "ClusterId": 9154, 

469 "DAGNodeName": "provisioningJob", 

470 "ExitBySignal": False, 

471 "ExitCode": 0, 

472 "HoldReason": "via condor_hold (by user mgower)", 

473 "HoldReasonCode": 1, 

474 "HoldReasonSubCode": 0, 

475 "JobStatus": htcondor.JobStatus.COMPLETED, 

476 "ProcId": 0, 

477 "Reason": "via condor_release (by user mgower)", 

478 "TerminatedNormally": True, 

479 "job_held_time": "2025-02-11T11:55:20", 

480 "job_released_time": "2025-02-11T11:55:25", 

481 } 

482 results = WmsSpecificInfo() 

483 report_utils._add_service_job_specific_info(job_ad, results) 

484 self.assertEqual( 

485 results.context, 

486 { 

487 "job_name": "provisioningJob", 

488 "status": "SUCCEEDED", 

489 "status_details": "(Note: Finished before workflow.)", 

490 }, 

491 ) 

492 

493 def testHeldReleasedSuccess(self): 

494 # DAG has completed. 

495 # Since held info will be in job_ad, 

496 # make sure knows status after released. 

497 job_ad = { 

498 "ClusterId": 8625, 

499 "DAGNodeName": "provisioningJob", 

500 "HoldReason": "via condor_hold (by user mgower)", 

501 "HoldReasonCode": 1, 

502 "HoldReasonSubCode": 0, 

503 "JobStatus": htcondor.JobStatus.REMOVED, 

504 "ProcId": 0, 

505 "Reason": "removed because <OtherJobRemoveRequirements = DAGManJobId =?= " 

506 "8624> fired when job (8624.0) was removed", 

507 "job_held_time": "2025-02-07T12:33:34", 

508 "job_released_time": "2025-02-07T12:33:47", 

509 } 

510 results = WmsSpecificInfo() 

511 report_utils._add_service_job_specific_info(job_ad, results) 

512 self.assertEqual( 

513 results.context, {"job_name": "provisioningJob", "status": "SUCCEEDED", "status_details": ""} 

514 ) 

515 

516 def testHeldReleasedDeleted(self): 

517 # Since held info will be in job_ad, 

518 # make sure knows status after released. 

519 job_ad = { 

520 "ClusterId": 9086, 

521 "DAGNodeName": "provisioningJob", 

522 "HoldReason": "via condor_hold (by user mgower)", 

523 "HoldReasonCode": 1, 

524 "HoldReasonSubCode": 0, 

525 "JobStatus": htcondor.JobStatus.REMOVED, 

526 "ProcId": 0, 

527 "Reason": "via condor_rm (by user mgower)", 

528 "job_evicted_time": "2025-02-11T11:35:04", 

529 "job_held_time": "2025-02-11T11:35:04", 

530 } 

531 results = WmsSpecificInfo() 

532 report_utils._add_service_job_specific_info(job_ad, results) 

533 self.assertEqual( 

534 results.context, {"job_name": "provisioningJob", "status": "DELETED", "status_details": ""} 

535 ) 

536 

537 def testHeldReleasedHeld(self): 

538 # Since release info will be in job_ad, 

539 # make sure knows held after release. 

540 job_ad = { 

541 "ClusterId": 8659, 

542 "DAGNodeName": "provisioningJob", 

543 "HoldReason": "via condor_hold (by user mgower)", 

544 "HoldReasonCode": 1, 

545 "HoldReasonSubCode": 0, 

546 "JobStatus": htcondor.JobStatus.REMOVED, 

547 "ProcId": 0, 

548 "Reason": "Removed by DAGMan (by user mgower)", 

549 "TerminatedNormally": False, 

550 "job_held_time": "2025-02-07T12:36:15", 

551 "job_released_time": "2025-02-07T12:36:07", 

552 } 

553 results = WmsSpecificInfo() 

554 report_utils._add_service_job_specific_info(job_ad, results) 

555 self.assertEqual( 

556 results.context, 

557 { 

558 "job_name": "provisioningJob", 

559 "status": "DELETED", 

560 "status_details": "(Job was held for the following reason: via condor_hold (by user mgower))", 

561 }, 

562 ) 

563 

564 

565class GetRunSummaryTestCase(unittest.TestCase): 

566 """Test _get_run_summary function.""" 

567 

568 def testJobSummaryInJobAd(self): 

569 summary = "pipetaskInit:1;label1:2;label2:2;finalJob:1" 

570 job_ad = {"ClusterId": 8659, "DAGNodeName": "testJob", "bps_job_summary": summary} 

571 results = report_utils._get_run_summary(job_ad) 

572 self.assertEqual(results, summary) 

573 

574 def testRunSummaryInJobAd(self): 

575 summary = "pipetaskInit:1;label1:2;label2:2;finalJob:1" 

576 job_ad = {"ClusterId": 8659, "DAGNodeName": "testJob", "bps_run_summary": summary} 

577 results = report_utils._get_run_summary(job_ad) 

578 self.assertEqual(results, summary) 

579 

580 def testSummaryFromDag(self): 

581 with temporaryDirectory() as tmp_dir: 

582 copy2(f"{TESTDIR}/data/good.dag", tmp_dir) 

583 job_ad = {"ClusterId": 8659, "DAGNodeName": "testJob", "Iwd": tmp_dir} 

584 results = report_utils._get_run_summary(job_ad) 

585 self.assertEqual(results, "pipetaskInit:1;label1:1;label2:1;label3:1;finalJob:1") 

586 

587 def testSummaryNoDag(self): 

588 with self.assertLogs(logger=logger, level="WARNING") as cm: 

589 with temporaryDirectory() as tmp_dir: 

590 job_ad = {"ClusterId": 8659, "DAGNodeName": "testJob", "Iwd": tmp_dir} 

591 results = report_utils._get_run_summary(job_ad) 

592 self.assertEqual(results, "") 

593 self.assertIn("lsst.ctrl.bps.htcondor", cm.records[0].name) 

594 self.assertIn("Could not get run summary for htcondor job", cm.output[0]) 

595 

596 

597class IsServiceJobTestCase(unittest.TestCase): 

598 """Test is_service_job function.""" 

599 

600 def testNotServiceJob(self): 

601 job_ad = {"ClusterId": 8659, "DAGNodeName": "testJob", "wms_node_type": lssthtc.WmsNodeType.PAYLOAD} 

602 self.assertFalse(report_utils.is_service_job(job_ad)) 

603 

604 def testIsServiceJob(self): 

605 job_ad = {"ClusterId": 8659, "DAGNodeName": "testJob", "wms_node_type": lssthtc.WmsNodeType.SERVICE} 

606 self.assertTrue(report_utils.is_service_job(job_ad)) 

607 

608 def testMissingBpsType(self): 

609 job_ad = { 

610 "ClusterId": 8659, 

611 "DAGNodeName": "testJob", 

612 } 

613 self.assertFalse(report_utils.is_service_job(job_ad)) 

614 

615 

616class CreateDetailedReportFromJobsTestCase(unittest.TestCase): 

617 """Test _create_detailed_report_from_jobs function.""" 

618 

619 def testTinySuccess(self): 

620 with temporaryDirectory() as tmp_dir: 

621 test_submit_dir = os.path.join(tmp_dir, "tiny_success") 

622 copytree(f"{TESTDIR}/data/tiny_success", test_submit_dir) 

623 wms_workflow_id, jobs, message = report_utils._get_info_from_path(test_submit_dir) 

624 run_reports = report_utils._create_detailed_report_from_jobs(wms_workflow_id, jobs) 

625 self.assertEqual(len(run_reports), 1) 

626 report = run_reports[wms_workflow_id] 

627 self.assertEqual(report.wms_id, wms_workflow_id) 

628 self.assertEqual(report.state, WmsStates.SUCCEEDED) 

629 self.assertTrue(os.path.samefile(report.path, test_submit_dir)) 

630 self.assertEqual(report.run_summary, "pipetaskInit:1;label1:1;label2:1;finalJob:1") 

631 self.assertEqual( 

632 report.job_state_counts, 

633 { 

634 WmsStates.UNKNOWN: 0, 

635 WmsStates.MISFIT: 0, 

636 WmsStates.UNREADY: 0, 

637 WmsStates.READY: 0, 

638 WmsStates.PENDING: 0, 

639 WmsStates.RUNNING: 0, 

640 WmsStates.DELETED: 0, 

641 WmsStates.HELD: 0, 

642 WmsStates.SUCCEEDED: 4, 

643 WmsStates.FAILED: 0, 

644 WmsStates.PRUNED: 0, 

645 }, 

646 ) 

647 self.assertEqual( 

648 report.specific_info.context, 

649 {"job_name": "provisioningJob", "status": "SUCCEEDED", "status_details": ""}, 

650 ) 

651 

652 def testTinyProblems(self): 

653 with temporaryDirectory() as tmp_dir: 

654 test_submit_dir = os.path.join(tmp_dir, "tiny_problems") 

655 copytree(f"{TESTDIR}/data/tiny_problems", test_submit_dir) 

656 wms_workflow_id, jobs, message = report_utils._get_info_from_path(test_submit_dir) 

657 run_reports = report_utils._create_detailed_report_from_jobs(wms_workflow_id, jobs) 

658 self.assertEqual(len(run_reports), 1) 

659 report = run_reports[wms_workflow_id] 

660 self.assertEqual(report.wms_id, wms_workflow_id) 

661 self.assertEqual(report.state, WmsStates.FAILED) 

662 self.assertTrue(os.path.samefile(report.path, test_submit_dir)) 

663 self.assertEqual(report.run_summary, "pipetaskInit:1;label1:2;label2:2;finalJob:1") 

664 self.assertEqual( 

665 report.job_state_counts, 

666 { 

667 WmsStates.UNKNOWN: 0, 

668 WmsStates.MISFIT: 0, 

669 WmsStates.UNREADY: 0, 

670 WmsStates.READY: 0, 

671 WmsStates.PENDING: 0, 

672 WmsStates.RUNNING: 0, 

673 WmsStates.DELETED: 0, 

674 WmsStates.HELD: 0, 

675 WmsStates.SUCCEEDED: 4, 

676 WmsStates.FAILED: 1, 

677 WmsStates.PRUNED: 1, 

678 }, 

679 ) 

680 self.assertEqual( 

681 run_reports[wms_workflow_id].specific_info.context, 

682 {"job_name": "provisioningJob", "status": "SUCCEEDED", "status_details": ""}, 

683 ) 

684 

685 def testTinyRunning(self): 

686 with temporaryDirectory() as tmp_dir: 

687 test_submit_dir = os.path.join(tmp_dir, "tiny_running") 

688 copytree(f"{TESTDIR}/data/tiny_running", test_submit_dir) 

689 wms_workflow_id, jobs, message = report_utils._get_info_from_path(test_submit_dir) 

690 run_reports = report_utils._create_detailed_report_from_jobs(wms_workflow_id, jobs) 

691 self.assertEqual(len(run_reports), 1) 

692 report = run_reports[wms_workflow_id] 

693 self.assertEqual(report.wms_id, wms_workflow_id) 

694 self.assertEqual(report.state, WmsStates.RUNNING) 

695 self.assertTrue(os.path.samefile(report.path, test_submit_dir)) 

696 self.assertEqual(report.run_summary, "pipetaskInit:1;label1:1;label2:1;finalJob:1") 

697 self.assertEqual( 

698 report.job_state_counts, 

699 { 

700 WmsStates.UNKNOWN: 0, 

701 WmsStates.MISFIT: 0, 

702 WmsStates.UNREADY: 2, 

703 WmsStates.READY: 0, 

704 WmsStates.PENDING: 0, 

705 WmsStates.RUNNING: 1, 

706 WmsStates.DELETED: 0, 

707 WmsStates.HELD: 0, 

708 WmsStates.SUCCEEDED: 1, 

709 WmsStates.FAILED: 0, 

710 WmsStates.PRUNED: 0, 

711 }, 

712 ) 

713 self.assertEqual( 

714 report.specific_info.context, 

715 {"job_name": "provisioningJob", "status": "RUNNING", "status_details": ""}, 

716 ) 

717 

718 def testNoopRunning(self): 

719 with temporaryDirectory() as tmp_dir: 

720 test_submit_dir = os.path.join(tmp_dir, "noop_running_1") 

721 copytree(f"{TESTDIR}/data/noop_running_1", test_submit_dir) 

722 wms_workflow_id, jobs, message = report_utils._get_info_from_path(test_submit_dir) 

723 run_reports = report_utils._create_detailed_report_from_jobs(wms_workflow_id, jobs) 

724 self.assertEqual(len(run_reports), 1) 

725 report = run_reports[wms_workflow_id] 

726 self.assertEqual(report.wms_id, wms_workflow_id) 

727 self.assertEqual(report.state, WmsStates.RUNNING) 

728 self.assertTrue(os.path.samefile(report.path, test_submit_dir)) 

729 self.assertEqual( 

730 set(report.run_summary.split(";")), 

731 {"pipetaskInit:1", "label1:6", "label2:6", "label3:6", "label4:6", "label5:6", "finalJob:1"}, 

732 ) 

733 self.assertEqual( 

734 report.job_state_counts, 

735 { 

736 WmsStates.UNKNOWN: 0, 

737 WmsStates.MISFIT: 0, 

738 WmsStates.UNREADY: 12, 

739 WmsStates.READY: 0, 

740 WmsStates.PENDING: 1, 

741 WmsStates.RUNNING: 10, 

742 WmsStates.DELETED: 0, 

743 WmsStates.HELD: 0, 

744 WmsStates.SUCCEEDED: 9, 

745 WmsStates.FAILED: 0, 

746 WmsStates.PRUNED: 0, 

747 }, 

748 ) 

749 self.assertEqual(report.total_number_jobs, 32) 

750 self.assertIsNone(report.specific_info) 

751 

752 def testNoopFailed(self): 

753 with temporaryDirectory() as tmp_dir: 

754 test_submit_dir = os.path.join(tmp_dir, "noop_failed_1") 

755 copytree(f"{TESTDIR}/data/noop_failed_1", test_submit_dir) 

756 wms_workflow_id, jobs, message = report_utils._get_info_from_path(test_submit_dir) 

757 run_reports = report_utils._create_detailed_report_from_jobs(wms_workflow_id, jobs) 

758 self.assertEqual(len(run_reports), 1) 

759 report = run_reports[wms_workflow_id] 

760 self.assertEqual(report.wms_id, wms_workflow_id) 

761 self.assertEqual(report.state, WmsStates.FAILED) 

762 self.assertTrue(os.path.samefile(report.path, test_submit_dir)) 

763 self.assertEqual( 

764 set(report.run_summary.split(";")), 

765 {"pipetaskInit:1", "label1:6", "label2:6", "label3:6", "label4:6", "label5:6", "finalJob:1"}, 

766 ) 

767 self.assertEqual( 

768 report.job_state_counts, 

769 { 

770 WmsStates.UNKNOWN: 0, 

771 WmsStates.MISFIT: 0, 

772 WmsStates.UNREADY: 0, 

773 WmsStates.READY: 0, 

774 WmsStates.PENDING: 0, 

775 WmsStates.RUNNING: 0, 

776 WmsStates.DELETED: 0, 

777 WmsStates.HELD: 0, 

778 WmsStates.SUCCEEDED: 27, 

779 WmsStates.FAILED: 1, 

780 WmsStates.PRUNED: 4, 

781 }, 

782 ) 

783 self.assertEqual(report.total_number_jobs, 32) 

784 self.assertIsNone(report.specific_info) 

785 self.assertEqual( 

786 report.exit_code_summary, 

787 { 

788 "pipetaskInit": [], 

789 "label1": [], 

790 "label2": [1], 

791 "label3": [], 

792 "label4": [], 

793 "label5": [], 

794 "finalJob": [], 

795 }, 

796 ) 

797 

798 

799class GetStatusFromIdTestCase(unittest.TestCase): 

800 """Test _get_status_from_id function.""" 

801 

802 @unittest.mock.patch("lsst.ctrl.bps.htcondor.report_utils._get_info_from_schedd") 

803 def testNotFound(self, mock_get): 

804 mock_get.return_value = {} 

805 

806 state, message = report_utils._get_status_from_id("100", 0, {}) 

807 

808 mock_get.assert_called_once_with("100", 0, {}) 

809 

810 self.assertEqual(state, WmsStates.UNKNOWN) 

811 self.assertEqual(message, "DAGMan job 100 not found in queue or history. Check id or try path.") 

812 

813 @unittest.mock.patch("lsst.ctrl.bps.htcondor.report_utils._htc_status_to_wms_state") 

814 @unittest.mock.patch("lsst.ctrl.bps.htcondor.report_utils._get_info_from_schedd") 

815 def testFound(self, mock_get, mock_conversion): 

816 fake_id = "100.0" 

817 dag_ads = {fake_id: {"JobStatus": lssthtc.JobStatus.RUNNING}} 

818 mock_get.return_value = {"schedd1": dag_ads} 

819 mock_conversion.return_value = WmsStates.RUNNING 

820 

821 state, message = report_utils._get_status_from_id(fake_id, 0, {}) 

822 

823 mock_get.assert_called_once_with(fake_id, 0, {}) 

824 mock_conversion.assert_called_once_with(dag_ads[fake_id]) 

825 

826 self.assertEqual(state, WmsStates.RUNNING) 

827 self.assertEqual(message, "") 

828 

829 

830class GetStatusFromPathTestCase(unittest.TestCase): 

831 """Test _get_status_from_path function.""" 

832 

833 @unittest.mock.patch("lsst.ctrl.bps.htcondor.report_utils.read_dag_log") 

834 def testNoDagLog(self, mock_read): 

835 mock_read.side_effect = FileNotFoundError 

836 

837 fake_path = "/fake/path" 

838 state, message = report_utils._get_status_from_path(fake_path) 

839 

840 mock_read.assert_called_once_with(Path(fake_path)) 

841 

842 self.assertEqual(state, WmsStates.UNKNOWN) 

843 self.assertEqual(message, f"DAGMan log not found in {fake_path}. Check path.") 

844 

845 def testSuccess(self): 

846 with temporaryDirectory() as tmp_dir: 

847 test_submit_dir = os.path.join(tmp_dir, "tiny_success") 

848 copytree(f"{TESTDIR}/data/tiny_success", test_submit_dir) 

849 state, message = report_utils._get_status_from_path(test_submit_dir) 

850 

851 self.assertEqual(state, WmsStates.SUCCEEDED) 

852 self.assertEqual(message, "") 

853 

854 def testFailure(self): 

855 with temporaryDirectory() as tmp_dir: 

856 test_submit_dir = os.path.join(tmp_dir, "tiny_problems") 

857 copytree(f"{TESTDIR}/data/tiny_problems", test_submit_dir) 

858 state, message = report_utils._get_status_from_path(test_submit_dir) 

859 

860 self.assertEqual(state, WmsStates.FAILED) 

861 self.assertEqual(message, "") 

862 

863 

864if __name__ == "__main__": 

865 unittest.main()