Coverage for tests / test_cliCmdReport.py: 9%

166 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-18 08:45 +0000

1# This file is part of ctrl_mpexec. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28"""Unit tests for ctrl_mpexec CLI update-graph-run subcommand.""" 

29 

30import os 

31import unittest 

32 

33import yaml 

34from yaml.loader import SafeLoader 

35 

36from lsst.ctrl.mpexec.cli.pipetask import cli as pipetask_cli 

37from lsst.ctrl.mpexec.cli.script.report import print_summary 

38from lsst.daf.butler.cli.utils import LogCliRunner, clickResultMsg 

39from lsst.daf.butler.tests.utils import makeTestTempDir, removeTestTempDir 

40from lsst.pipe.base.quantum_provenance_graph import Summary 

41from lsst.pipe.base.tests.simpleQGraph import makeSimpleQGraph 

42from lsst.pipe.base.tests.util import check_output_run 

43 

44TESTDIR = os.path.abspath(os.path.dirname(__file__)) 

45 

46expected_mock_datasets = [ 

47 "add_dataset1", 

48 "add2_dataset1", 

49 "task0_metadata", 

50 "task0_log", 

51 "add_dataset2", 

52 "add2_dataset2", 

53 "task1_metadata", 

54 "task1_log", 

55 "add_dataset3", 

56 "add2_dataset3", 

57 "task2_metadata", 

58 "task2_log", 

59 "add_dataset4", 

60 "add2_dataset4", 

61 "task3_metadata", 

62 "task3_log", 

63 "add_dataset5", 

64 "add2_dataset5", 

65 "task4_metadata", 

66 "task4_log", 

67] 

68 

69 

70class ReportTest(unittest.TestCase): 

71 """Test executing "pipetask report" command.""" 

72 

73 def setUp(self) -> None: 

74 self.runner = LogCliRunner() 

75 self.root = makeTestTempDir(TESTDIR) 

76 

77 def tearDown(self) -> None: 

78 removeTestTempDir(self.root) 

79 

80 def test_report(self): 

81 """Test for making a report on the produced, missing and expected 

82 datasets in a quantum graph. 

83 """ 

84 metadata = {"output_run": "run"} 

85 butler, qgraph = makeSimpleQGraph( 

86 run="run", 

87 root=self.root, 

88 metadata=metadata, 

89 ) 

90 butler.close() 

91 # Check that we can get the proper run collection from the qgraph 

92 self.assertEqual(check_output_run(qgraph, "run"), []) 

93 

94 graph_uri = os.path.join(self.root, "graph.qgraph") 

95 qgraph.saveUri(graph_uri) 

96 

97 test_filename = os.path.join(self.root, "report_test.yaml") 

98 

99 result = self.runner.invoke( 

100 pipetask_cli, 

101 ["report", self.root, graph_uri, "--full-output-filename", test_filename, "--no-logs"], 

102 input="no", 

103 ) 

104 

105 # Check that we can read from the command line 

106 self.assertEqual(result.exit_code, 0, clickResultMsg(result)) 

107 

108 # Check that we can open and read the file produced by make_reports 

109 with open(test_filename) as f: 

110 report_output_dict = yaml.load(f, Loader=SafeLoader) 

111 

112 self.assertIsNotNone(report_output_dict["task0"]) 

113 self.assertIsNotNone(report_output_dict["task0"]["failed_quanta"]) 

114 self.assertIsInstance(report_output_dict["task0"]["n_expected"], int) 

115 

116 result_hr = self.runner.invoke( 

117 pipetask_cli, 

118 ["report", self.root, graph_uri, "--no-logs"], 

119 input="no", 

120 ) 

121 

122 # Check that we can read from the command line 

123 self.assertEqual(result_hr.exit_code, 0, clickResultMsg(result_hr)) 

124 

125 # Check that we get string output 

126 self.assertIsInstance(result_hr.stdout, str) 

127 

128 # Check that task0 and the failed quanta for task0 exist in the string 

129 self.assertIn("task0", result_hr.stdout) 

