Coverage for tests / test_aggregator.py: 12%

508 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-22 08:57 +0000

1# This file is part of pipe_base. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <https://www.gnu.org/licenses/>. 

27 

28from __future__ import annotations 

29 

30import dataclasses 

31import itertools 

32import os 

33import tempfile 

34import time 

35import unittest.mock 

36import uuid 

37from collections.abc import Iterator 

38from contextlib import contextmanager 

39from typing import Any, cast 

40 

41import astropy.table 

42import click.testing 

43import numpy as np 

44import pydantic 

45from click.testing import CliRunner, Result 

46 

47import lsst.utils.tests 

48from lsst.daf.butler import Butler, ButlerLogRecords, QuantumBackedButler 

49from lsst.pex.config import Config 

50from lsst.pipe.base import ( 

51 AlgorithmError, 

52 QuantumAttemptStatus, 

53 QuantumSuccessCaveats, 

54 TaskMetadata, 

55) 

56from lsst.pipe.base import automatic_connection_constants as acc 

57from lsst.pipe.base.cli.cmd.commands import ( 

58 aggregate_graph as aggregate_graph_cli, 

59) 

60from lsst.pipe.base.cli.cmd.commands import ( 

61 provenance_report as provenance_report_cli, 

62) 

63from lsst.pipe.base.graph_walker import GraphWalker 

64from lsst.pipe.base.pipeline_graph import Edge 

65from lsst.pipe.base.quantum_graph import ( 

66 FORMAT_VERSION, 

67 PredictedDatasetInfo, 

68 PredictedQuantumGraph, 

69 PredictedQuantumInfo, 

70 ProvenanceDatasetInfo, 

71 ProvenanceQuantumGraph, 

72 ProvenanceQuantumInfo, 

73 ProvenanceQuantumReport, 

74 ProvenanceReport, 

75 ProvenanceTaskMetadataModel, 

76) 

77from lsst.pipe.base.quantum_graph.aggregator import AggregatorConfig, FatalWorkerError, aggregate_graph 

78from lsst.pipe.base.quantum_graph.ingest_graph import ingest_graph 

79from lsst.pipe.base.resource_usage import QuantumResourceUsage 

80from lsst.pipe.base.single_quantum_executor import SingleQuantumExecutor 

81from lsst.pipe.base.tests.mocks import ( 

82 DirectButlerRepo, 

83 DynamicConnectionConfig, 

84 DynamicTestPipelineTaskConfig, 

85) 

86from lsst.pipe.base.tests.util import patch_deterministic_uuid4 

87from lsst.utils.packages import Packages 

88 

89 

90@dataclasses.dataclass 

91class PrepInfo: 

92 """Struct of objects used in an aggregator test.""" 

93 

94 butler: Butler 

95 butler_path: str 

96 predicted: PredictedQuantumGraph 

97 predicted_path: str 

98 config: AggregatorConfig 

99 

100 

101class AggregatorTestCase(unittest.TestCase): 

102 """Unit tests for `lsst.pipe.base.quantum_graph.aggregator`.""" 

103 

104 @staticmethod 

105 @contextmanager 

106 def make_test_repo() -> Iterator[PrepInfo]: 

107 """Make a test data repository and predicted quantum graph. 

108 

109 Returns 

110 ------- 

111 prep_info : `PrepInfo` 

112 Objects used in aggregator tests. 

113 

114 Notes 

115 ----- 

116 The pipeline graph used by this task looks like this: 

117 

118 ■ calibrate: {detector, visit} 

119 ╭─┤ 

120 ■ │ consolidate: {visit} 

121 

122 ■ resample: {patch, visit} 

123 

124 ■ coadd: {band, patch} 

125 

126 The data can be visualized via:: 

127 

128 python -m lsst.daf.butler.tests.registry_data.spatial 

129 

130 One of the 'calibrate' quanta (visit=2, detector=2) is configured to 

131 fail with `lsst.pipe.base.AnnotatedPartialOutputsError`. This lets us 

132 test both success-with-caveats and failures, depending on how we 

133 configure the executor. This ``{visit: 2, detector: 2}`` data ID is 

134 the only one that overlaps ``{tract: 1, patch: 1}`` and 

135 ``{tract: 0, patch: 5}``, so it should chain to the 'resample' and 

136 'coadd' tasks, too. 

137 """ 

138 with patch_deterministic_uuid4(100): 

139 with DirectButlerRepo.make_temporary("base.yaml", "spatial.yaml") as (helper, root): 

140 calibrate_config = DynamicTestPipelineTaskConfig() 

141 calibrate_config.fail_exception = "lsst.pipe.base.AnnotatedPartialOutputsError" 

142 calibrate_config.fail_condition = "visit=2 AND detector=2" 

143 helper.add_task( 

144 "calibrate", 

145 config=calibrate_config, 

146 dimensions=["visit", "detector"], 

147 inputs={ 

148 "input_image": DynamicConnectionConfig( 

149 dataset_type_name="raw", 

150 dimensions=["visit", "detector"], 

151 ) 

152 }, 

153 prerequisite_inputs={ 

154 "refcat": DynamicConnectionConfig( 

155 dataset_type_name="references", 

156 dimensions=["htm7"], 

157 multiple=True, 

158 ) 

159 }, 

160 init_outputs={ 

161 "output_schema": DynamicConnectionConfig( 

162 dataset_type_name="source_schema", 

163 ) 

164 }, 

165 outputs={ 

166 "output_image": DynamicConnectionConfig( 

167 dataset_type_name="image", 

168 dimensions=["visit", "detector"], 

169 ), 

170 "output_table": DynamicConnectionConfig( 

171 dataset_type_name="source_detector", 

172 dimensions=["visit", "detector"], 

173 ), 

174 }, 

175 ) 

176 helper.add_task( 

177 "consolidate", 

178 dimensions=["visit"], 

179 init_inputs={ 

180 "input_schema": DynamicConnectionConfig( 

181 dataset_type_name="source_schema", 

182 ) 

183 }, 

184 inputs={ 

185 "input_table": DynamicConnectionConfig( 

186 dataset_type_name="source_detector", 

187 dimensions=["visit", "detector"], 

188 multiple=True, 

189 ) 

190 }, 

191 outputs={ 

192 "output_table": DynamicConnectionConfig( 

193 dataset_type_name="source", 

194 dimensions=["visit"], 

195 ) 

196 }, 

197 ) 

198 helper.add_task( 

199 "resample", 

200 dimensions=["patch", "visit"], 

201 inputs={ 

202 "input_image": DynamicConnectionConfig( 

203 dataset_type_name="image", 

204 dimensions=["visit", "detector"], 

205 multiple=True, 

206 ) 

207 }, 

208 outputs={ 

209 "output_image": DynamicConnectionConfig( 

210 dataset_type_name="warp", 

211 dimensions=["patch", "visit"], 

212 ) 

213 }, 

214 ) 

215 helper.add_task( 

216 "coadd", 

217 dimensions=["patch", "band"], 

218 inputs={ 

219 "input_image": DynamicConnectionConfig( 

220 dataset_type_name="warp", 

221 dimensions=["patch", "visit"], 

222 multiple=True, 

223 ) 

224 }, 

225 outputs={ 

226 "output_image": DynamicConnectionConfig( 

227 dataset_type_name="coadd", 

228 dimensions=["patch", "band"], 

229 ), 

230 }, 

231 ) 

232 pqgc = helper.make_quantum_graph_builder().finish(output="out_chain") 

233 # We use the butler root for various QG files just because it's 

234 # a convenient temporary directory. 

235 predicted_path = os.path.join(root, "predicted.qg") 

236 pqgc.write(predicted_path) 

