Coverage for tests / test_simple_pipeline_executor.py: 9%

297 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:47 +0000

1# This file is part of pipe_base. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <https://www.gnu.org/licenses/>. 

27 

28from __future__ import annotations 

29 

30import os 

31import shutil 

32import tempfile 

33import unittest 

34 

35import lsst.pipe.base.quantum_provenance_graph as qpg 

36import lsst.utils.tests 

37from lsst.daf.butler import Butler, DatasetType 

38from lsst.pipe.base import PipelineGraph, QuantumSuccessCaveats, RepeatableQuantumError 

39from lsst.pipe.base.simple_pipeline_executor import SimplePipelineExecutor 

40from lsst.pipe.base.tests.mocks import ( 

41 DirectButlerRepo, 

42 DynamicConnectionConfig, 

43 DynamicTestPipelineTask, 

44 DynamicTestPipelineTaskConfig, 

45 MockStorageClass, 

46 get_mock_name, 

47) 

48 

49TESTDIR = os.path.abspath(os.path.dirname(__file__)) 

50 

51 

52class SimplePipelineExecutorTests(lsst.utils.tests.TestCase): 

53 """Test the SimplePipelineExecutor API. 

54 

55 Because SimplePipelineExecutor is the easiest way to run simple pipelines 

56 in tests, this has also become a home for tests of execution edge cases 

57 that don't have a clear home in other test files. 

58 """ 

59 

60 def setUp(self): 

61 self.path = tempfile.mkdtemp() 

62 # standalone parameter forces the returned config to also include 

63 # the information from the search paths. 

64 config = Butler.makeRepo(self.path, standalone=True, searchPaths=[os.path.join(TESTDIR, "config")]) 

65 self.butler = SimplePipelineExecutor.prep_butler(config, [], "fake") 

66 self.enterContext(self.butler) 

67 self.butler.registry.registerDatasetType( 

68 DatasetType( 

69 "input", 

70 dimensions=self.butler.dimensions.empty, 

71 storageClass="StructuredDataDict", 

72 ) 

73 ) 

74 self.butler.put({"zero": 0}, "input") 

75 MockStorageClass.get_or_register_mock("StructuredDataDict") 

76 MockStorageClass.get_or_register_mock("TaskMetadataLike") 

77 

78 def tearDown(self): 

79 shutil.rmtree(self.path, ignore_errors=True) 

80 

81 def test_from_task_class(self): 

82 """Test executing a single quantum with an executor created by the 

83 `from_task_class` factory method, and the 

84 `SimplePipelineExecutor.as_generator` method. 

85 """ 

86 config = DynamicTestPipelineTaskConfig() 

87 config.inputs["i"] = DynamicConnectionConfig( 

88 dataset_type_name="input", storage_class="StructuredDataDict", mock_storage_class=False 

89 ) 

90 config.outputs["o"] = DynamicConnectionConfig( 

91 dataset_type_name="output", storage_class="StructuredDataDict" 

92 ) 

93 executor = SimplePipelineExecutor.from_task_class( 

94 DynamicTestPipelineTask, 

95 config=config, 

96 butler=self.butler, 

97 label="a", 

98 ) 

99 (quantum,) = executor.as_generator(register_dataset_types=True) 

100 self.assertEqual(self.butler.get("output").storage_class, get_mock_name("StructuredDataDict")) 

101 

102 def test_metadata_input(self): 

103 """Test two tasks where the output uses metadata from input.""" 

104 config_a = DynamicTestPipelineTaskConfig() 

105 config_a.inputs["i"] = DynamicConnectionConfig( 

106 dataset_type_name="input", storage_class="StructuredDataDict", mock_storage_class=False 

107 ) 

108 config_b = DynamicTestPipelineTaskConfig() 

109 config_b.outputs["o"] = DynamicConnectionConfig( 

110 dataset_type_name="output", storage_class="StructuredDataDict" 

111 ) 

112 config_b.inputs["in_metadata"] = DynamicConnectionConfig( 

113 dataset_type_name="a_metadata", storage_class="TaskMetadata" 

114 ) 

115 pipeline_graph = PipelineGraph() 