130 self.assertIn("Failed", result_hr.stdout) 

131 self.assertIn("Expected", result_hr.stdout) 

132 self.assertIn("Succeeded", result_hr.stdout) 

133 

134 # Check brief option for pipetask report 

135 result_brief = self.runner.invoke( 

136 pipetask_cli, 

137 ["report", self.root, graph_uri, "--no-logs", "--brief"], 

138 input="no", 

139 ) 

140 self.assertIsInstance(result_brief.stdout, str) 

141 

142 # Check that task0 and the failed quanta for task0 exist in the string 

143 self.assertIn("task0", result_brief.stdout) 

144 self.assertIn("Failed", result_brief.stdout) 

145 self.assertIn("Expected", result_brief.stdout) 

146 self.assertIn("Succeeded", result_brief.stdout) 

147 

148 # Test cli for the QPG 

149 result_v2_terminal_out = self.runner.invoke( 

150 pipetask_cli, 

151 ["report", self.root, graph_uri, "--no-logs", "--force-v2"], 

152 input="no", 

153 ) 

154 

155 # Check that we can read from the command line 

156 self.assertEqual(result_v2_terminal_out.exit_code, 0, clickResultMsg(result_v2_terminal_out)) 

157 

158 # Check that we get string output 

159 self.assertIsInstance(result_v2_terminal_out.stdout, str) 

160 

161 # Check that task0 and the quanta for task0 exist in the string 

162 self.assertIn("task0", result_v2_terminal_out.stdout) 

163 self.assertIn("Unknown", result_v2_terminal_out.stdout) 

164 self.assertIn("Successful", result_v2_terminal_out.stdout) 

165 self.assertIn("Blocked", result_v2_terminal_out.stdout) 

166 self.assertIn("Failed", result_v2_terminal_out.stdout) 

167 self.assertIn("Wonky", result_v2_terminal_out.stdout) 

168 self.assertIn("TOTAL", result_v2_terminal_out.stdout) 

169 self.assertIn("EXPECTED", result_v2_terminal_out.stdout) 

170 

171 # Test cli for the QPG brief option 

172 result_v2_brief = self.runner.invoke( 

173 pipetask_cli, 

174 ["report", self.root, graph_uri, "--no-logs", "--force-v2", "--brief"], 

175 input="no", 

176 ) 

177 

178 # Check that we can read from the command line 

179 self.assertEqual(result_v2_brief.exit_code, 0, clickResultMsg(result_v2_brief)) 

180 

181 # Check that we get string output 

182 self.assertIsInstance(result_v2_brief.stdout, str) 

183 

184 # Check that task0 and the quanta for task0 exist in the string 

185 self.assertIn("task0", result_v2_brief.stdout) 

186 self.assertIn("Unknown", result_v2_brief.stdout) 

187 self.assertIn("Successful", result_v2_brief.stdout) 

188 self.assertIn("Blocked", result_v2_brief.stdout) 

189 self.assertIn("Failed", result_v2_brief.stdout) 

190 self.assertIn("Wonky", result_v2_brief.stdout) 

191 self.assertIn("TOTAL", result_v2_brief.stdout) 

192 self.assertIn("EXPECTED", result_v2_brief.stdout) 

193 

194 # Check that the full output option works 

195 test_filename_v2 = os.path.join(self.root, "report_test.json") 

196 result_v2_full = self.runner.invoke( 

197 pipetask_cli, 

198 [ 

199 "report", 

200 self.root, 

201 graph_uri, 

202 "--no-logs", 

203 "--full-output-filename", 

204 test_filename_v2, 

205 "--force-v2", 

206 ], 

207 input="no", 

208 ) 

209 

210 self.assertEqual(result_v2_full.exit_code, 0, clickResultMsg(result_v2_full)) 

211 # Check the "brief" output that prints to the terminal first: 

212 # Check that we get string output 

213 self.assertIsInstance(result_v2_full.stdout, str) 

214 

215 # Check that task0 and the quanta for task0 exist in the string 

216 self.assertIn("task0", result_v2_full.stdout) 