237 config = AggregatorConfig( 

238 output_path=os.path.join(root, "provenance.qg"), 

239 # Set these small to see logic paths that otherwise only 

240 # affect large graphs. 

241 ingest_batch_size=10, 

242 zstd_dict_size=256, 

243 zstd_dict_n_inputs=16, 

244 ) 

245 yield PrepInfo( 

246 butler=helper.butler.clone(collections="out_chain"), 

247 butler_path=root, 

248 predicted=pqgc.assemble(), 

249 predicted_path=predicted_path, 

250 config=config, 

251 ) 

252 

253 def iter_graph_execution( 

254 self, 

255 repo: str, 

256 qg: PredictedQuantumGraph, 

257 raise_on_partial_outputs: bool, 

258 is_retry: bool = False, 

259 ) -> Iterator[uuid.UUID]: 

260 """Return an iterator that executes and yields quanta one by one. 

261 

262 Parameters 

263 ---------- 

264 repo : `str` 

265 Butler repository path or alias. 

266 qg : `lsst.pipe.base.quantum_graph.PredictedQuantumGraph` 

267 Predicted quantum graph. Must have datastore records attached, 

268 since execution uses a quantum-backed butler. 

269 raise_on_partial_outputs : `bool` 

270 Whether to raise on `lsst.pipe.base.AnnotatedPartialOutputsError` 

271 or treat it as a success with caveats. 

272 is_retry : `bool`, optional 

273 If `True`, this is a retry attempt and hence some outputs may 

274 already be present; skip successes and reprocess failures. 

275 

276 Returns 

277 ------- 

278 quanta : `~collections.abc.Iterator` [`uuid.UUID`] 

279 An iterator over all executed quantum IDs (not blocked ones). 

280 """ 

281 qbb = qg.make_init_qbb(repo) 

282 self.enterContext(qbb) 

283 qg.init_output_run(qbb) 

284 sqe = SingleQuantumExecutor( 

285 limited_butler_factory=lambda quantum: QuantumBackedButler.initialize( 

286 repo, 

287 quantum, 

288 qg.pipeline_graph.universe, 

289 ), 

290 assume_no_existing_outputs=not is_retry, 

291 skip_existing=is_retry, 

292 clobber_outputs=is_retry, 

293 raise_on_partial_outputs=raise_on_partial_outputs, 

294 ) 

295 qg.build_execution_quanta() 

296 xgraph = qg.quantum_only_xgraph 

297 walker = GraphWalker[uuid.UUID](xgraph.copy()) 

298 for ready in walker: 

299 for quantum_id in ready: 

300 info = xgraph.nodes[quantum_id] 

301 try: 

302 sqe.execute(info["pipeline_node"], info["quantum"], quantum_id) 

303 except AlgorithmError: 

304 walker.fail(quantum_id) 

305 else: 

306 walker.finish(quantum_id) 

307 yield quantum_id 

308 

309 def check_provenance_graph( 

310 self, 

311 pred: PredictedQuantumGraph, 

312 butler: Butler, 

313 expect_failure: bool, 

314 start_time: float, 

315 expect_failures_retried: bool = False, 

316 ) -> ProvenanceQuantumGraph: 

317 """Run a batter of tests on a provenance quantum graph produced by 

318 scanning the graph created by `make_test_repo`. 

319 

320 Parameters 

321 ---------- 

322 pred: `lsst.pipe.base.quantum_graph.PredictedQuantumGraph` 

323 Predicted quantum graph. 

324 prov_reader : \ 

325 `lsst.pipe.base.quantum_graph.ProvenanceQuantumGraphReader` 

326 Reader for the provenance quantum graph. 

327 butler : `lsst.daf.butler.Butler` 

328 Client for the data repository. 

329 expect_failure : `bool` 

330 Whether to expect one quantum of 'calibrate' to fail (`True`) or 

331 succeed without writing anything (`False`). 

332 start_time : `float` 

333 A POSIX timestamp that strictly precedes the start time of any 

334 quantum's execution. 

335 expect_failures_retried : `bool`, optional 

336 If `True`, expect an initial attempt with failures prior to the 

337 most recent attempt. 

338 

339 Returns 

340 ------- 

341 prov : `ProvenanceQuantumGraph` 

342 The full provenance quantum graph. 

343 """ 

344 prov: ProvenanceQuantumGraph = butler.get("run_provenance") 

345 self.maxDiff = None 

346 self.assertEqual(prov.header.version, FORMAT_VERSION) 

347 self.assertEqual( 

348 list(butler.collections.get_info(prov.header.output).children), 

349 [prov.header.output_run] 

350 + list(butler.collections.query(prov.header.inputs, flatten_chains=True)), 

351 ) 

352 self.assertEqual(pred.quanta_by_task.keys(), prov.quanta_by_task.keys()) 

353 for task_label in pred.quanta_by_task: 

354 self.assertEqual(pred.quanta_by_task[task_label], prov.quanta_by_task[task_label]) 

355 self.assertEqual(pred.datasets_by_type.keys() - {"packages"}, prov.datasets_by_type.keys()) 

356 for dataset_type_name in prov.datasets_by_type: 

357 self.assertEqual( 

358 pred.datasets_by_type[dataset_type_name], prov.datasets_by_type[dataset_type_name] 

359 ) 

360 self.assertEqual(prov.init_quanta.keys(), pred.quanta_by_task.keys()) 

361 for quantum_id in pred: 

362 # Check consistency between the predicted and provenance quantum 

363 # node attributes. 

364 pred_qinfo: PredictedQuantumInfo = pred.bipartite_xgraph.nodes[quantum_id] 

365 prov_qinfo: ProvenanceQuantumInfo = prov.bipartite_xgraph.nodes[quantum_id] 

366 self.assertEqual(pred_qinfo["task_label"], prov_qinfo["task_label"]) 

367 self.assertEqual(pred_qinfo["data_id"], prov_qinfo["data_id"]) 

368 msg = f"{pred_qinfo['task_label']}@{pred_qinfo['data_id']}" 

369 # Check consistency between the predicted and provenance dataset 

370 # node attributes and edges. Also gather existence information for 

371 # use later. 

372 existence: dict[str, list[bool]] = {} 

373 pipeline_edges: list[Edge] 

374 for dataset_id, _, pipeline_edges in pred.bipartite_xgraph.in_edges( 

375 quantum_id, data="pipeline_edges" 

376 ): 

377 self.assertTrue(prov.bipartite_xgraph.has_predecessor(quantum_id, dataset_id)) 

378 for edge in pipeline_edges: 

379 existence.setdefault(edge.connection_name, []).append( 

380 self.check_dataset(dataset_id, pred, prov, butler) 

381 ) 

382 for _, dataset_id, pipeline_edges in pred.bipartite_xgraph.out_edges( 

383 quantum_id, data="pipeline_edges" 

384 ): 

385 self.assertTrue(prov.bipartite_xgraph.has_successor(quantum_id, dataset_id)) 

386 for edge in pipeline_edges: 

387 existence.setdefault(edge.connection_name, []).append( 

388 self.check_dataset(dataset_id, pred, prov, butler) 

389 ) 

390 # Check quantum status and dataset existence against the known 

391 # structure of the graph and where failures/caveats occur. 

392 match (pred_qinfo["task_label"], dict(pred_qinfo["data_id"].required)): 

393 case "calibrate", {"visit": 2, "detector": 2}: 

394 # This is the quantum that can directly raise. 

395 self._expect_all_exist(existence["input_image"], msg=msg) 

396 self._expect_all_exist(existence["refcat"], msg=msg) 

397 self._expect_none_exist(existence["output_image"], msg=msg) 

398 self._expect_none_exist(existence["output_table"], msg=msg) 

399 if expect_failure: 

400 self._expect_failure(prov_qinfo, existence, msg=msg) 

401 else: 