116 pipeline_graph.add_task("a", DynamicTestPipelineTask, config_a) 

117 pipeline_graph.add_task("b", DynamicTestPipelineTask, config_b) 

118 executor = SimplePipelineExecutor.from_pipeline_graph(pipeline_graph, butler=self.butler) 

119 quanta = executor.run(register_dataset_types=True, save_versions=False) 

120 self.assertEqual(len(quanta), 2) 

121 output = self.butler.get("output") 

122 self.assertEqual(output.quantum.inputs["in_metadata"][0].original_type, "lsst.pipe.base.TaskMetadata") 

123 

124 def test_optional_intermediate(self): 

125 """Test a pipeline task with an optional regular input that is produced 

126 by another task. 

127 """ 

128 config_a = DynamicTestPipelineTaskConfig() 

129 config_a.inputs["i"] = DynamicConnectionConfig( 

130 dataset_type_name="input", storage_class="StructuredDataDict", mock_storage_class=False 

131 ) 

132 config_a.fail_exception = "lsst.pipe.base.NoWorkFound" 

133 config_a.fail_condition = "1=1" # butler query expression that is true 

134 config_a.outputs["o"] = DynamicConnectionConfig( 

135 dataset_type_name="intermediate", storage_class="StructuredDataDict" 

136 ) 

137 config_b = DynamicTestPipelineTaskConfig() 

138 config_b.inputs["i"] = DynamicConnectionConfig( 

139 dataset_type_name="intermediate", storage_class="StructuredDataDict", minimum=0 

140 ) 

141 config_b.outputs["o"] = DynamicConnectionConfig( 

142 dataset_type_name="output", storage_class="StructuredDataDict" 

143 ) 

144 pipeline_graph = PipelineGraph() 

145 pipeline_graph.add_task("a", DynamicTestPipelineTask, config_a) 

146 pipeline_graph.add_task("b", DynamicTestPipelineTask, config_b) 

147 executor = SimplePipelineExecutor.from_pipeline_graph(pipeline_graph, butler=self.butler) 

148 quanta = executor.run(register_dataset_types=True, save_versions=False) 

149 self.assertEqual(len(quanta), 2) 

150 # Both quanta ran successfully (NoWorkFound is a success). 

151 self.assertTrue(self.butler.exists("a_metadata")) 

152 self.assertTrue(self.butler.exists("b_metadata")) 

153 # The intermediate dataset was not written, but the final output was. 

154 self.assertFalse(self.butler.exists("intermediate")) 

155 self.assertTrue(self.butler.exists("output")) 

156 

157 def test_optional_input(self): 

158 """Test a pipeline task with an optional regular input that is an 

159 overall input to the pipeline. 

160 """ 

161 config_a = DynamicTestPipelineTaskConfig() 

162 config_a.inputs["i1"] = DynamicConnectionConfig( 

163 dataset_type_name="input", storage_class="StructuredDataDict", mock_storage_class=False 

164 ) 

165 config_a.outputs["i2"] = DynamicConnectionConfig( 

166 dataset_type_name="input_2", 

167 storage_class="StructuredDataDict", # will never exist 

168 ) 

169 config_a.outputs["o"] = DynamicConnectionConfig( 

170 dataset_type_name="output", storage_class="StructuredDataDict" 

171 ) 

172 pipeline_graph = PipelineGraph() 

173 pipeline_graph.add_task("a", DynamicTestPipelineTask, config_a) 

174 executor = SimplePipelineExecutor.from_pipeline_graph(pipeline_graph, butler=self.butler) 

175 quanta = executor.run(register_dataset_types=True, save_versions=False) 

176 self.assertEqual(len(quanta), 1) 

177 # The quanta ran successfully. 

178 self.assertTrue(self.butler.exists("a_metadata")) 

179 # The final output was written. 

180 self.assertTrue(self.butler.exists("output")) 

181 

182 def test_from_pipeline_file(self) -> None: 

183 """Test executing a two quanta from different configurations of the 

184 same task, with an executor created by the `from_pipeline_filename` 

185 factory method, and the `SimplePipelineExecutor.run` method. 

186 """ 

