Coverage for tests / test_run.py: 11%

315 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-24 08:19 +0000

1# This file is part of ctrl_mpexec. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <https://www.gnu.org/licenses/>. 

27 

28import logging 

29import os 

30import time 

31import unittest 

32import unittest.mock 

33 

34import click.testing 

35 

36import lsst.utils.tests 

37from lsst.ctrl.mpexec import PipelineGraphFactory 

38from lsst.ctrl.mpexec.cli import opt, script 

39from lsst.ctrl.mpexec.cli.cmd.commands import PipetaskCommand, coverage_context 

40from lsst.ctrl.mpexec.cli.utils import collect_pipeline_actions 

41from lsst.ctrl.mpexec.showInfo import ShowInfo 

42from lsst.daf.butler import CollectionType, MissingCollectionError 

43from lsst.daf.butler.cli.utils import LogCliRunner 

44from lsst.pipe.base.mp_graph_executor import MPGraphExecutorError 

45from lsst.pipe.base.script import transfer_from_graph 

46from lsst.pipe.base.tests.mocks import DirectButlerRepo, DynamicTestPipelineTaskConfig 

47 

48 

49class RunTestCase(unittest.TestCase): 

50 """Test pipetask run command-line.""" 

51 

52 @staticmethod 

53 def _make_run_args(*args: str, **kwargs: object) -> dict[str, object]: 

54 mock = unittest.mock.Mock() 

55 

56 @click.command(cls=PipetaskCommand) 

57 @opt.run_options() 

58 @opt.config_search_path_option() 

59 @opt.no_existing_outputs_option() 

60 def fake_run(ctx: click.Context, **kwargs: object): 

61 kwargs = collect_pipeline_actions(ctx, **kwargs) 

62 mock(**kwargs) 

63 

64 # At least one tests requires that we enable INFO logging so use 

65 # the specialist runner. 

66 runner = LogCliRunner() 

67 result = runner.invoke(fake_run, args, catch_exceptions=False) 

68 if result.exit_code != 0: 

69 raise RuntimeError(f"Failure getting default args for 'run': {result}") 

70 mock.assert_called_once() 

71 result: dict[str, object] = mock.call_args[1] 

72 result["show"] = ShowInfo([]) 

73 result.update(kwargs) 

74 return result 

75 

76 def test_missing_options(self): 

77 """Test that if options for the run script are missing that it 

78 fails. 

79 """ 

80 

81 @click.command() 

82 @opt.pipeline_build_options() 

83 def cli(**kwargs): 

84 script.run(**kwargs) 

85 

86 runner = click.testing.CliRunner() 

87 result = runner.invoke(cli) 

88 # The cli call should fail, because qgraph.run takes more options 

89 # than are defined by pipeline_build_options. 

90 self.assertNotEqual(result.exit_code, 0) 

91 

92 def test_simple_qg(self): 

93 """Test execution of a trivial quantum graph.""" 

94 with DirectButlerRepo.make_temporary() as (helper, root): 

95 helper.add_task() 

96 helper.add_task() 

97 helper.insert_datasets("dataset_auto0") 

98 kwargs = self._make_run_args( 

99 "-b", 

100 root, 

101 "-i", 

102 helper.input_chain, 

103 "-o", 

104 "output", 

105 "--register-dataset-types", 

106 pipeline_graph_factory=PipelineGraphFactory(pipeline_graph=helper.pipeline_graph), 

107 ) 

108 qg = script.qgraph(**kwargs) 

109 self.assertEqual(len(qg.quanta_by_task), 2) 

110 self.assertEqual(len(qg), 2) 

111 # Ensure that the output run used in the graph is also used in the 

112 # pipeline execution. It is possible for 'qgraph' and 'run' to 

113 # calculate time-stamped runs across a second boundary. 

114 kwargs["output_run"] = qg.header.output_run 

115 # Execute the graph and check for output existence. 

116 script.run(qg, **kwargs) 

117 with helper.butler.query() as query: 

118 self.assertEqual(query.datasets("dataset_auto1", collections=["output"]).count(), 1) 

119 self.assertEqual(query.datasets("dataset_auto2", collections=["output"]).count(), 1) 

120 # Test that we've disabled implicit threading 

121 self.assertEqual(os.environ["OMP_NUM_THREADS"], "1") 

122 

123 def test_simple_qg_rebase(self): 