402 self._expect_successful( 

403 prov_qinfo, 

404 existence, 

405 caveats=( 

406 QuantumSuccessCaveats.PARTIAL_OUTPUTS_ERROR 

407 | QuantumSuccessCaveats.ALL_OUTPUTS_MISSING 

408 | QuantumSuccessCaveats.ANY_OUTPUTS_MISSING 

409 ), 

410 exception_type="lsst.pipe.base.tests.mocks.MockAlgorithmError", 

411 msg=msg, 

412 ) 

413 if expect_failures_retried: 

414 self.assertEqual(len(prov_qinfo["attempts"]), 2) 

415 self.assertEqual( 

416 prov_qinfo["attempts"][0].exception.type_name, 

417 "lsst.pipe.base.tests.mocks.MockAlgorithmError", 

418 ) 

419 else: 

420 self.assertEqual(len(prov_qinfo["attempts"]), 1) 

421 case "consolidate", {"visit": 2}: 

422 # This quantum will succeed (with one predicted input 

423 # missing) or be blocked. 

424 self._expect_one_missing(existence["input_table"], msg=msg) 

425 if expect_failure: 

426 self._expect_blocked(prov_qinfo, existence, msg=msg) 

427 else: 

428 self._expect_successful(prov_qinfo, existence, msg=msg) 

429 self.assertEqual( 

430 len(prov_qinfo["attempts"]), expect_failures_retried or not expect_failure 

431 ) 

432 case ( 

433 "resample" | "coadd", 

434 {"tract": 1, "patch": 1} | {"tract": 0, "patch": 5}, 

435 ): 

436 # These quanta will be blocked by an upstream failure or do 

437 # chained caveats, since they won't have enough inputs to 

438 # run. 

439 if expect_failure: 

440 self._expect_blocked(prov_qinfo, existence, msg=msg) 

441 else: 

442 self._expect_successful( 

443 prov_qinfo, 

444 existence, 

445 caveats=( 

446 QuantumSuccessCaveats.ADJUST_QUANTUM_RAISED 

447 | QuantumSuccessCaveats.NO_WORK 

448 | QuantumSuccessCaveats.ALL_OUTPUTS_MISSING 

449 | QuantumSuccessCaveats.ANY_OUTPUTS_MISSING 

450 ), 

451 msg=msg, 

452 ) 

453 self.assertEqual( 

454 len(prov_qinfo["attempts"]), expect_failures_retried or not expect_failure 

455 ) 

456 case ( 

457 "resample", 

458 {"tract": 0, "patch": 4, "visit": 2} | {"tract": 1, "patch": 0, "visit": 2}, 

459 ): 

460 # This will succeed or be blocked, with one input missing 

461 # regardless. 

462 self._expect_one_missing(existence["input_image"], msg=msg) 

463 if expect_failure: 

464 self._expect_blocked(prov_qinfo, existence, msg=msg) 

465 else: 

466 self._expect_successful(prov_qinfo, existence, msg=msg) 

467 self.assertEqual( 

468 len(prov_qinfo["attempts"]), expect_failures_retried or not expect_failure 

469 ) 

470 case ( 

471 "coadd", 

472 {"tract": 0, "patch": 4, "band": "r"} | {"tract": 1, "patch": 0, "band": "r"}, 

473 ): 

474 # This will succeed with no inputs missing or be blocked 

475 # with one input missing. 

476 if expect_failure: 

477 self._expect_one_missing(existence["input_image"], msg=msg) 

478 self._expect_blocked(prov_qinfo, existence, msg=msg) 

479 else: 

480 self._expect_all_exist(existence["input_image"], msg=msg) 

481 self._expect_successful(prov_qinfo, existence, msg=msg) 

482 self.assertEqual( 

483 len(prov_qinfo["attempts"]), expect_failures_retried or not expect_failure 

484 ) 

485 case _: 

486 # All other quanta should succeed and have all inputs 

487 # present. 

488 for connection_name in prov_qinfo["pipeline_node"].inputs.keys(): 

489 self._expect_all_exist(existence[connection_name], msg=msg) 

490 self._expect_successful(prov_qinfo, existence, msg=msg) 

491 self.assertEqual(len(prov_qinfo["attempts"]), 1) 

492 self.check_metadata( 

493 quantum_id, 

494 prov, 

495 butler, 

496 expect_ingested=(prov_qinfo["status"] is QuantumAttemptStatus.SUCCESSFUL), 

497 ) 

498 self.check_log( 

499 quantum_id, 

500 prov, 

501 butler, 

502 expect_ingested=( 

503 prov_qinfo["status"] 

504 in ( 

505 QuantumAttemptStatus.SUCCESSFUL, 

506 QuantumAttemptStatus.FAILED, 

507 ) 

508 ), 

509 ) 

510 self.check_resource_usage_table(prov, expect_failure=expect_failure, start_time=start_time) 

511 self.check_packages(butler) 

512 self.check_configs(butler, prov) 

513 self.check_quantum_table(prov, expect_failure=expect_failure) 

514 self.check_exception_table(prov, expect_failure=expect_failure) 

515 self.check_report(prov, expect_failure=expect_failure) 

516 return prov 

517 

518 def _expect_all_exist(self, existence: list[bool], msg: str) -> None: 

519 self.assertTrue(all(existence), msg=msg) 

520 

521 def _expect_none_exist(self, existence: list[bool], msg: str) -> None: 

522 self.assertFalse(any(existence), msg=msg) 

523 

524 def _expect_one_missing(self, existence: list[bool], msg: str) -> None: 

525 self.assertEqual(existence.count(False), 1, msg=msg) 

526 

527 def _expect_successful( 

528 self, 

529 info: ProvenanceQuantumInfo, 

530 existence: dict[str, list[bool]], 

531 caveats: QuantumSuccessCaveats = QuantumSuccessCaveats.NO_CAVEATS, 

532 exception_type: str | None = None, 

533 *, 

534 msg: str, 

535 ) -> None: 

536 self.assertEqual(info["status"], QuantumAttemptStatus.SUCCESSFUL, msg=msg) 

537 self.assertEqual(info["caveats"], caveats, msg=msg) 

538 if exception_type is None: 

539 self.assertIsNone(info["exception"], msg=msg) 

540 else: 

541 assert info["exception"] is not None 

542 self.assertEqual(info["exception"].type_name, exception_type, msg=msg) 

543 self._expect_all_exist(existence[acc.LOG_OUTPUT_CONNECTION_NAME], msg=msg) 

544 self._expect_all_exist(existence[acc.METADATA_OUTPUT_CONNECTION_NAME], msg=msg) 

545 if not (caveats & QuantumSuccessCaveats.ANY_OUTPUTS_MISSING): 

546 for connection_name in info["pipeline_node"].outputs.keys(): 

547 self._expect_all_exist(existence[connection_name], msg=msg) 

548 if caveats & QuantumSuccessCaveats.ALL_OUTPUTS_MISSING: 

549 for connection_name in info["pipeline_node"].outputs.keys(): 

550 self._expect_none_exist(existence[connection_name], msg=msg) 

551 self.assertIsNotNone(info["resource_usage"], msg=msg) 

552 self.assertGreater(info["resource_usage"].total_time, 0, msg=msg) 

553 self.assertGreater(info["resource_usage"].memory, 0, msg=msg) 

554 

555 def _expect_failure( 

556 self, info: ProvenanceQuantumInfo, existence: dict[str, list[bool]], msg: str 

557 ) -> None: 

558 self.assertEqual(info["status"], QuantumAttemptStatus.FAILED, msg=msg) 

559 self.assertEqual(info["exception"].type_name, "lsst.pipe.base.tests.mocks.MockAlgorithmError") 

560 self._expect_all_exist(existence[acc.LOG_OUTPUT_CONNECTION_NAME], msg=msg) 

