Coverage for tests / test_bps_reports.py: 24%

195 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-17 09:04 +0000

1# This file is part of ctrl_bps. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <https://www.gnu.org/licenses/>. 

27 

28"""Tests for reporting mechanism.""" 

29 

30import dataclasses 

31import io 

32import unittest 

33 

34from astropy.table import Table 

35from wms_test_utils import TEST_REPORT 

36 

37from lsst.ctrl.bps import ( 

38 BaseRunReport, 

39 DetailedRunReport, 

40 ExitCodesReport, 

41 SummaryRunReport, 

42 WmsJobReport, 

43 WmsRunReport, 

44 WmsStates, 

45 compile_code_summary, 

46 compile_job_summary, 

47) 

48 

49 

50class FakeRunReport(BaseRunReport): 

51 """A fake run report.""" 

52 

53 def add(self, run_report, use_global_id=False): 

54 id_ = run_report.global_wms_id if use_global_id else run_report.wms_id 

55 self._table.add_row([id_, run_report.state.name]) 

56 

57 

58class FakeRunReportTestCase(unittest.TestCase): 

59 """Test shared methods.""" 

60 

61 def setUp(self): 

62 self.fields = [("ID", "S"), ("STATE", "S")] 

63 

64 self.report = FakeRunReport(self.fields) 

65 self.report.add(WmsRunReport(wms_id="2.0", state=WmsStates.RUNNING)) 

66 self.report.add(WmsRunReport(wms_id="1.0", state=WmsStates.SUCCEEDED)) 

67 

68 def testEquality(self): 

69 """Test if two reports are identical.""" 

70 other = FakeRunReport(self.fields) 

71 other.add(WmsRunReport(wms_id="2.0", state=WmsStates.RUNNING)) 

72 other.add(WmsRunReport(wms_id="1.0", state=WmsStates.SUCCEEDED)) 

73 self.assertEqual(self.report, other) 

74 

75 def testInequality(self): 

76 """Test if two reports are not identical.""" 

77 other = FakeRunReport(self.fields) 

78 other.add(WmsRunReport(wms_id="1.0", state=WmsStates.FAILED)) 

79 self.assertNotEqual(self.report, other) 

80 

81 def testLength(self): 

82 self.assertEqual(len(self.report), 2) 

83 

84 def testClear(self): 

85 """Test clearing the report.""" 

86 self.report.clear() 

87 self.assertEqual(len(self.report), 0) 

88 

89 def testSortWithKnownKey(self): 

90 """Test sorting the report using known column.""" 

91 expected_output = io.StringIO() 

92 expected = Table(dtype=self.fields) 

93 expected.add_row(["1.0", WmsStates.SUCCEEDED.name]) 

94 expected.add_row(["2.0", WmsStates.RUNNING.name]) 

95 print(expected, file=expected_output) 

96 

97 actual_output = io.StringIO() 

98 self.report.sort("ID") 

99 print(self.report, file=actual_output) 

100 

101 self.assertEqual(actual_output.getvalue(), expected_output.getvalue()) 

102 

103 expected_output.close() 

104 actual_output.close() 

105 

106 def testSortWithUnknownKey(self): 

107 """Test sorting the report using unknown column.""" 

108 with self.assertRaises(AttributeError): 

109 self.report.sort("foo") 

110 

111 

112class SummaryRunReportTestCase(unittest.TestCase): 

113 """Test a summary run report.""" 

114 

115 def setUp(self): 

116 self.fields = [ 

117 ("X", "S"), 

118 ("STATE", "S"), 

119 ("%S", "S"), 

120 ("ID", "S"), 

121 ("OPERATOR", "S"), 

122 ("PROJECT", "S"), 

123 ("CAMPAIGN", "S"), 

124 ("PAYLOAD", "S"), 

125 ("RUN", "S"), 

126 ] 

