Coverage for tests / test_bps_reports.py: 24%
195 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-18 08:47 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-18 08:47 +0000
1# This file is part of ctrl_bps.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <https://www.gnu.org/licenses/>.
28"""Tests for reporting mechanism."""
30import dataclasses
31import io
32import unittest
34from astropy.table import Table
35from wms_test_utils import TEST_REPORT
37from lsst.ctrl.bps import (
38 BaseRunReport,
39 DetailedRunReport,
40 ExitCodesReport,
41 SummaryRunReport,
42 WmsJobReport,
43 WmsRunReport,
44 WmsStates,
45 compile_code_summary,
46 compile_job_summary,
47)
50class FakeRunReport(BaseRunReport):
51 """A fake run report."""
53 def add(self, run_report, use_global_id=False):
54 id_ = run_report.global_wms_id if use_global_id else run_report.wms_id
55 self._table.add_row([id_, run_report.state.name])
58class FakeRunReportTestCase(unittest.TestCase):
59 """Test shared methods."""
61 def setUp(self):
62 self.fields = [("ID", "S"), ("STATE", "S")]
64 self.report = FakeRunReport(self.fields)
65 self.report.add(WmsRunReport(wms_id="2.0", state=WmsStates.RUNNING))
66 self.report.add(WmsRunReport(wms_id="1.0", state=WmsStates.SUCCEEDED))
68 def testEquality(self):
69 """Test if two reports are identical."""
70 other = FakeRunReport(self.fields)
71 other.add(WmsRunReport(wms_id="2.0", state=WmsStates.RUNNING))
72 other.add(WmsRunReport(wms_id="1.0", state=WmsStates.SUCCEEDED))
73 self.assertEqual(self.report, other)
75 def testInequality(self):
76 """Test if two reports are not identical."""
77 other = FakeRunReport(self.fields)
78 other.add(WmsRunReport(wms_id="1.0", state=WmsStates.FAILED))
79 self.assertNotEqual(self.report, other)
81 def testLength(self):
82 self.assertEqual(len(self.report), 2)
84 def testClear(self):
85 """Test clearing the report."""
86 self.report.clear()
87 self.assertEqual(len(self.report), 0)
89 def testSortWithKnownKey(self):
90 """Test sorting the report using known column."""
91 expected_output = io.StringIO()
92 expected = Table(dtype=self.fields)
93 expected.add_row(["1.0", WmsStates.SUCCEEDED.name])
94 expected.add_row(["2.0", WmsStates.RUNNING.name])
95 print(expected, file=expected_output)
97 actual_output = io.StringIO()
98 self.report.sort("ID")
99 print(self.report, file=actual_output)
101 self.assertEqual(actual_output.getvalue(), expected_output.getvalue())
103 expected_output.close()
104 actual_output.close()
106 def testSortWithUnknownKey(self):
107 """Test sorting the report using unknown column."""
108 with self.assertRaises(AttributeError):
109 self.report.sort("foo")
112class SummaryRunReportTestCase(unittest.TestCase):
113 """Test a summary run report."""
115 def setUp(self):
116 self.fields = [
117 ("X", "S"),
118 ("STATE", "S"),
119 ("%S", "S"),
120 ("ID", "S"),
121 ("OPERATOR", "S"),
122 ("PROJECT", "S"),
123 ("CAMPAIGN", "S"),
124 ("PAYLOAD", "S"),
125 ("RUN", "S"),
126 ]
127 self.run = WmsRunReport(
128 wms_id="1.0",
129 global_wms_id="foo#1.0",
130 path="/path/to/run",
131 label="label",
132 run="run",
133 project="dev",
134 campaign="testing",
135 payload="test",
136 operator="tester",
137 run_summary="foo:1;bar:1",
138 state=WmsStates.RUNNING,
139 jobs=None,
140 total_number_jobs=2,
141 job_state_counts={
142 state: 1 if state in {WmsStates.SUCCEEDED, WmsStates.RUNNING} else 0 for state in WmsStates
143 },
144 job_summary=None,
145 )
146 self.report = SummaryRunReport(self.fields)
148 self.expected = Table(dtype=self.fields)
149 self.expected.add_row(["", "RUNNING", "50", "1.0", "tester", "dev", "testing", "test", "run"])
151 self.expected_output = io.StringIO()
152 self.actual_output = io.StringIO()
154 def tearDown(self):
155 self.expected_output.close()
156 self.actual_output.close()
158 def testAddWithNoFlag(self):
159 """Test adding a report for a run with no issues."""
160 print("\n".join(self.expected.pformat(max_lines=-1, max_width=-1)), file=self.expected_output)
162 self.report.add(self.run)
163 print(self.report, file=self.actual_output)
165 self.assertEqual(self.actual_output.getvalue(), self.expected_output.getvalue())
167 def testAddWithFailedFlag(self):
168 """Test adding a run with a failed job."""
169 self.expected["X"][0] = "F"
170 print("\n".join(self.expected.pformat(max_lines=-1, max_width=-1)), file=self.expected_output)
172 # Alter the run report to include a failed job.
173 self.run.job_state_counts = {
174 state: 1 if state in {WmsStates.FAILED, WmsStates.SUCCEEDED} else 0 for state in WmsStates
175 }
176 self.report.add(self.run)
177 print(self.report, file=self.actual_output)
179 self.assertEqual(self.actual_output.getvalue(), self.expected_output.getvalue())
181 def testAddWithHeldFlag(self):
182 """Test adding a run with a held job."""
183 self.expected["X"][0] = "H"
184 print("\n".join(self.expected.pformat(max_lines=-1, max_width=-1)), file=self.expected_output)
186 # Alter the run report to include a held job.
187 self.run.job_state_counts = {
188 state: 1 if state in {WmsStates.SUCCEEDED, WmsStates.HELD} else 0 for state in WmsStates
189 }
190 self.report.add(self.run)
191 print(self.report, file=self.actual_output)
193 self.assertEqual(self.actual_output.getvalue(), self.expected_output.getvalue())
195 def testAddWithDeletedFlag(self):
196 """Test adding a run with a deleted job."""
197 self.expected["X"][0] = "D"
198 print("\n".join(self.expected.pformat(max_lines=-1, max_width=-1)), file=self.expected_output)
200 # Alter the run report to include a deleted job.
201 self.run.job_state_counts = {
202 state: 1 if state in {WmsStates.SUCCEEDED, WmsStates.DELETED} else 0 for state in WmsStates
203 }
204 self.report.add(self.run)
205 print(self.report, file=self.actual_output)
207 self.assertEqual(self.actual_output.getvalue(), self.expected_output.getvalue())
210class DetailedRunReportTestCase(unittest.TestCase):
211 """Test a detailed run report."""
213 def setUp(self):
214 self.fields = [("", "S")] + [(state.name, "I") for state in WmsStates] + [("EXPECTED", "i")]
216 table = Table(dtype=self.fields)
217 table.add_row(
218 ["TOTAL"]
219 + [1 if state in {WmsStates.RUNNING, WmsStates.SUCCEEDED} else 0 for state in WmsStates]
220 + [2]
221 )
222 table.add_row(["foo"] + [1 if state == WmsStates.SUCCEEDED else 0 for state in WmsStates] + [1])
223 table.add_row(["bar"] + [1 if state == WmsStates.RUNNING else 0 for state in WmsStates] + [1])
224 self.expected = DetailedRunReport.from_table(table)
226 self.run = WmsRunReport(
227 wms_id="1.0",
228 global_wms_id="foo#1.0",
229 path="/path/to/run",
230 label="label",
231 run="run",
232 project="dev",
233 campaign="testing",
234 payload="test",
235 operator="tester",
236 run_summary="foo:1;bar:1",
237 state=WmsStates.RUNNING,
238 jobs=[
239 WmsJobReport(wms_id="1.0", name="", label="foo", state=WmsStates.SUCCEEDED),
240 WmsJobReport(wms_id="2.0", name="", label="bar", state=WmsStates.RUNNING),
241 ],
242 total_number_jobs=2,
243 job_state_counts={
244 state: 1 if state in {WmsStates.SUCCEEDED, WmsStates.RUNNING} else 0 for state in WmsStates
245 },
246 job_summary={
247 "foo": {state: 1 if state == WmsStates.SUCCEEDED else 0 for state in WmsStates},
248 "bar": {state: 1 if state == WmsStates.RUNNING else 0 for state in WmsStates},
249 },
250 )
252 self.actual = DetailedRunReport(self.fields)
254 def testAddWithJobSummary(self):
255 """Test adding a run with a job summary."""
256 self.run.jobs = None
257 self.actual.add(self.run)
259 self.assertEqual(self.actual, self.expected)
261 def testAddWithoutJobSummary(self):
262 """Test adding a run without either a job summary or job info."""
263 self.run.jobs = None
264 self.run.job_summary = None
265 self.actual.add(self.run)
267 self.assertEqual(len(self.actual), 1)
268 self.assertRegex(self.actual.message, r"^WARNING.*incomplete")
270 def testAddWithoutRunSummary(self):
271 """Test adding a run without a run summary."""
272 table = Table(dtype=self.fields)
273 table.add_row(
274 ["TOTAL"]
275 + [1 if state in {WmsStates.RUNNING, WmsStates.SUCCEEDED} else 0 for state in WmsStates]
276 + [2]
277 )
278 table.add_row(["bar"] + [1 if state == WmsStates.RUNNING else 0 for state in WmsStates] + [-1])
279 table.add_row(["foo"] + [1 if state == WmsStates.SUCCEEDED else 0 for state in WmsStates] + [-1])
280 expected = DetailedRunReport.from_table(table)
282 self.run.run_summary = None
283 self.actual.add(self.run)
285 self.assertRegex(self.actual.message, r"^WARNING.*sorted alphabetically")
286 self.assertEqual(self.actual, expected)
289class ExitCodesReportTestCase(unittest.TestCase):
290 """Test an exit code report."""
292 def setUp(self):
293 self.fields = [
294 (" ", "S"),
295 ("PAYLOAD ERROR COUNT", "i"),
296 ("PAYLOAD ERROR CODES", "S"),
297 ("INFRASTRUCTURE ERROR COUNT", "i"),
298 ("INFRASTRUCTURE ERROR CODES", "S"),
299 ]
301 table = Table(dtype=self.fields)
302 table.add_row(["foo", 0, "None", 0, "None"])
303 table.add_row(["bar", 2, "1, 2", 2, "3, 4"])
304 self.expected = ExitCodesReport.from_table(table)
306 self.run = WmsRunReport(
307 wms_id="1.0",
308 global_wms_id="foo#1.0",
309 path="/path/to/run",
310 label="label",
311 run="run",
312 project="dev",
313 campaign="testing",
314 payload="test",
315 operator="tester",
316 run_summary="foo:1;bar:1",
317 state=WmsStates.RUNNING,
318 jobs=[
319 WmsJobReport(wms_id="1.0", name="", label="foo", state=WmsStates.SUCCEEDED),
320 WmsJobReport(wms_id="2.0", name="", label="bar", state=WmsStates.RUNNING),
321 ],
322 total_number_jobs=2,
323 job_state_counts={
324 state: 1 if state in {WmsStates.SUCCEEDED, WmsStates.RUNNING} else 0 for state in WmsStates
325 },
326 job_summary={
327 "foo": {state: 1 if state == WmsStates.SUCCEEDED else 0 for state in WmsStates},
328 "bar": {state: 1 if state == WmsStates.RUNNING else 0 for state in WmsStates},
329 },
330 exit_code_summary={
331 "foo": [],
332 "bar": [1, 2, 3, 4],
333 },
334 )
336 self.actual = ExitCodesReport(self.fields)
338 def testAddSuccess(self):
339 """Test adding a run successfully."""
340 self.actual.add(self.run)
342 self.assertEqual(len(self.actual), 2)
343 self.assertEqual(self.actual, self.expected)
345 def testAddFailure(self):
346 """Test adding a run unsuccessfully."""
347 self.run.job_summary = {}
348 self.run.exit_code_summary = {}
350 self.actual.add(self.run)
352 self.assertEqual(len(self.actual), 0)
353 self.assertRegex(self.actual.message, r"^WARNING.*report.*incomplete")
355 def testAddWithoutRunSummary(self):
356 """Test adding a run without a run summary."""
357 self.run.run_summary = None
359 self.actual.add(self.run)
361 self.assertRegex(self.actual.message, r"^WARNING.*sorted alphabetically")
364class CompileJobSummaryTestCase(unittest.TestCase):
365 """Test compiling a job summary."""
367 def setUp(self):
368 self.report = dataclasses.replace(TEST_REPORT)
370 def tearDown(self):
371 pass
373 def testSummaryExists(self):
374 """Test if the existing report is not altered."""
375 # Create a report with a "fake" job summary, i.e., a summary which
376 # differs from the one which would be compiled from the information
377 # about individual jobs.
378 expected = dataclasses.replace(
379 self.report,
380 job_summary={"foo": {state: 1 if state == WmsStates.FAILED else 0 for state in WmsStates}},
381 )
382 result = dataclasses.replace(expected)
384 messages = compile_job_summary(result)
386 self.assertEqual(result, expected)
387 self.assertFalse(messages)
389 def testSummaryMissing(self):
390 """Test if the summary is compiled if necessary."""
391 result = dataclasses.replace(self.report, job_summary=None)
393 messages = compile_job_summary(result)
395 self.assertEqual(result, self.report)
396 self.assertFalse(messages)
398 def testCompilationError(self):
399 """Test if a warning is issued if the summary cannot be compiled."""
400 result = dataclasses.replace(self.report, jobs=None, job_summary=None)
402 messages = compile_job_summary(result)
404 self.assertEqual(len(messages), 1)
405 self.assertRegex(messages[0], r"information.*not available")
408class CompileCodeSummaryTestCase(unittest.TestCase):
409 """Test compiling a code summary."""
411 def setUp(self):
412 self.report = WmsRunReport(
413 wms_id="1.0",
414 global_wms_id="foo#1.0",
415 path="/path/to/run",
416 label="label",
417 run="run",
418 project="dev",
419 campaign="testing",
420 payload="test",
421 operator="tester",
422 run_summary="foo:1;bar:1;baz:1",
423 state=WmsStates.RUNNING,
424 jobs=[
425 WmsJobReport(wms_id="1.0", name="", label="foo", state=WmsStates.SUCCEEDED),
426 WmsJobReport(wms_id="2.0", name="", label="bar", state=WmsStates.FAILED),
427 WmsJobReport(wms_id="3.0", name="", label="baz", state=WmsStates.RUNNING),
428 ],
429 total_number_jobs=3,
430 job_state_counts={
431 state: 1 if state in {WmsStates.SUCCEEDED, WmsStates.FAILED, WmsStates.RUNNING} else 0
432 for state in WmsStates
433 },
434 job_summary={
435 "foo": {state: 1 if state == WmsStates.SUCCEEDED else 0 for state in WmsStates},
436 "bar": {state: 1 if state == WmsStates.FAILED else 0 for state in WmsStates},
437 "baz": {state: 1 if state == WmsStates.RUNNING else 0 for state in WmsStates},
438 },
439 exit_code_summary={"foo": [], "bar": [1], "baz": []},
440 )
442 def tearDown(self):
443 pass
445 def testAddingMissingEntries(self):
446 """Test if the missing entries are added to the summary."""
447 result = dataclasses.replace(self.report, exit_code_summary={"bar": [1]})
449 messages = compile_code_summary(result)
451 self.assertEqual(result, self.report)
452 self.assertFalse(messages)
454 def testDetectingMismatches(self):
455 """Test if a mismatch between exit codes and failures is reported."""
456 expected = dataclasses.replace(self.report, exit_code_summary={"foo": [1], "bar": [1], "baz": []})
457 result = dataclasses.replace(expected)
459 messages = compile_code_summary(result)
461 self.assertEqual(result, expected)
462 self.assertEqual(len(messages), 1)
463 self.assertRegex(messages[0], r"exit codes.*differs.*failures.*labels: foo")
465 def testDetectingOmissions(self):
466 """Test if a failure not reflected in exit codes is reported."""
467 expected = dataclasses.replace(self.report, exit_code_summary={"foo": [], "baz": []})
468 result = dataclasses.replace(expected)
470 messages = compile_code_summary(result)
472 self.assertEqual(result, expected)
473 self.assertEqual(len(messages), 1)
474 self.assertRegex(messages[0], r"exit codes.*not available.*labels: bar")
476 def testDetectingDiscrepancies(self):
477 """Test if multiple discrepancies are reported."""
478 expected = dataclasses.replace(self.report, exit_code_summary={"foo": [], "baz": [1]})
479 result = dataclasses.replace(expected)
481 messages = compile_code_summary(result)
483 self.assertEqual(result, expected)
484 self.assertEqual(len(messages), 2)
485 self.assertRegex(messages[0], r"exit codes.*differs.*failures.*labels: baz")
486 self.assertRegex(messages[1], r"exit codes.*not available.*labels: bar")
488 def testHandlingNoJobSummary(self):
489 """Test if the existing report is not altered if no job summary."""
490 expected = dataclasses.replace(self.report, job_summary=None)
491 result = dataclasses.replace(expected)
493 messages = compile_code_summary(result)
495 self.assertEqual(result, expected)
496 self.assertFalse(messages)
499if __name__ == "__main__":
500 unittest.main()