187 filename = os.path.join(TESTDIR, "pipelines", "pipeline_simple.yaml") 

188 executor = SimplePipelineExecutor.from_pipeline_filename(filename, butler=self.butler) 

189 self._test_pipeline_file(executor) 

190 

191 def test_use_local_butler(self) -> None: 

192 """Test generating a local butler repository from a pipeline, then 

193 running that pipeline using the local butler. 

194 """ 

195 # Test a trivial pipeline that has only dataset types with empty 

196 # dimensions. 

197 filename = os.path.join(TESTDIR, "pipelines", "pipeline_simple.yaml") 

198 executor = SimplePipelineExecutor.from_pipeline_filename( 

199 filename, butler=self.butler, output="u/someone/pipeline" 

200 ) 

201 with tempfile.TemporaryDirectory() as tempdir: 

202 root = os.path.join(tempdir, "butler_root") 

203 local_butler = executor.use_local_butler(root) 

204 self.enterContext(local_butler) 

205 self._test_pipeline_file(executor) 

206 

207 # Test a more complicated pipeline involving dataset types with 

208 # non-empty dimensions. This will require dimension records to be 

209 # copied into the destination repository. 

210 with ( 

211 tempfile.TemporaryDirectory() as tempdir, 

212 DirectButlerRepo.make_temporary("base.yaml", "spatial.yaml") as (helper, root), 

213 ): 

214 helper.add_task(dimensions=["visit", "detector"]) 

215 qg = helper.make_quantum_graph(output="out_chain") 

216 executor = SimplePipelineExecutor(qg, helper.butler) 

217 output_butler_root = os.path.join(tempdir, "root") 

218 local_butler = executor.use_local_butler(output_butler_root) 

219 self.enterContext(local_butler) 

220 executor.run(register_dataset_types=True) 

221 output_butler = Butler.from_config(output_butler_root) 

222 self.enterContext(output_butler) 

223 ref = output_butler.find_dataset( 

224 "dataset_auto1", 

225 collections="out_chain", 

226 dimension_records=True, 

227 instrument="Cam1", 

228 detector=1, 

229 visit=1, 

230 ) 

231 self.assertIsNotNone(ref) 

232 assert ref is not None # For linters. 

233 self.assertIsNotNone(output_butler.get(ref)) 

234 # Check that dimension records are present in the output Butler. 

235 self.assertEqual(ref.dataId.records["visit"].science_program, "test_survey") 

236 self.assertTrue(ref.dataId.records["visit_detector_region"].region.contains(0.004, 0.020)) 

237 

238 def _test_pipeline_file(self, executor: SimplePipelineExecutor) -> None: 

239 quanta = executor.run(register_dataset_types=True, save_versions=False) 

240 self.assertEqual(len(quanta), 2) 

241 self.assertEqual( 

242 executor.butler.get("intermediate").storage_class, get_mock_name("StructuredDataDict") 

243 ) 

244 self.assertEqual(executor.butler.get("output").storage_class, get_mock_name("StructuredDataDict")) 

245 

246 def test_partial_outputs_success(self): 

247 """Test executing two quanta where the first raises 

248 `lsst.pipe.base.AnnotatedPartialOutputsError` and its output is an 

249 optional input to the second, while configuring the executor to 

250 consider this a success. 

251 """ 

252 config_a = DynamicTestPipelineTaskConfig() 

253 config_a.inputs["i"] = DynamicConnectionConfig( 

254 dataset_type_name="input", storage_class="StructuredDataDict", mock_storage_class=False 

255 ) 

256 config_a.fail_exception = "lsst.pipe.base.AnnotatedPartialOutputsError" 

257 config_a.fail_condition = "1=1" # butler query expression that is true 

258 config_a.outputs["o"] = DynamicConnectionConfig( 

259 dataset_type_name="intermediate", storage_class="StructuredDataDict" 

260 ) 

261 config_b = DynamicTestPipelineTaskConfig() 

262 config_b.inputs["i"] = DynamicConnectionConfig( 

263 dataset_type_name="intermediate", storage_class="StructuredDataDict", minimum=0 

264 ) 