127 self.run = WmsRunReport( 

128 wms_id="1.0", 

129 global_wms_id="foo#1.0", 

130 path="/path/to/run", 

131 label="label", 

132 run="run", 

133 project="dev", 

134 campaign="testing", 

135 payload="test", 

136 operator="tester", 

137 run_summary="foo:1;bar:1", 

138 state=WmsStates.RUNNING, 

139 jobs=None, 

140 total_number_jobs=2, 

141 job_state_counts={ 

142 state: 1 if state in {WmsStates.SUCCEEDED, WmsStates.RUNNING} else 0 for state in WmsStates 

143 }, 

144 job_summary=None, 

145 ) 

146 self.report = SummaryRunReport(self.fields) 

147 

148 self.expected = Table(dtype=self.fields) 

149 self.expected.add_row(["", "RUNNING", "50", "1.0", "tester", "dev", "testing", "test", "run"]) 

150 

151 self.expected_output = io.StringIO() 

152 self.actual_output = io.StringIO() 

153 

154 def tearDown(self): 

155 self.expected_output.close() 

156 self.actual_output.close() 

157 

158 def testAddWithNoFlag(self): 

159 """Test adding a report for a run with no issues.""" 

160 print("\n".join(self.expected.pformat(max_lines=-1, max_width=-1)), file=self.expected_output) 

161 

162 self.report.add(self.run) 

163 print(self.report, file=self.actual_output) 

164 

165 self.assertEqual(self.actual_output.getvalue(), self.expected_output.getvalue()) 

166 

167 def testAddWithFailedFlag(self): 

168 """Test adding a run with a failed job.""" 

169 self.expected["X"][0] = "F" 

170 print("\n".join(self.expected.pformat(max_lines=-1, max_width=-1)), file=self.expected_output) 

171 

172 # Alter the run report to include a failed job. 

173 self.run.job_state_counts = { 

174 state: 1 if state in {WmsStates.FAILED, WmsStates.SUCCEEDED} else 0 for state in WmsStates 

175 } 

176 self.report.add(self.run) 

177 print(self.report, file=self.actual_output) 

178 

179 self.assertEqual(self.actual_output.getvalue(), self.expected_output.getvalue()) 

180 

181 def testAddWithHeldFlag(self): 

182 """Test adding a run with a held job.""" 

183 self.expected["X"][0] = "H" 

184 print("\n".join(self.expected.pformat(max_lines=-1, max_width=-1)), file=self.expected_output) 

185 

186 # Alter the run report to include a held job. 

187 self.run.job_state_counts = { 

188 state: 1 if state in {WmsStates.SUCCEEDED, WmsStates.HELD} else 0 for state in WmsStates 

189 } 

190 self.report.add(self.run) 

191 print(self.report, file=self.actual_output) 

192 

193 self.assertEqual(self.actual_output.getvalue(), self.expected_output.getvalue()) 

194 

195 def testAddWithDeletedFlag(self): 

196 """Test adding a run with a deleted job.""" 

197 self.expected["X"][0] = "D" 

198 print("\n".join(self.expected.pformat(max_lines=-1, max_width=-1)), file=self.expected_output) 

199 

200 # Alter the run report to include a deleted job. 

201 self.run.job_state_counts = { 

202 state: 1 if state in {WmsStates.SUCCEEDED, WmsStates.DELETED} else 0 for state in WmsStates 

203 } 

204 self.report.add(self.run) 

205 print(self.report, file=self.actual_output) 

206 

207 self.assertEqual(self.actual_output.getvalue(), self.expected_output.getvalue()) 

208 

209 

210class DetailedRunReportTestCase(unittest.TestCase): 

211 """Test a detailed run report.""" 

212 

213 def setUp(self): 

214 self.fields = [("", "S")] + [(state.name, "I") for state in WmsStates] + [("EXPECTED", "i")] 

215 

216 table = Table(dtype=self.fields) 

217 table.add_row( 

218 ["TOTAL"] 

219 + [1 if state in {WmsStates.RUNNING, WmsStates.SUCCEEDED} else 0 for state in WmsStates] 

220 + [2] 

221 ) 

