Coverage for tests / test_cliCmdReport.py: 9%
166 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-30 08:50 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-30 08:50 +0000
1# This file is part of ctrl_mpexec.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28"""Unit tests for ctrl_mpexec CLI update-graph-run subcommand."""
30import os
31import unittest
33import yaml
34from yaml.loader import SafeLoader
36from lsst.ctrl.mpexec.cli.pipetask import cli as pipetask_cli
37from lsst.ctrl.mpexec.cli.script.report import print_summary
38from lsst.daf.butler.cli.utils import LogCliRunner, clickResultMsg
39from lsst.daf.butler.tests.utils import makeTestTempDir, removeTestTempDir
40from lsst.pipe.base.quantum_provenance_graph import Summary
41from lsst.pipe.base.tests.simpleQGraph import makeSimpleQGraph
42from lsst.pipe.base.tests.util import check_output_run
44TESTDIR = os.path.abspath(os.path.dirname(__file__))
46expected_mock_datasets = [
47 "add_dataset1",
48 "add2_dataset1",
49 "task0_metadata",
50 "task0_log",
51 "add_dataset2",
52 "add2_dataset2",
53 "task1_metadata",
54 "task1_log",
55 "add_dataset3",
56 "add2_dataset3",
57 "task2_metadata",
58 "task2_log",
59 "add_dataset4",
60 "add2_dataset4",
61 "task3_metadata",
62 "task3_log",
63 "add_dataset5",
64 "add2_dataset5",
65 "task4_metadata",
66 "task4_log",
67]
70class ReportTest(unittest.TestCase):
71 """Test executing "pipetask report" command."""
73 def setUp(self) -> None:
74 self.runner = LogCliRunner()
75 self.root = makeTestTempDir(TESTDIR)
77 def tearDown(self) -> None:
78 removeTestTempDir(self.root)
80 def test_report(self):
81 """Test for making a report on the produced, missing and expected
82 datasets in a quantum graph.
83 """
84 metadata = {"output_run": "run"}
85 butler, qgraph = makeSimpleQGraph(
86 run="run",
87 root=self.root,
88 metadata=metadata,
89 )
90 butler.close()
91 # Check that we can get the proper run collection from the qgraph
92 self.assertEqual(check_output_run(qgraph, "run"), [])
94 graph_uri = os.path.join(self.root, "graph.qgraph")
95 qgraph.saveUri(graph_uri)
97 test_filename = os.path.join(self.root, "report_test.yaml")
99 result = self.runner.invoke(
100 pipetask_cli,
101 ["report", self.root, graph_uri, "--full-output-filename", test_filename, "--no-logs"],
102 input="no",
103 )
105 # Check that we can read from the command line
106 self.assertEqual(result.exit_code, 0, clickResultMsg(result))
108 # Check that we can open and read the file produced by make_reports
109 with open(test_filename) as f:
110 report_output_dict = yaml.load(f, Loader=SafeLoader)
112 self.assertIsNotNone(report_output_dict["task0"])
113 self.assertIsNotNone(report_output_dict["task0"]["failed_quanta"])
114 self.assertIsInstance(report_output_dict["task0"]["n_expected"], int)
116 result_hr = self.runner.invoke(
117 pipetask_cli,
118 ["report", self.root, graph_uri, "--no-logs"],
119 input="no",
120 )
122 # Check that we can read from the command line
123 self.assertEqual(result_hr.exit_code, 0, clickResultMsg(result_hr))
125 # Check that we get string output
126 self.assertIsInstance(result_hr.stdout, str)
128 # Check that task0 and the failed quanta for task0 exist in the string
129 self.assertIn("task0", result_hr.stdout)
130 self.assertIn("Failed", result_hr.stdout)
131 self.assertIn("Expected", result_hr.stdout)
132 self.assertIn("Succeeded", result_hr.stdout)
134 # Check brief option for pipetask report
135 result_brief = self.runner.invoke(
136 pipetask_cli,
137 ["report", self.root, graph_uri, "--no-logs", "--brief"],
138 input="no",
139 )
140 self.assertIsInstance(result_brief.stdout, str)
142 # Check that task0 and the failed quanta for task0 exist in the string
143 self.assertIn("task0", result_brief.stdout)
144 self.assertIn("Failed", result_brief.stdout)
145 self.assertIn("Expected", result_brief.stdout)
146 self.assertIn("Succeeded", result_brief.stdout)
148 # Test cli for the QPG
149 result_v2_terminal_out = self.runner.invoke(
150 pipetask_cli,
151 ["report", self.root, graph_uri, "--no-logs", "--force-v2"],
152 input="no",
153 )
155 # Check that we can read from the command line
156 self.assertEqual(result_v2_terminal_out.exit_code, 0, clickResultMsg(result_v2_terminal_out))
158 # Check that we get string output
159 self.assertIsInstance(result_v2_terminal_out.stdout, str)
161 # Check that task0 and the quanta for task0 exist in the string
162 self.assertIn("task0", result_v2_terminal_out.stdout)
163 self.assertIn("Unknown", result_v2_terminal_out.stdout)
164 self.assertIn("Successful", result_v2_terminal_out.stdout)
165 self.assertIn("Blocked", result_v2_terminal_out.stdout)
166 self.assertIn("Failed", result_v2_terminal_out.stdout)
167 self.assertIn("Wonky", result_v2_terminal_out.stdout)
168 self.assertIn("TOTAL", result_v2_terminal_out.stdout)
169 self.assertIn("EXPECTED", result_v2_terminal_out.stdout)
171 # Test cli for the QPG brief option
172 result_v2_brief = self.runner.invoke(
173 pipetask_cli,
174 ["report", self.root, graph_uri, "--no-logs", "--force-v2", "--brief"],
175 input="no",
176 )
178 # Check that we can read from the command line
179 self.assertEqual(result_v2_brief.exit_code, 0, clickResultMsg(result_v2_brief))
181 # Check that we get string output
182 self.assertIsInstance(result_v2_brief.stdout, str)
184 # Check that task0 and the quanta for task0 exist in the string
185 self.assertIn("task0", result_v2_brief.stdout)
186 self.assertIn("Unknown", result_v2_brief.stdout)
187 self.assertIn("Successful", result_v2_brief.stdout)
188 self.assertIn("Blocked", result_v2_brief.stdout)
189 self.assertIn("Failed", result_v2_brief.stdout)
190 self.assertIn("Wonky", result_v2_brief.stdout)
191 self.assertIn("TOTAL", result_v2_brief.stdout)
192 self.assertIn("EXPECTED", result_v2_brief.stdout)
194 # Check that the full output option works
195 test_filename_v2 = os.path.join(self.root, "report_test.json")
196 result_v2_full = self.runner.invoke(
197 pipetask_cli,
198 [
199 "report",
200 self.root,
201 graph_uri,
202 "--no-logs",
203 "--full-output-filename",
204 test_filename_v2,
205 "--force-v2",
206 ],
207 input="no",
208 )
210 self.assertEqual(result_v2_full.exit_code, 0, clickResultMsg(result_v2_full))
211 # Check the "brief" output that prints to the terminal first:
212 # Check that we get string output
213 self.assertIsInstance(result_v2_full.stdout, str)
215 # Check that task0 and the quanta for task0 exist in the string
216 self.assertIn("task0", result_v2_full.stdout)
217 self.assertIn("Unknown", result_v2_full.stdout)
218 self.assertIn("Successful", result_v2_full.stdout)
219 self.assertIn("Blocked", result_v2_full.stdout)
220 self.assertIn("Failed", result_v2_full.stdout)
221 self.assertIn("Wonky", result_v2_full.stdout)
222 self.assertIn("TOTAL", result_v2_full.stdout)
223 self.assertIn("EXPECTED", result_v2_full.stdout)
225 # Then validate the full output json file:
226 with open(test_filename_v2) as f:
227 output = f.read()
228 model = Summary.model_validate_json(output)
229 # Below is the same set of tests as in `pipe_base`:
230 for task_summary in model.tasks.values():
231 self.assertEqual(task_summary.n_successful, 0)
232 self.assertEqual(task_summary.n_blocked, 0)
233 self.assertEqual(task_summary.n_unknown, 1)
234 self.assertEqual(task_summary.n_expected, 1)
235 self.assertListEqual(task_summary.failed_quanta, [])
236 self.assertListEqual(task_summary.recovered_quanta, [])
237 self.assertListEqual(task_summary.wonky_quanta, [])
238 self.assertEqual(task_summary.n_wonky, 0)
239 self.assertEqual(task_summary.n_failed, 0)
240 for dataset_type_name, dataset_type_summary in model.datasets.items():
241 self.assertListEqual(
242 dataset_type_summary.unsuccessful_datasets,
243 [{"instrument": "INSTR", "detector": 0}],
244 )
245 self.assertEqual(dataset_type_summary.n_visible, 0)
246 self.assertEqual(dataset_type_summary.n_shadowed, 0)
247 self.assertEqual(dataset_type_summary.n_predicted_only, 0)
248 self.assertEqual(dataset_type_summary.n_expected, 1)
249 self.assertEqual(dataset_type_summary.n_cursed, 0)
250 self.assertEqual(dataset_type_summary.n_unsuccessful, 1)
251 self.assertListEqual(dataset_type_summary.cursed_datasets, [])
252 self.assertIn(dataset_type_name, expected_mock_datasets)
253 match dataset_type_name:
254 case name if name in ["add_dataset1", "add2_dataset1", "task0_metadata", "task0_log"]:
255 self.assertEqual(dataset_type_summary.producer, "task0")
256 case name if name in ["add_dataset2", "add2_dataset2", "task1_metadata", "task1_log"]:
257 self.assertEqual(dataset_type_summary.producer, "task1")
258 case name if name in ["add_dataset3", "add2_dataset3", "task2_metadata", "task2_log"]:
259 self.assertEqual(dataset_type_summary.producer, "task2")
260 case name if name in ["add_dataset4", "add2_dataset4", "task3_metadata", "task3_log"]:
261 self.assertEqual(dataset_type_summary.producer, "task3")
262 case name if name in ["add_dataset5", "add2_dataset5", "task4_metadata", "task4_log"]:
263 self.assertEqual(dataset_type_summary.producer, "task4")
265 def test_aggregate_reports(self):
266 """Test `pipetask aggregate-reports` command. We make one
267 `SimpleQgraph` and then fake a copy in a couple of different ways,
268 making sure we can aggregate the similar graphs.
269 """
270 metadata = {"output_run": "run1"}
271 butler, qgraph1 = makeSimpleQGraph(
272 run="run",
273 root=self.root,
274 metadata=metadata,
275 )
276 butler.close()
278 # Check that we can get the proper run collection from the qgraph
279 self.assertEqual(check_output_run(qgraph1, "run"), [])
281 # Save the graph
282 graph_uri_1 = os.path.join(self.root, "graph1.qgraph")
283 qgraph1.saveUri(graph_uri_1)
285 file1 = os.path.join(self.root, "report_test_1.json")
286 file2 = os.path.join(self.root, "report_test_2.json")
287 aggregate_file = os.path.join(self.root, "aggregate_report.json")
289 report1 = self.runner.invoke(
290 pipetask_cli,
291 [
292 "report",
293 self.root,
294 graph_uri_1,
295 "--no-logs",
296 "--full-output-filename",
297 file1,
298 "--force-v2",
299 ],
300 input="no",
301 )
303 self.assertEqual(report1.exit_code, 0, clickResultMsg(report1))
304 # Now, copy the json output into a duplicate file and aggregate
305 with open(file1) as f:
306 sum1 = Summary.model_validate_json(f.read())
307 sum2 = sum1.model_copy(deep=True)
308 print_summary(sum2, file2, brief=False)
310 # Then use these file outputs as the inputs to aggregate reports:
311 aggregate_report = self.runner.invoke(
312 pipetask_cli,
313 [
314 "aggregate-reports",
315 file1,
316 file2,
317 "--full-output-filename",
318 aggregate_file,
319 ],
320 )
321 # Check that aggregate command had a zero exit code:
322 self.assertEqual(aggregate_report.exit_code, 0, clickResultMsg(aggregate_report))
324 # Check that it aggregates as expected:
325 with open(aggregate_file) as f:
326 agg_sum = Summary.model_validate_json(f.read())
327 for task_label, task_summary in agg_sum.tasks.items():
328 self.assertEqual(task_summary.n_successful, 0)
329 self.assertEqual(task_summary.n_blocked, 0)
330 self.assertEqual(task_summary.n_unknown, 2)
331 self.assertEqual(task_summary.n_expected, 2)
332 self.assertListEqual(task_summary.failed_quanta, [])
333 self.assertListEqual(task_summary.recovered_quanta, [])
334 self.assertListEqual(task_summary.wonky_quanta, [])
335 self.assertEqual(task_summary.n_wonky, 0)
336 self.assertEqual(task_summary.n_failed, 0)
337 for dataset_type_name, dataset_type_summary in agg_sum.datasets.items():
338 self.assertListEqual(
339 dataset_type_summary.unsuccessful_datasets,
340 [{"instrument": "INSTR", "detector": 0}, {"instrument": "INSTR", "detector": 0}],
341 )
342 self.assertEqual(dataset_type_summary.n_visible, 0)
343 self.assertEqual(dataset_type_summary.n_shadowed, 0)
344 self.assertEqual(dataset_type_summary.n_predicted_only, 0)
345 self.assertEqual(dataset_type_summary.n_expected, 2)
346 self.assertEqual(dataset_type_summary.n_cursed, 0)
347 self.assertEqual(dataset_type_summary.n_unsuccessful, 2)
348 self.assertListEqual(dataset_type_summary.cursed_datasets, [])
349 self.assertIn(dataset_type_name, expected_mock_datasets)
350 match dataset_type_name:
351 case name if name in ["add_dataset1", "add2_dataset1", "task0_metadata", "task0_log"]:
352 self.assertEqual(dataset_type_summary.producer, "task0")
353 case name if name in ["add_dataset2", "add2_dataset2", "task1_metadata", "task1_log"]:
354 self.assertEqual(dataset_type_summary.producer, "task1")
355 case name if name in ["add_dataset3", "add2_dataset3", "task2_metadata", "task2_log"]:
356 self.assertEqual(dataset_type_summary.producer, "task2")
357 case name if name in ["add_dataset4", "add2_dataset4", "task3_metadata", "task3_log"]:
358 self.assertEqual(dataset_type_summary.producer, "task3")
359 case name if name in ["add_dataset5", "add2_dataset5", "task4_metadata", "task4_log"]:
360 self.assertEqual(dataset_type_summary.producer, "task4")
363if __name__ == "__main__":
364 unittest.main()