265 config_b.outputs["o"] = DynamicConnectionConfig( 

266 dataset_type_name="output", storage_class="StructuredDataDict" 

267 ) 

268 pipeline_graph = PipelineGraph() 

269 pipeline_graph.add_task("a", DynamicTestPipelineTask, config_a) 

270 pipeline_graph.add_task("b", DynamicTestPipelineTask, config_b) 

271 # Consider the partial a success and proceed. 

272 executor = SimplePipelineExecutor.from_pipeline_graph( 

273 pipeline_graph, butler=self.butler, raise_on_partial_outputs=False 

274 ) 

275 (_, _) = executor.as_generator(register_dataset_types=True) 

276 self.assertFalse(self.butler.exists("intermediate")) 

277 self.assertEqual(self.butler.get("output").storage_class, get_mock_name("StructuredDataDict")) 

278 prov = qpg.QuantumProvenanceGraph(self.butler, [executor.quantum_graph], read_caveats="exhaustive") 

279 (quantum_key_a,) = prov.quanta["a"] 

280 quantum_info_a = prov.get_quantum_info(quantum_key_a) 

281 _, quantum_run_a = qpg.QuantumRun.find_final(quantum_info_a) 

282 self.assertEqual( 

283 quantum_run_a.caveats, 

284 QuantumSuccessCaveats.ALL_OUTPUTS_MISSING 

285 | QuantumSuccessCaveats.ANY_OUTPUTS_MISSING 

286 | QuantumSuccessCaveats.PARTIAL_OUTPUTS_ERROR, 

287 ) 

288 self.assertEqual( 

289 quantum_run_a.exception.type_name, 

290 "lsst.pipe.base.tests.mocks.MockAlgorithmError", 

291 ) 

292 self.assertEqual( 

293 quantum_run_a.exception.metadata, 

294 {"badness": 12}, 

295 ) 

296 (quantum_key_b,) = prov.quanta["b"] 

297 quantum_info_b = prov.get_quantum_info(quantum_key_b) 

298 _, quantum_run_b = qpg.QuantumRun.find_final(quantum_info_b) 

299 self.assertEqual(quantum_run_b.caveats, QuantumSuccessCaveats.NO_CAVEATS) 

300 prov_summary = prov.to_summary(self.butler) 

301 # One partial-outputs case, with an empty data ID: 

302 self.assertEqual(prov_summary.tasks["a"].caveats, {"*P": [{}]}) 

303 self.assertEqual( 

304 prov_summary.tasks["a"].exceptions.keys(), {"lsst.pipe.base.tests.mocks.MockAlgorithmError"} 

305 ) 

306 self.assertEqual( 

307 prov_summary.tasks["a"] 

308 .exceptions["lsst.pipe.base.tests.mocks.MockAlgorithmError"][0] 

309 .exception.metadata, 

310 {"badness": 12}, 

311 ) 

312 # No caveats for the second task, since it didn't need the first task's 

313 # output anyway. 

314 self.assertEqual(prov_summary.tasks["b"].caveats, {}) 

315 self.assertEqual(prov_summary.tasks["b"].exceptions, {}) 

316 # Check table forms for summaries of the same information. 

317 quantum_table = prov_summary.make_quantum_table() 

318 self.assertEqual(list(quantum_table["Task"]), ["a", "b"]) 

319 self.assertEqual(list(quantum_table["Unknown"]), [0, 0]) 

320 self.assertEqual(list(quantum_table["Successful"]), [1, 1]) 

321 self.assertEqual(list(quantum_table["Caveats"]), ["*P(1)", ""]) 

322 self.assertEqual(list(quantum_table["Blocked"]), [0, 0]) 

323 self.assertEqual(list(quantum_table["Failed"]), [0, 0]) 

324 self.assertEqual(list(quantum_table["Wonky"]), [0, 0]) 

325 self.assertEqual(list(quantum_table["TOTAL"]), [1, 1]) 

326 self.assertEqual(list(quantum_table["EXPECTED"]), [1, 1]) 

327 dataset_table = prov_summary.make_dataset_table() 

328 self.assertEqual( 

329 list(dataset_table["Dataset"]), 

330 ["intermediate", "a_metadata", "a_log", "output", "b_metadata", "b_log"], 

331 ) 