561 self._expect_none_exist(existence[acc.METADATA_OUTPUT_CONNECTION_NAME], msg=msg) 

562 for connection_name in info["pipeline_node"].outputs.keys(): 

563 self._expect_none_exist(existence[connection_name], msg=msg) 

564 

565 def _expect_blocked( 

566 self, 

567 info: ProvenanceQuantumInfo, 

568 existence: dict[str, list[bool]], 

569 msg: str, 

570 ) -> None: 

571 self.assertEqual(info["status"], QuantumAttemptStatus.BLOCKED, msg=msg) 

572 self.assertEqual(info["attempts"], []) 

573 self._expect_none_exist(existence[acc.LOG_OUTPUT_CONNECTION_NAME], msg=msg) 

574 self._expect_none_exist(existence[acc.METADATA_OUTPUT_CONNECTION_NAME], msg=msg) 

575 for connection_name in info["pipeline_node"].outputs.keys(): 

576 self._expect_none_exist(existence[connection_name], msg=msg) 

577 

578 def check_dataset( 

579 self, 

580 dataset_id: uuid.UUID, 

581 pred: PredictedQuantumGraph, 

582 prov: ProvenanceQuantumGraph, 

583 butler: Butler, 

584 ) -> bool: 

585 """Check a provenance dataset for consistency with its predicted 

586 counterpart. 

587 

588 Parameters 

589 ---------- 

590 dataset_id : `uuid.UUID` 

591 Unique ID for the dataset. 

592 pred: `lsst.pipe.base.quantum_graph.PredictedQuantumGraph` 

593 Predicted quantum graph. 

594 prov : `lsst.pipe.base.quantum_graph.ProvenanceQuantumGraph` 

595 Provenance quantum graph. 

596 butler : `lsst.daf.butler.Butler` 

597 Client for the data repository. 

598 

599 Returns 

600 ------- 

601 exists : `bool` 

602 Whether the dataset was marked as existing in the provenance 

603 quantum graph. 

604 """ 

605 pred_info: PredictedDatasetInfo = pred.bipartite_xgraph.nodes[dataset_id] 

606 prov_info: ProvenanceDatasetInfo = prov.bipartite_xgraph.nodes[dataset_id] 

607 self.assertEqual(pred_info["dataset_type_name"], prov_info["dataset_type_name"]) 

608 self.assertEqual(pred_info["data_id"], prov_info["data_id"]) 

609 self.assertEqual(pred_info["run"], prov_info["run"]) 

610 exists = prov_info["produced"] 

611 dataset_type_name = prov_info["dataset_type_name"] 

612 # We can remove this guard when we ingest QG-backed metadata and logs. 

613 if not dataset_type_name.endswith("_metadata") and not dataset_type_name.endswith("_log"): 

614 self.assertEqual( 

615 butler.get_dataset(dataset_id) is not None, 

616 exists, 

617 msg=( 

618 f"Ingest/existence inconsistency for {dataset_type_name}" 

619 f"@{prov_info['data_id']}/{dataset_id}]" 

620 ), 

621 ) 

622 return exists 

623 

624 def check_metadata( 

625 self, quantum_id: uuid.UUID, prov: ProvenanceQuantumGraph, butler: Butler, expect_ingested: bool 

626 ) -> None: 

627 """Check reading a metadata dataset from the butler, and check that the 

628 original metadata file has been deleted. 

629 

630 Parameters 

631 ---------- 

632 quantum_id : `uuid.UUID` 

633 Unique ID for the quantum this metadata belongs to. 

634 prov : `lsst.pipe.base.quantum_graph.ProvenanceQuantumGraph` 

635 Provenance quantum graph. 

636 butler : `lsst.daf.butler.Butler` 

637 Client for the data repository. 

638 expect_ingested : `bool` 

639 Whether the metadata dataset should have been ingested. 

640 """ 

641 dataset_id = prov.bipartite_xgraph.nodes[quantum_id]["metadata_id"] 

642 ref = butler.get_dataset(dataset_id) 

643 if not expect_ingested: 

644 self.assertIsNone(ref) 

645 return 

646 assert ref is not None 

647 metadata = butler.get(ref) 

648 self.assertIsInstance(metadata, TaskMetadata) 

649 graph_path = butler.getURI("run_provenance") 

650 self.assertEqual(butler.getURI(ref), graph_path) 

651 # We now delete the metadata dataset, in order let us get the original 

652 # location from the butler and check that there's nothing there. Note 

653 # that this doesn't actually delete the file because the butler knows 

654 # it's shared with other datasets. 

655 butler.pruneDatasets([ref], disassociate=True, unstore=True, purge=True) 

656 original_path = butler.getURI(ref, predict=True) 

657 self.assertTrue(graph_path.exists()) 

658 self.assertNotEqual(graph_path, original_path) 

659 self.assertFalse(original_path.exists()) 

660 

661 def check_log( 

662 self, quantum_id: uuid.UUID, prov: ProvenanceQuantumGraph, butler: Butler, expect_ingested: bool 

663 ) -> None: 

664 """Check reading a log dataset from the butler, and check that the 

665 original log file has been deleted. 

666 

667 Parameters 

668 ---------- 

669 quantum_id : `uuid.UUID` 

670 Unique ID for the quantum this log belongs to. 

671 prov : `lsst.pipe.base.quantum_graph.ProvenanceQuantumGraph` 

672 Provenance quantum graph. 

673 butler : `lsst.daf.butler.Butler` 

674 Client for the data repository. 

675 expect_ingested : `bool` 

676 Whether the metadata dataset should have been ingested. 

677 """ 

678 dataset_id = prov.bipartite_xgraph.nodes[quantum_id]["log_id"] 

679 ref = butler.get_dataset(dataset_id) 

680 if not expect_ingested: 

681 self.assertIsNone(ref) 

682 return 

683 assert ref is not None 

684 log = butler.get(ref) 

685 self.assertIsInstance(log, ButlerLogRecords) 

686 graph_path = butler.getURI("run_provenance") 

687 self.assertEqual(butler.getURI(ref), graph_path) 

688 # We now delete the log dataset, in order let us get the original 

689 # location from the butler and check that there's nothing there. Note 

690 # that this doesn't actually delete the file because the butler knows 

691 # it's shared with other datasets. 

692 butler.pruneDatasets([ref], disassociate=True, unstore=True, purge=True) 

693 original_path = butler.getURI(ref, predict=True) 

694 self.assertTrue(graph_path.exists()) 

695 self.assertNotEqual(graph_path, original_path) 

696 self.assertFalse(original_path.exists()) 

697 

698 def check_configs(self, butler: Butler, prov: ProvenanceQuantumGraph) -> None: 

699 for task_node in prov.pipeline_graph.tasks.values(): 

700 config = butler.get(task_node.init.config_output.dataset_type_name) 

701 self.assertIsInstance(config, Config) 

702 

703 def check_packages(self, butler: Butler) -> None: 

704 """Check fetching package versions from the provenance graph. 

705 

706 Parameters 

707 ---------- 

708 butler : `lsst.daf.butler.Butler` 

709 Client for the data repository. 

710 """ 

711 packages = butler.get("run_provenance.packages") 

712 self.assertIsInstance(packages, Packages) 

713 self.assertIn("pipe_base", packages) 

714 

715 def check_resource_usage_table( 

716 self, prov: ProvenanceQuantumGraph, expect_failure: bool, start_time: float 

717 ) -> None: 

718 """Check building a resource usage table from the provenance graph. 

719 

720 Parameters 

721 ---------- 

722 prov : `lsst.pipe.base.quantum_graph.ProvenanceQuantumGraph` 

723 Reader for the provenance quantum graph. 

724 expect_failure : `bool` 

725 Whether to expect one quantum of 'calibrate' to fail (`True`) or 

726 succeed without writing anything (`False`). 

727 start_time : `float` 

728 A POSIX timestamp that strictly precedes the start time of any 

729 quantum's execution. 

730 """ 