124 """Test execution of a trivial quantum graph, with --rebase used to 

125 force redefinition of the output collection. 

126 """ 

127 with DirectButlerRepo.make_temporary(input_chain="test1") as (helper, root): 

128 helper.add_task() 

129 helper.add_task() 

130 helper.insert_datasets("dataset_auto0") 

131 # Pass one input collection here for the usual test setup; we'll 

132 # override it later. 

133 kwargs = self._make_run_args( 

134 "-b", 

135 root, 

136 "-i", 

137 helper.input_chain, 

138 "-o", 

139 "output", 

140 "--register-dataset-types", 

141 pipeline_graph_factory=PipelineGraphFactory(pipeline_graph=helper.pipeline_graph), 

142 ) 

143 # We'll actually pass two input collections in. One is empty, but 

144 # the stuff we're testing here doesn't care. 

145 kwargs["input"] = ["test2", "test1"] 

146 helper.butler.collections.register("test2", CollectionType.RUN) 

147 # Set up the output collection with a sequence that doesn't end the 

148 # same way as the input collection. This is normally an error. 

149 helper.butler.collections.register("output", CollectionType.CHAINED) 

150 helper.butler.collections.register("unexpected_input", CollectionType.RUN) 

151 helper.butler.collections.register("output/run0", CollectionType.RUN) 

152 helper.butler.collections.redefine_chain( 

153 "output", ["test2", "unexpected_input", "test1", "output/run0"] 

154 ) 

155 # Without --rebase, the inconsistent input and output collections 

156 # are an error. 

157 with self.assertRaises(ValueError): 

158 script.qgraph(**kwargs) 

159 # With --rebase, the output collection gets redefined. 

160 kwargs["rebase"] = True 

161 qg = script.qgraph(**kwargs) 

162 self.assertEqual(len(qg.quanta_by_task), 2) 

163 self.assertEqual(len(qg), 2) 

164 # Ensure that the output run used in the graph is also used in the 

165 # pipeline execution. 

166 kwargs["output_run"] = qg.header.output_run 

167 # Execute the graph and check for output existence. 

168 script.run(qg, **kwargs) 

169 with helper.butler.query() as query: 

170 self.assertEqual(query.datasets("dataset_auto1", collections=["output"]).count(), 1) 

171 self.assertEqual(query.datasets("dataset_auto2", collections=["output"]).count(), 1) 

172 

173 def test_simple_qgraph_qbb(self): 

174 """Test execution of a trivial quantum graph in QBB mode.""" 

175 with DirectButlerRepo.make_temporary() as (helper, root): 

176 helper.add_task() 

177 helper.add_task() 

178 helper.insert_datasets("dataset_auto0") 

179 # It's unusual to put a QG in a butler root, but since we've 

180 # already got a temp dir, we might as well use it. 

181 qg_file_1 = os.path.join(root, "test1.qg") 

182 kwargs = self._make_run_args( 

183 "-b", 

184 root, 

185 "-i", 

186 helper.input_chain, 

187 "-o", 

188 "output", 

189 "--register-dataset-types", 

190 "--qgraph-datastore-records", 

191 "--save-qgraph", 

192 qg_file_1, 

193 pipeline_graph_factory=PipelineGraphFactory(pipeline_graph=helper.pipeline_graph), 

194 ) 

195 qg = script.qgraph(**kwargs) 

196 output_run = qg.header.output_run 

197 output = qg.header.output 

198 self.assertEqual(len(qg.quanta_by_task), 2) 

199 self.assertEqual(len(qg), 2) 

200 # Execute with QBB. 

201 kwargs.update(output_run=output_run, qgraph=qg_file_1) 

202 script.pre_exec_init_qbb(**kwargs) 

203 script.run_qbb(**kwargs) 

204 # Transfer the datasets to the butler. 

205 n1 = transfer_from_graph( 

206 qg_file_1, 

207 root, 

208 register_dataset_types=True, 

209 transfer_dimensions=False, 

210 update_output_chain=True, 

211 dry_run=False, 

212 dataset_type=(), 

213 ) 

214 self.assertEqual(n1, 9) 

215 # Check that the expected outputs exist. 

216 with helper.butler.query() as query: 

217 self.assertEqual(query.datasets("dataset_auto1", collections=output).count(), 1) 

218 self.assertEqual(query.datasets("dataset_auto2", collections=output).count(), 1) 