332 self.assertEqual(list(dataset_table["Visible"]), [0, 1, 1, 1, 1, 1]) 

333 self.assertEqual(list(dataset_table["Shadowed"]), [0, 0, 0, 0, 0, 0]) 

334 self.assertEqual(list(dataset_table["Predicted Only"]), [1, 0, 0, 0, 0, 0]) 

335 self.assertEqual(list(dataset_table["Unsuccessful"]), [0, 0, 0, 0, 0, 0]) 

336 self.assertEqual(list(dataset_table["Cursed"]), [0, 0, 0, 0, 0, 0]) 

337 self.assertEqual(list(dataset_table["TOTAL"]), [1, 1, 1, 1, 1, 1]) 

338 self.assertEqual(list(dataset_table["EXPECTED"]), [1, 1, 1, 1, 1, 1]) 

339 exception_table = prov_summary.make_exception_table() 

340 self.assertEqual(list(exception_table["Task"]), ["a"]) 

341 self.assertEqual( 

342 list(exception_table["Exception"]), ["lsst.pipe.base.tests.mocks.MockAlgorithmError"] 

343 ) 

344 self.assertEqual(list(exception_table["Count"]), [1]) 

345 bad_quantum_tables = prov_summary.make_bad_quantum_tables() 

346 self.assertEqual(bad_quantum_tables.keys(), {"a"}) 

347 self.assertEqual(list(bad_quantum_tables["a"]["Status(Caveats)"]), ["SUCCESSFUL(P)"]) 

348 self.assertEqual(list(bad_quantum_tables["a"]["Exception"]), ["MockAlgorithmError"]) 

349 self.assertFalse(prov_summary.make_bad_dataset_tables()) 

350 

351 def test_no_work_found(self): 

352 """Test executing two quanta where the first raises 

353 `NoWorkFound` in `runQuantum`, leading the next to raise `NoWorkFound` 

354 in `adjustQuantum`. 

355 """ 

356 config_a = DynamicTestPipelineTaskConfig() 

357 config_a.inputs["i"] = DynamicConnectionConfig( 

358 dataset_type_name="input", storage_class="StructuredDataDict", mock_storage_class=False 

359 ) 

360 config_a.fail_exception = "lsst.pipe.base.NoWorkFound" 

361 config_a.fail_condition = "1=1" # butler query expression that is true 

362 config_a.outputs["o"] = DynamicConnectionConfig( 

363 dataset_type_name="intermediate", storage_class="StructuredDataDict" 

364 ) 

365 config_b = DynamicTestPipelineTaskConfig() 

366 config_b.inputs["i"] = DynamicConnectionConfig( 

367 dataset_type_name="intermediate", storage_class="StructuredDataDict" 

368 ) 

369 config_b.outputs["o"] = DynamicConnectionConfig( 

370 dataset_type_name="output", storage_class="StructuredDataDict" 

371 ) 

372 pipeline_graph = PipelineGraph() 

373 pipeline_graph.add_task("a", DynamicTestPipelineTask, config_a) 

374 pipeline_graph.add_task("b", DynamicTestPipelineTask, config_b) 

375 # Consider the partial a success and proceed. 

376 executor = SimplePipelineExecutor.from_pipeline_graph( 

377 pipeline_graph, butler=self.butler, raise_on_partial_outputs=False 

378 ) 

379 (_, _) = executor.as_generator(register_dataset_types=True) 

380 prov = qpg.QuantumProvenanceGraph() 

381 prov.assemble_quantum_provenance_graph(self.butler, [executor.quantum_graph]) 

382 (quantum_key_a,) = prov.quanta["a"] 

383 quantum_info_a = prov.get_quantum_info(quantum_key_a) 

384 _, quantum_run_a = qpg.QuantumRun.find_final(quantum_info_a) 

385 self.assertEqual( 

386 quantum_run_a.caveats, 

387 QuantumSuccessCaveats.ALL_OUTPUTS_MISSING 

388 | QuantumSuccessCaveats.ANY_OUTPUTS_MISSING 

389 | QuantumSuccessCaveats.NO_WORK, 

390 ) 