731 tbl = prov.make_task_resource_usage_table("calibrate", include_data_ids=True) 

732 self.assertEqual(len(tbl), prov.header.n_task_quanta["calibrate"]) 

733 self.assertCountEqual( 

734 tbl.colnames, 

735 ["quantum_id"] 

736 + list(prov.pipeline_graph.tasks["calibrate"].dimensions.names) 

737 + list(QuantumResourceUsage.model_fields), 

738 ) 

739 # Check that quantum start times are bounded by the before-execution 

740 # start_time and now. This makes sure we didn't get any timezone 

741 # shenanigans. 

742 end_time = time.time() 

743 for quantum_start_time in tbl["start"]: 

744 self.assertGreater(quantum_start_time, start_time) 

745 self.assertLess(quantum_start_time, end_time) 

746 self.assertTrue(np.all(tbl["init_time"] >= 0.0)) 

747 self.assertTrue(np.all(tbl["prep_time"] > 0.0)) 

748 self.assertTrue(np.all(tbl["run_time"] >= 0.0)) 

749 

750 def check_quantum_table(self, prov: ProvenanceQuantumGraph, expect_failure: bool) -> None: 

751 """Check `ProvenanceQuantumGraph.make_quantum_table`. 

752 

753 Parameters 

754 ---------- 

755 prov : `lsst.pipe.base.quantum_graph.ProvenanceQuantumGraph` 

756 Reader for the provenance quantum graph. 

757 expect_failure : `bool` 

758 Whether to expect one quantum of 'calibrate' to fail (`True`) or 

759 succeed without writing anything (`False`). 

760 """ 

761 t = prov.make_quantum_table() 

762 self.assertEqual(list(t["Task"]), ["calibrate", "consolidate", "resample", "coadd"]) 

763 self.assertEqual(t["TOTAL"][0], 8) 

764 self.assertEqual(t["EXPECTED"][0], 8) 

765 self.assertEqual(t["Blocked"][0], 0) 

766 self.assertEqual(t["TOTAL"][1], 2) 

767 self.assertEqual(t["EXPECTED"][1], 2) 

768 self.assertEqual(t["TOTAL"][2], 10) 

769 self.assertEqual(t["EXPECTED"][2], 10) 

770 if expect_failure: 

771 # calibrate 

772 self.assertEqual(t["Successful"][0], 7) 

773 self.assertEqual(t["Caveats"][0], "") 

774 self.assertEqual(t["Failed"][0], 1) 

775 # consolidate 

776 self.assertEqual(t["Successful"][1], 1) 

777 self.assertEqual(t["Caveats"][1], "") 

778 self.assertEqual(t["Failed"][1], 0) 

779 self.assertEqual(t["Blocked"][1], 1) 

780 # resample 

781 self.assertEqual(t["Successful"][2], 6) 

782 self.assertEqual(t["Caveats"][2], "") 

783 self.assertEqual(t["Failed"][2], 0) 

784 self.assertEqual(t["Blocked"][2], 4) 

785 else: 

786 # calibrate 

787 self.assertEqual(t["Successful"][0], 8) 

788 self.assertEqual(t["Caveats"][0], "*P(1)") 

789 self.assertEqual(t["Failed"][0], 0) 

790 # consolidate 

791 self.assertEqual(t["Successful"][1], 2) 

792 self.assertEqual(t["Caveats"][1], "") 

793 self.assertEqual(t["Failed"][1], 0) 

794 self.assertEqual(t["Blocked"][1], 0) 

795 # resample 

796 self.assertEqual(t["Successful"][2], 10) 

797 self.assertEqual(t["Caveats"][2], "*A(2)") 

798 self.assertEqual(t["Failed"][2], 0) 

799 self.assertEqual(t["Blocked"][2], 0) 

800 

801 def check_exception_table(self, prov: ProvenanceQuantumGraph, expect_failure: bool) -> None: 

802 """Check `ProvenanceQuantumGraph.make_exception_table`. 

803 

804 Parameters 

805 ---------- 

806 prov : `lsst.pipe.base.quantum_graph.ProvenanceQuantumGraph` 

807 Reader for the provenance quantum graph. 

808 expect_failure : `bool` 

809 Whether to expect one quantum of 'calibrate' to fail (`True`) or 

810 succeed without writing anything (`False`). 

811 """ 

812 t = prov.make_exception_table() 

813 self.assertEqual(list(t["Task"]), ["calibrate"]) 

814 self.assertEqual(list(t["Exception"]), ["lsst.pipe.base.tests.mocks.MockAlgorithmError"]) 

815 self.assertEqual(list(t["Successes"]), [int(not expect_failure)]) 

816 self.assertEqual(list(t["Failures"]), [int(expect_failure)]) 

817 

818 def check_report(self, prov: ProvenanceQuantumGraph, expect_failure: bool) -> None: 

819 """Check `ProvenanceQuantumGraph.make_report`. 

820 

821 Parameters 

822 ---------- 

823 prov : `lsst.pipe.base.quantum_graph.ProvenanceQuantumGraph` 

824 Reader for the provenance quantum graph. 

825 expect_failure : `bool` 

826 Whether to expect one quantum of 'calibrate' to fail (`True`) or 

827 succeed without writing anything (`False`). 

828 """ 

829 with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as data_id_table_dir: 

830 report = prov.make_status_report( 

831 also=QuantumAttemptStatus.SUCCESSFUL, data_id_table_dir=data_id_table_dir 

832 ) 

833 task_label = "calibrate" 

834 status_name = "FAILED" if expect_failure else "SUCCESSFUL" 

835 exc_type = "lsst.pipe.base.tests.mocks.MockAlgorithmError" 

836 self.assertEqual(report.root.keys(), {task_label}) 

837 self.assertEqual(report.root[task_label].keys(), {status_name}) 

838 self.assertEqual(report.root[task_label][status_name].keys(), {exc_type}) 

839 self.assertEqual(len(report.root[task_label][status_name][exc_type]), 1) 

840 qr = report.root[task_label][status_name][exc_type][0] 

841 self.assertIsInstance(qr, ProvenanceQuantumReport) 

842 self.assertEqual( 

843 qr.data_id, 

844 { 

845 "instrument": "Cam1", 

846 "visit": 2, 

847 "detector": 2, 

848 "band": "r", 

849 "day_obs": 20210909, 

850 "physical_filter": "Cam1-R1", 

851 }, 

852 ) 

853 tbl = astropy.table.Table.read( 

854 os.path.join(data_id_table_dir, task_label, status_name, f"{exc_type}.ecsv") 

855 ) 

856 self.assertCountEqual(tbl.colnames, ["instrument", "visit", "detector"]) 

857 self.assertEqual(len(tbl), 1) 

858 self.assertEqual(list(tbl["instrument"]), ["Cam1"]) 

859 self.assertEqual(list(tbl["detector"]), [2]) 

860 self.assertEqual(list(tbl["visit"]), [2]) 

861 

862 def test_all_successful(self) -> None: 

863 """Test running a full graph with no failures, and then scanning the 

864 results with incomplete=False. 

865 """ 

866 with self.make_test_repo() as prep: 

867 prep.config.incomplete = False 

868 start_time = time.time() 

869 attempted_quanta = list( 

870 self.iter_graph_execution(prep.butler_path, prep.predicted, raise_on_partial_outputs=False) 

871 ) 

872 self.assertCountEqual(attempted_quanta, prep.predicted.quantum_only_xgraph.nodes.keys()) 

873 aggregate_graph(prep.predicted_path, prep.butler_path, prep.config) 