219 # Check that some metadata keys were written. 

220 some_task_label = next(iter(qg.pipeline_graph.tasks)) 

221 (some_metadata_ref,) = helper.butler.query_datasets( 

222 f"{some_task_label}_metadata", 

223 limit=1, 

224 collections=output, 

225 ) 

226 some_metadata = helper.butler.get(some_metadata_ref) 

227 self.assertIn("qg_read_time", some_metadata["job"]) 

228 self.assertIn("qg_size", some_metadata["job"]) 

229 

230 # Update the output run and try again. 

231 new_output_run = output_run + "_new" 

232 qg_file_2 = os.path.join(root, "test2.qg") 

233 script.update_graph_run(qg_file_1, new_output_run, qg_file_2) 

234 kwargs.update(qgraph=qg_file_2) 

235 # Execute with QBB again. 

236 script.pre_exec_init_qbb(**kwargs) 

237 script.run_qbb(**kwargs) 

238 # Transfer the datasets to the butler. 

239 n2 = transfer_from_graph( 

240 qg_file_2, 

241 root, 

242 register_dataset_types=True, 

243 transfer_dimensions=False, 

244 update_output_chain=False, 

245 dry_run=False, 

246 dataset_type=(), 

247 ) 

248 self.assertEqual(n2, 9) 

249 # Check that the expected outputs exist in the new run. 

250 with helper.butler.query() as query: 

251 self.assertEqual(query.datasets("dataset_auto1", collections=new_output_run).count(), 1) 

252 self.assertEqual(query.datasets("dataset_auto2", collections=new_output_run).count(), 1) 

253 

254 def test_empty_qg(self): 

255 """Test that making an empty QG produces the right error messages.""" 

256 with DirectButlerRepo.make_temporary("base.yaml") as (helper, root): 

257 helper.add_task(dimensions=["instrument"]) 

258 helper.add_task(dimensions=["instrument"]) 

259 helper.pipeline_graph.resolve(registry=helper.butler.registry) 

260 helper.butler.registry.registerDatasetType( 

261 helper.pipeline_graph.dataset_types["dataset_auto0"].dataset_type 

262 ) 

263 kwargs = self._make_run_args( 

264 "-b", 

265 root, 

266 "-i", 

267 helper.input_chain, 

268 "-o", 

269 "output", 

270 pipeline_graph_factory=PipelineGraphFactory(pipeline_graph=helper.pipeline_graph), 

271 ) 

272 # Note that we haven't inserted any datasets into this repo; that's 

273 # how we'll force an empty graph. 

274 with self.assertLogs(level=logging.ERROR) as cm: 

275 qg = script.qgraph(**kwargs) 

276 self.assertRegex( 

277 cm.output[0], ".*Initial data ID query returned no rows, so QuantumGraph will be empty.*" 

278 ) 

279 self.assertRegex(cm.output[0], ".*dataset_auto0.*input_run.*doomed to fail.") 

280 self.assertIsNone(qg) 

281 

282 def test_simple_qg_no_skip_existing_inputs(self): 

283 """Test for case when output data for one task already appears in 

284 the *input* collection, but no ``--extend-run`` or ``-skip-existing`` 

285 option is present. 

286 """ 

287 with DirectButlerRepo.make_temporary() as (helper, root): 

288 helper.add_task() 

289 helper.add_task() 

290 helper.insert_datasets("dataset_auto0") 

291 kwargs = self._make_run_args( 

292 "-b", 

293 root, 

294 "-i", 

295 helper.input_chain, 

296 "-o", 

297 "output", 

298 "--register-dataset-types", 

299 pipeline_graph_factory=PipelineGraphFactory(pipeline_graph=helper.pipeline_graph), 

300 ) 

301 qg1 = script.qgraph(**kwargs) 

302 run1 = qg1.header.output_run 

303 self.assertEqual(len(qg1.quanta_by_task["task_auto1"]), 1) 

304 self.assertEqual(len(qg1.quanta_by_task["task_auto2"]), 1) 

305 self.assertEqual(len(qg1), 2) 

306 # Ensure that the output run used in the graph is also used in the 

307 # pipeline execution. It is possible for 'qgraph' and 'run' to 

308 # calculate time-stamped runs across a second boundary. 

309 kwargs["output_run"] = run1 

310 # Execute the graph and check for output existence. 