391 (quantum_key_b,) = prov.quanta["b"] 

392 quantum_info_b = prov.get_quantum_info(quantum_key_b) 

393 _, quantum_run_b = qpg.QuantumRun.find_final(quantum_info_b) 

394 self.assertEqual( 

395 quantum_run_b.caveats, 

396 QuantumSuccessCaveats.ALL_OUTPUTS_MISSING 

397 | QuantumSuccessCaveats.ANY_OUTPUTS_MISSING 

398 | QuantumSuccessCaveats.ADJUST_QUANTUM_RAISED 

399 | QuantumSuccessCaveats.NO_WORK, 

400 ) 

401 prov_summary = prov.to_summary(self.butler) 

402 # One NoWorkFound, raised by runQuantum, with an empty data ID: 

403 self.assertEqual(prov_summary.tasks["a"].caveats, {"*N": [{}]}) 

404 self.assertEqual(prov_summary.tasks["a"].exceptions, {}) 

405 # One NoWorkFound, raised by adjustQuantum, with an empty data ID. 

406 self.assertEqual(prov_summary.tasks["b"].caveats, {"*A": [{}]}) 

407 self.assertEqual(prov_summary.tasks["b"].exceptions, {}) 

408 # Check table forms for summaries of the same information. 

409 quantum_table = prov_summary.make_quantum_table() 

410 self.assertEqual(list(quantum_table["Task"]), ["a", "b"]) 

411 self.assertEqual(list(quantum_table["Unknown"]), [0, 0]) 

412 self.assertEqual(list(quantum_table["Successful"]), [1, 1]) 

413 self.assertEqual(list(quantum_table["Caveats"]), ["*N(1)", "*A(1)"]) 

414 self.assertEqual(list(quantum_table["Blocked"]), [0, 0]) 

415 self.assertEqual(list(quantum_table["Failed"]), [0, 0]) 

416 self.assertEqual(list(quantum_table["Wonky"]), [0, 0]) 

417 self.assertEqual(list(quantum_table["TOTAL"]), [1, 1]) 

418 self.assertEqual(list(quantum_table["EXPECTED"]), [1, 1]) 

419 dataset_table = prov_summary.make_dataset_table() 

420 self.assertEqual( 

421 list(dataset_table["Dataset"]), 

422 ["intermediate", "a_metadata", "a_log", "output", "b_metadata", "b_log"], 

423 ) 

424 self.assertEqual(list(dataset_table["Visible"]), [0, 1, 1, 0, 1, 1]) 

425 self.assertEqual(list(dataset_table["Shadowed"]), [0, 0, 0, 0, 0, 0]) 

426 self.assertEqual(list(dataset_table["Predicted Only"]), [1, 0, 0, 1, 0, 0]) 

427 self.assertEqual(list(dataset_table["Unsuccessful"]), [0, 0, 0, 0, 0, 0]) 

428 self.assertEqual(list(dataset_table["Cursed"]), [0, 0, 0, 0, 0, 0]) 

429 self.assertEqual(list(dataset_table["TOTAL"]), [1, 1, 1, 1, 1, 1]) 

430 self.assertEqual(list(dataset_table["EXPECTED"]), [1, 1, 1, 1, 1, 1]) 

431 self.assertFalse(prov_summary.make_exception_table()) 

432 self.assertFalse(prov_summary.make_bad_quantum_tables()) 

433 self.assertFalse(prov_summary.make_bad_dataset_tables()) 

434 

435 def test_partial_outputs_failure(self): 

436 """Test executing two quanta where the first raises 

437 `lsst.pipe.base.AnnotatedPartialOutputsError` and its output is an 

438 optional input to the second, while configuring the executor to 

439 consider this a failure. 

440 """ 

441 config_a = DynamicTestPipelineTaskConfig() 

442 config_a.inputs["i"] = DynamicConnectionConfig( 

443 dataset_type_name="input", storage_class="StructuredDataDict", mock_storage_class=False 

444 ) 

445 config_a.fail_exception = "lsst.pipe.base.AnnotatedPartialOutputsError" 

446 config_a.fail_condition = "1=1" # butler query expression that is true 