874 ingest_graph(prep.butler_path, prep.config.output_path, transfer="move", batch_size=10) 

875 prov = self.check_provenance_graph( 

876 prep.predicted, 

877 prep.butler, 

878 expect_failure=False, 

879 start_time=start_time, 

880 ) 

881 self.check_no_original_dirs(prep.butler_path, prov.header.output_run) 

882 for i, quantum_id in enumerate(attempted_quanta): 

883 qinfo: ProvenanceQuantumInfo = prov.quantum_only_xgraph.nodes[quantum_id] 

884 self.assertEqual(qinfo["attempts"][-1].previous_process_quanta, attempted_quanta[:i]) 

885 

886 def test_all_successful_two_phase(self) -> None: 

887 """Test running some of a graph with no failures, scanning with 

888 incomplete=True, then finishing the graph and scanning again. 

889 """ 

890 with self.make_test_repo() as prep: 

891 start_time = time.time() 

892 execution_iter = self.iter_graph_execution( 

893 prep.butler_path, prep.predicted, raise_on_partial_outputs=False 

894 ) 

895 attempted_quanta = list(itertools.islice(execution_iter, 9)) 

896 self.assertEqual(len(attempted_quanta), 9) 

897 # Run the scanner while telling it execution is incomplete, so it 

898 # just abandons incomplete quanta and doesn't write the provenance 

899 # QG. 

900 prep.config.incomplete = True 

901 aggregate_graph(prep.predicted_path, prep.butler_path, prep.config) 

902 self.assertFalse(os.path.exists(cast(str, prep.config.output_path))) 

903 # Finish executing the quanta. 

904 attempted_quanta.extend(execution_iter) 

905 # Scan again, and write the provenance QG. 

906 prep.config.incomplete = False 

907 aggregate_graph(prep.predicted_path, prep.butler_path, prep.config) 

908 ingest_graph(prep.butler_path, prep.config.output_path, transfer="move", batch_size=10) 

909 prov = self.check_provenance_graph( 

910 prep.predicted, 

911 prep.butler, 

912 expect_failure=False, 

913 start_time=start_time, 

914 ) 

915 for i, quantum_id in enumerate(attempted_quanta): 

916 qinfo: ProvenanceQuantumInfo = prov.quantum_only_xgraph.nodes[quantum_id] 

917 self.assertEqual(qinfo["attempts"][-1].previous_process_quanta, attempted_quanta[:i]) 

918 

919 def test_some_failed(self) -> None: 

920 """Test running a full graph with some failures, and then scanning the 

921 results with incomplete=False. 

922 """ 

923 with self.make_test_repo() as prep: 

924 prep.config.incomplete = False 

925 start_time = time.time() 

926 attempted_quanta = list( 

927 self.iter_graph_execution(prep.butler_path, prep.predicted, raise_on_partial_outputs=True) 

928 ) 

929 aggregate_graph(prep.predicted_path, prep.butler_path, prep.config) 

930 ingest_graph(prep.butler_path, prep.config.output_path, transfer="move", batch_size=10) 

931 prov = self.check_provenance_graph( 

932 prep.predicted, 

933 prep.butler, 

934 expect_failure=True, 

935 start_time=start_time, 

936 ) 

937 self.check_no_original_dirs(prep.butler_path, prov.header.output_run) 

938 for i, quantum_id in enumerate(attempted_quanta): 

939 qinfo: ProvenanceQuantumInfo = prov.quantum_only_xgraph.nodes[quantum_id] 

940 self.assertEqual(qinfo["attempts"][-1].previous_process_quanta, attempted_quanta[:i]) 

941 

942 def test_some_failed_two_phase(self) -> None: 

943 """Test running a full graph with some failures, then scanning the 

944 results with incomplete=True, then scanning again with 

945 incomplete=False. 

946 """ 

947 with self.make_test_repo() as prep: 

948 start_time = time.time() 

949 attempted_quanta = list( 

950 self.iter_graph_execution(prep.butler_path, prep.predicted, raise_on_partial_outputs=True) 

951 ) 

952 prep.config.incomplete = True 

953 aggregate_graph(prep.predicted_path, prep.butler_path, prep.config) 

954 prep.config.incomplete = False 

955 aggregate_graph(prep.predicted_path, prep.butler_path, prep.config) 

956 ingest_graph(prep.butler_path, prep.config.output_path, transfer="move", batch_size=10) 

957 prov = self.check_provenance_graph( 

958 prep.predicted, 

959 prep.butler, 

960 expect_failure=True, 

961 start_time=start_time, 

962 ) 

963 for i, quantum_id in enumerate(attempted_quanta): 

964 qinfo: ProvenanceQuantumInfo = prov.quantum_only_xgraph.nodes[quantum_id] 

965 self.assertEqual(qinfo["attempts"][-1].previous_process_quanta, attempted_quanta[:i]) 

966 

967 def test_retry(self) -> None: 

968 """Test running a full graph with some failures, rerunning the quanta 

969 that failed or were blocked in the first attempt, and then scanning 

970 for provenance. 

971 """ 

972 with self.make_test_repo() as prep: 

973 start_time = time.time() 

974 attempted_quanta_1 = list( 

975 self.iter_graph_execution(prep.butler_path, prep.predicted, raise_on_partial_outputs=True) 

976 ) 

977 attempted_quanta_2 = list( 

978 self.iter_graph_execution( 

979 prep.butler_path, prep.predicted, raise_on_partial_outputs=False, is_retry=True 

980 ) 

981 ) 

982 aggregate_graph(prep.predicted_path, prep.butler_path, prep.config) 

983 ingest_graph(prep.butler_path, prep.config.output_path, transfer="move", batch_size=10) 

984 prov = self.check_provenance_graph( 

985 prep.predicted, 

986 prep.butler, 

987 expect_failure=False, 

988 start_time=start_time, 

989 expect_failures_retried=True, 

990 ) 

991 for i, quantum_id in enumerate(attempted_quanta_1): 

992 qinfo: ProvenanceQuantumInfo = prov.quantum_only_xgraph.nodes[quantum_id] 

993 self.assertEqual(qinfo["attempts"][0].previous_process_quanta, attempted_quanta_1[:i]) 

994 expected: list[uuid.UUID] = [] 

995 for quantum_id in attempted_quanta_2: 

996 qinfo: ProvenanceQuantumInfo = prov.quantum_only_xgraph.nodes[quantum_id] 

997 if ( 

998 quantum_id in attempted_quanta_1 

999 and qinfo["attempts"][0].status is QuantumAttemptStatus.SUCCESSFUL 

1000 ): 

1001 # These weren't actually attempted twice, since they 

1002 # were already successful in the first round. 

1003 self.assertEqual(len(qinfo["attempts"]), 1) 

1004 else: 

1005 self.assertEqual(qinfo["attempts"][-1].previous_process_quanta, expected) 

1006 expected.append(quantum_id) 

1007 

1008 def test_promise_ingest_graph(self) -> None: 

1009 """Test running with promise_ingest_graph=True.""" 

1010 with self.make_test_repo() as prep: 

1011 prep.config.incomplete = False 

1012 prep.config.promise_ingest_graph = True 

1013 start_time = time.time() 

1014 attempted_quanta = list( 

1015 self.iter_graph_execution(prep.butler_path, prep.predicted, raise_on_partial_outputs=True) 

1016 ) 

1017 aggregate_graph(prep.predicted_path, prep.butler_path, prep.config) 

1018 self.assertFalse(prep.butler.query_datasets("calibrate_metadata", explain=False)) 

1019 self.assertFalse(prep.butler.query_datasets("consolidate_log", explain=False)) 

1020 self.assertFalse(prep.butler.query_datasets("resample_config", explain=False)) 

1021 ingest_graph(prep.butler_path, prep.config.output_path, transfer="move", batch_size=10) 