217 self.assertIn("Unknown", result_v2_full.stdout) 

218 self.assertIn("Successful", result_v2_full.stdout) 

219 self.assertIn("Blocked", result_v2_full.stdout) 

220 self.assertIn("Failed", result_v2_full.stdout) 

221 self.assertIn("Wonky", result_v2_full.stdout) 

222 self.assertIn("TOTAL", result_v2_full.stdout) 

223 self.assertIn("EXPECTED", result_v2_full.stdout) 

224 

225 # Then validate the full output json file: 

226 with open(test_filename_v2) as f: 

227 output = f.read() 

228 model = Summary.model_validate_json(output) 

229 # Below is the same set of tests as in `pipe_base`: 

230 for task_summary in model.tasks.values(): 

231 self.assertEqual(task_summary.n_successful, 0) 

232 self.assertEqual(task_summary.n_blocked, 0) 

233 self.assertEqual(task_summary.n_unknown, 1) 

234 self.assertEqual(task_summary.n_expected, 1) 

235 self.assertListEqual(task_summary.failed_quanta, []) 

236 self.assertListEqual(task_summary.recovered_quanta, []) 

237 self.assertListEqual(task_summary.wonky_quanta, []) 

238 self.assertEqual(task_summary.n_wonky, 0) 

239 self.assertEqual(task_summary.n_failed, 0) 

240 for dataset_type_name, dataset_type_summary in model.datasets.items(): 

241 self.assertListEqual( 

242 dataset_type_summary.unsuccessful_datasets, 

243 [{"instrument": "INSTR", "detector": 0}], 

244 ) 

245 self.assertEqual(dataset_type_summary.n_visible, 0) 

246 self.assertEqual(dataset_type_summary.n_shadowed, 0) 

247 self.assertEqual(dataset_type_summary.n_predicted_only, 0) 

248 self.assertEqual(dataset_type_summary.n_expected, 1) 

249 self.assertEqual(dataset_type_summary.n_cursed, 0) 

250 self.assertEqual(dataset_type_summary.n_unsuccessful, 1) 

251 self.assertListEqual(dataset_type_summary.cursed_datasets, []) 

252 self.assertIn(dataset_type_name, expected_mock_datasets) 

253 match dataset_type_name: 

254 case name if name in ["add_dataset1", "add2_dataset1", "task0_metadata", "task0_log"]: 

255 self.assertEqual(dataset_type_summary.producer, "task0") 

256 case name if name in ["add_dataset2", "add2_dataset2", "task1_metadata", "task1_log"]: 

257 self.assertEqual(dataset_type_summary.producer, "task1") 

258 case name if name in ["add_dataset3", "add2_dataset3", "task2_metadata", "task2_log"]: 

259 self.assertEqual(dataset_type_summary.producer, "task2") 

260 case name if name in ["add_dataset4", "add2_dataset4", "task3_metadata", "task3_log"]: 

261 self.assertEqual(dataset_type_summary.producer, "task3") 

262 case name if name in ["add_dataset5", "add2_dataset5", "task4_metadata", "task4_log"]: 

263 self.assertEqual(dataset_type_summary.producer, "task4") 

264 

265 def test_aggregate_reports(self): 

266 """Test `pipetask aggregate-reports` command. We make one 

267 `SimpleQgraph` and then fake a copy in a couple of different ways, 

268 making sure we can aggregate the similar graphs. 

269 """ 

270 metadata = {"output_run": "run1"} 

271 butler, qgraph1 = makeSimpleQGraph( 

272 run="run", 

273 root=self.root, 

274 metadata=metadata, 

275 ) 

276 butler.close() 

277 

278 # Check that we can get the proper run collection from the qgraph 

279 self.assertEqual(check_output_run(qgraph1, "run"), []) 

280 

281 # Save the graph 

282 graph_uri_1 = os.path.join(self.root, "graph1.qgraph") 

283 qgraph1.saveUri(graph_uri_1) 

284 

285 file1 = os.path.join(self.root, "report_test_1.json") 