447 config_a.outputs["o"] = DynamicConnectionConfig( 

448 dataset_type_name="intermediate", storage_class="StructuredDataDict" 

449 ) 

450 config_b = DynamicTestPipelineTaskConfig() 

451 config_b.inputs["i"] = DynamicConnectionConfig( 

452 dataset_type_name="intermediate", storage_class="StructuredDataDict", minimum=0 

453 ) 

454 config_b.outputs["o"] = DynamicConnectionConfig( 

455 dataset_type_name="output", storage_class="StructuredDataDict" 

456 ) 

457 pipeline_graph = PipelineGraph() 

458 pipeline_graph.add_task("a", DynamicTestPipelineTask, config_a) 

459 pipeline_graph.add_task("b", DynamicTestPipelineTask, config_b) 

460 executor = SimplePipelineExecutor.from_pipeline_graph( 

461 pipeline_graph, 

462 butler=self.butler, 

463 raise_on_partial_outputs=True, 

464 ) 

465 # The executor should raise the chained exception 

466 # (RepeatableQuantumError, since that's what the mocking system in 

467 # pipe_base uses here), not AnnotatedPartialOutputsError. 

468 with self.assertRaises(RepeatableQuantumError): 

469 executor.run(register_dataset_types=True) 

470 self.assertFalse(self.butler.exists("intermediate")) 

471 self.assertFalse(self.butler.exists("output")) 

472 prov = qpg.QuantumProvenanceGraph() 

473 prov.assemble_quantum_provenance_graph(self.butler, [executor.quantum_graph]) 

474 (quantum_key_a,) = prov.quanta["a"] 

475 quantum_info_a = prov.get_quantum_info(quantum_key_a) 

476 _, quantum_run_a = qpg.QuantumRun.find_final(quantum_info_a) 

477 self.assertEqual(quantum_run_a.status, qpg.QuantumRunStatus.FAILED) 

478 self.assertIsNone(quantum_run_a.caveats) 

479 self.assertIsNone(quantum_run_a.exception) 

480 (quantum_key_b,) = prov.quanta["b"] 

481 quantum_info_b = prov.get_quantum_info(quantum_key_b) 

482 self.assertEqual(quantum_info_b["status"], qpg.QuantumInfoStatus.BLOCKED) 

483 prov_summary = prov.to_summary(self.butler) 

484 # One partial-outputs failure case for the first task. 

485 self.assertEqual(prov_summary.tasks["a"].n_failed, 1) 

486 # No direct failures, but one blocked for the second 

487 self.assertEqual(prov_summary.tasks["b"].n_failed, 0) 

488 self.assertEqual(prov_summary.tasks["b"].n_blocked, 1) 

489 # Check table forms for summaries of the same information. 

490 quantum_table = prov_summary.make_quantum_table() 

491 self.assertEqual(list(quantum_table["Task"]), ["a", "b"]) 

492 self.assertEqual(list(quantum_table["Unknown"]), [0, 0]) 

493 self.assertEqual(list(quantum_table["Successful"]), [0, 0]) 

494 self.assertEqual(list(quantum_table["Caveats"]), ["", ""]) 

495 self.assertEqual(list(quantum_table["Blocked"]), [0, 1]) 

496 self.assertEqual(list(quantum_table["Failed"]), [1, 0]) 

497 self.assertEqual(list(quantum_table["Wonky"]), [0, 0]) 

498 self.assertEqual(list(quantum_table["TOTAL"]), [1, 1]) 

499 self.assertEqual(list(quantum_table["EXPECTED"]), [1, 1]) 

500 dataset_table = prov_summary.make_dataset_table() 

501 self.assertEqual( 

502 list(dataset_table["Dataset"]), 

503 ["intermediate", "a_metadata", "a_log", "output", "b_metadata", "b_log"], 

504 ) 

505 # Note that a_log is UNSUCCESSFUL, not VISIBLE, despite being present 

506 # in butler because those categories are mutually exclusive and we 

507 # don't want to consider any outputs of failed quanta to be successful. 

508 self.assertEqual(list(dataset_table["Visible"]), [0, 0, 0, 0, 0, 0]) 