222 table.add_row(["foo"] + [1 if state == WmsStates.SUCCEEDED else 0 for state in WmsStates] + [1]) 

223 table.add_row(["bar"] + [1 if state == WmsStates.RUNNING else 0 for state in WmsStates] + [1]) 

224 self.expected = DetailedRunReport.from_table(table) 

225 

226 self.run = WmsRunReport( 

227 wms_id="1.0", 

228 global_wms_id="foo#1.0", 

229 path="/path/to/run", 

230 label="label", 

231 run="run", 

232 project="dev", 

233 campaign="testing", 

234 payload="test", 

235 operator="tester", 

236 run_summary="foo:1;bar:1", 

237 state=WmsStates.RUNNING, 

238 jobs=[ 

239 WmsJobReport(wms_id="1.0", name="", label="foo", state=WmsStates.SUCCEEDED), 

240 WmsJobReport(wms_id="2.0", name="", label="bar", state=WmsStates.RUNNING), 

241 ], 

242 total_number_jobs=2, 

243 job_state_counts={ 

244 state: 1 if state in {WmsStates.SUCCEEDED, WmsStates.RUNNING} else 0 for state in WmsStates 

245 }, 

246 job_summary={ 

247 "foo": {state: 1 if state == WmsStates.SUCCEEDED else 0 for state in WmsStates}, 

248 "bar": {state: 1 if state == WmsStates.RUNNING else 0 for state in WmsStates}, 

249 }, 

250 ) 

251 

252 self.actual = DetailedRunReport(self.fields) 

253 

254 def testAddWithJobSummary(self): 

255 """Test adding a run with a job summary.""" 

256 self.run.jobs = None 

257 self.actual.add(self.run) 

258 

259 self.assertEqual(self.actual, self.expected) 

260 

261 def testAddWithoutJobSummary(self): 

262 """Test adding a run without either a job summary or job info.""" 

263 self.run.jobs = None 

264 self.run.job_summary = None 

265 self.actual.add(self.run) 

266 

267 self.assertEqual(len(self.actual), 1) 

268 self.assertRegex(self.actual.message, r"^WARNING.*incomplete") 

269 

270 def testAddWithoutRunSummary(self): 

271 """Test adding a run without a run summary.""" 

272 table = Table(dtype=self.fields) 

273 table.add_row( 

274 ["TOTAL"] 

275 + [1 if state in {WmsStates.RUNNING, WmsStates.SUCCEEDED} else 0 for state in WmsStates] 

276 + [2] 

277 ) 

278 table.add_row(["bar"] + [1 if state == WmsStates.RUNNING else 0 for state in WmsStates] + [-1]) 

279 table.add_row(["foo"] + [1 if state == WmsStates.SUCCEEDED else 0 for state in WmsStates] + [-1]) 

280 expected = DetailedRunReport.from_table(table) 

281 

282 self.run.run_summary = None 

283 self.actual.add(self.run) 

284 

285 self.assertRegex(self.actual.message, r"^WARNING.*sorted alphabetically") 

286 self.assertEqual(self.actual, expected) 

287 

288 

289class ExitCodesReportTestCase(unittest.TestCase): 

290 """Test an exit code report.""" 

291 

292 def setUp(self): 

293 self.fields = [ 

294 (" ", "S"), 

295 ("PAYLOAD ERROR COUNT", "i"), 

296 ("PAYLOAD ERROR CODES", "S"), 

297 ("INFRASTRUCTURE ERROR COUNT", "i"), 

298 ("INFRASTRUCTURE ERROR CODES", "S"), 

299 ] 

300 

301 table = Table(dtype=self.fields) 

302 table.add_row(["foo", 0, "None", 0, "None"]) 

303 table.add_row(["bar", 2, "1, 2", 2, "3, 4"]) 

304 self.expected = ExitCodesReport.from_table(table) 

305 