311 script.run(qg1, **kwargs) 

312 with helper.butler.query() as query: 

313 self.assertEqual(query.datasets("dataset_auto1", collections=["output"]).count(), 1) 

314 self.assertEqual(query.datasets("dataset_auto2", collections=["output"]).count(), 1) 

315 # Make a new QG with the same output collection, but a new RUN 

316 # collection, it should run again, shadowing the previous outputs. 

317 kwargs["output_run"] = None 

318 time.sleep(1) # Make sure we don't get the same RUN timestamp. 

319 qg2 = script.qgraph(**kwargs) 

320 run2 = qg2.header.output_run 

321 self.assertNotEqual(run1, run2) 

322 self.assertEqual(len(qg1.quanta_by_task["task_auto1"]), 1) 

323 self.assertEqual(len(qg1.quanta_by_task["task_auto2"]), 1) 

324 self.assertEqual(len(qg2), 2) 

325 kwargs["output_run"] = run2 

326 script.run(qg2, **kwargs) 

327 with helper.butler.query() as query: 

328 self.assertEqual(query.datasets("dataset_auto1", collections=[run2]).count(), 1) 

329 self.assertEqual(query.datasets("dataset_auto2", collections=[run2]).count(), 1) 

330 

331 def test_simple_qg_skip_existing_inputs(self): 

332 """Test for case when output data for one task already appears in 

333 the *input* collection, but no ``--extend-run`` or ``-skip-existing`` 

334 option is present. 

335 """ 

336 with DirectButlerRepo.make_temporary() as (helper, root): 

337 helper.add_task() 

338 helper.insert_datasets("dataset_auto0") 

339 kwargs = self._make_run_args( 

340 "-b", 

341 root, 

342 "-i", 

343 helper.input_chain, 

344 "-o", 

345 "output", 

346 "--register-dataset-types", 

347 pipeline_graph_factory=PipelineGraphFactory(pipeline_graph=helper.pipeline_graph), 

348 ) 

349 qg1 = script.qgraph(**kwargs) 

350 run1 = qg1.header.output_run 

351 self.assertEqual(len(qg1.quanta_by_task["task_auto1"]), 1) 

352 self.assertEqual(len(qg1), 1) 

353 # Ensure that the output run used in the graph is also used in the 

354 # pipeline execution. It is possible for 'qgraph' and 'run' to 

355 # calculate time-stamped runs across a second boundary. 

356 kwargs["output_run"] = run1 

357 # Execute the graph and check for output existence. 

358 script.run(qg1, **kwargs) 

359 with helper.butler.query() as query: 

360 self.assertEqual(query.datasets("dataset_auto1", collections=["output"]).count(), 1) 

361 # Make a new QG with the same output collection, but a new RUN 

362 # collection, with --skip-existing-in, and one more task. The 

363 # first task should be skipped and the second should be run. 

364 helper.add_task() 

365 kwargs = self._make_run_args( 

366 "-b", 

367 root, 

368 "-i", 

369 helper.input_chain, 

370 "-o", 

371 "output", 

372 "--register-dataset-types", 

373 "--skip-existing-in", 

374 "output", 

375 pipeline_graph_factory=PipelineGraphFactory(pipeline_graph=helper.pipeline_graph), 

376 ) 

377 time.sleep(1) # Make sure we don't get the same RUN timestamp. 

378 qg2 = script.qgraph(**kwargs) 

379 run2 = qg2.header.output_run 

380 self.assertNotEqual(run1, run2) 

381 self.assertEqual(len(qg2.quanta_by_task["task_auto1"]), 0) 

382 self.assertEqual(len(qg2.quanta_by_task["task_auto2"]), 1) 

383 self.assertEqual(len(qg2), 1) 

384 kwargs["output_run"] = run2 

385 script.run(qg2, **kwargs) 

386 with helper.butler.query() as query: 

387 self.assertEqual(query.datasets("dataset_auto1", collections=[run2]).count(), 0) 

388 self.assertEqual(query.datasets("dataset_auto2", collections=[run2]).count(), 1) 

389 self.assertEqual(query.datasets("dataset_auto1", collections=["output"]).count(), 1) 

390 self.assertEqual(query.datasets("dataset_auto2", collections=["output"]).count(), 1) 

391 

392 def test_simple_qg_extend_run(self): 

393 """Test for case when output data for one task already appears in 

394 the output RUN collection, and `--extend-run` is used to skip it. 

395 """ 