509 self.assertEqual(list(dataset_table["Shadowed"]), [0, 0, 0, 0, 0, 0]) 

510 self.assertEqual(list(dataset_table["Predicted Only"]), [0, 0, 0, 0, 0, 0]) 

511 self.assertEqual(list(dataset_table["Unsuccessful"]), [1, 1, 1, 1, 1, 1]) 

512 self.assertEqual(list(dataset_table["Cursed"]), [0, 0, 0, 0, 0, 0]) 

513 self.assertEqual(list(dataset_table["TOTAL"]), [1, 1, 1, 1, 1, 1]) 

514 self.assertEqual(list(dataset_table["EXPECTED"]), [1, 1, 1, 1, 1, 1]) 

515 self.assertFalse(prov_summary.make_exception_table()) 

516 bad_quantum_tables = prov_summary.make_bad_quantum_tables() 

517 self.assertEqual(bad_quantum_tables.keys(), {"a"}) 

518 self.assertEqual(list(bad_quantum_tables["a"]["Status(Caveats)"]), ["FAILED"]) 

519 self.assertEqual(list(bad_quantum_tables["a"]["Exception"]), [""]) 

520 self.assertFalse(prov_summary.make_bad_dataset_tables()) 

521 

522 def test_existence_check_skips(self): 

523 """Test that pre-execution existence checks are not performed for 

524 overall-input datasets, as this those checks could otherwise mask 

525 repository configuration problems or downtime as NoWorkFound cases. 

526 """ 

527 # First we configure and execute task A, which is just a way to get a 

528 # MockDataset in the repo for us to play with; the important test can't 

529 # use the non-mock 'input' dataset because the mock runQuantum only 

530 # actually reads MockDatasets. 

531 config_a = DynamicTestPipelineTaskConfig() 

532 config_a.inputs["i"] = DynamicConnectionConfig( 

533 dataset_type_name="input", storage_class="StructuredDataDict", mock_storage_class=False 

534 ) 

535 config_a.outputs["o"] = DynamicConnectionConfig( 

536 dataset_type_name="intermediate", storage_class="StructuredDataDict" 

537 ) 

538 executor_a = SimplePipelineExecutor.from_task_class( 

539 DynamicTestPipelineTask, 

540 config=config_a, 

541 butler=self.butler, 

542 label="a", 

543 ) 

544 executor_a.run(register_dataset_types=True) 

545 # Now we can do the real test. 

546 config_b = DynamicTestPipelineTaskConfig() 

547 config_b.inputs["i"] = DynamicConnectionConfig( 

548 dataset_type_name="intermediate", storage_class="StructuredDataDict", minimum=0 

549 ) 

550 config_b.outputs["o"] = DynamicConnectionConfig( 

551 dataset_type_name="output", storage_class="StructuredDataDict" 

552 ) 

553 butler = self.butler.clone(run="new_run") 

554 executor_b = SimplePipelineExecutor.from_task_class( 

555 DynamicTestPipelineTask, 

556 config=config_b, 

557 butler=butler, 

558 label="b", 

559 attach_datastore_records=True, 

560 ) 

561 # Delete the input dataset after the QG has already been built. 

562 intermediate_refs = butler.query_datasets("intermediate") 

563 self.assertEqual(len(intermediate_refs), 1) 

564 butler.pruneDatasets(intermediate_refs, purge=True, unstore=True) 

565 with self.assertRaises(FileNotFoundError): 

566 # We should get an exception rather than NoWorkFound, because for 

567 # this single-task pipeline, the missing dataset is an 

568 # overall-input (name notwithstanding). 

569 executor_b.run(register_dataset_types=True) 

570 

571 

572class MemoryTester(lsst.utils.tests.MemoryTestCase): 

573 """Generic tests for file leaks.""" 

574 

575 

576def setup_module(module): 

577 """Set up the module for pytest. 

578 

579 Parameters 

580 ---------- 

581 module : `~types.ModuleType` 

582 Module to set up. 

583 """ 

584 lsst.utils.tests.init() 

585 

586 

587if __name__ == "__main__": 

588 lsst.utils.tests.init() 

589 unittest.main()