286 file2 = os.path.join(self.root, "report_test_2.json") 

287 aggregate_file = os.path.join(self.root, "aggregate_report.json") 

288 

289 report1 = self.runner.invoke( 

290 pipetask_cli, 

291 [ 

292 "report", 

293 self.root, 

294 graph_uri_1, 

295 "--no-logs", 

296 "--full-output-filename", 

297 file1, 

298 "--force-v2", 

299 ], 

300 input="no", 

301 ) 

302 

303 self.assertEqual(report1.exit_code, 0, clickResultMsg(report1)) 

304 # Now, copy the json output into a duplicate file and aggregate 

305 with open(file1) as f: 

306 sum1 = Summary.model_validate_json(f.read()) 

307 sum2 = sum1.model_copy(deep=True) 

308 print_summary(sum2, file2, brief=False) 

309 

310 # Then use these file outputs as the inputs to aggregate reports: 

311 aggregate_report = self.runner.invoke( 

312 pipetask_cli, 

313 [ 

314 "aggregate-reports", 

315 file1, 

316 file2, 

317 "--full-output-filename", 

318 aggregate_file, 

319 ], 

320 ) 

321 # Check that aggregate command had a zero exit code: 

322 self.assertEqual(aggregate_report.exit_code, 0, clickResultMsg(aggregate_report)) 

323 

324 # Check that it aggregates as expected: 

325 with open(aggregate_file) as f: 

326 agg_sum = Summary.model_validate_json(f.read()) 

327 for task_label, task_summary in agg_sum.tasks.items(): 

328 self.assertEqual(task_summary.n_successful, 0) 

329 self.assertEqual(task_summary.n_blocked, 0) 

330 self.assertEqual(task_summary.n_unknown, 2) 

331 self.assertEqual(task_summary.n_expected, 2) 

332 self.assertListEqual(task_summary.failed_quanta, []) 

333 self.assertListEqual(task_summary.recovered_quanta, []) 

334 self.assertListEqual(task_summary.wonky_quanta, []) 

335 self.assertEqual(task_summary.n_wonky, 0) 

336 self.assertEqual(task_summary.n_failed, 0) 

337 for dataset_type_name, dataset_type_summary in agg_sum.datasets.items(): 

338 self.assertListEqual( 

339 dataset_type_summary.unsuccessful_datasets, 

340 [{"instrument": "INSTR", "detector": 0}, {"instrument": "INSTR", "detector": 0}], 

341 ) 

342 self.assertEqual(dataset_type_summary.n_visible, 0) 

343 self.assertEqual(dataset_type_summary.n_shadowed, 0) 

344 self.assertEqual(dataset_type_summary.n_predicted_only, 0) 

345 self.assertEqual(dataset_type_summary.n_expected, 2) 

346 self.assertEqual(dataset_type_summary.n_cursed, 0) 

347 self.assertEqual(dataset_type_summary.n_unsuccessful, 2) 

348 self.assertListEqual(dataset_type_summary.cursed_datasets, []) 

349 self.assertIn(dataset_type_name, expected_mock_datasets) 

350 match dataset_type_name: 

351 case name if name in ["add_dataset1", "add2_dataset1", "task0_metadata", "task0_log"]: 

352 self.assertEqual(dataset_type_summary.producer, "task0") 

353 case name if name in ["add_dataset2", "add2_dataset2", "task1_metadata", "task1_log"]: 

354 self.assertEqual(dataset_type_summary.producer, "task1") 

355 case name if name in ["add_dataset3", "add2_dataset3", "task2_metadata", "task2_log"]: 

356 self.assertEqual(dataset_type_summary.producer, "task2") 

357 case name if name in ["add_dataset4", "add2_dataset4", "task3_metadata", "task3_log"]: 

358 self.assertEqual(dataset_type_summary.producer, "task3") 

359 case name if name in ["add_dataset5", "add2_dataset5", "task4_metadata", "task4_log"]: 

360 self.assertEqual(dataset_type_summary.producer, "task4") 

361 

362 

363if __name__ == "__main__": 

364 unittest.main()