306 self.run = WmsRunReport( 

307 wms_id="1.0", 

308 global_wms_id="foo#1.0", 

309 path="/path/to/run", 

310 label="label", 

311 run="run", 

312 project="dev", 

313 campaign="testing", 

314 payload="test", 

315 operator="tester", 

316 run_summary="foo:1;bar:1", 

317 state=WmsStates.RUNNING, 

318 jobs=[ 

319 WmsJobReport(wms_id="1.0", name="", label="foo", state=WmsStates.SUCCEEDED), 

320 WmsJobReport(wms_id="2.0", name="", label="bar", state=WmsStates.RUNNING), 

321 ], 

322 total_number_jobs=2, 

323 job_state_counts={ 

324 state: 1 if state in {WmsStates.SUCCEEDED, WmsStates.RUNNING} else 0 for state in WmsStates 

325 }, 

326 job_summary={ 

327 "foo": {state: 1 if state == WmsStates.SUCCEEDED else 0 for state in WmsStates}, 

328 "bar": {state: 1 if state == WmsStates.RUNNING else 0 for state in WmsStates}, 

329 }, 

330 exit_code_summary={ 

331 "foo": [], 

332 "bar": [1, 2, 3, 4], 

333 }, 

334 ) 

335 

336 self.actual = ExitCodesReport(self.fields) 

337 

338 def testAddSuccess(self): 

339 """Test adding a run successfully.""" 

340 self.actual.add(self.run) 

341 

342 self.assertEqual(len(self.actual), 2) 

343 self.assertEqual(self.actual, self.expected) 

344 

345 def testAddFailure(self): 

346 """Test adding a run unsuccessfully.""" 

347 self.run.job_summary = {} 

348 self.run.exit_code_summary = {} 

349 

350 self.actual.add(self.run) 

351 

352 self.assertEqual(len(self.actual), 0) 

353 self.assertRegex(self.actual.message, r"^WARNING.*report.*incomplete") 

354 

355 def testAddWithoutRunSummary(self): 

356 """Test adding a run without a run summary.""" 

357 self.run.run_summary = None 

358 

359 self.actual.add(self.run) 

360 

361 self.assertRegex(self.actual.message, r"^WARNING.*sorted alphabetically") 

362 

363 

364class CompileJobSummaryTestCase(unittest.TestCase): 

365 """Test compiling a job summary.""" 

366 

367 def setUp(self): 

368 self.report = dataclasses.replace(TEST_REPORT) 

369 

370 def tearDown(self): 

371 pass 

372 

373 def testSummaryExists(self): 

374 """Test if the existing report is not altered.""" 

375 # Create a report with a "fake" job summary, i.e., a summary which 

376 # differs from the one which would be compiled from the information 

377 # about individual jobs. 

378 expected = dataclasses.replace( 

379 self.report, 

380 job_summary={"foo": {state: 1 if state == WmsStates.FAILED else 0 for state in WmsStates}}, 

381 ) 

382 result = dataclasses.replace(expected) 

383 

384 messages = compile_job_summary(result) 

385 

386 self.assertEqual(result, expected) 

387 self.assertFalse(messages) 

388 

389 def testSummaryMissing(self): 

390 """Test if the summary is compiled if necessary.""" 

391 result = dataclasses.replace(self.report, job_summary=None) 

392 

393 messages = compile_job_summary(result) 

394 

395 self.assertEqual(result, self.report) 

396 self.assertFalse(messages) 

397 

398 def testCompilationError(self): 

399 """Test if a warning is issued if the summary cannot be compiled.""" 

400 result = dataclasses.replace(self.report, jobs=None, job_summary=None) 

401 

402 messages = compile_job_summary(result) 

403 

404 self.assertEqual(len(messages), 1) 

405 self.assertRegex(messages[0], r"information.*not available") 

406 

407 

408class CompileCodeSummaryTestCase(unittest.TestCase): 

409 """Test compiling a code summary.""" 

410 

411 def setUp(self): 