396 with DirectButlerRepo.make_temporary() as (helper, root): 

397 helper.add_task() 

398 helper.insert_datasets("dataset_auto0") 

399 kwargs = self._make_run_args( 

400 "-b", 

401 root, 

402 "-i", 

403 helper.input_chain, 

404 "-o", 

405 "output", 

406 "--register-dataset-types", 

407 pipeline_graph_factory=PipelineGraphFactory(pipeline_graph=helper.pipeline_graph), 

408 ) 

409 qg1 = script.qgraph(**kwargs) 

410 run1 = qg1.header.output_run 

411 self.assertEqual(len(qg1.quanta_by_task["task_auto1"]), 1) 

412 self.assertEqual(len(qg1), 1) 

413 # Ensure that the output run used in the graph is also used in the 

414 # pipeline execution. It is possible for 'qgraph' and 'run' to 

415 # calculate time-stamped runs across a second boundary. 

416 kwargs["output_run"] = run1 

417 # Execute the graph and check for output existence. 

418 script.run(qg1, **kwargs) 

419 with helper.butler.query() as query: 

420 self.assertEqual(query.datasets("dataset_auto1", collections=["output"]).count(), 1) 

421 # Make a new QG with the same output collection, but a new RUN 

422 # collection, with --extend-run, and one more task. The first task 

423 # should be skipped and the second should be run. 

424 helper.add_task() 

425 kwargs = self._make_run_args( 

426 "-b", 

427 root, 

428 "-i", 

429 helper.input_chain, 

430 "-o", 

431 "output", 

432 "--register-dataset-types", 

433 "--extend-run", 

434 pipeline_graph_factory=PipelineGraphFactory(pipeline_graph=helper.pipeline_graph), 

435 ) 

436 qg2 = script.qgraph(**kwargs) 

437 run2 = qg2.header.output_run 

438 self.assertEqual(run1, run2) 

439 self.assertEqual(len(qg2.quanta_by_task["task_auto1"]), 0) 

440 self.assertEqual(len(qg2.quanta_by_task["task_auto2"]), 1) 

441 self.assertEqual(len(qg2), 1) 

442 kwargs["output_run"] = run2 

443 script.run(qg2, **kwargs) 

444 with helper.butler.query() as query: 

445 self.assertEqual(query.datasets("dataset_auto1", collections=[run2]).count(), 1) 

446 self.assertEqual(query.datasets("dataset_auto2", collections=[run2]).count(), 1) 

447 self.assertEqual(query.datasets("dataset_auto1", collections=["output"]).count(), 1) 

448 self.assertEqual(query.datasets("dataset_auto2", collections=["output"]).count(), 1) 

449 

450 def test_simple_qg_clobber(self): 

451 """Test for case when output data for one task already appears in 

452 the output RUN collection, and `--extend-run --clobber-outputs` is used 

453 to skip it. 

454 """ 

455 with DirectButlerRepo.make_temporary() as (helper, root): 

456 helper.add_task() 

457 helper.insert_datasets("dataset_auto0") 

458 kwargs = self._make_run_args( 

459 "-b", 

460 root, 

461 "-i", 

462 helper.input_chain, 

463 "-o", 

464 "output", 

465 "--register-dataset-types", 

466 pipeline_graph_factory=PipelineGraphFactory(pipeline_graph=helper.pipeline_graph), 

467 ) 

468 qg1 = script.qgraph(**kwargs) 

469 run1 = qg1.header.output_run 

470 self.assertEqual(len(qg1.quanta_by_task), 1) 

471 self.assertEqual(len(qg1), 1) 

472 # Ensure that the output run used in the graph is also used in the 

473 # pipeline execution. It is possible for 'qgraph' and 'run' to 

474 # calculate time-stamped runs across a second boundary. 

475 kwargs["output_run"] = run1 

476 # Execute the graph and check for output existence. 

477 script.run(qg1, **kwargs) 

478 with helper.butler.query() as query: 

479 self.assertEqual(query.datasets("dataset_auto1", collections=["output"]).count(), 1) 

480 # Delete the metadata output so we don't take the skip-existing 

481 # logic path instead of the clobbering one. 

482 helper.butler.pruneDatasets( 

483 helper.butler.query_datasets("task_auto1_metadata", collections=run1), 

484 purge=True, 

485 unstore=True, 

486 disassociate=True, 

487 ) 