1022 prov = self.check_provenance_graph( 

1023 prep.predicted, 

1024 prep.butler, 

1025 expect_failure=True, 

1026 start_time=start_time, 

1027 ) 

1028 self.check_no_original_dirs(prep.butler_path, prov.header.output_run) 

1029 for i, quantum_id in enumerate(attempted_quanta): 

1030 qinfo: ProvenanceQuantumInfo = prov.quantum_only_xgraph.nodes[quantum_id] 

1031 self.assertEqual(qinfo["attempts"][-1].previous_process_quanta, attempted_quanta[:i]) 

1032 

1033 def test_worker_failures(self) -> None: 

1034 """Test that if failures occur on (multiple) workers we shut down 

1035 gracefully instead of hanging. 

1036 """ 

1037 with self.make_test_repo() as prep: 

1038 with self.assertRaises(FatalWorkerError): 

1039 aggregate_graph(prep.predicted_path, "nonexistent", prep.config) 

1040 

1041 def test_aggregate_graph_cli_overrides(self) -> None: 

1042 """Test that command-line options override config attributes as 

1043 expected. 

1044 """ 

1045 

1046 def mock_run(predicted_path: str, butler_path: str, config: AggregatorConfig) -> None: 

1047 print(config.model_dump_json(indent=2)) 

1048 

1049 def check(result: Result, **kwargs: Any) -> None: 

1050 self.assertEqual(result.exit_code, 0, msg=result.output) 

1051 self.assertEqual( 

1052 result.output.strip(), AggregatorConfig(**kwargs).model_dump_json(indent=2).strip() 

1053 ) 

1054 

1055 self.maxDiff = None 

1056 runner = CliRunner() 

1057 with unittest.mock.patch("lsst.pipe.base.quantum_graph.aggregator.aggregate_graph", mock_run): 

1058 check(runner.invoke(aggregate_graph_cli, ("pg", "repo"))) 

1059 check(runner.invoke(aggregate_graph_cli, ("pg", "repo", "--output", "out")), output_path="out") 

1060 check(runner.invoke(aggregate_graph_cli, ("pg", "repo", "-j", "4")), n_processes=4) 

1061 check( 

1062 runner.invoke(aggregate_graph_cli, ("pg", "repo", "--incomplete")), 

1063 incomplete=True, 

1064 ) 

1065 check(runner.invoke(aggregate_graph_cli, ("pg", "repo", "--dry-run")), dry_run=True) 

1066 check( 

1067 runner.invoke(aggregate_graph_cli, ("pg", "repo", "--interactive-status")), 

1068 interactive_status=True, 

1069 ) 

1070 check( 

1071 runner.invoke(aggregate_graph_cli, ("pg", "repo", "--log-status-interval", "120")), 

1072 log_status_interval=120, 

1073 ) 

1074 check( 

1075 runner.invoke(aggregate_graph_cli, ("pg", "repo", "--no-register-dataset-types")), 

1076 register_dataset_types=False, 

1077 ) 

1078 check( 

1079 runner.invoke(aggregate_graph_cli, ("pg", "repo", "--no-update-output-chain")), 

1080 update_output_chain=False, 

1081 ) 

1082 check( 

1083 runner.invoke(aggregate_graph_cli, ("pg", "repo", "--worker-log-dir", "wlogs")), 

1084 worker_log_dir="wlogs", 

1085 ) 

1086 check( 

1087 runner.invoke(aggregate_graph_cli, ("pg", "repo", "--worker-log-level", "DEBUG")), 

1088 worker_log_level="DEBUG", 

1089 ) 

1090 check( 

1091 runner.invoke(aggregate_graph_cli, ("pg", "repo", "--zstd-level", "11")), 

1092 zstd_level=11, 

1093 ) 

1094 check( 

1095 runner.invoke(aggregate_graph_cli, ("pg", "repo", "--zstd-dict-size", "143")), 

1096 zstd_dict_size=143, 

1097 ) 

1098 check( 

1099 runner.invoke(aggregate_graph_cli, ("pg", "repo", "--zstd-dict-n-inputs", "2")), 

1100 zstd_dict_n_inputs=2, 

1101 ) 

1102 check( 

1103 runner.invoke(aggregate_graph_cli, ("pg", "repo", "--mock-storage-classes")), 

1104 mock_storage_classes=True, 

1105 ) 

1106 

1107 def check_provenance_report(self, result: click.testing.Result, root: str) -> None: 

1108 self.maxDiff = None 

1109 self.assertEqual(result.exit_code, 0, msg=result.output) 

1110 self.assertEqual( 

1111 result.output, 

1112 " Task Caveats Failed Blocked Successful TOTAL EXPECTED\n" 

1113 "----------- ------- ------ ------- ---------- ----- --------\n" 

1114 " calibrate 1 0 7 8 8\n" 

1115 "consolidate 0 1 1 2 2\n" 

1116 " resample 0 4 6 10 10\n" 

1117 " coadd 0 4 6 10 10\n" 

1118 "\n" 

1119 " Task Exception Successes Failures\n" 

1120 "--------- --------------------------------------------- --------- --------\n" 

1121 "calibrate lsst.pipe.base.tests.mocks.MockAlgorithmError 0 1\n" 

1122 "\n", 

1123 ) 

1124 with open(os.path.join(root, "report.json")) as report_file: 

1125 report = ProvenanceReport.model_validate_json(report_file.read()) 

1126 self.assertEqual(report.root.keys(), {"calibrate"}) 

1127 self.assertEqual(report.root["calibrate"].keys(), {"FAILED"}) 

1128 self.assertEqual( 

1129 report.root["calibrate"]["FAILED"].keys(), {"lsst.pipe.base.tests.mocks.MockAlgorithmError"} 

1130 ) 

1131 self.assertEqual( 

1132 len(report.root["calibrate"]["FAILED"]["lsst.pipe.base.tests.mocks.MockAlgorithmError"]), 1 

1133 ) 

1134 self.assertEqual( 

1135 list(os.walk(os.path.join(root, "data_ids"))), 

1136 [ 

1137 (os.path.join(root, "data_ids"), ["calibrate"], []), 

1138 (os.path.join(root, "data_ids", "calibrate"), ["FAILED"], []), 

1139 ( 

1140 os.path.join(root, "data_ids", "calibrate", "FAILED"), 

1141 [], 

1142 ["lsst.pipe.base.tests.mocks.MockAlgorithmError.ecsv"], 

1143 ), 

1144 ], 

1145 ) 

1146 

1147 def check_no_original_dirs(self, butler_path: str, output_run: str) -> None: 

1148 """Check that there are no config/log/metadata directories in 

1149 the butler's directory for this output run. 

1150 """ 

1151 root = os.path.join(butler_path, output_run) 

1152 for subdir in os.listdir(root): 

1153 if subdir.endswith("_config") or subdir.endswith("_metadata") or subdir.endswith("_log"): 

1154 raise AssertionError(f"Directory {os.path.join(root, subdir)} still exists.") 

1155 

1156 def test_provenance_report_content(self) -> None: 

1157 """Test the provenance-report CLI command.""" 

1158 with self.make_test_repo() as prep: 

1159 prep.config.incomplete = False 

1160 prep.config.promise_ingest_graph = True 

1161 for _ in self.iter_graph_execution( 

1162 prep.butler_path, prep.predicted, raise_on_partial_outputs=True 

1163 ): 

1164 pass 

1165 aggregate_graph(prep.predicted_path, prep.butler_path, prep.config) 

1166 self.assertFalse(prep.butler.query_datasets("calibrate_metadata", explain=False)) 

1167 self.assertFalse(prep.butler.query_datasets("consolidate_log", explain=False)) 

