Coverage for python / lsst / pipe / base / quantum_provenance_graph.py: 26%

728 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-26 08:59 +0000

1# This file is part of pipe_base. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28"""A set of already-run, merged quantum graphs with provenance information 

29which can be used to compose a report on the status of multi-attempt 

30processing. 

31""" 

32 

33from __future__ import annotations 

34 

35__all__ = ( 

36 "DatasetKey", 

37 "PrerequisiteDatasetKey", 

38 "QuantumKey", 

39 "QuantumProvenanceGraph", 

40) 

41 

42import concurrent.futures 

43import dataclasses 

44import datetime 

45import itertools 

46import logging 

47import textwrap 

48import threading 

49import uuid 

50from collections.abc import Callable, Iterator, Mapping, Sequence, Set 

51from enum import Enum 

52from typing import Any, ClassVar, Literal, TypedDict, cast 

53 

54import astropy.table 

55import networkx 

56import pydantic 

57 

58from lsst.daf.butler import ( 

59 Butler, 

60 ButlerConfig, 

61 ButlerLogRecords, 

62 DataCoordinate, 

63 DataIdValue, 

64 DatasetId, 

65 DatasetRef, 

66 DatasetType, 

67 DimensionUniverse, 

68 LimitedButler, 

69 MissingDatasetTypeError, 

70 QuantumBackedButler, 

71) 

72from lsst.resources import ResourcePathExpression 

73from lsst.utils.logging import PeriodicLogger, getLogger 

74 

75from ._status import ExceptionInfo, QuantumSuccessCaveats 

76from .automatic_connection_constants import ( 

77 LOG_OUTPUT_CONNECTION_NAME, 

78 LOG_OUTPUT_TEMPLATE, 

79 METADATA_OUTPUT_CONNECTION_NAME, 

80 METADATA_OUTPUT_STORAGE_CLASS, 

81 METADATA_OUTPUT_TEMPLATE, 

82 PROVENANCE_DATASET_TYPE_NAME, 

83) 

84from .graph import QuantumGraph, QuantumNode 

85 

86_LOG = getLogger(__name__) 

87 

88 

89@dataclasses.dataclass(slots=True, eq=True, frozen=True) 

90class QuantumKey: 

91 """Identifier type for quantum keys in a `QuantumProvenanceGraph`. These 

92 keys correspond to a task label and data ID, but can refer to this over 

93 multiple runs or datasets. 

94 """ 

95 

96 task_label: str 

97 """Label of the task in the pipeline.""" 

98 

99 data_id_values: tuple[DataIdValue, ...] 

100 """Data ID values of the quantum. 

101 

102 Note that keys are fixed given `task_label`, so using only the values here 

103 speeds up comparisons. 

104 """ 

105 

106 is_task: ClassVar[Literal[True]] = True 

107 """Whether this node represents a quantum rather 

108 than a dataset (always `True`). 

109 """ 

110 

111 

112@dataclasses.dataclass(slots=True, eq=True, frozen=True) 

113class DatasetKey: 

114 """Identifier type for dataset keys in a `QuantumProvenanceGraph`.""" 

115 

116 dataset_type_name: str 

117 """Name of the dataset type (never a component).""" 

118 

119 data_id_values: tuple[DataIdValue, ...] 

120 """Data ID values of the dataset. 

121 

122 Note that keys are fixed given `parent_dataset_type_name`, so using only 

123 the values here speeds up comparisons. 

124 """ 

125 

126 is_task: ClassVar[Literal[False]] = False 

127 """Whether this node represents a quantum rather than a dataset (always 

128 `False`). 

129 """ 

130 

131 is_prerequisite: ClassVar[Literal[False]] = False 

132 """Whether this node is a prerequisite to another node (also always 

133 `False`). 

134 """ 

135 

136 

137@dataclasses.dataclass(slots=True, eq=True, frozen=True) 

138class PrerequisiteDatasetKey: 

139 """Identifier type for prerequisite dataset keys in a 

140 `QuantumProvenanceGraph`. 

141 

142 Unlike regular datasets, prerequisites are not actually required to come 

143 from a find-first search of `input_collections`, so we don't want to 

144 assume that the same data ID implies the same dataset. Happily we also 

145 don't need to search for them by data ID in the graph, so we can use the 

146 dataset ID (UUID) instead. 

147 """ 

148 

149 dataset_type_name: str 

150 """Name of the dataset type (never a component).""" 

151 

152 dataset_id_bytes: bytes 

153 """Dataset ID (UUID) as raw bytes.""" 

154 

155 is_task: ClassVar[Literal[False]] = False 

156 """Whether this node represents a quantum rather 

157 than a dataset (always `False`). 

158 """ 

159 

160 is_prerequisite: ClassVar[Literal[True]] = True 

161 """Whether this node is a prerequisite to another node (always `True`). 

162 """ 

163 

164 

165class QuantumRunStatus(Enum): 

166 """Enum describing the status of a quantum-run collection combination. 

167 

168 Possible Statuses 

169 ----------------- 

170 METADATA_MISSING = -3: Metadata is missing for this quantum in this run. 

171 It is impossible to tell whether execution of this quantum was 

172 attempted due to missing metadata. 

173 LOGS_MISSING = -2: Logs are missing for this quantum in this run. It was 

174 attempted, but it is impossible to tell if it succeeded or failed due 

175 to missing logs. 

176 FAILED = -1: Attempts to execute the quantum failed in this run. 

177 BLOCKED = 0: This run does not include an executed version of this 

178 quantum because an upstream task failed. 

179 SUCCESSFUL = 1: This quantum was executed successfully in this run. 

180 """ 

181 

182 METADATA_MISSING = -3 

183 LOGS_MISSING = -2 

184 FAILED = -1 

185 BLOCKED = 0 

186 SUCCESSFUL = 1 

187 

188 

189class QuantumRun(pydantic.BaseModel): 

190 """Information about a quantum in a given run collection.""" 

191 

192 model_config = pydantic.ConfigDict(arbitrary_types_allowed=True) # for DatasetRef attrs. 

193 

194 id: uuid.UUID 

195 """The quantum graph node ID associated with the dataId in a specific run. 

196 """ 

197 

198 status: QuantumRunStatus = QuantumRunStatus.METADATA_MISSING 

199 """The status of the quantum in that run. 

200 """ 

201 

202 caveats: QuantumSuccessCaveats | None = None 

203 """Flags that describe possibly-qualified successes. 

204 

205 This is `None` when `status` is not `SUCCESSFUL` or `LOGS_MISSING`. It 

206 may also be `None` if metadata was not loaded or had no success flags. 

207 """ 

208 

209 exception: ExceptionInfo | None = None 

210 """Information about an exception that that was raised during the quantum's 

211 execution. 

212 

213 Exception information for failed quanta is not currently stored, so this 

214 field is actually only populated for quanta that raise 

215 `AnnotatedPartialOutputsError`, and only when the execution system is 