488 # Make a new QG with the same output collection, but a new RUN 

489 # collection, with --clobber-outputs, and one more task. Both 

490 # tasks should be run. 

491 helper.add_task() 

492 kwargs = self._make_run_args( 

493 "-b", 

494 root, 

495 "-i", 

496 helper.input_chain, 

497 "-o", 

498 "output", 

499 "--register-dataset-types", 

500 "--extend-run", 

501 "--clobber-outputs", 

502 pipeline_graph_factory=PipelineGraphFactory(pipeline_graph=helper.pipeline_graph), 

503 ) 

504 qg2 = script.qgraph(**kwargs) 

505 run2 = qg2.header.output_run 

506 self.assertEqual(run1, run2) 

507 self.assertEqual(len(qg2.quanta_by_task), 2) 

508 self.assertEqual(len(qg2), 2) 

509 kwargs["output_run"] = run2 

510 script.run(qg2, **kwargs) 

511 with helper.butler.query() as query: 

512 self.assertEqual(query.datasets("dataset_auto1", collections=[run2]).count(), 1) 

513 self.assertEqual(query.datasets("dataset_auto2", collections=[run2]).count(), 1) 

514 self.assertEqual(query.datasets("dataset_auto1", collections=["output"]).count(), 1) 

515 self.assertEqual(query.datasets("dataset_auto2", collections=["output"]).count(), 1) 

516 

517 def test_simple_qg_replace_run(self): 

518 """Test repeated execution of a trivial quantum graph with 

519 --replace-run. 

520 """ 

521 with DirectButlerRepo.make_temporary() as (helper, root): 

522 helper.add_task() 

523 helper.insert_datasets("dataset_auto0") 

524 kwargs = self._make_run_args( 

525 "-b", 

526 root, 

527 "-i", 

528 helper.input_chain, 

529 "-o", 

530 "output", 

531 "--register-dataset-types", 

532 pipeline_graph_factory=PipelineGraphFactory(pipeline_graph=helper.pipeline_graph), 

533 ) 

534 qg1 = script.qgraph(**kwargs) 

535 run1 = qg1.header.output_run 

536 self.assertEqual(len(qg1.quanta_by_task["task_auto1"]), 1) 

537 self.assertEqual(len(qg1), 1) 

538 # Ensure that the output run used in the graph is also used in the 

539 # pipeline execution. It is possible for 'qgraph' and 'run' to 

540 # calculate time-stamped runs across a second boundary. 

541 kwargs["output_run"] = run1 

542 # Execute the graph and check for output existence. 

543 script.run(qg1, **kwargs) 

544 with helper.butler.query() as query: 

545 self.assertEqual(query.datasets("dataset_auto1", collections=["output"]).count(), 1) 

546 # Delete the metadata output so we don't take the skip-existing 

547 # logic path instead of the clobbering one. 

548 helper.butler.pruneDatasets( 

549 helper.butler.query_datasets("task_auto1_metadata", collections=run1), 

550 purge=True, 

551 unstore=True, 

552 disassociate=True, 

553 ) 

554 # Make a new QG with the same output collection, but a new RUN 

555 # collection, with --clobber-outputs, and one more task. Both 

556 # tasks should be run. 

557 time.sleep(1) # Make sure we don't get the same RUN timestamp. 

558 kwargs = self._make_run_args( 

559 "-b", 

560 root, 

561 "-i", 

562 helper.input_chain, 

563 "-o", 

564 "output", 

565 "--replace-run", 

566 pipeline_graph_factory=PipelineGraphFactory(pipeline_graph=helper.pipeline_graph), 

567 ) 

568 qg2 = script.qgraph(**kwargs) 

569 run2 = qg2.header.output_run 

570 self.assertNotEqual(run1, run2) 

571 self.assertEqual(len(qg2.quanta_by_task["task_auto1"]), 1) 

572 self.assertEqual(len(qg2), 1) 

573 kwargs["output_run"] = run2 

574 script.run(qg2, **kwargs) 

575 self.assertNotIn(run1, helper.butler.collections.get_info("output").children) 

576 self.assertIn(run2, helper.butler.collections.get_info("output").children) 

577 with helper.butler.query() as query: 

578 self.assertEqual(query.datasets("dataset_auto1", collections=[run2]).count(), 1) 