412 self.report = WmsRunReport( 

413 wms_id="1.0", 

414 global_wms_id="foo#1.0", 

415 path="/path/to/run", 

416 label="label", 

417 run="run", 

418 project="dev", 

419 campaign="testing", 

420 payload="test", 

421 operator="tester", 

422 run_summary="foo:1;bar:1;baz:1", 

423 state=WmsStates.RUNNING, 

424 jobs=[ 

425 WmsJobReport(wms_id="1.0", name="", label="foo", state=WmsStates.SUCCEEDED), 

426 WmsJobReport(wms_id="2.0", name="", label="bar", state=WmsStates.FAILED), 

427 WmsJobReport(wms_id="3.0", name="", label="baz", state=WmsStates.RUNNING), 

428 ], 

429 total_number_jobs=3, 

430 job_state_counts={ 

431 state: 1 if state in {WmsStates.SUCCEEDED, WmsStates.FAILED, WmsStates.RUNNING} else 0 

432 for state in WmsStates 

433 }, 

434 job_summary={ 

435 "foo": {state: 1 if state == WmsStates.SUCCEEDED else 0 for state in WmsStates}, 

436 "bar": {state: 1 if state == WmsStates.FAILED else 0 for state in WmsStates}, 

437 "baz": {state: 1 if state == WmsStates.RUNNING else 0 for state in WmsStates}, 

438 }, 

439 exit_code_summary={"foo": [], "bar": [1], "baz": []}, 

440 ) 

441 

442 def tearDown(self): 

443 pass 

444 

445 def testAddingMissingEntries(self): 

446 """Test if the missing entries are added to the summary.""" 

447 result = dataclasses.replace(self.report, exit_code_summary={"bar": [1]}) 

448 

449 messages = compile_code_summary(result) 

450 

451 self.assertEqual(result, self.report) 

452 self.assertFalse(messages) 

453 

454 def testDetectingMismatches(self): 

455 """Test if a mismatch between exit codes and failures is reported.""" 

456 expected = dataclasses.replace(self.report, exit_code_summary={"foo": [1], "bar": [1], "baz": []}) 

457 result = dataclasses.replace(expected) 

458 

459 messages = compile_code_summary(result) 

460 

461 self.assertEqual(result, expected) 

462 self.assertEqual(len(messages), 1) 

463 self.assertRegex(messages[0], r"exit codes.*differs.*failures.*labels: foo") 

464 

465 def testDetectingOmissions(self): 

466 """Test if a failure not reflected in exit codes is reported.""" 

467 expected = dataclasses.replace(self.report, exit_code_summary={"foo": [], "baz": []}) 

468 result = dataclasses.replace(expected) 

469 

470 messages = compile_code_summary(result) 

471 

472 self.assertEqual(result, expected) 

473 self.assertEqual(len(messages), 1) 

474 self.assertRegex(messages[0], r"exit codes.*not available.*labels: bar") 

475 

476 def testDetectingDiscrepancies(self): 

477 """Test if multiple discrepancies are reported.""" 

478 expected = dataclasses.replace(self.report, exit_code_summary={"foo": [], "baz": [1]}) 

479 result = dataclasses.replace(expected) 

480 

481 messages = compile_code_summary(result) 

482 

483 self.assertEqual(result, expected) 

484 self.assertEqual(len(messages), 2) 

485 self.assertRegex(messages[0], r"exit codes.*differs.*failures.*labels: baz") 

486 self.assertRegex(messages[1], r"exit codes.*not available.*labels: bar") 

487 

488 def testHandlingNoJobSummary(self): 

489 """Test if the existing report is not altered if no job summary.""" 

490 expected = dataclasses.replace(self.report, job_summary=None) 

491 result = dataclasses.replace(expected) 

492 

493 messages = compile_code_summary(result) 

494 

495 self.assertEqual(result, expected) 

496 self.assertFalse(messages) 

497 

498 

499if __name__ == "__main__": 

500 unittest.main()