216 configured not to consider these partial successes a failure (i.e. when 

217 `status` is `~QuantumRunStatus.SUCCESSFUL` and `caveats` has 

218 `~QuantumSuccessCaveats.PARTIAL_OUTPUTS_ERROR` set. The error whose 

219 information is reported here is the exception chained from the 

220 `AnnotatedPartialOutputsError`. 

221 

222 In the future, exception information from failures may be available as 

223 well. 

224 """ 

225 

226 metadata_ref: DatasetRef 

227 """Predicted DatasetRef for the metadata dataset.""" 

228 

229 log_ref: DatasetRef 

230 """Predicted DatasetRef for the log dataset.""" 

231 

232 @staticmethod 

233 def find_final(info: QuantumInfo) -> tuple[str, QuantumRun]: 

234 """Return the final RUN collection name and `QuantumRun` structure from 

235 a `QuantumInfo` dictionary. 

236 

237 The "final run" is the last RUN collection in the sequence of quantum 

238 graphs that: 

239 

240 - actually had a quantum for that task label and data ID; 

241 - execution seems to have at least been attempted (at least one of 

242 metadata or logs were produced). 

243 

244 Parameters 

245 ---------- 

246 info : `QuantumInfo` 

247 Quantum information that includes all runs. 

248 

249 Returns 

250 ------- 

251 run : `str` 

252 RUN collection name. 

253 quantum_run : `QuantumRun` 

254 Information about a quantum in a RUN collection. 

255 

256 Raises 

257 ------ 

258 ValueError 

259 Raised if this quantum never had a status that suggested execution 

260 in any run. 

261 """ 

262 for run, quantum_run in reversed(info["runs"].items()): 

263 if ( 

264 quantum_run.status is not QuantumRunStatus.METADATA_MISSING 

265 and quantum_run.status is not QuantumRunStatus.BLOCKED 

266 ): 

267 return run, quantum_run 

268 raise ValueError("Quantum was never executed.") 

269 

270 

271class QuantumInfoStatus(Enum): 

272 """The status of a quantum (a particular task run on a particular dataID) 

273 across all runs. 

274 

275 Possible Statuses 

276 ----------------- 

277 WONKY = -3: The overall state of this quantum reflects inconsistencies or 

278 is difficult to discern. There are a few specific ways to enter a wonky 

279 state; it is impossible to exit and requires human intervention to 

280 proceed with processing. 

281 Currently, a quantum enters a wonky state for one of three reasons: 

282 - Its overall `QuantumInfoStatus` moves from a successful state (as a 

283 result of a successful run) to any other state. In other words, 

284 something that initially succeeded fails on subsequent attempts. 

285 - A `QuantumRun` is missing logs. 

286 - There are multiple runs associated with a dataset, and this comes up 

287 in a findFirst search. This means that a dataset which will be used 

288 as an input data product for further processing has heterogeneous 

289 inputs, which may have had different inputs or a different 

290 data-query. 

291 FAILED = -2: These quanta were attempted and failed. Failed quanta have 

292 logs and no metadata. 

293 UNKNOWN = -1: These are quanta which do not have any metadata associated 

294 with processing, but for which it is impossible to tell the status due 

295 to an additional absence of logs. Quanta which had not been processed 

296 at all would reflect this state, as would quanta which were 

297 conceptualized in the construction of the quantum graph but later 

298 identified to be unneccesary or erroneous (deemed NoWorkFound by the 

299 Science Pipelines). 

300 BLOCKED = 0: The quantum is not able to execute because its inputs are 

301 missing due to an upstream failure. Blocked quanta are distinguished 

302 from failed quanta by being successors of failed quanta in the graph. 

303 All the successors of blocked quanta are also marked as blocked. 

304 SUCCESSFUL = 1: Attempts at executing this quantum were successful. 

305 """ 

306 

307 WONKY = -3 

308 FAILED = -2 

309 UNKNOWN = -1 

310 BLOCKED = 0 

311 SUCCESSFUL = 1 

312 

313 

314class QuantumInfo(TypedDict): 

315 """Information about a quantum (i.e., the combination of a task label and 

316 data ID) across all attempted runs. 

317 

318 Used to annotate the networkx node dictionary. 

319 """ 

320 

321 data_id: DataCoordinate 

322 """The data_id of the quantum. 

323 """ 

324 

325 runs: dict[str, QuantumRun] 

326 """All run collections associated with the quantum. 

327 """ 

328 

329 status: QuantumInfoStatus 

330 """The overall status of the quantum. Note that it is impossible to exit a 

331 wonky state. 

332 """ 

333 

334 recovered: bool 

335 """The quantum was originally not successful but was ultimately successful. 

336 """ 

337 

338 messages: list[str] 

339 """Diagnostic messages to help disambiguate wonky states. 

340 """ 

341 

342 log: DatasetKey 

343 """The `DatasetKey` which can be used to access the log associated with the 

344 quantum across runs. 

345 """ 

346 

347 metadata: DatasetKey 

348 """The `DatasetKey` which can be used to access the metadata for the 

349 quantum across runs. 

350 """ 

351 

352 

353class DatasetRun(pydantic.BaseModel): 

354 """Information about a dataset in a given run collection.""" 

355 

356 id: uuid.UUID 

357 """The dataset ID associated with the dataset in a specific run. 

358 """ 

359 

360 produced: bool = False 

361 """Whether the specific run wrote the dataset. 

362 """ 

363 

364 visible: bool = False 

365 """Whether this dataset is visible in the final output collection; in other 

366 words, whether this dataset is queryable in a find-first search. This 

367 determines whether it will be used as an input to further processing. 

368 """ 

369 

370 @pydantic.model_validator(mode="after") 

371 def _validate(self) -> DatasetRun: 

372 """Validate the model for `DatasetRun` by asserting that no visible 

373 `DatasetRun` is also not produced (this should be impossible). 

374 

375 Returns 

376 ------- 

377 self : `DatasetRun` 

378 The `DatasetRun` object, validated. 

379 """ 

380 assert not (self.visible and not self.produced) 

381 return self 

382 

383 

384class DatasetInfoStatus(Enum): 

385 """Status of the the DatasetType-dataID pair over all runs. This depends 

386 not only on the presence of the dataset itself, but also on metadata, logs 

387 and the state of its producer quantum. 

388 

389 Possible Statuses 

390 ----------------- 

391 CURSED: The dataset was the result of an unsuccessful quantum and was 

392 visible in the output collection anyway. These are flagged as 

393 cursed so that they may be caught before they become inputs to 

394 further processing. 

395 UNSUCCESSFUL: The dataset was not produced. These are the results of 

396 failed or blocked quanta. 

397 PREDICTED_ONLY: The dataset was predicted, and was not visible in any 

398 run, but was the successor of a successful quantum. These datasets are 

399 the result of pipelines NoWorkFound cases, in which a dataset is 

400 predicted in the graph but found to not be necessary in processing. 

401 SHADOWED: The dataset exists but is not queryable in a find_first 

402 search. This could mean that the version of this dataset which is 

403 passed as an input to further processing is not in the collections 

404 given. A shadowed dataset will not be used as an input to further 

405 processing. 

406 VISIBLE: The dataset is queryable in a find_first search. This means 

407 that it can be used as an input by subsequent tasks and processing. 

408 """ 

409 

410 CURSED = -2 

411 UNSUCCESSFUL = -1 

412 PREDICTED_ONLY = 0 

413 SHADOWED = 1 

414 VISIBLE = 2 

415 

416 

417class DatasetInfo(TypedDict): 

418 """Information about a given dataset across all runs. 

419 

420 Used to annotate the networkx node dictionary. 

421 """ 

422 

423 data_id: DataCoordinate 

424 """The data_id of the quantum. 

425 """ 

426 

427 runs: dict[str, DatasetRun] 

428 """All runs associated with the dataset. 

429 """ 

430 

431 status: DatasetInfoStatus 

432 """Overall status of the dataset. 

433 """ 

434 

435 messages: list[str] 

436 """Diagnostic messages to help disambiguate cursed states. 

437 """ 

438 

439 

440class UnsuccessfulQuantumSummary(pydantic.BaseModel): 

441 """A summary of all relevant information on an unsuccessful quantum. 

442 

443 This summarizes all information on a task's output for a particular data ID 

444 over all runs. 

445 """ 

446 

447 data_id: dict[str, DataIdValue] 

448 """The data_id of the unsuccessful quantum. 

449 """ 

450 runs: dict[str, str] 

451 """A dictionary (key: output run collection name) with the value of the 

452 enum name of the `QuantumRunStatus` of each run associated with an attempt 

453 to process the unsuccessful quantum. 

454 """ 

455 messages: list[str] 

456 """Any messages associated with the unsuccessful quantum (any clues as to 

457 why the quantum may be in a FAILED or WONKY state). 

458 """ 

459 

460 @classmethod 

461 def _from_info(cls, info: QuantumInfo) -> UnsuccessfulQuantumSummary: 

462 """Summarize all relevant information from the `QuantumInfo` in an 

463 `UnsuccessfulQuantumSummary`; return an `UnsuccessfulQuantumSummary`. 

464 

465 Parameters 

466 ---------- 

467 info : `QuantumInfo` 

468 The `QuantumInfo` object for the unsuccessful quantum. 

469 

470 Returns 

471 ------- 

472 summary : `UnsuccessfulQuantumSummary` 

473 A Pydantic model containing the dataID, run collection names (and 

474 each of their `QuantumRunStatus` enum names) as well as messages 

475 which may point to any clues about the nature of the problem. For 

476 failed quanta, these are usually error messages from the butler 

477 logs. For wonky quanta, these can be messages generated during the 

478 assembly of the `QuantumProvenanceGraph` that describe why it was 

479 marked as wonky. 

480 """ 

481 return cls( 

482 data_id=dict(info["data_id"].required), 

483 runs={k: v.status.name for k, v in info["runs"].items()}, 

484 messages=info["messages"], 

485 ) 

486 

487 

488class ExceptionInfoSummary(pydantic.BaseModel): 

489 """A summary of an exception raised by a quantum.""" 

490 

491 quantum_id: uuid.UUID 

492 """Unique identifier for this quantum in this run.""" 

493 

494 data_id: dict[str, DataIdValue] 

495 """The data ID of the quantum.""" 

496 

497 run: str 

498 """Name of the RUN collection in which this exception was (last) raised.""" 

499 

500 exception: ExceptionInfo 

501 """Information about an exception chained to 

502 `AnnotatedPartialOutputsError`, if that was raised by this quantum in this 

503 run. 

504 """ 

505 

506 

507class TaskSummary(pydantic.BaseModel): 

508 """A summary of the status of all quanta associated with a single task, 

509 across all runs. 

510 """ 

511 

512 n_successful: int = 0 

513 """A count of successful quanta. 

514 """ 

515 n_blocked: int = 0 

516 """A count of blocked quanta. 

517 """ 

518 n_unknown: int = 0 

519 """A count of quanta for which there are no metadata or logs. 

520 """ 

521 

522 n_expected: int = 0 

523 """The number of quanta expected by the graph. 

524 """ 

525 

526 @pydantic.computed_field # type: ignore[prop-decorator] 

527 @property 

528 def n_wonky(self) -> int: 

529 """Return a count of `wonky` quanta.""" 

530 return len(self.wonky_quanta) 

531 

532 @pydantic.computed_field # type: ignore[prop-decorator] 

533 @property 

534 def n_failed(self) -> int: 

535 """Return a count of `failed` quanta.""" 

536 return len(self.failed_quanta) 

537 

538 caveats: dict[str, list[dict[str, DataIdValue]]] = pydantic.Field(default_factory=dict) 

539 """Quanta that were successful with caveats. 

540 

541 Keys are 2-character codes returned by `QuantumSuccessCaveats.concise`; 

542 values are lists of data IDs of quanta with those caveats. Quanta that were 

543 unqualified successes are not included. 

544 

545 Quanta for which success flags were not read from metadata will not be 

546 included. 

547 """ 

548 

549 exceptions: dict[str, list[ExceptionInfoSummary]] = pydantic.Field(default_factory=dict) 

550 """Exceptions raised by partially-successful quanta. 

551 

552 Keys are fully-qualified exception type names and values are lists of 

553 extra exception information, with each entry corresponding to different 

554 data ID. Only the final RUN for each data ID is represented here. 

555 

556 Every entry in this data structure corresponds to one in `ceveats` with 

557 the "P" code (for `QuantumSuccessCaveats.PARTIAL_OUTPUTS_ERROR`). 

558 

559 In the future, this may be expanded to include exceptions for failed quanta 

560 as well (at present that information is not retained during execution). 

561 """ 

562 

563 failed_quanta: list[UnsuccessfulQuantumSummary] = pydantic.Field(default_factory=list) 

564 """A list of all `UnsuccessfulQuantumSummary` objects associated with the 

565 FAILED quanta. This is a report containing their data IDs, the status 

566 of each run associated with each `failed` quantum, and the error messages 

567 associated with the failures when applicable. 

568 """ 

569 recovered_quanta: list[dict[str, DataIdValue]] = pydantic.Field(default_factory=list) 

570 """A list of dataIDs (key->value) which moved from an unsuccessful to 

571 successful state. 

572 """ 

573 wonky_quanta: list[UnsuccessfulQuantumSummary] = pydantic.Field(default_factory=list) 

574 """A list of all `UnsuccessfulQuantumSummary` objects associated with the 

575 WONKY quanta. This is a report containing their data_ids, the status of 

576 each run associated with each `wonky` quantum, and messages (dictated in 

577 this module) associated with the particular issue identified. 

578 """ 

579 

580 def _add_quantum_info( 

581 self, 

582 info: QuantumInfo, 

583 log_getter: Callable[[DatasetRef], ButlerLogRecords] | None, 

584 executor: concurrent.futures.Executor, 

585 ) -> concurrent.futures.Future[None] | None: 

586 """Add a `QuantumInfo` to a `TaskSummary`. 

587 

588 Unpack the `QuantumInfo` object, sorting quanta of each status into 

589 the correct place in the `TaskSummary`. If looking for error messages 

590 in the `lsst.daf.butler.Butler` logs is desired, take special care to 

591 catch issues with missing logs. 

592 

593 Parameters 

594 ---------- 

595 info : `QuantumInfo` 

596 The `QuantumInfo` object to add to the `TaskSummary`. 

597 log_getter : `~collections.abc.Callable` or `None` 

598 A callable that can be passed a `~lsst.daf.butler.DatasetRef` for 

599 a log dataset to retreive those logs, or `None` to not load any 

600 logs. 

601 executor : `concurrent.futures.Executor` 

602 A possibly-parallel executor that should be used to schedule 

603 log dataset reads. 

604 

605 Returns 

606 ------- 

607 future : `concurrent.futures.Future` or `None` 

608 A future that represents a parallelized log read and summary 

609 update. 

610 """ 

611 try: 

612 final_run, final_quantum_run = QuantumRun.find_final(info) 

613 except ValueError: 

614 final_run = None 

615 final_quantum_run = None 

616 match info["status"]: 

617 case QuantumInfoStatus.SUCCESSFUL: 

618 self.n_successful += 1 

619 if info["recovered"]: 

620 self.recovered_quanta.append(dict(info["data_id"].required)) 

621 if final_quantum_run is not None and final_quantum_run.caveats: 

622 code = final_quantum_run.caveats.concise() 

623 self.caveats.setdefault(code, []).append(dict(info["data_id"].required)) 

624 if final_quantum_run.caveats & QuantumSuccessCaveats.PARTIAL_OUTPUTS_ERROR: 

625 if final_quantum_run.exception is not None: 

626 self.exceptions.setdefault(final_quantum_run.exception.type_name, []).append( 

627 ExceptionInfoSummary( 

628 quantum_id=final_quantum_run.id, 

629 data_id=dict(info["data_id"].required), 

630 run=final_run, 

631 exception=final_quantum_run.exception, 

632 ) 

633 ) 

634 return None 

635 case QuantumInfoStatus.WONKY: 

636 self.wonky_quanta.append(UnsuccessfulQuantumSummary._from_info(info)) 

637 return None 

638 case QuantumInfoStatus.BLOCKED: 

639 self.n_blocked += 1 

640 return None 

641 case QuantumInfoStatus.FAILED: 

642 failed_quantum_summary = UnsuccessfulQuantumSummary._from_info(info) 

643 future: concurrent.futures.Future[None] | None = None 

644 if log_getter: 

645 

646 def callback() -> None: 

647 for quantum_run in info["runs"].values(): 

648 try: 

649 log = log_getter(quantum_run.log_ref) 

650 except LookupError: 

651 failed_quantum_summary.messages.append( 

652 f"Logs not ingested for {quantum_run.log_ref!r}" 

653 ) 

654 except FileNotFoundError: 

655 failed_quantum_summary.messages.append( 

656 f"Logs missing or corrupt for {quantum_run.log_ref!r}" 

657 ) 

658 else: 

659 failed_quantum_summary.messages.extend( 

660 [record.message for record in log if record.levelno >= logging.ERROR] 

661 ) 

662 

663 future = executor.submit(callback) 

664 self.failed_quanta.append(failed_quantum_summary) 

665 return future 

666 case QuantumInfoStatus.UNKNOWN: 

667 self.n_unknown += 1 

668 return None 

669 case unrecognized_state: 

670 raise AssertionError(f"Unrecognized quantum status {unrecognized_state!r}") 

671 

672 def _add_data_id_group(self, other_summary: TaskSummary) -> None: 

673 """Add information from a `TaskSummary` over one dataquery-identified 

674 group to another, as part of aggregating `Summary` reports. 

675 

676 Parameters 

677 ---------- 

678 other_summary : `TaskSummary` 

679 `TaskSummary` to aggregate. 

680 """ 

681 self.n_successful += other_summary.n_successful 

682 self.n_blocked += other_summary.n_blocked 

683 self.n_unknown += other_summary.n_unknown 

684 self.n_expected += other_summary.n_expected 

685 for code in self.caveats.keys() | other_summary.caveats.keys(): 

686 self.caveats.setdefault(code, []).extend(other_summary.caveats.get(code, [])) 

687 for type_name in self.exceptions.keys() | other_summary.exceptions.keys(): 

688 self.exceptions.setdefault(type_name, []).extend(other_summary.exceptions.get(type_name, [])) 

689 self.wonky_quanta.extend(other_summary.wonky_quanta) 

690 self.recovered_quanta.extend(other_summary.recovered_quanta) 

691 self.failed_quanta.extend(other_summary.failed_quanta) 

692 

693 

694class CursedDatasetSummary(pydantic.BaseModel): 

695 """A summary of all the relevant information on a cursed dataset.""" 

696 

697 producer_data_id: dict[str, DataIdValue] 

698 """The data_id of the task which produced this dataset. This is mostly 

699 useful for people wishing to track down the task which produced this 

700 cursed dataset quickly. 

701 """ 

702 data_id: dict[str, DataIdValue] 

703 """The data_id of the cursed dataset. 

704 """ 

705 runs_produced: dict[str, bool] 

706 """A dictionary of all the runs associated with the cursed dataset; 

707 the `bool` is true if the dataset was produced in the associated run. 

708 """ 

709 run_visible: str | None 

710 """The run collection that holds the dataset that is visible in the final 

711 output collection. 

712 """ 

713 messages: list[str] 

714 """Any diagnostic messages (dictated in this module) which might help in 

715 understanding why or how the dataset became cursed. 

716 """ 

717 

718 @classmethod 

719 def _from_info(cls, info: DatasetInfo, producer_info: QuantumInfo) -> CursedDatasetSummary: 

720 """Summarize all relevant information from the `DatasetInfo` in an 

721 `CursedDatasetSummary`; return a `CursedDatasetSummary`. 

722 

723 Parameters 

724 ---------- 

725 info : `DatasetInfo` 

726 All relevant information on the dataset. 

727 producer_info : `QuantumInfo` 

728 All relevant information on the producer task. This is used to 

729 report the data_id of the producer task. 

730 

731 Returns 

732 ------- 

733 summary : `CursedDatasetSummary` 

734 A Pydantic model containing the dataID of the task which produced 

735 this cursed dataset, the dataID associated with the cursed dataset, 

736 run collection names (and their `DatasetRun` information) as well 

737 as any messages which may point to any clues about the nature of 

738 the problem. These are be messages generated during the assembly of 

739 the `QuantumProvenanceGraph` that describe why it was marked as 

740 cursed. 

741 """ 

742 runs_visible = {k for k, v in info["runs"].items() if v.visible} 

743 return cls( 

744 producer_data_id=dict(producer_info["data_id"].required), 

745 data_id=dict(info["data_id"].required), 

746 runs_produced={k: v.produced for k, v in info["runs"].items()}, 

747 # this has at most one element 

748 run_visible=runs_visible.pop() if runs_visible else None, 

749 messages=info["messages"], 

750 ) 

751 

752 

753class DatasetTypeSummary(pydantic.BaseModel): 

754 """A summary of the status of all datasets of a particular type across all 

755 runs. 

756 """ 

757 

758 producer: str = "" 

759 """The name of the task which produced this dataset. 

760 """ 

761 

762 n_visible: int = 0 

763 """A count of the datasets of this type which were visible in the 

764 finalized collection(s). 

765 """ 

766 n_shadowed: int = 0 

767 """A count of the datasets of this type which were produced but not 

768 visible. This includes any datasets which do not come up in a butler 

769 query over their associated collection. 

770 """ 

771 n_predicted_only: int = 0 

772 """A count of the datasets of this type which were predicted but 

773 ultimately not produced. Note that this does not indicate a failure, 

774 which are accounted for differently. This is commonly referred to as 

775 a `NoWorkFound` case. 

776 """ 

777 n_expected: int = 0 

778 """The number of datasets of this type expected by the graph. 

779 """ 

780 

781 @pydantic.computed_field # type: ignore[prop-decorator] 

782 @property 

783 def n_cursed(self) -> int: 

784 """Return a count of cursed datasets.""" 

785 return len(self.cursed_datasets) 

786 

787 @pydantic.computed_field # type: ignore[prop-decorator] 

788 @property 

789 def n_unsuccessful(self) -> int: 

790 """Return a count of unsuccessful datasets.""" 

791 return len(self.unsuccessful_datasets) 

792 

793 cursed_datasets: list[CursedDatasetSummary] = pydantic.Field(default_factory=list) 

794 """A list of all `CursedDatasetSummary` objects associated with the 

795 cursed datasets. This is a report containing their data_ids and the 

796 data_ids of their producer task, the status of each run associated with 

797 each `cursed` dataset, and messages (dictated in this module) associated 

798 with the particular issue identified. 

799 """ 

800 unsuccessful_datasets: list[dict[str, DataIdValue]] = pydantic.Field(default_factory=list) 

801 """A list of all unsuccessful datasets by their name and data_id. 

802 """ 

803 

804 def _add_dataset_info(self, info: DatasetInfo, producer_info: QuantumInfo) -> None: 

805 """Add a `DatasetInfo` to a `DatasetTypeSummary`. 

806 

807 Unpack the `DatasetInfo` object, sorting datasets of each status into 

808 the correct place in the `DatasetTypeSummary`. If the status of a 

809 dataset is not valid, raise an `AssertionError`. 

810 

811 Parameters 

812 ---------- 

813 info : `DatasetInfo` 

814 The `DatasetInfo` object to add to the `DatasetTypeSummary`. 

815 producer_info : `QuantumInfo` 

816 The `QuantumInfo` object associated with the producer of the 

817 dataset. This is used to report the producer task in the 

818 summaries for cursed datasets, which may help identify 

819 specific issues. 

820 """ 

821 match info["status"]: 

822 case DatasetInfoStatus.VISIBLE: 

823 self.n_visible += 1 

824 case DatasetInfoStatus.SHADOWED: 

825 self.n_shadowed += 1 

826 case DatasetInfoStatus.UNSUCCESSFUL: 

827 self.unsuccessful_datasets.append(dict(info["data_id"].mapping)) 

828 case DatasetInfoStatus.CURSED: 

829 self.cursed_datasets.append(CursedDatasetSummary._from_info(info, producer_info)) 

830 case DatasetInfoStatus.PREDICTED_ONLY: 

831 self.n_predicted_only += 1 

832 case unrecognized_state: 

833 raise AssertionError(f"Unrecognized dataset status {unrecognized_state!r}") 

834 

835 def _add_data_id_group(self, other_summary: DatasetTypeSummary) -> None: 

836 """Add information from a `DatasetTypeSummary` over one 

837 dataquery-identified group to another, as part of aggregating `Summary` 

838 reports. 

839 

840 Parameters 

841 ---------- 

842 other_summary : `DatasetTypeSummary` 

843 `DatasetTypeSummary` to aggregate. 

844 """ 

845 if self.producer and other_summary.producer: 

846 # Guard against empty string 

847 if self.producer != other_summary.producer: 

848 _LOG.warning( 

849 "Producer for dataset type is not consistent: %r != %r.", 

850 self.producer, 

851 other_summary.producer, 

852 ) 

853 _LOG.warning("Ignoring %r.", other_summary.producer) 

854 else: 

855 if other_summary.producer and not self.producer: 

856 self.producer = other_summary.producer 

857 

858 self.n_visible += other_summary.n_visible 

859 self.n_shadowed += other_summary.n_shadowed 

860 self.n_predicted_only += other_summary.n_predicted_only 

861 self.n_expected += other_summary.n_expected 

862 

863 self.cursed_datasets.extend(other_summary.cursed_datasets) 

864 self.unsuccessful_datasets.extend(other_summary.unsuccessful_datasets) 

865 

866 

867class Summary(pydantic.BaseModel): 

868 """A summary of the contents of the QuantumProvenanceGraph, including 

869 all information on the quanta for each task and the datasets of each 

870 `~lsst.daf.butler.DatasetType`. 

871 """ 

872 

873 tasks: dict[str, TaskSummary] = pydantic.Field(default_factory=dict) 

874 """Summaries for the tasks and their quanta. 

875 """ 

876 

877 datasets: dict[str, DatasetTypeSummary] = pydantic.Field(default_factory=dict) 

878 """Summaries for the datasets. 

879 """ 

880 

881 @classmethod 

882 def aggregate(cls, summaries: Sequence[Summary]) -> Summary: 

883 """Combine summaries from disjoint data id groups into an overall 

884 summary of common tasks and datasets. Intended for use when the same 

885 pipeline has been run over all groups. 

886 

887 Parameters 

888 ---------- 

889 summaries : `~collections.abc.Sequence` [`Summary`] 

890 Sequence of all `Summary` objects to aggregate. 

891 """ 

892 result = cls() 

893 for summary in summaries: 

894 for label, task_summary in summary.tasks.items(): 

895 result_task_summary = result.tasks.setdefault(label, TaskSummary()) 

896 result_task_summary._add_data_id_group(task_summary) 

897 for dataset_type, dataset_type_summary in summary.datasets.items(): 

898 result_dataset_summary = result.datasets.setdefault(dataset_type, DatasetTypeSummary()) 

899 result_dataset_summary._add_data_id_group(dataset_type_summary) 

900 return result 

901 

902 def pprint(self, brief: bool = False, datasets: bool = True) -> None: 

903 """Print this summary to stdout, as a series of tables. 

904 

905 Parameters 

906 ---------- 

907 brief : `bool`, optional 

908 If `True`, only display short (counts-only) tables. By default, 

909 per-data ID information for exceptions and failures are printed as 

910 well. 

911 datasets : `bool`, optional 

912 Whether to include tables of datasets as well as quanta. This 

913 includes a summary table of dataset counts for various status and 

914 (if ``brief`` is `True`) a table with per-data ID information for 

915 each unsuccessful or cursed dataset. 

916 """ 

917 self.make_quantum_table().pprint_all() 

918 print("") 

919 print("Caveat codes:") 

920 for k, v in QuantumSuccessCaveats.legend().items(): 

921 print(f"{k}: {v}") 

922 print("") 

923 if exception_table := self.make_exception_table(): 

924 exception_table.pprint_all() 

925 print("") 

926 if datasets: 

927 self.make_dataset_table().pprint_all() 

928 print("") 

929 if not brief: 

930 for task_label, bad_quantum_table in self.make_bad_quantum_tables().items(): 

931 print(f"{task_label} errors:") 

932 bad_quantum_table.pprint_all() 

933 print("") 

934 if datasets: 

935 for dataset_type_name, bad_dataset_table in self.make_bad_dataset_tables().items(): 

936 print(f"{dataset_type_name} errors:") 

937 bad_dataset_table.pprint_all() 

938 print("") 

939 

940 def make_quantum_table(self) -> astropy.table.Table: 

941 """Construct an `astropy.table.Table` with a tabular summary of the 

942 quanta. 

943 

944 Returns 

945 ------- 

946 table : `astropy.table.Table` 

947 A table view of the quantum information. This only includes 

948 counts of status categories and caveats, not any per-data-ID 

949 detail. 

950 

951 Notes 

952 ----- 

953 Success caveats in the table are represented by their 

954 `~QuantumSuccessCaveats.concise` form, so when pretty-printing this 

955 table for users, the `~QuantumSuccessCaveats.legend` should generally 

956 be printed as well. 

957 """ 

958 rows = [] 

959 for label, task_summary in self.tasks.items(): 

960 if len(task_summary.caveats) > 1: 

961 caveats = "(multiple)" 

962 elif len(task_summary.caveats) == 1: 

963 ((code, data_ids),) = task_summary.caveats.items() 

964 caveats = f"{code}({len(data_ids)})" 

965 else: 

966 caveats = "" 

967 rows.append( 

968 { 

969 "Task": label, 

970 "Unknown": task_summary.n_unknown, 

971 "Successful": task_summary.n_successful, 

972 "Caveats": caveats, 

973 "Blocked": task_summary.n_blocked, 

974 "Failed": task_summary.n_failed, 

975 "Wonky": task_summary.n_wonky, 

976 "TOTAL": sum( 

977 [ 

978 task_summary.n_successful, 

979 task_summary.n_unknown, 

980 task_summary.n_blocked, 

981 task_summary.n_failed, 

982 task_summary.n_wonky, 

983 ] 

984 ), 

985 "EXPECTED": task_summary.n_expected, 

986 } 

987 ) 

988 return astropy.table.Table(rows) 

989 

990 def make_dataset_table(self) -> astropy.table.Table: 

991 """Construct an `astropy.table.Table` with a tabular summary of the 

992 datasets. 

993 

994 Returns 

995 ------- 

996 table : `astropy.table.Table` 

997 A table view of the dataset information. This only includes 

998 counts of status categories, not any per-data-ID detail. 

999 """ 

1000 rows = [] 

1001 for dataset_type_name, dataset_type_summary in self.datasets.items(): 

1002 rows.append( 

1003 { 

1004 "Dataset": dataset_type_name, 

1005 "Visible": dataset_type_summary.n_visible, 

1006 "Shadowed": dataset_type_summary.n_shadowed, 

1007 "Predicted Only": dataset_type_summary.n_predicted_only, 

1008 "Unsuccessful": dataset_type_summary.n_unsuccessful, 

1009 "Cursed": dataset_type_summary.n_cursed, 

1010 "TOTAL": sum( 

1011 [ 

1012 dataset_type_summary.n_visible, 

1013 dataset_type_summary.n_shadowed, 

1014 dataset_type_summary.n_predicted_only, 

1015 dataset_type_summary.n_unsuccessful, 

1016 dataset_type_summary.n_cursed, 

1017 ] 

1018 ), 

1019 "EXPECTED": dataset_type_summary.n_expected, 

1020 } 

1021 ) 

1022 return astropy.table.Table(rows) 

1023 

1024 def make_exception_table(self) -> astropy.table.Table: 

1025 """Construct an `astropy.table.Table` with counts for each exception 

1026 type raised by each task. 

1027 

1028 At present this only includes information from partial-outputs-error 

1029 successes, since exception information for failures is not tracked. 

1030 This may change in the future. 

1031 

1032 Returns 

1033 ------- 

1034 table : `astropy.table.Table` 

1035 A table with columns for task label, exception type, and counts. 

1036 """ 

1037 rows = [] 

1038 for task_label, task_summary in self.tasks.items(): 

1039 for type_name, exception_summaries in task_summary.exceptions.items(): 

1040 rows.append({"Task": task_label, "Exception": type_name, "Count": len(exception_summaries)}) 

1041 return astropy.table.Table(rows) 

1042 

1043 def make_bad_quantum_tables(self, max_message_width: int = 80) -> dict[str, astropy.table.Table]: 

1044 """Construct an `astropy.table.Table` with per-data-ID information 

1045 about failed, wonky, and partial-outputs-error quanta. 

1046 

1047 Parameters 

1048 ---------- 

1049 max_message_width : `int`, optional 

1050 Maximum width for the Message column. Longer messages are 

1051 truncated. 

1052 

1053 Returns 

1054 ------- 

1055 tables : `dict` [ `str`, `astropy.table.Table` ] 

1056 A table for each task with status, data IDs, and log messages for 

1057 each unsuccessful quantum. Keys are task labels. Only task with 

1058 unsuccessful quanta or partial outputs errors are included. 

1059 """ 

1060 result = {} 

1061 for task_label, task_summary in self.tasks.items(): 

1062 rows = [] 

1063 for status, unsuccessful_quantum_summary in itertools.chain( 

1064 zip(itertools.repeat("FAILED"), task_summary.failed_quanta), 

1065 zip(itertools.repeat("WONKY"), task_summary.wonky_quanta), 

1066 ): 

1067 row = {"Status(Caveats)": status, "Exception": "", **unsuccessful_quantum_summary.data_id} 

1068 row["Message"] = ( 

1069 textwrap.shorten(unsuccessful_quantum_summary.messages[-1], max_message_width) 

1070 if unsuccessful_quantum_summary.messages 

1071 else "" 

1072 ) 

1073 rows.append(row) 

1074 for exception_summary in itertools.chain.from_iterable(task_summary.exceptions.values()): 

1075 # Trim off the package name from the exception type for 

1076 # brevity. 

1077 short_name: str = exception_summary.exception.type_name.rsplit(".", maxsplit=1)[-1] 

1078 row = { 

1079 "Status(Caveats)": "SUCCESSFUL(P)", # we only get exception info for partial outputs 

1080 "Exception": short_name, 

1081 **exception_summary.data_id, 

1082 "Message": textwrap.shorten(exception_summary.exception.message, max_message_width), 

1083 } 

1084 rows.append(row) 

1085 if rows: 

1086 table = astropy.table.Table(rows) 

1087 table.columns["Exception"].format = "<" 

1088 table.columns["Message"].format = "<" 

1089 result[task_label] = table 

1090 return result 

1091 

1092 def make_bad_dataset_tables(self, max_message_width: int = 80) -> dict[str, astropy.table.Table]: 

1093 """Construct an `astropy.table.Table` with per-data-ID information 

1094 about unsuccessful and cursed datasets. 

1095 

1096 Parameters 

1097 ---------- 

1098 max_message_width : `int`, optional 

1099 Maximum width for the Message column. Longer messages are 

1100 truncated. 

1101 

1102 Returns 

1103 ------- 

1104 tables : `dict` [ `str`, `astropy.table.Table` ] 

1105 A table for each task with status, data IDs, and log messages for 

1106 each unsuccessful quantum. Keys are task labels. Only task with 

1107 unsuccessful quanta are included. 

1108 """ 

1109 result = {} 

1110 for dataset_type_name, dataset_type_summary in self.datasets.items(): 

1111 rows = [] 

1112 for data_id in dataset_type_summary.unsuccessful_datasets: 

1113 row = {"Status": "UNSUCCESSFUL", **data_id, "Message": ""} 

1114 for cursed_dataset_summary in dataset_type_summary.cursed_datasets: 

1115 row = {"Status": "CURSED", **cursed_dataset_summary.data_id} 

1116 row["Message"] = ( 

1117 textwrap.shorten(cursed_dataset_summary.messages[-1], max_message_width) 

1118 if cursed_dataset_summary.messages 

1119 else "" 

1120 ) 

1121 rows.append(row) 

1122 if rows: 

1123 table = astropy.table.Table(rows) 

1124 table.columns["Message"].format = "<" 

1125 result[dataset_type_name] = table 

1126 return result 

1127 

1128 

1129class QuantumProvenanceGraph: 

1130 """A set of already-run, merged quantum graphs with provenance 

1131 information. 

1132 

1133 Parameters 

1134 ---------- 

1135 butler : `lsst.daf.butler.Butler` 

1136 The Butler used for this report. This should match the Butler used 

1137 for the run associated with the executed quantum graph. 

1138 qgraphs : `~collections.abc.Sequence` [`QuantumGraph` |\ 

1139 `~lsst.utils.resources.ResourcePathExpression`] 

1140 A list of either quantum graph objects or their uri's, to be used 

1141 to assemble the `QuantumProvenanceGraph`. 

1142 collections : `~collections.abc.Sequence` [`str`] | `None` 

1143 Collections to use in `lsst.daf.butler.query_datasets` when testing 

1144 which datasets are available at a high level. 

1145 where : `str` 

1146 A "where" string to use to constrain the datasets; should be provided 

1147 if ``collections`` includes many datasets that are not in any graphs, 

1148 to select just those that might be (e.g. when sharding over dimensions 

1149 and using a final collection that spans multiple shards). 

1150 curse_failed_logs : `bool` 

1151 Mark log datasets as CURSED if they are visible in the final output 

1152 collection. Note that a campaign-level collection must be used here for 

1153 `collections` if `curse_failed_logs` is `True`. 

1154 read_caveats : `str` or `None`, optional 

1155 Whether to read metadata files to get flags that describe qualified 

1156 successes. If `None`, no metadata files will be read and all 

1157 ``caveats`` fields will be `None`. If "exhaustive", all metadata files 

1158 will be read. If "lazy", only metadata files where at least one 

1159 predicted output is missing will be read. 

1160 use_qbb : `bool`, optional 

1161 If `True`, use a quantum-backed butler when reading metadata files. 

1162 Note that some butler database queries are still run even if this is 

1163 `True`; this does not avoid database access entirely. 

1164 n_cores : `int`, optional 

1165 Number of threads to use for parallelization. 

1166 """ 

1167 

1168 def __init__( 

1169 self, 

1170 butler: Butler | None = None, 

1171 qgraphs: Sequence[QuantumGraph | ResourcePathExpression] = (), 

1172 *, 

1173 collections: Sequence[str] | None = None, 

1174 where: str = "", 

1175 curse_failed_logs: bool = False, 

1176 read_caveats: Literal["lazy", "exhaustive"] | None = "lazy", 

1177 use_qbb: bool = True, 

1178 n_cores: int = 1, 

1179 ) -> None: 

1180 # The graph we annotate as we step through all the graphs associated 

1181 # with the processing to create the `QuantumProvenanceGraph`. 

1182 self._xgraph = networkx.DiGraph() 

1183 # The nodes representing quanta in `_xgraph` grouped by task label. 

1184 self._quanta: dict[str, set[QuantumKey]] = {} 

1185 # The nodes representing datasets in `_xgraph` grouped by dataset type 

1186 # name. 

1187 self._datasets: dict[str, set[DatasetKey]] = {} 

1188 # Bool representing whether the graph has been finalized. This is set 

1189 # to True when resolve_duplicates completes. 

1190 self._finalized: bool = False 

1191 # In order to both parallelize metadata/log reads and potentially use 

1192 # QBB to do it, we in general need one butler for each output_run and 

1193 # thread combination. This dict is keyed by the former, and the 

1194 # wrapper type used for the value handles the latter. 

1195 self._butler_wrappers: dict[str, _ThreadLocalButlerWrapper] = {} 

1196 if butler is not None: 

1197 self.assemble_quantum_provenance_graph( 

1198 butler, 

1199 qgraphs, 

1200 collections=collections, 

1201 where=where, 

1202 curse_failed_logs=curse_failed_logs, 

1203 read_caveats=read_caveats, 

1204 use_qbb=use_qbb, 

1205 n_cores=n_cores, 

1206 ) 

1207 elif qgraphs: 

1208 raise TypeError("'butler' must be provided if `qgraphs` is.") 

1209 

1210 @property 

1211 def quanta(self) -> Mapping[str, Set[QuantumKey]]: 

1212 """A mapping from task label to a set of keys for its quanta.""" 

1213 return self._quanta 

1214 

1215 @property 

1216 def datasets(self) -> Mapping[str, Set[DatasetKey]]: 

1217 """A mapping from dataset type name to a set of keys for datasets.""" 

1218 return self._datasets 

1219 

1220 def get_quantum_info(self, key: QuantumKey) -> QuantumInfo: 

1221 """Get a `QuantumInfo` object from the `QuantumProvenanceGraph` using 

1222 a `QuantumKey`. 

1223 

1224 Parameters 

1225 ---------- 

1226 key : `QuantumKey` 

1227 The key used to refer to the node on the graph. 

1228 

1229 Returns 

1230 ------- 

1231 quantum_info : `QuantumInfo` 

1232 The `TypedDict` with information on the task label-dataID pair 

1233 across all runs. 

1234 """ 

1235 return self._xgraph.nodes[key] 

1236 

1237 def get_dataset_info(self, key: DatasetKey) -> DatasetInfo: 

1238 """Get a `DatasetInfo` object from the `QuantumProvenanceGraph` using 

1239 a `DatasetKey`. 

1240 

1241 Parameters 

1242 ---------- 

1243 key : `DatasetKey` 

1244 The key used to refer to the node on the graph. 

1245 

1246 Returns 

1247 ------- 

1248 dataset_info : `DatasetInfo` 

1249 The `TypedDict` with information about the 

1250 `~lsst.daf.butler.DatasetType`-dataID pair across all runs. 

1251 """ 

1252 return self._xgraph.nodes[key] 

1253 

1254 def to_summary( 

1255 self, butler: Butler | None = None, do_store_logs: bool = True, n_cores: int = 1 

1256 ) -> Summary: 

1257 """Summarize the `QuantumProvenanceGraph`. 

1258 

1259 Parameters 

1260 ---------- 

1261 butler : `lsst.daf.butler.Butler`, optional 

1262 Ignored; accepted for backwards compatibility. 

1263 do_store_logs : `bool` 

1264 Store the logs in the summary dictionary. 

1265 n_cores : `int`, optional 

1266 Number of cores to use. 

1267 

1268 Returns 

1269 ------- 

1270 result : `Summary` 

1271 A struct containing counts of quanta and datasets in each of 

1272 the overall states defined in `QuantumInfo` and `DatasetInfo`, 

1273 as well as diagnostic information and error messages for failed 

1274 quanta and strange edge cases, and a list of recovered quanta. 

1275 """ 

1276 status_log = PeriodicLogger(_LOG) 

1277 if not self._finalized: 

1278 raise RuntimeError( 

1279 """resolve_duplicates must be called to finalize the 

1280 QuantumProvenanceGraph before making a summary.""" 

1281 ) 

1282 result = Summary() 

1283 futures: list[concurrent.futures.Future[None]] = [] 

1284 _LOG.verbose("Summarizing %s tasks.", len(self._quanta.keys())) 

1285 with concurrent.futures.ThreadPoolExecutor(n_cores) as executor: 

1286 for m, (task_label, quanta) in enumerate(self._quanta.items()): 

1287 task_summary = TaskSummary() 

1288 task_summary.n_expected = len(quanta) 

1289 for n, quantum_key in enumerate(quanta): 

1290 quantum_info = self.get_quantum_info(quantum_key) 

1291 future = task_summary._add_quantum_info( 

1292 quantum_info, 

1293 log_getter=self._butler_get if do_store_logs else None, 

1294 executor=executor, 

1295 ) 

1296 if future is not None: 

1297 futures.append(future) 

1298 status_log.log( 

1299 "Summarized %s of %s quanta of task %s of %s.", 

1300 n + 1, 

1301 len(quanta), 

1302 m + 1, 

1303 len(self._quanta.keys()), 

1304 ) 

1305 result.tasks[task_label] = task_summary 

1306 for n, future in enumerate(concurrent.futures.as_completed(futures)): 

1307 if (err := future.exception()) is not None: 

1308 raise err 

1309 status_log.log("Loaded messages from %s of %s log datasets.", n + 1, len(futures)) 

1310 _LOG.verbose("Summarizing %s dataset types.", len(self._datasets.keys())) 

1311 for m, (dataset_type_name, datasets) in enumerate(self._datasets.items()): 

1312 dataset_type_summary = DatasetTypeSummary(producer="") 

1313 dataset_type_summary.n_expected = len(datasets) 

1314 for n, dataset_key in enumerate(datasets): 

1315 dataset_info = self.get_dataset_info(dataset_key) 

1316 producer_key = self.get_producer_of(dataset_key) 

1317 producer_info = self.get_quantum_info(producer_key) 

1318 # Not ideal, but hard to get out of the graph at the moment. 

1319 # Change after DM-40441 

1320 dataset_type_summary.producer = producer_key.task_label 

1321 dataset_type_summary._add_dataset_info(dataset_info, producer_info) 

1322 status_log.log( 

1323 "Summarized %s of %s datasets of type %s of %s.", 

1324 n + 1, 

1325 len(datasets), 

1326 m + 1, 

1327 len(self._datasets.keys()), 

1328 ) 

1329 result.datasets[dataset_type_name] = dataset_type_summary 

1330 return result 

1331 

1332 def iter_outputs_of(self, quantum_key: QuantumKey) -> Iterator[DatasetKey]: 

1333 """Iterate through the outputs of a quantum, yielding the keys of 

1334 all of the datasets produced by the quantum. 

1335 

1336 Parameters 

1337 ---------- 

1338 quantum_key : `QuantumKey` 

1339 The key for the quantum whose outputs are needed. 

1340 """ 

1341 yield from self._xgraph.successors(quantum_key) 

1342 

1343 def get_producer_of(self, dataset_key: DatasetKey) -> QuantumKey: 

1344 """Unpack the predecessor (producer quantum) of a given dataset key 

1345 from a graph. 

1346 

1347 Parameters 

1348 ---------- 

1349 dataset_key : `DatasetKey` 

1350 The key for the dataset whose producer quantum is needed. 

1351 

1352 Returns 

1353 ------- 

1354 result : `QuantumKey` 

1355 The key for the quantum which produced the dataset. 

1356 """ 

1357 (result,) = self._xgraph.predecessors(dataset_key) 

1358 return result 

1359 

1360 def iter_downstream( 

1361 self, key: QuantumKey | DatasetKey 

1362 ) -> Iterator[tuple[QuantumKey, QuantumInfo] | tuple[DatasetKey, DatasetInfo]]: 

1363 """Iterate over the quanta and datasets that are downstream of a 

1364 quantum or dataset. 

1365 

1366 Parameters 

1367 ---------- 

1368 key : `QuantumKey` or `DatasetKey` 

1369 Starting node. 

1370 

1371 Returns 

1372 ------- 

1373 iter : `~collections.abc.Iterator` [ `tuple` ] 

1374 An iterator over pairs of (`QuantumKey`, `QuantumInfo`) or 

1375 (`DatasetKey`, `DatasetInfo`). 

1376 """ 

1377 for key in networkx.dag.descendants(self._xgraph, key): 

1378 yield (key, self._xgraph.nodes[key]) # type: ignore 

1379 

1380 def assemble_quantum_provenance_graph( 

1381 self, 

1382 butler: Butler, 

1383 qgraphs: Sequence[QuantumGraph | ResourcePathExpression], 

1384 collections: Sequence[str] | None = None, 

1385 where: str = "", 

1386 curse_failed_logs: bool = False, 

1387 read_caveats: Literal["lazy", "exhaustive"] | None = "lazy", 

1388 use_qbb: bool = True, 

1389 n_cores: int = 1, 

1390 ) -> None: 

1391 """Assemble the quantum provenance graph from a list of all graphs 

1392 corresponding to processing attempts. 

1393 

1394 Parameters 

1395 ---------- 

1396 butler : `lsst.daf.butler.Butler` 

1397 The Butler used for this report. This should match the Butler used 

1398 for the run associated with the executed quantum graph. 

1399 qgraphs : `~collections.abc.Sequence` [`QuantumGraph` |\ 

1400 `~lsst.utils.resources.ResourcePathExpression`] 

1401 A list of either quantum graph objects or their uri's, to be used 

1402 to assemble the `QuantumProvenanceGraph`. 

1403 collections : `~collections.abc.Sequence` [`str`] | `None` 

1404 Collections to use in `lsst.daf.butler.query_datasets` when testing 

1405 which datasets are available at a high level. 

1406 where : `str` 

1407 A "where" string to use to constrain the datasets; should be 

1408 provided if ``collections`` includes many datasets that are not in 

1409 any graphs, to select just those that might be (e.g. when sharding 

1410 over dimensions and using a final collection that spans multiple 

1411 shards). 

1412 curse_failed_logs : `bool` 

1413 Mark log datasets as CURSED if they are visible in the final 

1414 output collection. Note that a campaign-level collection must be 

1415 used here for `collections` if `curse_failed_logs` is `True`. 

1416 read_caveats : `str` or `None`, optional 

1417 Whether to read metadata files to get flags that describe qualified 

1418 successes. If `None`, no metadata files will be read and all 

1419 ``caveats`` fields will be `None`. If "exhaustive", all 

1420 metadata files will be read. If "lazy", only metadata files where 

1421 at least one predicted output is missing will be read. 

1422 use_qbb : `bool`, optional 

1423 If `True`, use a quantum-backed butler when reading metadata files. 

1424 Note that some butler database queries are still run even if this 

1425 is `True`; this does not avoid database access entirely. 

1426 n_cores : `int`, optional 

1427 Number of threads to use for parallelization. 

1428 """ 

1429 if read_caveats not in ("lazy", "exhaustive", None): 

1430 raise TypeError( 

1431 f"Invalid option {read_caveats!r} for read_caveats; should be 'lazy', 'exhaustive', or None." 

1432 ) 

1433 output_runs = [] 

1434 last_time: datetime.datetime | None = None 

1435 for graph in qgraphs: 

1436 if not isinstance(graph, QuantumGraph): 

1437 _LOG.verbose("Loading quantum graph %r.", graph) 

1438 qgraph = QuantumGraph.loadUri(graph) 

1439 else: 

1440 qgraph = graph 

1441 assert qgraph.metadata is not None, "Saved QGs always have metadata." 

1442 self._add_new_graph(butler, qgraph, read_caveats=read_caveats, use_qbb=use_qbb, n_cores=n_cores) 

1443 output_runs.append(qgraph.metadata["output_run"]) 

1444 if last_time is not None and last_time > qgraph.metadata["time"]: 

1445 raise RuntimeError("Quantum graphs must be passed in chronological order.") 

1446 last_time = qgraph.metadata["time"] 

1447 if not collections: 

1448 # We reverse the order of the associated output runs because the 

1449 # query in _resolve_duplicates must be done most-recent first. 

1450 collections = list(reversed(output_runs)) 

1451 assert not curse_failed_logs, ( 

1452 "curse_failed_logs option must be used with one campaign-level collection." 

1453 ) 

1454 self._resolve_duplicates(butler, collections, where, curse_failed_logs) 

1455 

1456 def _add_new_graph( 

1457 self, 

1458 butler: Butler, 

1459 qgraph: QuantumGraph, 

1460 read_caveats: Literal["lazy", "exhaustive"] | None, 

1461 use_qbb: bool = True, 

1462 n_cores: int = 1, 

1463 ) -> None: 

1464 """Add a new quantum graph to the `QuantumProvenanceGraph`. 

1465 

1466 Parameters 

1467 ---------- 

1468 butler : `lsst.daf.butler.Butler` 

1469 The Butler used for this report. This should match the Butler 

1470 used for the run associated with the executed quantum graph. 

1471 qgraph : `QuantumGraph` 

1472 The quantum graph object to add. 

1473 read_caveats : `str` or `None` 

1474 Whether to read metadata files to get flags that describe qualified 

1475 successes. If `None`, no metadata files will be read and all 

1476 ``caveats`` fields will be `None`. If "exhaustive", all 

1477 metadata files will be read. If "lazy", only metadata files where 

1478 at least one predicted output is missing will be read. 

1479 use_qbb : `bool`, optional 

1480 If `True`, use a quantum-backed butler when reading metadata files. 

1481 Note that some butler database queries are still run even if this 

1482 is `True`; this does not avoid database access entirely. 

1483 n_cores : `int`, optional 

1484 Number of threads to use for parallelization. 

1485 """ 

1486 status_log = PeriodicLogger(_LOG) 

1487 output_run = qgraph.metadata["output_run"] 

1488 # Add QuantumRun and DatasetRun (and nodes/edges, as needed) to the 

1489 # QPG for all quanta in the QG. 

1490 _LOG.verbose("Adding output run to provenance graph.") 

1491 new_quanta: list[QuantumKey] = [] 

1492 for n, node in enumerate(qgraph): 

1493 new_quanta.append(self._add_new_quantum(node, output_run)) 

1494 status_log.log("Added nodes for %s of %s quanta.", n + 1, len(qgraph)) 

1495 # Query for datasets in the output run to see which ones were actually 

1496 # produced. 

1497 _LOG.verbose("Querying for existence for %s dataset types.", len(self._datasets.keys())) 

1498 for m, dataset_type_name in enumerate(self._datasets): 

1499 try: 

1500 refs = butler.query_datasets( 

1501 dataset_type_name, collections=output_run, explain=False, limit=None 

1502 ) 

1503 except MissingDatasetTypeError: 

1504 continue 

1505 for n, ref in enumerate(refs): 

1506 dataset_key = DatasetKey(ref.datasetType.name, ref.dataId.required_values) 

1507 dataset_info = self.get_dataset_info(dataset_key) 

1508 dataset_run = dataset_info["runs"][output_run] # dataset run (singular) 

1509 dataset_run.produced = True 

1510 status_log.log( 

1511 "Updated status for %s of %s datasets of %s of %s types.", 

1512 n + 1, 

1513 len(refs), 

1514 m + 1, 

1515 len(self._datasets.keys()), 

1516 ) 

1517 if use_qbb: 

1518 provenance_graph_ref: DatasetRef | None = None 

1519 try: 

1520 provenance_graph_ref = butler.find_dataset( 

1521 PROVENANCE_DATASET_TYPE_NAME, collections=output_run 

1522 ) 

1523 except MissingDatasetTypeError: 

1524 pass 

1525 if provenance_graph_ref is not None: 

1526 _LOG.warning( 

1527 "Cannot use QBB for metadata/log reads after provenance has been ingested; " 

1528 "falling back to full butler." 

1529 ) 

1530 self._butler_wrappers[output_run] = _ThreadLocalButlerWrapper.wrap_full(butler) 

1531 else: 

1532 _LOG.verbose("Using quantum-backed butler for metadata loads.") 

1533 self._butler_wrappers[output_run] = _ThreadLocalButlerWrapper.wrap_qbb(butler, qgraph) 

1534 else: 

1535 _LOG.verbose("Using full butler for metadata loads.") 

1536 self._butler_wrappers[output_run] = _ThreadLocalButlerWrapper.wrap_full(butler) 

1537 

1538 _LOG.verbose("Setting quantum status from dataset existence.") 

1539 # Update quantum status information based on which datasets were 

1540 # produced. 

1541 blocked: set[DatasetKey] = set() # the outputs of failed or blocked quanta in this run. 

1542 with concurrent.futures.ThreadPoolExecutor(n_cores) as executor: 

1543 futures: list[concurrent.futures.Future[None]] = [] 

1544 for n, quantum_key in enumerate(new_quanta): 

1545 if ( 

1546 self._update_run_status(quantum_key, output_run, blocked) == QuantumRunStatus.SUCCESSFUL 

1547 and read_caveats is not None 

1548 ): 

1549 self._update_caveats(quantum_key, output_run, read_caveats, executor, futures) 

1550 self._update_info_status(quantum_key, output_run) 

1551 status_log.log("Updated status for %s of %s quanta.", n + 1, len(new_quanta)) 

1552 for n, future in enumerate(concurrent.futures.as_completed(futures)): 

1553 if (err := future.exception()) is not None: 

1554 raise err 

1555 status_log.log("Added exception/caveat information for %s of %s quanta.", n + 1, len(futures)) 

1556 

1557 def _add_new_quantum(self, node: QuantumNode, output_run: str) -> QuantumKey: 

1558 """Add a quantum from a new quantum graph to the provenance graph. 

1559 

1560 Parameters 

1561 ---------- 

1562 node : `QuantumNode` 

1563 Node in the quantum graph. 

1564 output_run : `str` 

1565 Output run collection. 

1566 

1567 Returns 

1568 ------- 

1569 quantum_key : `QuantumKey` 

1570 Key for the new or existing node in the provenance graph. 

1571 

1572 Notes 

1573 ----- 

1574 This method adds new quantum and dataset nodes to the provenance graph 

1575 if they don't already exist, while adding new `QuantumRun` and 

1576 `DatasetRun` objects to both new and existing nodes. All status 

1577 information on those nodes is set to initial, default values that 

1578 generally reflect quanta that have not been attempted to be run. 

1579 """ 

1580 # make a key to refer to the quantum and add it to the quantum 

1581 # provenance graph. 

1582 quantum_key = QuantumKey( 

1583 node.taskDef.label, cast(DataCoordinate, node.quantum.dataId).required_values 

1584 ) 

1585 self._xgraph.add_node(quantum_key) 

1586 # use the key to get a `QuantumInfo` object for the quantum 

1587 # and set defaults for its values. 

1588 quantum_info = self.get_quantum_info(quantum_key) 

1589 quantum_info.setdefault("messages", []) 

1590 quantum_info.setdefault("runs", {}) 

1591 quantum_info.setdefault("data_id", cast(DataCoordinate, node.quantum.dataId)) 

1592 quantum_info.setdefault("status", QuantumInfoStatus.UNKNOWN) 

1593 quantum_info.setdefault("recovered", False) 

1594 self._quanta.setdefault(quantum_key.task_label, set()).add(quantum_key) 

1595 metadata_ref = node.quantum.outputs[METADATA_OUTPUT_TEMPLATE.format(label=node.taskDef.label)][0] 

1596 log_ref = node.quantum.outputs[LOG_OUTPUT_TEMPLATE.format(label=node.taskDef.label)][0] 

1597 # associate run collections with specific quanta. this is important 

1598 # if the same quanta are processed in multiple runs as in recovery 

1599 # workflows. 

1600 quantum_runs = quantum_info.setdefault("runs", {}) 

1601 # the `QuantumRun` here is the specific quantum-run collection 

1602 # combination. 

1603 quantum_runs[output_run] = QuantumRun(id=node.nodeId, metadata_ref=metadata_ref, log_ref=log_ref) 

1604 # For each of the outputs of the quanta (datasets) make a key to 

1605 # refer to the dataset. 

1606 for ref in itertools.chain.from_iterable(node.quantum.outputs.values()): 

1607 dataset_key = DatasetKey(ref.datasetType.name, ref.dataId.required_values) 

1608 # add datasets to the nodes of the graph, with edges on the 

1609 # quanta. 

1610 self._xgraph.add_edge(quantum_key, dataset_key) 

1611 # use the dataset key to make a `DatasetInfo` object for 

1612 # the dataset and set defaults for its values. 

1613 dataset_info = self.get_dataset_info(dataset_key) 

1614 dataset_info.setdefault("data_id", ref.dataId) 

1615 dataset_info.setdefault("status", DatasetInfoStatus.PREDICTED_ONLY) 

1616 dataset_info.setdefault("messages", []) 

1617 self._datasets.setdefault(dataset_key.dataset_type_name, set()).add(dataset_key) 

1618 dataset_runs = dataset_info.setdefault("runs", {}) 

1619 # make a `DatasetRun` for the specific dataset-run 

1620 # collection combination. 

1621 dataset_runs[output_run] = DatasetRun(id=ref.id) 

1622 # save metadata and logs for easier status interpretation later 

1623 if dataset_key.dataset_type_name.endswith(METADATA_OUTPUT_CONNECTION_NAME): 

1624 quantum_info["metadata"] = dataset_key 

1625 quantum_runs[output_run].metadata_ref = ref 

1626 if dataset_key.dataset_type_name.endswith(LOG_OUTPUT_CONNECTION_NAME): 

1627 quantum_info["log"] = dataset_key 

1628 quantum_runs[output_run].log_ref = ref 

1629 for ref in itertools.chain.from_iterable(node.quantum.inputs.values()): 

1630 dataset_key = DatasetKey(ref.datasetType.nameAndComponent()[0], ref.dataId.required_values) 

1631 if dataset_key in self._xgraph: 

1632 # add another edge if the input datasetType and quantum are 

1633 # in the graph 

1634 self._xgraph.add_edge(dataset_key, quantum_key) 

1635 return quantum_key 

1636 

1637 def _update_run_status( 

1638 self, quantum_key: QuantumKey, output_run: str, blocked: set[DatasetKey] 

1639 ) -> QuantumRunStatus: 

1640 """Update the status of this quantum in its own output run, using 

1641 information in the graph about which of its output datasets exist. 

1642 

1643 Parameters 

1644 ---------- 

1645 quantum_key : `QuantumKey` 

1646 Key for the node in the provenance graph. 

1647 output_run : `str` 

1648 Output run collection. 

1649 blocked : `set` [ `DatasetKey` ] 

1650 A set of output datasets (for all quanta, not just this one) that 

1651 were blocked by failures. Will be modified in place. 

1652 

1653 Returns 

1654 ------- 

1655 run_status : `QuantumRunStatus` 

1656 Run-specific status for this quantum. 

1657 """ 

1658 quantum_info = self.get_quantum_info(quantum_key) 

1659 quantum_run = quantum_info["runs"][output_run] 

1660 metadata_key = quantum_info["metadata"] 

1661 log_key = quantum_info["log"] 

1662 metadata_dataset_run = self.get_dataset_info(metadata_key)["runs"][output_run] 

1663 log_dataset_run = self.get_dataset_info(log_key)["runs"][output_run] 

1664 # if we do have metadata, we know that the task finished. 

1665 if metadata_dataset_run.produced: 

1666 # if we also have logs, this is a success. 

1667 if log_dataset_run.produced: 

1668 quantum_run.status = QuantumRunStatus.SUCCESSFUL 

1669 else: 

1670 # if we have metadata and no logs, this is a very rare 

1671 # case. either the task ran successfully and the datastore 

1672 # died immediately afterwards, or some supporting 

1673 # infrastructure for transferring the logs to the datastore 

1674 # failed. 

1675 quantum_run.status = QuantumRunStatus.LOGS_MISSING 

1676 

1677 # missing metadata means that the task did not finish. 

1678 else: 

1679 # if we have logs and no metadata, the task not finishing is 

1680 # a failure in the task itself. This includes all payload 

1681 # errors and some other problems. 

1682 if log_dataset_run.produced: 

1683 quantum_run.status = QuantumRunStatus.FAILED 

1684 # if a quantum fails, all its successor datasets are 

1685 # blocked. 

1686 blocked.update(self._xgraph.successors(quantum_key)) 

1687 # if we are missing metadata and logs, either the task was not 

1688 # started, or a hard external environmental error prevented 

1689 # it from writing logs or metadata. 

1690 else: 

1691 # if none of this quantum's inputs were blocked, the 

1692 # metadata must just be missing. 

1693 if blocked.isdisjoint(self._xgraph.predecessors(quantum_key)): 

1694 # None of this quantum's inputs were blocked. 

1695 quantum_run.status = QuantumRunStatus.METADATA_MISSING 

1696 # otherwise we can assume from no metadata and no logs 

1697 # that the task was blocked by an upstream failure. 

1698 else: 

1699 quantum_run.status = QuantumRunStatus.BLOCKED 

1700 blocked.update(self._xgraph.successors(quantum_key)) 

1701 return quantum_run.status 

1702 

1703 def _update_info_status(self, quantum_key: QuantumKey, output_run: str) -> QuantumInfoStatus: 

1704 """Update the status of this quantum across all runs with the status 

1705 for its latest run. 

1706 

1707 Parameters 

1708 ---------- 

1709 quantum_key : `QuantumKey` 

1710 Key for the node in the provenance graph. 

1711 output_run : `str` 

1712 Output run collection. 

1713 

1714 Returns 

1715 ------- 

1716 info_status : `QuantumRunStatus` 

1717 Run-specific status for this quantum. 

1718 """ 

1719 # Now we can start using state transitions to mark overall status. 

1720 quantum_info = self.get_quantum_info(quantum_key) 

1721 quantum_run = quantum_info["runs"][output_run] 

1722 last_status = quantum_info["status"] 

1723 new_status: QuantumInfoStatus 

1724 match last_status, quantum_run.status: 

1725 # A quantum can never escape a WONKY state. 

1726 case (QuantumInfoStatus.WONKY, _): 

1727 new_status = QuantumInfoStatus.WONKY 

1728 # Any transition to a success (excluding from WONKY) is 

1729 # a success; any transition from a failed state is also a 

1730 # recovery. 

1731 case (_, QuantumRunStatus.SUCCESSFUL): 

1732 new_status = QuantumInfoStatus.SUCCESSFUL 

1733 if last_status != QuantumInfoStatus.SUCCESSFUL and last_status != QuantumInfoStatus.UNKNOWN: 

1734 quantum_info["recovered"] = True 

1735 # Missing logs are one of the categories of wonky quanta. They 

1736 # interfere with our ability to discern quantum status and are 

1737 # signs of weird things afoot in processing. Add a message 

1738 # noting why this quantum is being marked as wonky to be stored 

1739 # in its `UnsuccessfulQuantumInfo`. 

1740 case (_, QuantumRunStatus.LOGS_MISSING): 

1741 new_status = QuantumInfoStatus.WONKY 

1742 quantum_info["messages"].append(f"Logs missing for run {output_run!r}.") 

1743 # Leaving a successful state is another category of wonky 

1744 # quanta. If a previous success fails on a subsequent run, 

1745 # a human should inspect why. Add a message noting why this 

1746 # quantum is being marked as wonky to be stored in its 

1747 # `UnsuccessfulQuantumInfo`. 

1748 case (QuantumInfoStatus.SUCCESSFUL, _): 

1749 new_status = QuantumInfoStatus.WONKY 

1750 quantum_info["messages"].append( 

1751 f"Status went from successful in run {list(quantum_info['runs'].values())[-1]!r} " 

1752 f"to {quantum_run.status!r} in {output_run!r}." 

1753 ) 

1754 # If a quantum status is unknown and it moves to blocked, we 

1755 # know for sure that it is a blocked quantum. 

1756 case (QuantumInfoStatus.UNKNOWN, QuantumRunStatus.BLOCKED): 

1757 new_status = QuantumInfoStatus.BLOCKED 

1758 # A transition into blocked does not change the overall quantum 

1759 # status for a failure. 

1760 case (_, QuantumRunStatus.BLOCKED): 

1761 new_status = last_status 

1762 # If a quantum transitions from any state into missing 

1763 # metadata, we don't have enough information to diagnose its 

1764 # state. 

1765 case (_, QuantumRunStatus.METADATA_MISSING): 

1766 new_status = QuantumInfoStatus.UNKNOWN 

1767 # Any transition into failure is a failed state. 

1768 case (_, QuantumRunStatus.FAILED): 

1769 new_status = QuantumInfoStatus.FAILED 

1770 # Update `QuantumInfo.status` for this quantum. 

1771 quantum_info["status"] = new_status 

1772 return new_status 

1773 

1774 def _update_caveats( 

1775 self, 

1776 quantum_key: QuantumKey, 

1777 output_run: str, 

1778 read_caveats: Literal["lazy", "exhaustive"], 

1779 executor: concurrent.futures.Executor, 

1780 futures: list[concurrent.futures.Future[None]], 

1781 ) -> None: 

1782 """Read quantum success caveats and exception information from task 

1783 metadata. 

1784 

1785 Parameters 

1786 ---------- 

1787 quantum_key : `QuantumKey` 

1788 Key for the node in the provenance graph. 

1789 output_run : `str` 

1790 Output run collection. 

1791 read_caveats : `str` 

1792 Whether to read metadata files to get flags that describe qualified 

1793 successes. If "exhaustive", all metadata files will be read. If 

1794 "lazy", only metadata files where at least one predicted output is 

1795 missing will be read. 

1796 executor : `concurrent.futures.Executor` 

1797 The futures executor to use. 

1798 futures : `list` [ `concurrent.futures.Future` ] 

1799 Current list of futures. Will be modified. 

1800 """ 

1801 if read_caveats == "lazy" and all( 

1802 self.get_dataset_info(dataset_key)["runs"][output_run].produced 

1803 for dataset_key in self._xgraph.successors(quantum_key) 

1804 ): 

1805 return 

1806 quantum_info = self.get_quantum_info(quantum_key) 

1807 quantum_run = quantum_info["runs"][output_run] 

1808 

1809 def read_metadata() -> None: 

1810 md = self._butler_get(quantum_run.metadata_ref, storageClass=METADATA_OUTPUT_STORAGE_CLASS) 

1811 try: 

1812 # Int conversion guards against spurious conversion to 

1813 # float that can apparently sometimes happen in 

1814 # TaskMetadata. 

1815 quantum_run.caveats = QuantumSuccessCaveats(int(md["quantum"]["caveats"])) 

1816 except LookupError: 

1817 pass 

1818 try: 

1819 quantum_run.exception = ExceptionInfo._from_metadata(md[quantum_key.task_label]["failure"]) 

1820 except LookupError: 

1821 pass 

1822 

1823 futures.append(executor.submit(read_metadata)) 

1824 

1825 def _resolve_duplicates( 

1826 self, 

1827 butler: Butler, 

1828 collections: Sequence[str] | None = None, 

1829 where: str = "", 

1830 curse_failed_logs: bool = False, 

1831 ) -> None: 

1832 """After quantum graphs associated with each run have been added 

1833 to the `QuantumProvenanceGraph, resolve any discrepancies between 

1834 them and use all attempts to finalize overall status. 

1835 

1836 Particularly, use the state of each `DatasetRun` in combination with 

1837 overall quantum status to ascertain the status of each dataset. 

1838 Additionally, if there are multiple visible runs associated with a 

1839 dataset, mark the producer quantum as WONKY. 

1840 

1841 This method should be called after 

1842 `QuantumProvenanceGraph._add_new_graph` has been called on every graph 

1843 associated with the data processing. 

1844 

1845 Parameters 

1846 ---------- 

1847 butler : `lsst.daf.butler.Butler` 

1848 The Butler used for this report. This should match the Butler used 

1849 for the run associated with the executed quantum graph. 

1850 collections : `~collections.abc.Sequence` [`str`] | `None` 

1851 Collections to use in `lsst.daf.butler.query_datasets` when testing 

1852 which datasets are available at a high level. 

1853 where : `str` 

1854 A "where" string to use to constrain the datasets; should be 

1855 provided if ``collections`` includes many datasets that are not in 

1856 any graphs, to select just those that might be (e.g. when sharding 

1857 over dimensions and using a final collection that spans multiple 

1858 shards). 

1859 curse_failed_logs : `bool` 

1860 Mark log datasets as CURSED if they are visible in the final 

1861 output collection. Note that a campaign-level collection must be 

1862 used here for `collections` if `curse_failed_logs` is `True`; if 

1863 `_resolve_duplicates` is run on a list of group-level collections 

1864 then each will only show log datasets from their own failures as 

1865 visible and datasets from others will be marked as cursed. 

1866 """ 

1867 # First thing: raise an error if resolve_duplicates has been run 

1868 # before on this qpg. 

1869 if self._finalized: 

1870 raise RuntimeError( 

1871 """resolve_duplicates may only be called on a 

1872 QuantumProvenanceGraph once. Call only after all graphs have 

1873 been added, or make a new graph with all constituent 

1874 attempts.""" 

1875 ) 

1876 status_log = PeriodicLogger(_LOG) 

1877 _LOG.verbose("Querying for dataset visibility.") 

1878 for m, dataset_type_name in enumerate(self._datasets): 

1879 # find datasets in a larger collection. 

1880 try: 

1881 refs = butler.query_datasets( 

1882 dataset_type_name, collections=collections, where=where, limit=None, explain=False 

1883 ) 

1884 except MissingDatasetTypeError: 

1885 continue 

1886 for n, ref in enumerate(refs): 

1887 dataset_key = DatasetKey(ref.datasetType.name, ref.dataId.required_values) 

1888 try: 

1889 dataset_info = self.get_dataset_info(dataset_key) 

1890 # Ignore if we don't actually have the dataset in any of the 

1891 # graphs given. 

1892 except KeyError: 

1893 continue 

1894 # queryable datasets are `visible`. 

1895 dataset_info["runs"][ref.run].visible = True 

1896 status_log.log( 

1897 "Updated visibility for %s of %s datasets of type %s of %s.", 

1898 n + 1, 

1899 len(refs), 

1900 m + 1, 

1901 len(self._datasets.keys()), 

1902 ) 

1903 _LOG.verbose("Updating task status from dataset visibility.") 

1904 for m, task_quanta in enumerate(self._quanta.values()): 

1905 for n, quantum_key in enumerate(task_quanta): 

1906 # runs associated with visible datasets. 

1907 visible_runs: set[str] = set() 

1908 quantum_info = self.get_quantum_info(quantum_key) 

1909 # Loop over each dataset in the outputs of a single quantum. 

1910 for dataset_key in self.iter_outputs_of(quantum_key): 

1911 dataset_info = self.get_dataset_info(dataset_key) 

1912 dataset_type_name = dataset_key.dataset_type_name 

1913 visible_runs.update( 

1914 run for run, dataset_run in dataset_info["runs"].items() if dataset_run.visible 

1915 ) 

1916 if any(dataset_run.visible for dataset_run in dataset_info["runs"].values()): 

1917 query_state = "visible" 

1918 # set the publish state to `shadowed` if the dataset was 

1919 # produced but not visible (i.e., not queryable from the 

1920 # final collection(s)). 

1921 elif any(dataset_run.produced for dataset_run in dataset_info["runs"].values()): 

1922 query_state = "shadowed" 

1923 # a dataset which was not produced and not visible is 

1924 # missing. 

1925 else: 

1926 query_state = "missing" 

1927 # use the quantum status and publish state to ascertain the 

1928 # status of the dataset. 

1929 match (quantum_info["status"], query_state): 

1930 # visible datasets from successful quanta are as 

1931 # intended. 

1932 case (QuantumInfoStatus.SUCCESSFUL, "visible"): 

1933 dataset_info["status"] = DatasetInfoStatus.VISIBLE 

1934 # missing datasets from successful quanta indicate a 

1935 # `NoWorkFound` case. 

1936 case (QuantumInfoStatus.SUCCESSFUL, "missing"): 

1937 dataset_info["status"] = DatasetInfoStatus.PREDICTED_ONLY 

1938 case (QuantumInfoStatus.SUCCESSFUL, "shadowed"): 

1939 dataset_info["status"] = DatasetInfoStatus.SHADOWED 

1940 # If anything other than a successful quantum produces 

1941 # a visible dataset, that dataset is cursed. Set the 

1942 # status for the dataset to cursed and note the reason 

1943 # for labeling the dataset as cursed. 

1944 case (_, "visible"): 

1945 # Avoiding publishing failed logs is difficult 

1946 # without using tagged collections, so flag them as 

1947 # merely unsuccessful unless the user requests it. 

1948 if ( 

1949 dataset_type_name.endswith(LOG_OUTPUT_CONNECTION_NAME) 

1950 and not curse_failed_logs 

1951 ): 

1952 dataset_info["status"] = DatasetInfoStatus.UNSUCCESSFUL 

1953 else: 

1954 dataset_info["status"] = DatasetInfoStatus.CURSED 

1955 dataset_info["messages"].append( 

1956 f"Unsuccessful dataset {dataset_type_name} visible in " 

1957 "final output collection." 

1958 ) 

1959 # any other produced dataset (produced but not 

1960 # visible and not successful) is a regular 

1961 # failure. 

1962 case _: 

1963 dataset_info["status"] = DatasetInfoStatus.UNSUCCESSFUL 

1964 if len(visible_runs) > 1: 

1965 quantum_info["status"] = QuantumInfoStatus.WONKY 

1966 quantum_info["messages"].append( 

1967 f"Outputs from different runs of the same quanta were visible: {visible_runs}." 

1968 ) 

1969 for dataset_key in self.iter_outputs_of(quantum_key): 

1970 dataset_info = self.get_dataset_info(dataset_key) 

1971 quantum_info["messages"].append( 

1972 f"{dataset_key.dataset_type_name}" 

1973 + f"from {str(dataset_info['runs'])};" 

1974 + f"{str(dataset_info['status'])}" 

1975 ) 

1976 status_log.log( 

1977 "Updated task status from visibility for %s of %s quanta of task %s of %s.", 

1978 n + 1, 

1979 len(task_quanta), 

1980 m + 1, 

1981 len(self._quanta.keys()), 

1982 ) 

1983 # If we make it all the way through resolve_duplicates, set 

1984 # self._finalized = True so that it cannot be run again. 

1985 self._finalized = True 

1986 

1987 def _butler_get(self, ref: DatasetRef, **kwargs: Any) -> Any: 

1988 return self._butler_wrappers[ref.run].butler.get(ref, **kwargs) 

1989 

1990 

1991class _ThreadLocalButlerWrapper: 

1992 """A wrapper for a thread-local limited butler. 

1993 

1994 Parameter 

1995 --------- 

1996 factory : `~collections.abc.Callable` 

1997 A callable that takes no arguments and returns a limited butler. 

1998 """ 

1999 

2000 def __init__(self, factory: Callable[[], LimitedButler]): 

2001 self._factory = factory 

2002 self._thread_local = threading.local() 

2003 

2004 @classmethod 

2005 def wrap_qbb(cls, full_butler: Butler, qg: QuantumGraph) -> _ThreadLocalButlerWrapper: 

2006 """Wrap a `~lsst.daf.butler.QuantumBackedButler` suitable for reading 

2007 log and metadata files. 

2008 

2009 Parameters 

2010 ---------- 

2011 full_butler : `~lsst.daf.butler.Butler` 

2012 Full butler to draw datastore and dimension configuration from. 

2013 qg : `QuantumGraph` 

2014 Quantum graph. 

2015 

2016 Returns 

2017 ------- 

2018 wrapper : `_ThreadLocalButlerWrapper` 

2019 A wrapper that provides access to a thread-local QBB, constructing] 

2020 it on first use. 

2021 """ 

2022 dataset_ids = [] 

2023 for task_label in qg.pipeline_graph.tasks.keys(): 

2024 for quantum in qg.get_task_quanta(task_label).values(): 

2025 dataset_ids.append(quantum.outputs[LOG_OUTPUT_TEMPLATE.format(label=task_label)][0].id) 

2026 dataset_ids.append(quantum.outputs[METADATA_OUTPUT_TEMPLATE.format(label=task_label)][0].id) 

2027 try: 

2028 butler_config = full_butler._config # type: ignore[attr-defined] 

2029 except AttributeError: 

2030 raise RuntimeError("use_qbb=True requires a direct butler.") from None 

2031 factory = _QuantumBackedButlerFactory( 

2032 butler_config, 

2033 dataset_ids, 

2034 full_butler.dimensions, 

2035 dataset_types={dt.name: dt for dt in qg.registryDatasetTypes()}, 

2036 ) 

2037 return cls(factory) 

2038 

2039 @classmethod 

2040 def wrap_full(cls, full_butler: Butler) -> _ThreadLocalButlerWrapper: 

2041 """Wrap a full `~lsst.daf.butler.Butler`. 

2042 

2043 Parameters 

2044 ---------- 

2045 full_butler : `~lsst.daf.butler.Butler` 

2046 Full butler to clone when making thread-local copies. 

2047 

2048 Returns 

2049 ------- 

2050 wrapper : `_ThreadLocalButlerWrapper` 

2051 A wrapper that provides access to a thread-local butler, 

2052 constructing it on first use. 

2053 """ 

2054 return cls(full_butler.clone) 

2055 

2056 @property 

2057 def butler(self) -> LimitedButler: 

2058 """The wrapped butler, constructed on first use within each thread.""" 

2059 if (butler := getattr(self._thread_local, "butler", None)) is None: 

2060 self._thread_local.butler = self._factory() 

2061 butler = self._thread_local.butler 

2062 return butler 

2063 

2064 

2065@dataclasses.dataclass 

2066class _QuantumBackedButlerFactory: 

2067 """A factory for `~lsst.daf.butler.QuantumBackedButler`, for use by 

2068 `_ThreadLocalButlerWrapper`. 

2069 """ 

2070 

2071 config: ButlerConfig 

2072 dataset_ids: list[DatasetId] 

2073 universe: DimensionUniverse 

2074 dataset_types: dict[str, DatasetType] 

2075 

2076 def __call__(self) -> QuantumBackedButler: 

2077 return QuantumBackedButler.from_predicted( 

2078 self.config, 

2079 predicted_inputs=self.dataset_ids, 

2080 predicted_outputs=[], 

2081 dimensions=self.universe, 

2082 # We don't need the datastore records in the QG because we're 

2083 # only going to read metadata and logs, and those are never 

2084 # overall inputs. 

2085 datastore_records={}, 

2086 dataset_types=self.dataset_types, 

2087 ) 

2088 

2089 

2090def _cli() -> None: 

2091 import argparse 

2092 

2093 from .pipeline_graph.visualization import ( 

2094 QuantumProvenanceGraphStatusAnnotator, 

2095 QuantumProvenanceGraphStatusOptions, 

2096 show, 

2097 ) 

2098 

2099 parser = argparse.ArgumentParser( 

2100 "QuantumProvenanceGraph command-line utilities.", 

2101 description=( 

2102 "This is a small, low-effort debugging utility. " 

2103 "It may disappear at any time in favor of a public 'pipetask' interface." 

2104 ), 

2105 ) 

2106 subparsers = parser.add_subparsers(dest="cmd") 

2107 

2108 pprint_parser = subparsers.add_parser("pprint", help="Print a saved summary as a series of tables.") 

2109 pprint_parser.add_argument("file", type=argparse.FileType("r"), help="Saved summary JSON file.") 

2110 pprint_parser.add_argument( 

2111 "--brief", 

2112 action=argparse.BooleanOptionalAction, 

2113 default=False, 

2114 help="Whether to print per-data ID information.", 

2115 ) 

2116 pprint_parser.add_argument( 

2117 "--datasets", 

2118 action=argparse.BooleanOptionalAction, 

2119 default=True, 

2120 ) 

2121 

2122 xgraph_parser = subparsers.add_parser("xgraph", help="Print a visual representation of a saved xgraph.") 

2123 xgraph_parser.add_argument("file", type=argparse.FileType("r"), help="Saved summary JSON file.") 

2124 xgraph_parser.add_argument("qgraph", type=str, help="Saved quantum graph file.") 

2125 

2126 args = parser.parse_args() 

2127 

2128 match args.cmd: 

2129 case "pprint": 

2130 summary = Summary.model_validate_json(args.file.read()) 

2131 args.file.close() 

2132 summary.pprint(brief=args.brief, datasets=args.datasets) 

2133 case "xgraph": 

2134 summary = Summary.model_validate_json(args.file.read()) 

2135 args.file.close() 

2136 status_annotator = QuantumProvenanceGraphStatusAnnotator(summary) 

2137 status_options = QuantumProvenanceGraphStatusOptions( 

2138 display_percent=True, display_counts=True, abbreviate=True, visualize=True 

2139 ) 

2140 qgraph = QuantumGraph.loadUri(args.qgraph) 

2141 pipeline_graph = qgraph.pipeline_graph 

2142 show( 

2143 pipeline_graph, 

2144 dataset_types=True, 

2145 status_annotator=status_annotator, 

2146 status_options=status_options, 

2147 ) 

2148 case _: 

2149 raise AssertionError(f"Unhandled subcommand {args.dest}.") 

2150 

2151 

2152if __name__ == "__main__": 

2153 _cli()