579 self.assertEqual(query.datasets("dataset_auto1", collections=["output"]).count(), 1) 

580 # Repeat once again with --prune-replaced as well. 

581 time.sleep(1) # Make sure we don't get the same RUN timestamp. 

582 kwargs = self._make_run_args( 

583 "-b", 

584 root, 

585 "-i", 

586 helper.input_chain, 

587 "-o", 

588 "output", 

589 "--replace-run", 

590 "--prune-replaced", 

591 "purge", 

592 pipeline_graph_factory=PipelineGraphFactory(pipeline_graph=helper.pipeline_graph), 

593 ) 

594 qg3 = script.qgraph(**kwargs) 

595 run3 = qg3.header.output_run 

596 self.assertNotEqual(run2, run3) 

597 self.assertEqual(len(qg3.quanta_by_task["task_auto1"]), 1) 

598 self.assertEqual(len(qg3), 1) 

599 kwargs["output_run"] = run3 

600 script.run(qg3, **kwargs) 

601 self.assertNotIn(run2, helper.butler.collections.get_info("output").children) 

602 with self.assertRaises(MissingCollectionError): 

603 helper.butler.collections.get_info(run2) 

604 self.assertIn(run3, helper.butler.collections.get_info("output").children) 

605 with helper.butler.query() as query: 

606 self.assertEqual(query.datasets("dataset_auto1", collections=[run3]).count(), 1) 

607 self.assertEqual(query.datasets("dataset_auto1", collections=["output"]).count(), 1) 

608 # Trying to run again with inputs that aren't exactly what we 

609 # started with is an error, and the kind that should not modify the 

610 # data repo. 

611 kwargs = self._make_run_args( 

612 "-b", 

613 root, 

614 "-i", 

615 run1, 

616 "-o", 

617 "output", 

618 "--replace-run", 

619 "--prune-replaced", 

620 "purge", 

621 pipeline_graph_factory=PipelineGraphFactory(pipeline_graph=helper.pipeline_graph), 

622 ) 

623 with self.assertRaises(ValueError): 

624 script.qgraph(**kwargs) 

625 

626 def test_qg_partial_failure(self): 

627 """Test execution of a quantum graph where one quantum fails but others 

628 should continue. 

629 """ 

630 with DirectButlerRepo.make_temporary("base.yaml") as (helper, root): 

631 helper.add_task( 

632 dimensions=["detector"], config=DynamicTestPipelineTaskConfig(fail_condition="detector=3") 

633 ) 

634 helper.insert_datasets("dataset_auto0") 

635 kwargs = self._make_run_args( 

636 "-b", 

637 root, 

638 "-i", 

639 helper.input_chain, 

640 "-o", 

641 "output", 

642 "--register-dataset-types", 

643 pipeline_graph_factory=PipelineGraphFactory(pipeline_graph=helper.pipeline_graph), 

644 ) 

645 qg = script.qgraph(**kwargs) 

646 self.assertEqual(len(qg.quanta_by_task), 1) 

647 self.assertEqual(len(qg), 4) 

648 kwargs["output_run"] = qg.header.output_run 

649 # Execute the graph and check for output existence. 

650 with self.assertRaises(MPGraphExecutorError): 

651 script.run(qg, **kwargs) 

652 with helper.butler.query() as query: 

653 self.assertEqual(query.datasets("dataset_auto1", collections=["output"]).count(), 3) 

654 

655 

656class CoverageTestCase(unittest.TestCase): 

657 """Test the coverage context manager.""" 

658 

659 @unittest.mock.patch.dict("sys.modules", coverage=unittest.mock.MagicMock()) 

660 def testWithCoverage(self): 

661 """Test that the coverage context manager runs when invoked.""" 

662 with coverage_context({"coverage": True}): 

663 self.assertTrue(True) 

664 

665 @unittest.mock.patch("lsst.ctrl.mpexec.cli.cmd.commands.import_module", side_effect=ModuleNotFoundError()) 

666 def testWithMissingCoverage(self, mock_import): # numpydoc ignore=PR01 

667 """Test that the coverage context manager complains when coverage is 

668 not available. 

669 """ 

670 with self.assertRaises(click.exceptions.ClickException): 

671 with coverage_context({"coverage": True}): 

672 pass 

673 

674 

675if __name__ == "__main__": 

676 lsst.utils.tests.init() 

677 unittest.main()