1168 self.assertFalse(prep.butler.query_datasets("resample_config", explain=False)) 

1169 

1170 # First test on a provenance graph file that has not been ingested. 

1171 runner = CliRunner() 

1172 report_root = os.path.join(prep.butler_path, "uningested") 

1173 result = runner.invoke( 

1174 provenance_report_cli, 

1175 ( 

1176 cast(str, prep.config.output_path), 

1177 "--status-report", 

1178 os.path.join(report_root, "report.json"), 

1179 "--data-id-table-dir", 

1180 os.path.join(report_root, "data_ids"), 

1181 ), 

1182 ) 

1183 self.check_provenance_report(result, report_root) 

1184 

1185 ingest_graph(prep.butler_path, prep.config.output_path, transfer="move", batch_size=10) 

1186 

1187 runner = CliRunner() 

1188 report_root = os.path.join(prep.butler_path, "ingested") 

1189 result = runner.invoke( 

1190 provenance_report_cli, 

1191 ( 

1192 prep.butler_path, 

1193 *prep.butler.collections.defaults, 

1194 "--status-report", 

1195 os.path.join(report_root, "report.json"), 

1196 "--data-id-table-dir", 

1197 os.path.join(report_root, "data_ids"), 

1198 ), 

1199 ) 

1200 self.check_provenance_report(result, os.path.join(report_root)) 

1201 

1202 def test_provenance_report_cli_overrides(self) -> None: 

1203 """Test the provenance-report CLI command with a mocked 

1204 implementation. 

1205 """ 

1206 

1207 class MakeManyReportsArgs(pydantic.BaseModel): 

1208 status_report_file: str | None = None 

1209 print_quantum_table: bool = True 

1210 print_exception_table: bool = True 

1211 states: list[QuantumAttemptStatus] = pydantic.Field( 

1212 default_factory=lambda: [ 

1213 QuantumAttemptStatus.FAILED, 

1214 QuantumAttemptStatus.ABORTED, 

1215 QuantumAttemptStatus.ABORTED_SUCCESS, 

1216 ] 

1217 ) 

1218 with_caveats: QuantumSuccessCaveats | None = None 

1219 data_id_table_dir: str | None = None 

1220 

1221 @pydantic.model_validator(mode="after") 

1222 def _sort_states(self) -> MakeManyReportsArgs: 

1223 self.states.sort(key=lambda e: e.value) 

1224 return self 

1225 

1226 class MockProvenanceQuantumGraph(pydantic.BaseModel): 

1227 repo_or_filename: str 

1228 collection: str | None 

1229 quanta: list[uuid.UUID] | None = None 

1230 datasets: list[uuid.UUID] | None = None 

1231 writeable: bool = False 

1232 make_many_reports_args: MakeManyReportsArgs | None = None 

1233 

1234 @classmethod 

1235 @contextmanager 

1236 def from_args( 

1237 cls, repo_or_filename: str, /, **kwargs: Any 

1238 ) -> Iterator[tuple[MockProvenanceQuantumGraph, None]]: 

1239 yield cls(repo_or_filename=repo_or_filename, **kwargs), None 

1240 

1241 def make_many_reports(self, **kwargs: Any) -> None: 

1242 self.make_many_reports_args = MakeManyReportsArgs(**kwargs) 

1243 print(self.model_dump_json(indent=2)) 

1244 

1245 def check( 

1246 result: Result, repo_or_filename: str, collection: str | None = None, **kwargs: Any 

1247 ) -> None: 

1248 self.assertEqual(result.exit_code, 0, msg=result.output) 

1249 self.assertEqual( 

1250 result.output.strip(), 

1251 MockProvenanceQuantumGraph( 

1252 repo_or_filename=repo_or_filename, 

1253 collection=collection, 

1254 datasets=[], 

1255 writeable=False, 

1256 make_many_reports_args=MakeManyReportsArgs(**kwargs), 

1257 ).model_dump_json(indent=2), 

1258 ) 

1259 

1260 self.maxDiff = None 

1261 runner = CliRunner() 

1262 with unittest.mock.patch( 

1263 "lsst.pipe.base.quantum_graph.ProvenanceQuantumGraph", MockProvenanceQuantumGraph 

1264 ): 

1265 check( 

1266 runner.invoke(provenance_report_cli, ("repo", "collection1")), 

1267 repo_or_filename="repo", 

1268 collection="collection1", 

1269 ) 

1270 check( 

1271 runner.invoke(provenance_report_cli, ("filename1",)), 

1272 repo_or_filename="filename1", 

1273 collection=None, 

1274 ) 

1275 check( 

1276 runner.invoke(provenance_report_cli, ("repo", "collection1", "--no-quantum-table")), 

1277 repo_or_filename="repo", 

1278 collection="collection1", 

1279 print_quantum_table=False, 

1280 ) 

1281 check( 

1282 runner.invoke(provenance_report_cli, ("repo", "collection1", "--no-exception-table")), 

1283 repo_or_filename="repo", 

1284 collection="collection1", 

1285 print_exception_table=False, 

1286 ) 

1287 check( 

1288 runner.invoke(provenance_report_cli, ("repo", "collection1", "--status-report", "filename2")), 

1289 repo_or_filename="repo", 

1290 collection="collection1", 

1291 status_report_file="filename2", 

1292 ) 

1293 check( 

1294 runner.invoke(provenance_report_cli, ("repo", "collection1", "--state", "SUCCESSFUL")), 

1295 repo_or_filename="repo", 

1296 collection="collection1", 

1297 states=[ 

1298 QuantumAttemptStatus.SUCCESSFUL, 

1299 QuantumAttemptStatus.FAILED, 

1300 QuantumAttemptStatus.ABORTED, 

1301 QuantumAttemptStatus.ABORTED_SUCCESS, 

1302 ], 

1303 ) 

1304 check( 

1305 runner.invoke(provenance_report_cli, ("repo", "collection1", "--no-state", "FAILED")), 

1306 repo_or_filename="repo", 

1307 collection="collection1", 

1308 states=[ 

1309 QuantumAttemptStatus.ABORTED, 

1310 QuantumAttemptStatus.ABORTED_SUCCESS, 

1311 ], 

1312 ) 

1313 check( 

1314 runner.invoke( 

1315 provenance_report_cli, ("repo", "collection1", "--caveat", "PARTIAL_OUTPUTS_ERROR") 

1316 ), 

1317 repo_or_filename="repo", 

1318 collection="collection1", 

1319 with_caveats=QuantumSuccessCaveats.PARTIAL_OUTPUTS_ERROR, 

1320 states=[ 

1321 QuantumAttemptStatus.SUCCESSFUL, 

1322 QuantumAttemptStatus.FAILED, 

1323 QuantumAttemptStatus.ABORTED, 

1324 QuantumAttemptStatus.ABORTED_SUCCESS, 

1325 ], 

1326 ) 

1327 check( 

1328 runner.invoke(provenance_report_cli, ("repo", "collection1", "--data-id-table-dir", "dir1")), 

1329 repo_or_filename="repo", 

1330 collection="collection1", 

1331 data_id_table_dir="dir1", 

1332 ) 

1333 

1334 def test_bad_metadata_readable(self) -> None: 

1335 """Test that consolidated metadata accidentally written with floats 

1336 transformed to JSON null are now readable. 

1337 """ 

1338 with open(os.path.join(os.path.dirname(__file__), "data", "DM-54057.json")) as stream: 

1339 data = stream.read() 

1340 prov_md = ProvenanceTaskMetadataModel.model_validate_json(data) 

1341 self.assertTrue(np.isnan(prov_md.attempts[0]["calibrateImage:psf_measure_psf"]["spatialFitChi2"])) 

1342 

1343 

1344if __name__ == "__main__": 

1345 lsst.utils.tests.init() 

1346 unittest.main()