Coverage for python / lsst / pipe / base / quantum_graph / _provenance.py: 27%

700 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-06 08:32 +0000

1# This file is part of pipe_base. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28from __future__ import annotations 

29 

30__all__ = ( 

31 "ProvenanceDatasetInfo", 

32 "ProvenanceDatasetModel", 

33 "ProvenanceInitQuantumInfo", 

34 "ProvenanceInitQuantumModel", 

35 "ProvenanceLogRecordsModel", 

36 "ProvenanceQuantumGraph", 

37 "ProvenanceQuantumGraphReader", 

38 "ProvenanceQuantumGraphWriter", 

39 "ProvenanceQuantumInfo", 

40 "ProvenanceQuantumModel", 

41 "ProvenanceQuantumReport", 

42 "ProvenanceQuantumScanData", 

43 "ProvenanceQuantumScanModels", 

44 "ProvenanceQuantumScanStatus", 

45 "ProvenanceReport", 

46 "ProvenanceTaskMetadataModel", 

47) 

48 

49import dataclasses 

50import enum 

51import itertools 

52import sys 

53import uuid 

54from collections import Counter 

55from collections.abc import Callable, Iterable, Iterator, Mapping 

56from contextlib import ExitStack, contextmanager 

57from typing import TYPE_CHECKING, Any, TypedDict 

58 

59import astropy.table 

60import networkx 

61import numpy as np 

62import pydantic 

63 

64from lsst.daf.butler import Butler, DataCoordinate 

65from lsst.daf.butler.logging import ButlerLogRecord, ButlerLogRecords 

66from lsst.resources import ResourcePath, ResourcePathExpression 

67from lsst.utils.iteration import ensure_iterable 

68from lsst.utils.logging import LsstLogAdapter, getLogger 

69from lsst.utils.packages import Packages 

70 

71from .. import automatic_connection_constants as acc 

72from .._status import ExceptionInfo, QuantumAttemptStatus, QuantumSuccessCaveats 

73from .._task_metadata import TaskMetadata 

74from ..log_capture import _ExecutionLogRecordsExtra 

75from ..log_on_close import LogOnClose 

76from ..pipeline_graph import PipelineGraph, TaskImportMode, TaskInitNode 

77from ..resource_usage import QuantumResourceUsage 

78from ._common import ( 

79 BaseQuantumGraph, 

80 BaseQuantumGraphReader, 

81 BaseQuantumGraphWriter, 

82 ConnectionName, 

83 DataCoordinateValues, 

84 DatasetInfo, 

85 DatasetTypeName, 

86 HeaderModel, 

87 QuantumInfo, 

88 TaskLabel, 

89) 

90from ._multiblock import Compressor, MultiblockReader, MultiblockWriter 

91from ._predicted import ( 

92 PredictedDatasetModel, 

93 PredictedQuantumDatasetsModel, 

94 PredictedQuantumGraph, 

95 PredictedQuantumGraphComponents, 

96) 

97 

98# Sphinx needs imports for type annotations of base class members. 

99if "sphinx" in sys.modules: 

100 import zipfile # noqa: F401 

101 

102 from ._multiblock import AddressReader, Decompressor # noqa: F401 

103 

104 

105type LoopWrapper[T] = Callable[[Iterable[T]], Iterable[T]] 

106 

107_LOG = getLogger(__file__) 

108 

109DATASET_ADDRESS_INDEX = 0 

110QUANTUM_ADDRESS_INDEX = 1 

111LOG_ADDRESS_INDEX = 2 

112METADATA_ADDRESS_INDEX = 3 

113 

114DATASET_MB_NAME = "datasets" 

115QUANTUM_MB_NAME = "quanta" 

116LOG_MB_NAME = "logs" 

117METADATA_MB_NAME = "metadata" 

118 

119 

120def pass_through[T](arg: T) -> T: 

121 return arg 

122 

123 

124class ProvenanceDatasetInfo(DatasetInfo): 

125 """A typed dictionary that annotates the attributes of the NetworkX graph 

126 node data for a provenance dataset. 

127 

128 Since NetworkX types are not generic over their node mapping type, this has 

129 to be used explicitly, e.g.:: 

130 

131 node_data: ProvenanceDatasetInfo = xgraph.nodes[dataset_id] 

132 

133 where ``xgraph`` is `ProvenanceQuantumGraph.bipartite_xgraph`. 

134 """ 

135 

136 dataset_id: uuid.UUID 

137 """Unique identifier for the dataset.""" 

138 

139 produced: bool 

140 """Whether this dataset was produced (vs. only predicted). 

141 

142 This is always `True` for overall input datasets. It is also `True` for 

143 datasets that were produced and then removed before/during transfer back to 

144 the central butler repository, so it may not reflect the continued 

145 existence of the dataset. 

146 """ 

147 

148 

149class ProvenanceQuantumInfo(QuantumInfo): 

150 """A typed dictionary that annotates the attributes of the NetworkX graph 

151 node data for a provenance quantum. 

152 

153 Since NetworkX types are not generic over their node mapping type, this has 

154 to be used explicitly, e.g.:: 

155 

156 node_data: ProvenanceQuantumInfo = xgraph.nodes[quantum_id] 

157 

158 where ``xgraph`` is `ProvenanceQuantumGraph.bipartite_xgraph` or 

159 `ProvenanceQuantumGraph.quantum_only_xgraph` 

160 """ 

161 

162 status: QuantumAttemptStatus 

163 """Enumerated status for the quantum. 

164 

165 This corresponds to the last attempt to run this quantum, or 

166 `QuantumAttemptStatus.BLOCKED` if there were no attempts. 

167 """ 

168 

169 caveats: QuantumSuccessCaveats | None 

170 """Flags indicating caveats on successful quanta. 

171 

172 This corresponds to the last attempt to run this quantum. 

173 """ 

174 

175 exception: ExceptionInfo | None 

176 """Information about an exception raised when the quantum was executing. 

177 

178 This corresponds to the last attempt to run this quantum. 

179 """ 

180 

181 resource_usage: QuantumResourceUsage | None 

182 """Resource usage information (timing, memory use) for this quantum. 

183 

184 This corresponds to the last attempt to run this quantum. 

185 """ 

186 

187 attempts: list[ProvenanceQuantumAttemptModel] 

188 """Information about each attempt to run this quantum. 

189 

190 An entry is added merely if the quantum *should* have been attempted; an 

191 empty `list` is used only for quanta that were blocked by an upstream 

192 failure. 

193 """ 

194 

195 metadata_id: uuid.UUID 

196 """ID of this quantum's metadata dataset.""" 

197 

198 log_id: uuid.UUID 

199 """ID of this quantum's log dataset.""" 

200 

201 

202class ProvenanceInitQuantumInfo(TypedDict): 

203 """A typed dictionary that annotates the attributes of the NetworkX graph 

204 node data for a provenance init quantum. 

205 

206 Since NetworkX types are not generic over their node mapping type, this has 

207 to be used explicitly, e.g.:: 

208 

209 node_data: ProvenanceInitQuantumInfo = xgraph.nodes[quantum_id] 

210 

211 where ``xgraph`` is `ProvenanceQuantumGraph.bipartite_xgraph`. 

212 """ 

213 

214 data_id: DataCoordinate 

215 """Data ID of the quantum. 

216 

217 This is always an empty ID; this key exists to allow init-quanta and 

218 regular quanta to be treated more similarly. 

219 """ 

220 

221 task_label: str 

222 """Label of the task for this quantum.""" 

223 

224 pipeline_node: TaskInitNode 

225 """Node in the pipeline graph for this task's init-only step.""" 

226 

227 config_id: uuid.UUID 

228 """ID of this task's config dataset.""" 

229 

230 

231class ProvenanceDatasetModel(PredictedDatasetModel): 

232 """Data model for the datasets in a provenance quantum graph file.""" 

233 

234 produced: bool 

235 """Whether this dataset was produced (vs. only predicted). 

236 

237 This is always `True` for overall input datasets. It is also `True` for 

238 datasets that were produced and then removed before/during transfer back to 

239 the central butler repository, so it may not reflect the continued 

240 existence of the dataset. 

241 """ 

242 

243 producer: uuid.UUID | None = None 

244 """ID of the quantum that produced this dataset. 

245 

246 This is `None` for overall inputs to the graph. 

247 """ 

248 

249 consumers: list[uuid.UUID] = pydantic.Field(default_factory=list) 

250 """IDs of quanta that were predicted to consume this dataset.""" 

251 

252 @property 

253 def node_id(self) -> uuid.UUID: 

254 """Alias for the dataset ID.""" 

255 return self.dataset_id 

256 

257 @classmethod 

258 def from_predicted( 

259 cls, 

260 predicted: PredictedDatasetModel, 

261 producer: uuid.UUID | None = None, 

262 consumers: Iterable[uuid.UUID] = (), 

263 ) -> ProvenanceDatasetModel: 

264 """Construct from a predicted dataset model. 

265 

266 Parameters 

267 ---------- 

268 predicted : `PredictedDatasetModel` 

269 Information about the dataset from the predicted graph. 

270 producer : `uuid.UUID` or `None`, optional 

271 ID of the quantum that was predicted to produce this dataset. 

272 consumers : `~collections.abc.Iterable` [`uuid.UUID`], optional 

273 IDs of the quanta that were predicted to consume this dataset. 

274 

275 Returns 

276 ------- 

277 provenance : `ProvenanceDatasetModel` 

278 Provenance dataset model. 

279 

280 Notes 

281 ----- 

282 This initializes `produced` to `True` when ``producer is None`` and 

283 `False` otherwise, on the assumption that it will be updated later. 

284 """ 

285 return cls.model_construct( 

286 dataset_id=predicted.dataset_id, 

287 dataset_type_name=predicted.dataset_type_name, 

288 data_coordinate=predicted.data_coordinate, 

289 run=predicted.run, 

290 produced=(producer is None), # if it's not produced by this QG, it's an overall input 

291 producer=producer, 

292 consumers=list(consumers), 

293 ) 

294 

295 def _add_to_graph(self, graph: ProvenanceQuantumGraph) -> None: 

296 """Add this dataset and its edges to quanta to a provenance graph. 

297 

298 Parameters 

299 ---------- 

300 graph : `ProvenanceQuantumGraph` 

301 Graph to update in place. 

302 

303 Notes 

304 ----- 

305 This method adds: 

306 

307 - a ``bipartite_xgraph`` dataset node with full attributes; 

308 - ``bipartite_xgraph`` edges to adjacent quanta (which adds quantum 

309 nodes with no attributes), without populating edge attributes; 

310 - ``quantum_only_xgraph`` edges for each pair of quanta in which one 

311 produces this dataset and another consumes it (this also adds quantum 

312 nodes with no attributes). 

313 """ 

314 dataset_type_node = graph.pipeline_graph.dataset_types[self.dataset_type_name] 

315 data_id = DataCoordinate.from_full_values(dataset_type_node.dimensions, tuple(self.data_coordinate)) 

316 graph._bipartite_xgraph.add_node( 

317 self.dataset_id, 

318 data_id=data_id, 

319 dataset_type_name=self.dataset_type_name, 

320 pipeline_node=dataset_type_node, 

321 run=self.run, 

322 produced=self.produced, 

323 ) 

324 if self.producer is not None: 

325 graph._bipartite_xgraph.add_edge(self.producer, self.dataset_id) 

326 for consumer_id in self.consumers: 

327 graph._bipartite_xgraph.add_edge(self.dataset_id, consumer_id) 

328 if self.producer is not None: 

329 graph._quantum_only_xgraph.add_edge(self.producer, consumer_id) 

330 graph._datasets_by_type[self.dataset_type_name][data_id] = self.dataset_id 

331 

332 # Work around the fact that Sphinx chokes on Pydantic docstring formatting, 

333 # when we inherit those docstrings in our public classes. 

334 if "sphinx" in sys.modules and not TYPE_CHECKING: 

335 

336 def copy(self, *args: Any, **kwargs: Any) -> Any: 

337 """See `pydantic.BaseModel.copy`.""" 

338 return super().copy(*args, **kwargs) 

339 

340 def model_dump(self, *args: Any, **kwargs: Any) -> Any: 

341 """See `pydantic.BaseModel.model_dump`.""" 

342 return super().model_dump(*args, **kwargs) 

343 

344 def model_dump_json(self, *args: Any, **kwargs: Any) -> Any: 

345 """See `pydantic.BaseModel.model_dump_json`.""" 

346 return super().model_dump(*args, **kwargs) 

347 

348 def model_copy(self, *args: Any, **kwargs: Any) -> Any: 

349 """See `pydantic.BaseModel.model_copy`.""" 

350 return super().model_copy(*args, **kwargs) 

351 

352 @classmethod 

353 def model_construct(cls, *args: Any, **kwargs: Any) -> Any: # type: ignore[misc, override] 

354 """See `pydantic.BaseModel.model_construct`.""" 

355 return super().model_construct(*args, **kwargs) 

356 

357 @classmethod 

358 def model_json_schema(cls, *args: Any, **kwargs: Any) -> Any: 

359 """See `pydantic.BaseModel.model_json_schema`.""" 

360 return super().model_json_schema(*args, **kwargs) 

361 

362 @classmethod 

363 def model_validate(cls, *args: Any, **kwargs: Any) -> Any: 

364 """See `pydantic.BaseModel.model_validate`.""" 

365 return super().model_validate(*args, **kwargs) 

366 

367 @classmethod 

368 def model_validate_json(cls, *args: Any, **kwargs: Any) -> Any: 

369 """See `pydantic.BaseModel.model_validate_json`.""" 

370 return super().model_validate_json(*args, **kwargs) 

371 

372 @classmethod 

373 def model_validate_strings(cls, *args: Any, **kwargs: Any) -> Any: 

374 """See `pydantic.BaseModel.model_validate_strings`.""" 

375 return super().model_validate_strings(*args, **kwargs) 

376 

377 

378class ProvenanceQuantumAttemptModel(pydantic.BaseModel): 

379 """Data model for a now-superseded attempt to run a quantum in a 

380 provenance quantum graph file. 

381 """ 

382 

383 attempt: int = 0 

384 """Counter incremented for every attempt to execute this quantum.""" 

385 

386 status: QuantumAttemptStatus = QuantumAttemptStatus.UNKNOWN 

387 """Enumerated status for the quantum.""" 

388 

389 caveats: QuantumSuccessCaveats | None = None 

390 """Flags indicating caveats on successful quanta.""" 

391 

392 exception: ExceptionInfo | None = None 

393 """Information about an exception raised when the quantum was executing.""" 

394 

395 resource_usage: QuantumResourceUsage | None = None 

396 """Resource usage information (timing, memory use) for this quantum.""" 

397 

398 previous_process_quanta: list[uuid.UUID] = pydantic.Field(default_factory=list) 

399 """The IDs of other quanta previously executed in the same process as this 

400 one. 

401 """ 

402 

403 # Work around the fact that Sphinx chokes on Pydantic docstring formatting, 

404 # when we inherit those docstrings in our public classes. 

405 if "sphinx" in sys.modules and not TYPE_CHECKING: 

406 

407 def copy(self, *args: Any, **kwargs: Any) -> Any: 

408 """See `pydantic.BaseModel.copy`.""" 

409 return super().copy(*args, **kwargs) 

410 

411 def model_dump(self, *args: Any, **kwargs: Any) -> Any: 

412 """See `pydantic.BaseModel.model_dump`.""" 

413 return super().model_dump(*args, **kwargs) 

414 

415 def model_dump_json(self, *args: Any, **kwargs: Any) -> Any: 

416 """See `pydantic.BaseModel.model_dump_json`.""" 

417 return super().model_dump(*args, **kwargs) 

418 

419 def model_copy(self, *args: Any, **kwargs: Any) -> Any: 

420 """See `pydantic.BaseModel.model_copy`.""" 

421 return super().model_copy(*args, **kwargs) 

422 

423 @classmethod 

424 def model_construct(cls, *args: Any, **kwargs: Any) -> Any: # type: ignore[misc, override] 

425 """See `pydantic.BaseModel.model_construct`.""" 

426 return super().model_construct(*args, **kwargs) 

427 

428 @classmethod 

429 def model_json_schema(cls, *args: Any, **kwargs: Any) -> Any: 

430 """See `pydantic.BaseModel.model_json_schema`.""" 

431 return super().model_json_schema(*args, **kwargs) 

432 

433 @classmethod 

434 def model_validate(cls, *args: Any, **kwargs: Any) -> Any: 

435 """See `pydantic.BaseModel.model_validate`.""" 

436 return super().model_validate(*args, **kwargs) 

437 

438 @classmethod 

439 def model_validate_json(cls, *args: Any, **kwargs: Any) -> Any: 

440 """See `pydantic.BaseModel.model_validate_json`.""" 

441 return super().model_validate_json(*args, **kwargs) 

442 

443 @classmethod 

444 def model_validate_strings(cls, *args: Any, **kwargs: Any) -> Any: 

445 """See `pydantic.BaseModel.model_validate_strings`.""" 

446 return super().model_validate_strings(*args, **kwargs) 

447 

448 

449class ProvenanceLogRecordsModel(pydantic.BaseModel): 

450 """Data model for storing execution logs in a provenance quantum graph 

451 file. 

452 """ 

453 

454 attempts: list[list[ButlerLogRecord] | None] = pydantic.Field(default_factory=list) 

455 """Logs from attempts to run this task, ordered chronologically from first 

456 to last. 

457 """ 

458 

459 # Work around the fact that Sphinx chokes on Pydantic docstring formatting, 

460 # when we inherit those docstrings in our public classes. 

461 if "sphinx" in sys.modules and not TYPE_CHECKING: 

462 

463 def copy(self, *args: Any, **kwargs: Any) -> Any: 

464 """See `pydantic.BaseModel.copy`.""" 

465 return super().copy(*args, **kwargs) 

466 

467 def model_dump(self, *args: Any, **kwargs: Any) -> Any: 

468 """See `pydantic.BaseModel.model_dump`.""" 

469 return super().model_dump(*args, **kwargs) 

470 

471 def model_dump_json(self, *args: Any, **kwargs: Any) -> Any: 

472 """See `pydantic.BaseModel.model_dump_json`.""" 

473 return super().model_dump(*args, **kwargs) 

474 

475 def model_copy(self, *args: Any, **kwargs: Any) -> Any: 

476 """See `pydantic.BaseModel.model_copy`.""" 

477 return super().model_copy(*args, **kwargs) 

478 

479 @classmethod 

480 def model_construct(cls, *args: Any, **kwargs: Any) -> Any: # type: ignore[misc, override] 

481 """See `pydantic.BaseModel.model_construct`.""" 

482 return super().model_construct(*args, **kwargs) 

483 

484 @classmethod 

485 def model_json_schema(cls, *args: Any, **kwargs: Any) -> Any: 

486 """See `pydantic.BaseModel.model_json_schema`.""" 

487 return super().model_json_schema(*args, **kwargs) 

488 

489 @classmethod 

490 def model_validate(cls, *args: Any, **kwargs: Any) -> Any: 

491 """See `pydantic.BaseModel.model_validate`.""" 

492 return super().model_validate(*args, **kwargs) 

493 

494 @classmethod 

495 def model_validate_json(cls, *args: Any, **kwargs: Any) -> Any: 

496 """See `pydantic.BaseModel.model_validate_json`.""" 

497 return super().model_validate_json(*args, **kwargs) 

498 

499 @classmethod 

500 def model_validate_strings(cls, *args: Any, **kwargs: Any) -> Any: 

501 """See `pydantic.BaseModel.model_validate_strings`.""" 

502 return super().model_validate_strings(*args, **kwargs) 

503 

504 

505class ProvenanceTaskMetadataModel(pydantic.BaseModel): 

506 """Data model for storing task metadata in a provenance quantum graph 

507 file. 

508 """ 

509 

510 # We want to convert infs and nans to constants, not null. Unfortunately 

511 # the fact that TaskMetadata _also_ sets this is ignored when that model 

512 # is nested here. 

513 model_config = pydantic.ConfigDict(ser_json_inf_nan="constants") 

514 

515 attempts: list[TaskMetadata | None] = pydantic.Field(default_factory=list) 

516 """Metadata from attempts to run this task, ordered chronologically from 

517 first to last. 

518 """ 

519 

520 # Work around the fact that Sphinx chokes on Pydantic docstring formatting, 

521 # when we inherit those docstrings in our public classes. 

522 if "sphinx" in sys.modules and not TYPE_CHECKING: 

523 

524 def copy(self, *args: Any, **kwargs: Any) -> Any: 

525 """See `pydantic.BaseModel.copy`.""" 

526 return super().copy(*args, **kwargs) 

527 

528 def model_dump(self, *args: Any, **kwargs: Any) -> Any: 

529 """See `pydantic.BaseModel.model_dump`.""" 

530 return super().model_dump(*args, **kwargs) 

531 

532 def model_dump_json(self, *args: Any, **kwargs: Any) -> Any: 

533 """See `pydantic.BaseModel.model_dump_json`.""" 

534 return super().model_dump(*args, **kwargs) 

535 

536 def model_copy(self, *args: Any, **kwargs: Any) -> Any: 

537 """See `pydantic.BaseModel.model_copy`.""" 

538 return super().model_copy(*args, **kwargs) 

539 

540 @classmethod 

541 def model_construct(cls, *args: Any, **kwargs: Any) -> Any: # type: ignore[misc, override] 

542 """See `pydantic.BaseModel.model_construct`.""" 

543 return super().model_construct(*args, **kwargs) 

544 

545 @classmethod 

546 def model_json_schema(cls, *args: Any, **kwargs: Any) -> Any: 

547 """See `pydantic.BaseModel.model_json_schema`.""" 

548 return super().model_json_schema(*args, **kwargs) 

549 

550 @classmethod 

551 def model_validate(cls, *args: Any, **kwargs: Any) -> Any: 

552 """See `pydantic.BaseModel.model_validate`.""" 

553 return super().model_validate(*args, **kwargs) 

554 

555 @classmethod 

556 def model_validate_json(cls, *args: Any, **kwargs: Any) -> Any: 

557 """See `pydantic.BaseModel.model_validate_json`.""" 

558 return super().model_validate_json(*args, **kwargs) 

559 

560 @classmethod 

561 def model_validate_strings(cls, *args: Any, **kwargs: Any) -> Any: 

562 """See `pydantic.BaseModel.model_validate_strings`.""" 

563 return super().model_validate_strings(*args, **kwargs) 

564 

565 

566class ProvenanceQuantumReport(pydantic.BaseModel): 

567 """A Pydantic model that used to report information about a single 

568 (generally problematic) quantum. 

569 """ 

570 

571 quantum_id: uuid.UUID 

572 data_id: dict[str, int | str] 

573 attempts: list[ProvenanceQuantumAttemptModel] 

574 

575 @classmethod 

576 def from_info(cls, quantum_id: uuid.UUID, quantum_info: ProvenanceQuantumInfo) -> ProvenanceQuantumReport: 

577 """Construct from a provenance quantum graph node. 

578 

579 Parameters 

580 ---------- 

581 quantum_id : `uuid.UUID` 

582 Unique ID for the quantum. 

583 quantum_info : `ProvenanceQuantumInfo` 

584 Node attributes for this quantum. 

585 """ 

586 return cls( 

587 quantum_id=quantum_id, 

588 data_id=dict(quantum_info["data_id"].mapping), 

589 attempts=quantum_info["attempts"], 

590 ) 

591 

592 # Work around the fact that Sphinx chokes on Pydantic docstring formatting, 

593 # when we inherit those docstrings in our public classes. 

594 if "sphinx" in sys.modules and not TYPE_CHECKING: 

595 

596 def copy(self, *args: Any, **kwargs: Any) -> Any: 

597 """See `pydantic.BaseModel.copy`.""" 

598 return super().copy(*args, **kwargs) 

599 

600 def model_dump(self, *args: Any, **kwargs: Any) -> Any: 

601 """See `pydantic.BaseModel.model_dump`.""" 

602 return super().model_dump(*args, **kwargs) 

603 

604 def model_dump_json(self, *args: Any, **kwargs: Any) -> Any: 

605 """See `pydantic.BaseModel.model_dump_json`.""" 

606 return super().model_dump(*args, **kwargs) 

607 

608 def model_copy(self, *args: Any, **kwargs: Any) -> Any: 

609 """See `pydantic.BaseModel.model_copy`.""" 

610 return super().model_copy(*args, **kwargs) 

611 

612 @classmethod 

613 def model_construct(cls, *args: Any, **kwargs: Any) -> Any: # type: ignore[misc, override] 

614 """See `pydantic.BaseModel.model_construct`.""" 

615 return super().model_construct(*args, **kwargs) 

616 

617 @classmethod 

618 def model_json_schema(cls, *args: Any, **kwargs: Any) -> Any: 

619 """See `pydantic.BaseModel.model_json_schema`.""" 

620 return super().model_json_schema(*args, **kwargs) 

621 

622 @classmethod 

623 def model_validate(cls, *args: Any, **kwargs: Any) -> Any: 

624 """See `pydantic.BaseModel.model_validate`.""" 

625 return super().model_validate(*args, **kwargs) 

626 

627 @classmethod 

628 def model_validate_json(cls, *args: Any, **kwargs: Any) -> Any: 

629 """See `pydantic.BaseModel.model_validate_json`.""" 

630 return super().model_validate_json(*args, **kwargs) 

631 

632 @classmethod 

633 def model_validate_strings(cls, *args: Any, **kwargs: Any) -> Any: 

634 """See `pydantic.BaseModel.model_validate_strings`.""" 

635 return super().model_validate_strings(*args, **kwargs) 

636 

637 

638class ProvenanceReport(pydantic.RootModel): 

639 """A Pydantic model that groups quantum information by task label, then 

640 status (as a string), and then exception type. 

641 """ 

642 

643 root: dict[TaskLabel, dict[str, dict[str | None, list[ProvenanceQuantumReport]]]] = {} 

644 

645 # Work around the fact that Sphinx chokes on Pydantic docstring formatting, 

646 # when we inherit those docstrings in our public classes. 

647 if "sphinx" in sys.modules and not TYPE_CHECKING: 

648 

649 def copy(self, *args: Any, **kwargs: Any) -> Any: 

650 """See `pydantic.BaseModel.copy`.""" 

651 return super().copy(*args, **kwargs) 

652 

653 def model_dump(self, *args: Any, **kwargs: Any) -> Any: 

654 """See `pydantic.BaseModel.model_dump`.""" 

655 return super().model_dump(*args, **kwargs) 

656 

657 def model_dump_json(self, *args: Any, **kwargs: Any) -> Any: 

658 """See `pydantic.BaseModel.model_dump_json`.""" 

659 return super().model_dump(*args, **kwargs) 

660 

661 def model_copy(self, *args: Any, **kwargs: Any) -> Any: 

662 """See `pydantic.BaseModel.model_copy`.""" 

663 return super().model_copy(*args, **kwargs) 

664 

665 @classmethod 

666 def model_construct(cls, *args: Any, **kwargs: Any) -> Any: # type: ignore[misc, override] 

667 """See `pydantic.BaseModel.model_construct`.""" 

668 return super().model_construct(*args, **kwargs) 

669 

670 @classmethod 

671 def model_json_schema(cls, *args: Any, **kwargs: Any) -> Any: 

672 """See `pydantic.BaseModel.model_json_schema`.""" 

673 return super().model_json_schema(*args, **kwargs) 

674 

675 @classmethod 

676 def model_validate(cls, *args: Any, **kwargs: Any) -> Any: 

677 """See `pydantic.BaseModel.model_validate`.""" 

678 return super().model_validate(*args, **kwargs) 

679 

680 @classmethod 

681 def model_validate_json(cls, *args: Any, **kwargs: Any) -> Any: 

682 """See `pydantic.BaseModel.model_validate_json`.""" 

683 return super().model_validate_json(*args, **kwargs) 

684 

685 @classmethod 

686 def model_validate_strings(cls, *args: Any, **kwargs: Any) -> Any: 

687 """See `pydantic.BaseModel.model_validate_strings`.""" 

688 return super().model_validate_strings(*args, **kwargs) 

689 

690 

691class ProvenanceQuantumModel(pydantic.BaseModel): 

692 """Data model for the quanta in a provenance quantum graph file.""" 

693 

694 quantum_id: uuid.UUID 

695 """Unique identifier for the quantum.""" 

696 

697 task_label: TaskLabel 

698 """Name of the type of this dataset.""" 

699 

700 data_coordinate: DataCoordinateValues = pydantic.Field(default_factory=list) 

701 """The full values (required and implied) of this dataset's data ID.""" 

702 

703 inputs: dict[ConnectionName, list[uuid.UUID]] = pydantic.Field(default_factory=dict) 

704 """IDs of the datasets predicted to be consumed by this quantum, grouped by 

705 connection name. 

706 """ 

707 

708 outputs: dict[ConnectionName, list[uuid.UUID]] = pydantic.Field(default_factory=dict) 

709 """IDs of the datasets predicted to be produced by this quantum, grouped by 

710 connection name. 

711 """ 

712 

713 attempts: list[ProvenanceQuantumAttemptModel] = pydantic.Field(default_factory=list) 

714 """Provenance for all attempts to execute this quantum, ordered 

715 chronologically from first to last. 

716 

717 An entry is added merely if the quantum *should* have been attempted; an 

718 empty `list` is used only for quanta that were blocked by an upstream 

719 failure. 

720 """ 

721 

722 @property 

723 def node_id(self) -> uuid.UUID: 

724 """Alias for the quantum ID.""" 

725 return self.quantum_id 

726 

727 @classmethod 

728 def from_predicted(cls, predicted: PredictedQuantumDatasetsModel) -> ProvenanceQuantumModel: 

729 """Construct from a predicted quantum model. 

730 

731 Parameters 

732 ---------- 

733 predicted : `PredictedQuantumDatasetsModel` 

734 Information about the quantum from the predicted graph. 

735 

736 Returns 

737 ------- 

738 provenance : `ProvenanceQuantumModel` 

739 Provenance quantum model. 

740 """ 

741 inputs = { 

742 connection_name: [d.dataset_id for d in predicted_inputs] 

743 for connection_name, predicted_inputs in predicted.inputs.items() 

744 } 

745 outputs = { 

746 connection_name: [d.dataset_id for d in predicted_outputs] 

747 for connection_name, predicted_outputs in predicted.outputs.items() 

748 } 

749 return cls( 

750 quantum_id=predicted.quantum_id, 

751 task_label=predicted.task_label, 

752 data_coordinate=predicted.data_coordinate, 

753 inputs=inputs, 

754 outputs=outputs, 

755 ) 

756 

757 def _add_to_graph(self, graph: ProvenanceQuantumGraph) -> None: 

758 """Add this quantum and its edges to datasets to a provenance graph. 

759 

760 Parameters 

761 ---------- 

762 graph : `ProvenanceQuantumGraph` 

763 Graph to update in place. 

764 

765 Notes 

766 ----- 

767 This method adds: 

768 

769 - a ``bipartite_xgraph`` quantum node with full attributes; 

770 - a ``quantum_only_xgraph`` quantum node with full attributes; 

771 - ``bipartite_xgraph`` edges to adjacent datasets (which adds datasets 

772 nodes with no attributes), while populating those edge attributes; 

773 - ``quantum_only_xgraph`` edges to any adjacent quantum that has also 

774 already been loaded. 

775 """ 

776 task_node = graph.pipeline_graph.tasks[self.task_label] 

777 data_id = DataCoordinate.from_full_values(task_node.dimensions, tuple(self.data_coordinate)) 

778 last_attempt = ( 

779 self.attempts[-1] 

780 if self.attempts 

781 else ProvenanceQuantumAttemptModel(status=QuantumAttemptStatus.BLOCKED) 

782 ) 

783 graph._bipartite_xgraph.add_node( 

784 self.quantum_id, 

785 data_id=data_id, 

786 task_label=self.task_label, 

787 pipeline_node=task_node, 

788 status=last_attempt.status, 

789 caveats=last_attempt.caveats, 

790 exception=last_attempt.exception, 

791 resource_usage=last_attempt.resource_usage, 

792 attempts=self.attempts, 

793 ) 

794 graph._quanta_by_task_label[self.task_label][data_id] = self.quantum_id 

795 graph._quantum_only_xgraph.add_node(self.quantum_id, **graph._bipartite_xgraph.nodes[self.quantum_id]) 

796 for connection_name, dataset_ids in self.inputs.items(): 

797 read_edge = task_node.get_input_edge(connection_name) 

798 for dataset_id in dataset_ids: 

799 graph._bipartite_xgraph.add_edge(dataset_id, self.quantum_id, is_read=True) 

800 graph._bipartite_xgraph.edges[dataset_id, self.quantum_id].setdefault( 

801 "pipeline_edges", [] 

802 ).append(read_edge) 

803 for connection_name, dataset_ids in self.outputs.items(): 

804 write_edge = task_node.get_output_edge(connection_name) 

805 if connection_name == acc.METADATA_OUTPUT_CONNECTION_NAME: 

806 graph._bipartite_xgraph.add_node( 

807 dataset_ids[0], 

808 data_id=data_id, 

809 dataset_type_name=write_edge.dataset_type_name, 

810 pipeline_node=graph.pipeline_graph.dataset_types[write_edge.dataset_type_name], 

811 run=graph.header.output_run, 

812 produced=last_attempt.status.has_metadata, 

813 ) 

814 graph._datasets_by_type[write_edge.dataset_type_name][data_id] = dataset_ids[0] 

815 graph._bipartite_xgraph.nodes[self.quantum_id]["metadata_id"] = dataset_ids[0] 

816 graph._quantum_only_xgraph.nodes[self.quantum_id]["metadata_id"] = dataset_ids[0] 

817 if connection_name == acc.LOG_OUTPUT_CONNECTION_NAME: 

818 graph._bipartite_xgraph.add_node( 

819 dataset_ids[0], 

820 data_id=data_id, 

821 dataset_type_name=write_edge.dataset_type_name, 

822 pipeline_node=graph.pipeline_graph.dataset_types[write_edge.dataset_type_name], 

823 run=graph.header.output_run, 

824 produced=last_attempt.status.has_log, 

825 ) 

826 graph._datasets_by_type[write_edge.dataset_type_name][data_id] = dataset_ids[0] 

827 graph._bipartite_xgraph.nodes[self.quantum_id]["log_id"] = dataset_ids[0] 

828 graph._quantum_only_xgraph.nodes[self.quantum_id]["log_id"] = dataset_ids[0] 

829 for dataset_id in dataset_ids: 

830 graph._bipartite_xgraph.add_edge( 

831 self.quantum_id, 

832 dataset_id, 

833 is_read=False, 

834 # There can only be one pipeline edge for an output. 

835 pipeline_edges=[write_edge], 

836 ) 

837 for dataset_id in graph._bipartite_xgraph.predecessors(self.quantum_id): 

838 for upstream_quantum_id in graph._bipartite_xgraph.predecessors(dataset_id): 

839 graph._quantum_only_xgraph.add_edge(upstream_quantum_id, self.quantum_id) 

840 for dataset_id in graph._bipartite_xgraph.successors(self.quantum_id): 

841 for downstream_quantum_id in graph._bipartite_xgraph.successors(dataset_id): 

842 graph._quantum_only_xgraph.add_edge(self.quantum_id, downstream_quantum_id) 

843 

844 # Work around the fact that Sphinx chokes on Pydantic docstring formatting, 

845 # when we inherit those docstrings in our public classes. 

846 if "sphinx" in sys.modules and not TYPE_CHECKING: 

847 

848 def copy(self, *args: Any, **kwargs: Any) -> Any: 

849 """See `pydantic.BaseModel.copy`.""" 

850 return super().copy(*args, **kwargs) 

851 

852 def model_dump(self, *args: Any, **kwargs: Any) -> Any: 

853 """See `pydantic.BaseModel.model_dump`.""" 

854 return super().model_dump(*args, **kwargs) 

855 

856 def model_dump_json(self, *args: Any, **kwargs: Any) -> Any: 

857 """See `pydantic.BaseModel.model_dump_json`.""" 

858 return super().model_dump(*args, **kwargs) 

859 

860 def model_copy(self, *args: Any, **kwargs: Any) -> Any: 

861 """See `pydantic.BaseModel.model_copy`.""" 

862 return super().model_copy(*args, **kwargs) 

863 

864 @classmethod 

865 def model_construct(cls, *args: Any, **kwargs: Any) -> Any: # type: ignore[misc, override] 

866 """See `pydantic.BaseModel.model_construct`.""" 

867 return super().model_construct(*args, **kwargs) 

868 

869 @classmethod 

870 def model_json_schema(cls, *args: Any, **kwargs: Any) -> Any: 

871 """See `pydantic.BaseModel.model_json_schema`.""" 

872 return super().model_json_schema(*args, **kwargs) 

873 

874 @classmethod 

875 def model_validate(cls, *args: Any, **kwargs: Any) -> Any: 

876 """See `pydantic.BaseModel.model_validate`.""" 

877 return super().model_validate(*args, **kwargs) 

878 

879 @classmethod 

880 def model_validate_json(cls, *args: Any, **kwargs: Any) -> Any: 

881 """See `pydantic.BaseModel.model_validate_json`.""" 

882 return super().model_validate_json(*args, **kwargs) 

883 

884 @classmethod 

885 def model_validate_strings(cls, *args: Any, **kwargs: Any) -> Any: 

886 """See `pydantic.BaseModel.model_validate_strings`.""" 

887 return super().model_validate_strings(*args, **kwargs) 

888 

889 

890class ProvenanceInitQuantumModel(pydantic.BaseModel): 

891 """Data model for the special "init" quanta in a provenance quantum graph 

892 file. 

893 """ 

894 

895 quantum_id: uuid.UUID 

896 """Unique identifier for the quantum.""" 

897 

898 task_label: TaskLabel 

899 """Name of the type of this dataset. 

900 

901 This is always a parent dataset type name, not a component. 

902 

903 Note that full dataset type definitions are stored in the pipeline graph. 

904 """ 

905 

906 inputs: dict[ConnectionName, uuid.UUID] = pydantic.Field(default_factory=dict) 

907 """IDs of the datasets predicted to be consumed by this quantum, grouped by 

908 connection name. 

909 """ 

910 

911 outputs: dict[ConnectionName, uuid.UUID] = pydantic.Field(default_factory=dict) 

912 """IDs of the datasets predicted to be produced by this quantum, grouped by 

913 connection name. 

914 """ 

915 

916 @classmethod 

917 def from_predicted(cls, predicted: PredictedQuantumDatasetsModel) -> ProvenanceInitQuantumModel: 

918 """Construct from a predicted quantum model. 

919 

920 Parameters 

921 ---------- 

922 predicted : `PredictedQuantumDatasetsModel` 

923 Information about the quantum from the predicted graph. 

924 

925 Returns 

926 ------- 

927 provenance : `ProvenanceInitQuantumModel` 

928 Provenance init quantum model. 

929 """ 

930 inputs = { 

931 connection_name: predicted_inputs[0].dataset_id 

932 for connection_name, predicted_inputs in predicted.inputs.items() 

933 } 

934 outputs = { 

935 connection_name: predicted_outputs[0].dataset_id 

936 for connection_name, predicted_outputs in predicted.outputs.items() 

937 } 

938 return cls( 

939 quantum_id=predicted.quantum_id, 

940 task_label=predicted.task_label, 

941 inputs=inputs, 

942 outputs=outputs, 

943 ) 

944 

945 def _add_to_graph(self, graph: ProvenanceQuantumGraph, empty_data_id: DataCoordinate) -> None: 

946 """Add this quantum and its edges to datasets to a provenance graph. 

947 

948 Parameters 

949 ---------- 

950 graph : `ProvenanceQuantumGraph` 

951 Graph to update in place. 

952 empty_data_id : `lsst.daf.butler.DataCoordinate` 

953 The empty data ID for the appropriate dimension universe. 

954 

955 Notes 

956 ----- 

957 This method adds: 

958 

959 - a ``bipartite_xgraph`` quantum node with full attributes; 

960 - ``bipartite_xgraph`` edges to adjacent datasets (which adds datasets 

961 nodes with no attributes), while populating those edge attributes; 

962 """ 

963 task_init_node = graph.pipeline_graph.tasks[self.task_label].init 

964 graph._bipartite_xgraph.add_node( 

965 self.quantum_id, data_id=empty_data_id, task_label=self.task_label, pipeline_node=task_init_node 

966 ) 

967 for connection_name, dataset_id in self.inputs.items(): 

968 read_edge = task_init_node.get_input_edge(connection_name) 

969 graph._bipartite_xgraph.add_edge(dataset_id, self.quantum_id, is_read=True) 

970 graph._bipartite_xgraph.edges[dataset_id, self.quantum_id].setdefault( 

971 "pipeline_edges", [] 

972 ).append(read_edge) 

973 for connection_name, dataset_id in self.outputs.items(): 

974 write_edge = task_init_node.get_output_edge(connection_name) 

975 graph._bipartite_xgraph.add_node( 

976 dataset_id, 

977 data_id=empty_data_id, 

978 dataset_type_name=write_edge.dataset_type_name, 

979 pipeline_node=graph.pipeline_graph.dataset_types[write_edge.dataset_type_name], 

980 run=graph.header.output_run, 

981 produced=True, 

982 ) 

983 graph._datasets_by_type[write_edge.dataset_type_name][empty_data_id] = dataset_id 

984 graph._bipartite_xgraph.add_edge( 

985 self.quantum_id, 

986 dataset_id, 

987 is_read=False, 

988 # There can only be one pipeline edge for an output. 

989 pipeline_edges=[write_edge], 

990 ) 

991 if write_edge.connection_name == acc.CONFIG_INIT_OUTPUT_CONNECTION_NAME: 

992 graph._bipartite_xgraph.nodes[self.quantum_id]["config_id"] = dataset_id 

993 graph._init_quanta[self.task_label] = self.quantum_id 

994 

995 # Work around the fact that Sphinx chokes on Pydantic docstring formatting, 

996 # when we inherit those docstrings in our public classes. 

997 if "sphinx" in sys.modules and not TYPE_CHECKING: 

998 

999 def copy(self, *args: Any, **kwargs: Any) -> Any: 

1000 """See `pydantic.BaseModel.copy`.""" 

1001 return super().copy(*args, **kwargs) 

1002 

1003 def model_dump(self, *args: Any, **kwargs: Any) -> Any: 

1004 """See `pydantic.BaseModel.model_dump`.""" 

1005 return super().model_dump(*args, **kwargs) 

1006 

1007 def model_dump_json(self, *args: Any, **kwargs: Any) -> Any: 

1008 """See `pydantic.BaseModel.model_dump_json`.""" 

1009 return super().model_dump(*args, **kwargs) 

1010 

1011 def model_copy(self, *args: Any, **kwargs: Any) -> Any: 

1012 """See `pydantic.BaseModel.model_copy`.""" 

1013 return super().model_copy(*args, **kwargs) 

1014 

1015 @classmethod 

1016 def model_construct(cls, *args: Any, **kwargs: Any) -> Any: # type: ignore[misc, override] 

1017 """See `pydantic.BaseModel.model_construct`.""" 

1018 return super().model_construct(*args, **kwargs) 

1019 

1020 @classmethod 

1021 def model_json_schema(cls, *args: Any, **kwargs: Any) -> Any: 

1022 """See `pydantic.BaseModel.model_json_schema`.""" 

1023 return super().model_json_schema(*args, **kwargs) 

1024 

1025 @classmethod 

1026 def model_validate(cls, *args: Any, **kwargs: Any) -> Any: 

1027 """See `pydantic.BaseModel.model_validate`.""" 

1028 return super().model_validate(*args, **kwargs) 

1029 

1030 @classmethod 

1031 def model_validate_json(cls, *args: Any, **kwargs: Any) -> Any: 

1032 """See `pydantic.BaseModel.model_validate_json`.""" 

1033 return super().model_validate_json(*args, **kwargs) 

1034 

1035 @classmethod 

1036 def model_validate_strings(cls, *args: Any, **kwargs: Any) -> Any: 

1037 """See `pydantic.BaseModel.model_validate_strings`.""" 

1038 return super().model_validate_strings(*args, **kwargs) 

1039 

1040 

1041class ProvenanceInitQuantaModel(pydantic.RootModel): 

1042 """Data model for the init quanta in a provenance graph.""" 

1043 

1044 root: list[ProvenanceInitQuantumModel] = pydantic.Field(default_factory=list) 

1045 """List of special "init" quanta, one for each task.""" 

1046 

1047 def _add_to_graph(self, graph: ProvenanceQuantumGraph) -> None: 

1048 """Add this quantum and its edges to datasets to a provenance graph. 

1049 

1050 Parameters 

1051 ---------- 

1052 graph : `ProvenanceQuantumGraph` 

1053 Graph to update in place. 

1054 """ 

1055 empty_data_id = DataCoordinate.make_empty(graph.pipeline_graph.universe) 

1056 for init_quantum in self.root: 

1057 init_quantum._add_to_graph(graph, empty_data_id=empty_data_id) 

1058 

1059 # Work around the fact that Sphinx chokes on Pydantic docstring formatting, 

1060 # when we inherit those docstrings in our public classes. 

1061 if "sphinx" in sys.modules and not TYPE_CHECKING: 

1062 

1063 def copy(self, *args: Any, **kwargs: Any) -> Any: 

1064 """See `pydantic.BaseModel.copy`.""" 

1065 return super().copy(*args, **kwargs) 

1066 

1067 def model_dump(self, *args: Any, **kwargs: Any) -> Any: 

1068 """See `pydantic.BaseModel.model_dump`.""" 

1069 return super().model_dump(*args, **kwargs) 

1070 

1071 def model_dump_json(self, *args: Any, **kwargs: Any) -> Any: 

1072 """See `pydantic.BaseModel.model_dump_json`.""" 

1073 return super().model_dump(*args, **kwargs) 

1074 

1075 def model_copy(self, *args: Any, **kwargs: Any) -> Any: 

1076 """See `pydantic.BaseModel.model_copy`.""" 

1077 return super().model_copy(*args, **kwargs) 

1078 

1079 @classmethod 

1080 def model_construct(cls, *args: Any, **kwargs: Any) -> Any: # type: ignore[misc, override] 

1081 """See `pydantic.BaseModel.model_construct`.""" 

1082 return super().model_construct(*args, **kwargs) 

1083 

1084 @classmethod 

1085 def model_json_schema(cls, *args: Any, **kwargs: Any) -> Any: 

1086 """See `pydantic.BaseModel.model_json_schema`.""" 

1087 return super().model_json_schema(*args, **kwargs) 

1088 

1089 @classmethod 

1090 def model_validate(cls, *args: Any, **kwargs: Any) -> Any: 

1091 """See `pydantic.BaseModel.model_validate`.""" 

1092 return super().model_validate(*args, **kwargs) 

1093 

1094 @classmethod 

1095 def model_validate_json(cls, *args: Any, **kwargs: Any) -> Any: 

1096 """See `pydantic.BaseModel.model_validate_json`.""" 

1097 return super().model_validate_json(*args, **kwargs) 

1098 

1099 @classmethod 

1100 def model_validate_strings(cls, *args: Any, **kwargs: Any) -> Any: 

1101 """See `pydantic.BaseModel.model_validate_strings`.""" 

1102 return super().model_validate_strings(*args, **kwargs) 

1103 

1104 

1105class ProvenanceQuantumGraph(BaseQuantumGraph): 

1106 """A quantum graph that represents processing that has already been 

1107 executed. 

1108 

1109 Parameters 

1110 ---------- 

1111 header : `HeaderModel` 

1112 General metadata shared with other quantum graph types. 

1113 pipeline_graph : `.pipeline_graph.PipelineGraph` 

1114 Graph of tasks and dataset types. May contain a superset of the tasks 

1115 and dataset types that actually have quanta and datasets in the quantum 

1116 graph. 

1117 

1118 Notes 

1119 ----- 

1120 A provenance quantum graph is generally obtained via the 

1121 `ProvenanceQuantumGraphReader.graph` attribute, which is updated in-place 

1122 as information is read from disk. 

1123 """ 

1124 

1125 def __init__(self, header: HeaderModel, pipeline_graph: PipelineGraph) -> None: 

1126 super().__init__(header, pipeline_graph) 

1127 self._init_quanta: dict[TaskLabel, uuid.UUID] = {} 

1128 self._quantum_only_xgraph = networkx.DiGraph() 

1129 self._bipartite_xgraph = networkx.DiGraph() 

1130 self._quanta_by_task_label: dict[str, dict[DataCoordinate, uuid.UUID]] = { 

1131 task_label: {} for task_label in self.pipeline_graph.tasks.keys() 

1132 } 

1133 self._datasets_by_type: dict[str, dict[DataCoordinate, uuid.UUID]] = { 

1134 dataset_type_name: {} for dataset_type_name in self.pipeline_graph.dataset_types.keys() 

1135 } 

1136 

1137 @classmethod 

1138 @contextmanager 

1139 def from_args( 

1140 cls, 

1141 repo_or_filename: str, 

1142 /, 

1143 collection: str | None = None, 

1144 *, 

1145 quanta: Iterable[uuid.UUID] | None = None, 

1146 datasets: Iterable[uuid.UUID] | None = None, 

1147 writeable: bool = False, 

1148 ) -> Iterator[tuple[ProvenanceQuantumGraph, Butler | None]]: 

1149 """Construct a `ProvenanceQuantumGraph` fron CLI-friendly arguments for 

1150 a file or butler-ingested graph dataset. 

1151 

1152 Parameters 

1153 ---------- 

1154 repo_or_filename : `str` 

1155 Either a provenance quantum graph filename or a butler repository 

1156 path or alias. 

1157 collection : `str`, optional 

1158 Collection to search; presence indicates that the first argument 

1159 is a butler repository, not a filename. 

1160 quanta : `~collections.abc.Iterable` [ `str` ] or `None`, optional 

1161 IDs of the quanta to load, or `None` to load all. 

1162 datasets : `~collections.abc.Iterable` [ `str` ], optional 

1163 IDs of the datasets to load, or `None` to load all. 

1164 writeable : `bool`, optional 

1165 Whether the butler should be constructed with write support. 

1166 

1167 Returns 

1168 ------- 

1169 context : `contextlib.AbstractContextManager` 

1170 A context manager that yields a tuple of 

1171 

1172 - the `ProvenanceQuantumGraph` 

1173 - the `Butler` constructed (or `None`) 

1174 

1175 when entered. 

1176 """ 

1177 exit_stack = ExitStack() 

1178 if collection is not None: 

1179 try: 

1180 butler = exit_stack.enter_context( 

1181 Butler.from_config(repo_or_filename, collections=[collection], writeable=writeable) 

1182 ) 

1183 except Exception as err: 

1184 err.add_note( 

1185 f"Expected {repo_or_filename!r} to be a butler repository path or alias because a " 

1186 f"collection ({collection}) was provided." 

1187 ) 

1188 raise 

1189 with exit_stack: 

1190 graph = butler.get( 

1191 acc.PROVENANCE_DATASET_TYPE_NAME, parameters={"quanta": quanta, "datasets": datasets} 

1192 ) 

1193 yield graph, butler 

1194 else: 

1195 try: 

1196 reader = exit_stack.enter_context(ProvenanceQuantumGraphReader.open(repo_or_filename)) 

1197 except Exception as err: 

1198 err.add_note( 

1199 f"Expected a {repo_or_filename} to be a provenance quantum graph filename " 

1200 f"because no collection was provided." 

1201 ) 

1202 raise 

1203 with exit_stack: 

1204 if quanta is None: 

1205 reader.read_quanta() 

1206 elif not quanta: 

1207 reader.read_quanta(quanta) 

1208 if datasets is None: 

1209 reader.read_datasets() 

1210 elif not datasets: 

1211 reader.read_datasets(datasets) 

1212 yield reader.graph, None 

1213 

1214 @property 

1215 def init_quanta(self) -> Mapping[TaskLabel, uuid.UUID]: 

1216 """A mapping from task label to the ID of the special init quantum for 

1217 that task. 

1218 

1219 This is populated by the ``init_quanta`` component. Additional 

1220 information about each init quantum can be found by using the ID to 

1221 look up node attributes in the `bipartite_xgraph`, i.e.:: 

1222 

1223 info: ProvenanceInitQuantumInfo = qg.bipartite_xgraph.nodes[id] 

1224 """ 

1225 return self._init_quanta 

1226 

1227 @property 

1228 def quanta_by_task(self) -> Mapping[TaskLabel, Mapping[DataCoordinate, uuid.UUID]]: 

1229 """A nested mapping of all quanta, keyed first by task name and then by 

1230 data ID. 

1231 

1232 Notes 

1233 ----- 

1234 This is populated one quantum at a time as they are read. All tasks in 

1235 the pipeline graph are included, even if none of their quanta were 

1236 loaded (i.e. nested mappings may be empty). 

1237 

1238 The returned object may be an internal dictionary; as the type 

1239 annotation indicates, it should not be modified in place. 

1240 """ 

1241 return self._quanta_by_task_label 

1242 

1243 @property 

1244 def datasets_by_type(self) -> Mapping[DatasetTypeName, Mapping[DataCoordinate, uuid.UUID]]: 

1245 """A nested mapping of all datasets, keyed first by dataset type name 

1246 and then by data ID. 

1247 

1248 Notes 

1249 ----- 

1250 This is populated one dataset at a time as they are read. All dataset 

1251 types in the pipeline graph are included, even if none of their 

1252 datasets were loaded (i.e. nested mappings may be empty). 

1253 

1254 Reading a quantum also populates its log and metadata datasets. 

1255 

1256 The returned object may be an internal dictionary; as the type 

1257 annotation indicates, it should not be modified in place. 

1258 """ 

1259 return self._datasets_by_type 

1260 

1261 @property 

1262 def quantum_only_xgraph(self) -> networkx.DiGraph: 

1263 """A directed acyclic graph with quanta as nodes (and datasets elided). 

1264 

1265 Notes 

1266 ----- 

1267 Node keys are quantum UUIDs, and are populated one quantum at a time as 

1268 they are loaded. Loading quanta (via 

1269 `ProvenanceQuantumGraphReader.read_quanta`) will add the loaded nodes 

1270 with full attributes and add edges to adjacent nodes with no 

1271 attributes. Loading datasets (via 

1272 `ProvenanceQuantumGraphReader.read_datasets`) will also add edges and 

1273 nodes with no attributes. 

1274 

1275 Node attributes are described by the `ProvenanceQuantumInfo` types. 

1276 

1277 This graph does not include special "init" quanta. 

1278 

1279 The returned object is a read-only view of an internal one. 

1280 """ 

1281 return self._quantum_only_xgraph.copy(as_view=True) 

1282 

1283 @property 

1284 def bipartite_xgraph(self) -> networkx.DiGraph: 

1285 """A directed acyclic graph with quantum and dataset nodes. 

1286 

1287 Notes 

1288 ----- 

1289 Node keys are quantum or dataset UUIDs, and are populated one quantum 

1290 or dataset at a time as they are loaded. Loading quanta (via 

1291 `ProvenanceQuantumGraphReader.read_quanta`) or datasets (via 

1292 `ProvenanceQuantumGraphReader.read_datasets`) will load those nodes 

1293 with full attributes and edges to adjacent nodes with no attributes. 

1294 Loading quanta is necessary to populate edge attributes. 

1295 Reading a quantum also populates its log and metadata datasets. 

1296 

1297 Node attributes are described by the 

1298 `ProvenanceQuantumInfo`, `ProvenanceInitQuantumInfo`, and 

1299 `ProvenanceDatasetInfo` types. 

1300 

1301 This graph includes init-input and init-output datasets, but it does 

1302 *not* reflect the dependency between each task's special "init" quantum 

1303 and its runtime quanta (as this would require edges between quanta, and 

1304 that would break the "bipartite" property). 

1305 

1306 The returned object is a read-only view of an internal one. 

1307 """ 

1308 return self._bipartite_xgraph.copy(as_view=True) 

1309 

1310 def make_quantum_table(self, drop_unused_columns: bool = True) -> astropy.table.Table: 

1311 """Construct an `astropy.table.Table` with a tabular summary of the 

1312 quanta. 

1313 

1314 Parameters 

1315 ---------- 

1316 drop_unused_columns : `bool`, optional 

1317 Whether to drop columns for rare states that did not actually 

1318 occur in this run. 

1319 

1320 Returns 

1321 ------- 

1322 table : `astropy.table.Table` 

1323 A table view of the quantum information. This only includes 

1324 counts of status categories and caveats, not any per-data-ID 

1325 detail. 

1326 

1327 Notes 

1328 ----- 

1329 Success caveats in the table are represented by their 

1330 `~QuantumSuccessCaveats.concise` form, so when pretty-printing this 

1331 table for users, the `~QuantumSuccessCaveats.legend` should generally 

1332 be printed as well. 

1333 """ 

1334 rows = [] 

1335 for task_label, quanta_for_task in self.quanta_by_task.items(): 

1336 if not self.header.n_task_quanta[task_label]: 

1337 continue 

1338 status_counts = Counter[QuantumAttemptStatus]( 

1339 self._quantum_only_xgraph.nodes[q]["status"] for q in quanta_for_task.values() 

1340 ) 

1341 caveat_counts = Counter[QuantumSuccessCaveats | None]( 

1342 self._quantum_only_xgraph.nodes[q]["caveats"] for q in quanta_for_task.values() 

1343 ) 

1344 caveat_counts.pop(QuantumSuccessCaveats.NO_CAVEATS, None) 

1345 caveat_counts.pop(None, None) 

1346 if len(caveat_counts) > 1: 

1347 caveats = "(multiple)" 

1348 elif len(caveat_counts) == 1: 

1349 ((code, count),) = caveat_counts.items() 

1350 # MyPy can't tell that the pop(None, None) above makes None 

1351 # impossible here. 

1352 caveats = f"{code.concise()}({count})" # type: ignore[union-attr] 

1353 else: 

1354 caveats = "" 

1355 row: dict[str, Any] = { 

1356 "Task": task_label, 

1357 "Caveats": caveats, 

1358 } 

1359 for status in QuantumAttemptStatus: 

1360 row[status.title] = status_counts.get(status, 0) 

1361 row.update( 

1362 { 

1363 "TOTAL": len(quanta_for_task), 

1364 "EXPECTED": self.header.n_task_quanta[task_label], 

1365 } 

1366 ) 

1367 rows.append(row) 

1368 table = astropy.table.Table(rows) 

1369 if drop_unused_columns: 

1370 for status in QuantumAttemptStatus: 

1371 if status.is_rare and not table[status.title].any(): 

1372 del table[status.title] 

1373 return table 

1374 

1375 def make_exception_table(self) -> astropy.table.Table: 

1376 """Construct an `astropy.table.Table` with counts for each exception 

1377 type raised by each task. 

1378 

1379 Returns 

1380 ------- 

1381 table : `astropy.table.Table` 

1382 A table with columns for task label, exception type, and counts. 

1383 """ 

1384 rows = [] 

1385 for task_label, quanta_for_task in self.quanta_by_task.items(): 

1386 success_counts = Counter[str]() 

1387 failed_counts = Counter[str]() 

1388 for quantum_id in quanta_for_task.values(): 

1389 quantum_info: ProvenanceQuantumInfo = self._quantum_only_xgraph.nodes[quantum_id] 

1390 exc_info = quantum_info["exception"] 

1391 if exc_info is not None: 

1392 if quantum_info["status"] is QuantumAttemptStatus.SUCCESSFUL: 

1393 success_counts[exc_info.type_name] += 1 

1394 else: 

1395 failed_counts[exc_info.type_name] += 1 

1396 for type_name in sorted(success_counts.keys() | failed_counts.keys()): 

1397 rows.append( 

1398 { 

1399 "Task": task_label, 

1400 "Exception": type_name, 

1401 "Successes": success_counts.get(type_name, 0), 

1402 "Failures": failed_counts.get(type_name, 0), 

1403 } 

1404 ) 

1405 return astropy.table.Table(rows) 

1406 

1407 def make_task_resource_usage_table( 

1408 self, task_label: TaskLabel, include_data_ids: bool = False 

1409 ) -> astropy.table.Table: 

1410 """Make a table of resource usage for a single task. 

1411 

1412 Parameters 

1413 ---------- 

1414 task_label : `str` 

1415 Label of the task to extract resource usage for. 

1416 include_data_ids : `bool`, optional 

1417 Whether to also include data ID columns. 

1418 

1419 Returns 

1420 ------- 

1421 table : `astropy.table.Table` 

1422 A table with columns for quantum ID and all fields in 

1423 `QuantumResourceUsage`. 

1424 """ 

1425 quanta_for_task = self.quanta_by_task[task_label] 

1426 dtype_terms: list[tuple[str, np.dtype]] = [("quantum_id", np.dtype((np.void, 16)))] 

1427 if include_data_ids: 

1428 dimensions = self.pipeline_graph.tasks[task_label].dimensions 

1429 for dimension_name in dimensions.data_coordinate_keys: 

1430 dtype = np.dtype(self.pipeline_graph.universe.dimensions[dimension_name].primary_key.pytype) 

1431 dtype_terms.append((dimension_name, dtype)) 

1432 fields = QuantumResourceUsage.get_numpy_fields() 

1433 dtype_terms.extend(fields.items()) 

1434 row_dtype = np.dtype(dtype_terms) 

1435 rows: list[object] = [] 

1436 for data_id, quantum_id in quanta_for_task.items(): 

1437 info: ProvenanceQuantumInfo = self._quantum_only_xgraph.nodes[quantum_id] 

1438 if (resource_usage := info["resource_usage"]) is not None: 

1439 row: tuple[object, ...] = (quantum_id.bytes,) 

1440 if include_data_ids: 

1441 row += data_id.full_values 

1442 row += resource_usage.get_numpy_row() 

1443 rows.append(row) 

1444 array = np.array(rows, dtype=row_dtype) 

1445 return astropy.table.Table(array, units=QuantumResourceUsage.get_units()) 

1446 

1447 def make_status_report( 

1448 self, 

1449 states: Iterable[QuantumAttemptStatus] = ( 

1450 QuantumAttemptStatus.FAILED, 

1451 QuantumAttemptStatus.ABORTED, 

1452 QuantumAttemptStatus.ABORTED_SUCCESS, 

1453 ), 

1454 *, 

1455 also: QuantumAttemptStatus | Iterable[QuantumAttemptStatus] = (), 

1456 with_caveats: QuantumSuccessCaveats | None = QuantumSuccessCaveats.PARTIAL_OUTPUTS_ERROR, 

1457 data_id_table_dir: ResourcePathExpression | None = None, 

1458 ) -> ProvenanceReport: 

1459 """Make a JSON- or YAML-friendly report of all quanta with the given 

1460 states. 

1461 

1462 Parameters 

1463 ---------- 

1464 states : `~collections.abc.Iterable` [`..QuantumAttemptStatus`] or \ 

1465 `..QuantumAttemptStatus`, optional 

1466 A quantum is included if it has any of these states. Defaults to 

1467 states that clearly represent problems. 

1468 also : `~collections.abc.Iterable` [`..QuantumAttemptStatus`] or \ 

1469 `..QuantumAttemptStatus`, optional 

1470 Additional states to consider; unioned with ``states``. This is 

1471 provided so users can easily request additional states while also 

1472 getting the defaults. 

1473 with_caveats : `..QuantumSuccessCaveats` or `None`, optional 

1474 If `..QuantumAttemptStatus.SUCCESSFUL` is in ``states``, only 

1475 include quanta with these caveat flags. May be set to `None` 

1476 to report on all successful quanta. 

1477 data_id_table_dir : convertible to `~lsst.resources.ResourcePath`, \ 

1478 optional 

1479 If provided, a directory to write data ID tables (in ECSV format) 

1480 with all of the data IDs with the given states, for use with the 

1481 ``--data-id-tables`` argument to the quantum graph builder. 

1482 Subdirectories for each task and status will created within this 

1483 directory, with one file for each exception type (or ``UNKNOWN`` 

1484 when there is no exception). 

1485 

1486 Returns 

1487 ------- 

1488 report : `ProvenanceModel` 

1489 A Pydantic model that groups quanta by task label and exception 

1490 type. 

1491 """ 

1492 states = set(ensure_iterable(states)) 

1493 states.update(ensure_iterable(also)) 

1494 result = ProvenanceReport(root={}) 

1495 if data_id_table_dir is not None: 

1496 data_id_table_dir = ResourcePath(data_id_table_dir) 

1497 for task_label, quanta_for_task in self.quanta_by_task.items(): 

1498 reports_for_task: dict[str, dict[str | None, list[ProvenanceQuantumReport]]] = {} 

1499 table_rows_for_task: dict[str, dict[str | None, list[tuple[int | str, ...]]]] = {} 

1500 for quantum_id in quanta_for_task.values(): 

1501 quantum_info: ProvenanceQuantumInfo = self._quantum_only_xgraph.nodes[quantum_id] 

1502 quantum_status = quantum_info["status"] 

1503 if quantum_status not in states: 

1504 continue 

1505 if ( 

1506 quantum_status is QuantumAttemptStatus.SUCCESSFUL 

1507 and with_caveats is not None 

1508 and (quantum_info["caveats"] is None or not (quantum_info["caveats"] & with_caveats)) 

1509 ): 

1510 continue 

1511 key1 = quantum_status.name 

1512 exc_info = quantum_info["exception"] 

1513 key2 = exc_info.type_name if exc_info is not None else None 

1514 reports_for_task.setdefault(key1, {}).setdefault(key2, []).append( 

1515 ProvenanceQuantumReport.from_info(quantum_id, quantum_info) 

1516 ) 

1517 if data_id_table_dir: 

1518 table_rows_for_task.setdefault(key1, {}).setdefault(key2, []).append( 

1519 quantum_info["data_id"].required_values 

1520 ) 

1521 if reports_for_task: 

1522 result.root[task_label] = reports_for_task 

1523 if table_rows_for_task: 

1524 assert data_id_table_dir is not None, "table_rows_for_task should be empty" 

1525 for status_name, table_rows_for_status in table_rows_for_task.items(): 

1526 dir_for_task_and_status = data_id_table_dir.join(task_label, forceDirectory=True).join( 

1527 status_name, forceDirectory=True 

1528 ) 

1529 if dir_for_task_and_status.isLocal: 

1530 dir_for_task_and_status.mkdir() 

1531 for exc_name, data_id_rows in table_rows_for_status.items(): 

1532 table = astropy.table.Table( 

1533 rows=data_id_rows, 

1534 names=list(self.pipeline_graph.tasks[task_label].dimensions.required), 

1535 ) 

1536 filename = f"{exc_name}.ecsv" if exc_name is not None else "UNKNOWN.ecsv" 

1537 with dir_for_task_and_status.join(filename).open("w") as stream: 

1538 table.write(stream, format="ecsv") 

1539 return result 

1540 

1541 def make_many_reports( 

1542 self, 

1543 states: Iterable[QuantumAttemptStatus] = ( 

1544 QuantumAttemptStatus.FAILED, 

1545 QuantumAttemptStatus.ABORTED, 

1546 QuantumAttemptStatus.ABORTED_SUCCESS, 

1547 ), 

1548 *, 

1549 status_report_file: ResourcePathExpression | None = None, 

1550 print_quantum_table: bool = False, 

1551 print_exception_table: bool = False, 

1552 also: QuantumAttemptStatus | Iterable[QuantumAttemptStatus] = (), 

1553 with_caveats: QuantumSuccessCaveats | None = None, 

1554 data_id_table_dir: ResourcePathExpression | None = None, 

1555 ) -> None: 

1556 """Write multiple reports. 

1557 

1558 Parameters 

1559 ---------- 

1560 states : `~collections.abc.Iterable` [`..QuantumAttemptStatus`] or \ 

1561 `..QuantumAttemptStatus`, optional 

1562 A quantum is included in the status report and data ID tables if it 

1563 has any of these states. Defaults to states that clearly represent 

1564 problems. 

1565 status_report_file : convertible to `~lsst.resources.ResourcePath`, 

1566 optional 

1567 Filename for the JSON status report (see `make_status_report`). 

1568 print_quantum_table : `bool`, optional 

1569 If `True`, print a quantum summary table (counts only) to STDOUT. 

1570 print_exception_table : `bool`, optional 

1571 If `True`, print an exception-type summary table (counts only) to 

1572 STDOUT. 

1573 also : `~collections.abc.Iterable` [`..QuantumAttemptStatus`] or \ 

1574 `..QuantumAttemptStatus`, optional 

1575 Additional states to consider in the status report and data ID 

1576 tables; unioned with ``states``. This is provided so users can 

1577 easily request additional states while also getting the defaults. 

1578 with_caveats : `..QuantumSuccessCaveats` or `None`, optional 

1579 Only include quanta with these caveat flags in the status report 

1580 and data ID tables. May be set to `None` to report on all 

1581 successful quanta (an empty sequence reports on only quanta with no 

1582 caveats). If provided, `QuantumAttemptStatus.SUCCESSFUL` is 

1583 automatically included in ``states``. 

1584 data_id_table_dir : convertible to `~lsst.resources.ResourcePath`, \ 

1585 optional 

1586 If provided, a directory to write data ID tables (in ECSV format) 

1587 with all of the data IDs with the given states, for use with the 

1588 ``--data-id-tables`` argument to the quantum graph builder. 

1589 Subdirectories for each task and status will created within this 

1590 directory, with one file for each exception type (or ``UNKNOWN`` 

1591 when there is no exception). 

1592 """ 

1593 if status_report_file is not None or data_id_table_dir is not None: 

1594 status_report = self.make_status_report( 

1595 states, also=also, with_caveats=with_caveats, data_id_table_dir=data_id_table_dir 

1596 ) 

1597 if status_report_file is not None: 

1598 status_report_file = ResourcePath(status_report_file) 

1599 if status_report_file.isLocal: 

1600 status_report_file.dirname().mkdir() 

1601 with ResourcePath(status_report_file).open("w") as stream: 

1602 stream.write(status_report.model_dump_json(indent=2)) 

1603 if print_quantum_table: 

1604 quantum_table = self.make_quantum_table() 

1605 if quantum_table: 

1606 quantum_table.pprint_all() 

1607 print("") 

1608 if print_exception_table: 

1609 exception_table = self.make_exception_table() 

1610 if exception_table: 

1611 exception_table.pprint_all() 

1612 print("") 

1613 

1614 

1615@dataclasses.dataclass 

1616class ProvenanceQuantumGraphReader(BaseQuantumGraphReader): 

1617 """A helper class for reading provenance quantum graphs. 

1618 

1619 Notes 

1620 ----- 

1621 The `open` context manager should be used to construct new instances. 

1622 Instances cannot be used after the context manager exits, except to access 

1623 the `graph` attribute`. 

1624 

1625 The various ``read_*`` methods in this class update the `graph` attribute 

1626 in place. 

1627 """ 

1628 

1629 graph: ProvenanceQuantumGraph = dataclasses.field(init=False) 

1630 """Loaded provenance graph, populated in place as components are read.""" 

1631 

1632 @classmethod 

1633 @contextmanager 

1634 def open( 

1635 cls, 

1636 uri: ResourcePathExpression, 

1637 *, 

1638 page_size: int | None = None, 

1639 import_mode: TaskImportMode = TaskImportMode.DO_NOT_IMPORT, 

1640 ) -> Iterator[ProvenanceQuantumGraphReader]: 

1641 """Construct a reader from a URI. 

1642 

1643 Parameters 

1644 ---------- 

1645 uri : convertible to `lsst.resources.ResourcePath` 

1646 URI to open. Should have a ``.qg`` extension. 

1647 page_size : `int`, optional 

1648 Approximate number of bytes to read at once from address files and 

1649 multi-block files. Note that this does not set a page size for 

1650 *all* reads, but it does affect the smallest, most numerous reads. 

1651 Can also be set via the ``LSST_QG_PAGE_SIZE`` environment variable. 

1652 import_mode : `.pipeline_graph.TaskImportMode`, optional 

1653 How to handle importing the task classes referenced in the pipeline 

1654 graph. 

1655 

1656 Returns 

1657 ------- 

1658 reader : `contextlib.AbstractContextManager` [ \ 

1659 `ProvenanceQuantumGraphReader` ] 

1660 A context manager that returns the reader when entered. 

1661 """ 

1662 with cls._open( 

1663 uri, 

1664 graph_type="provenance", 

1665 address_filename="nodes", 

1666 page_size=page_size, 

1667 import_mode=import_mode, 

1668 n_addresses=4, 

1669 ) as self: 

1670 yield self 

1671 

1672 def __post_init__(self) -> None: 

1673 self.graph = ProvenanceQuantumGraph(self.header, self.pipeline_graph) 

1674 

1675 def read_init_quanta(self) -> None: 

1676 """Read the thin graph, with all edge information and categorization of 

1677 quanta by task label. 

1678 """ 

1679 init_quanta = self._read_single_block("init_quanta", ProvenanceInitQuantaModel) 

1680 for init_quantum in init_quanta.root: 

1681 self.graph._init_quanta[init_quantum.task_label] = init_quantum.quantum_id 

1682 init_quanta._add_to_graph(self.graph) 

1683 

1684 def read_full_graph(self) -> None: 

1685 """Read all bipartite edges and all quantum and dataset node 

1686 attributes, fully populating the `graph` attribute. 

1687 

1688 Notes 

1689 ----- 

1690 This does not read logs, metadata, or packages ; those must always be 

1691 fetched explicitly. 

1692 """ 

1693 self.read_init_quanta() 

1694 self.read_datasets() 

1695 self.read_quanta() 

1696 

1697 def read_datasets(self, datasets: Iterable[uuid.UUID] | None = None) -> None: 

1698 """Read information about the given datasets. 

1699 

1700 Parameters 

1701 ---------- 

1702 datasets : `~collections.abc.Iterable` [`uuid.UUID`], optional 

1703 Iterable of dataset IDs to load. If not provided, all datasets 

1704 will be loaded. The UUIDs and indices of quanta will be ignored. 

1705 """ 

1706 self._read_nodes(datasets, DATASET_ADDRESS_INDEX, DATASET_MB_NAME, ProvenanceDatasetModel) 

1707 

1708 def read_quanta(self, quanta: Iterable[uuid.UUID] | None = None) -> None: 

1709 """Read information about the given quanta. 

1710 

1711 Parameters 

1712 ---------- 

1713 quanta : `~collections.abc.Iterable` [`uuid.UUID`], optional 

1714 Iterable of quantum IDs to load. If not provided, all quanta will 

1715 be loaded. The UUIDs and indices of datasets and special init 

1716 quanta will be ignored. 

1717 """ 

1718 self._read_nodes(quanta, QUANTUM_ADDRESS_INDEX, QUANTUM_MB_NAME, ProvenanceQuantumModel) 

1719 

1720 def _read_nodes( 

1721 self, 

1722 nodes: Iterable[uuid.UUID] | None, 

1723 address_index: int, 

1724 mb_name: str, 

1725 model_type: type[ProvenanceDatasetModel] | type[ProvenanceQuantumModel], 

1726 ) -> None: 

1727 node: ProvenanceDatasetModel | ProvenanceQuantumModel | None 

1728 if nodes is None: 

1729 self.address_reader.read_all() 

1730 nodes = self.address_reader.rows.keys() 

1731 for node in MultiblockReader.read_all_models_in_zip( 

1732 self.zf, 

1733 mb_name, 

1734 model_type, 

1735 self.decompressor, 

1736 int_size=self.header.int_size, 

1737 page_size=self.page_size, 

1738 ): 

1739 if "pipeline_node" in self.graph._bipartite_xgraph.nodes.get(node.node_id, {}): 

1740 # Use the old node to reduce memory usage (since it might 

1741 # also have other outstanding reference holders). 

1742 continue 

1743 node._add_to_graph(self.graph) 

1744 else: 

1745 with MultiblockReader.open_in_zip(self.zf, mb_name, int_size=self.header.int_size) as mb_reader: 

1746 for node_id_or_index in nodes: 

1747 address_row = self.address_reader.find(node_id_or_index) 

1748 if "pipeline_node" in self.graph._bipartite_xgraph.nodes.get(address_row.key, {}): 

1749 # Use the old node to reduce memory usage (since it 

1750 # might also have other outstanding reference holders). 

1751 continue 

1752 node = mb_reader.read_model( 

1753 address_row.addresses[address_index], model_type, self.decompressor 

1754 ) 

1755 if node is not None: 

1756 node._add_to_graph(self.graph) 

1757 

1758 def fetch_logs(self, nodes: Iterable[uuid.UUID]) -> dict[uuid.UUID, list[ButlerLogRecords | None]]: 

1759 """Fetch log datasets. 

1760 

1761 Parameters 

1762 ---------- 

1763 nodes : `~collections.abc.Iterable` [ `uuid.UUID` ] 

1764 UUIDs of the log datasets themselves or of the quanta they 

1765 correspond to. 

1766 

1767 Returns 

1768 ------- 

1769 logs : `dict` [ `uuid.UUID`, `list` [\ 

1770 `lsst.daf.butler.ButlerLogRecords` or `None`] ] 

1771 Logs for the given IDs. Each value is a list of 

1772 `lsst.daf.butler.ButlerLogRecords` instances representing different 

1773 execution attempts, ordered chronologically from first to last. 

1774 Attempts where logs were missing will have `None` in this list. 

1775 """ 

1776 result: dict[uuid.UUID, list[ButlerLogRecords | None]] = {} 

1777 with MultiblockReader.open_in_zip(self.zf, LOG_MB_NAME, int_size=self.header.int_size) as mb_reader: 

1778 for node_id_or_index in nodes: 

1779 address_row = self.address_reader.find(node_id_or_index) 

1780 logs_by_attempt = mb_reader.read_model( 

1781 address_row.addresses[LOG_ADDRESS_INDEX], ProvenanceLogRecordsModel, self.decompressor 

1782 ) 

1783 if logs_by_attempt is not None: 

1784 result[node_id_or_index] = [ 

1785 ButlerLogRecords.from_records(attempt_logs) if attempt_logs is not None else None 

1786 for attempt_logs in logs_by_attempt.attempts 

1787 ] 

1788 return result 

1789 

1790 def fetch_metadata(self, nodes: Iterable[uuid.UUID]) -> dict[uuid.UUID, list[TaskMetadata | None]]: 

1791 """Fetch metadata datasets. 

1792 

1793 Parameters 

1794 ---------- 

1795 nodes : `~collections.abc.Iterable` [ `uuid.UUID` ] 

1796 UUIDs of the metadata datasets themselves or of the quanta they 

1797 correspond to. 

1798 

1799 Returns 

1800 ------- 

1801 metadata : `dict` [ `uuid.UUID`, `list` [`.TaskMetadata`] ] 

1802 Metadata for the given IDs. Each value is a list of 

1803 `.TaskMetadata` instances representing different execution 

1804 attempts, ordered chronologically from first to last. Attempts 

1805 where metadata was missing (not written even in the fallback extra 

1806 provenance in the logs) will have `None` in this list. 

1807 """ 

1808 result: dict[uuid.UUID, list[TaskMetadata | None]] = {} 

1809 with MultiblockReader.open_in_zip( 

1810 self.zf, METADATA_MB_NAME, int_size=self.header.int_size 

1811 ) as mb_reader: 

1812 for node_id_or_index in nodes: 

1813 address_row = self.address_reader.find(node_id_or_index) 

1814 metadata_by_attempt = mb_reader.read_model( 

1815 address_row.addresses[METADATA_ADDRESS_INDEX], 

1816 ProvenanceTaskMetadataModel, 

1817 self.decompressor, 

1818 ) 

1819 if metadata_by_attempt is not None: 

1820 result[node_id_or_index] = metadata_by_attempt.attempts 

1821 return result 

1822 

1823 def fetch_packages(self) -> Packages: 

1824 """Fetch package version information.""" 

1825 data = self._read_single_block_raw("packages") 

1826 return Packages.fromBytes(data, format="json") 

1827 

1828 

1829class ProvenanceQuantumGraphWriter: 

1830 """A struct of low-level writer objects for the main components of a 

1831 provenance quantum graph. 

1832 

1833 Parameters 

1834 ---------- 

1835 output_path : `str` 

1836 Path to write the graph to. 

1837 exit_stack : `contextlib.ExitStack` 

1838 Object that can be used to manage multiple context managers. 

1839 log_on_close : `LogOnClose` 

1840 Factory for context managers that log when closed. 

1841 predicted : `.PredictedQuantumGraphComponents` 

1842 Components of the predicted graph. 

1843 zstd_level : `int`, optional 

1844 Compression level. 

1845 cdict_data : `bytes` or `None`, optional 

1846 Bytes representation of the compression dictionary used by the 

1847 compressor. 

1848 loop_wrapper : `~collections.abc.Callable`, optional 

1849 A callable that takes an iterable and returns an equivalent one, to be 

1850 used in all potentially-large loops. This can be used to add progress 

1851 reporting or check for cancelation signals. 

1852 log : `LsstLogAdapter`, optional 

1853 Logger to use for debug messages. 

1854 """ 

1855 

1856 def __init__( 

1857 self, 

1858 output_path: str, 

1859 *, 

1860 exit_stack: ExitStack, 

1861 log_on_close: LogOnClose, 

1862 predicted: PredictedQuantumGraphComponents | PredictedQuantumGraph, 

1863 zstd_level: int = 10, 

1864 cdict_data: bytes | None = None, 

1865 loop_wrapper: LoopWrapper = pass_through, 

1866 log: LsstLogAdapter | None = None, 

1867 ) -> None: 

1868 header = predicted.header.model_copy() 

1869 header.graph_type = "provenance" 

1870 if log is None: 

1871 log = _LOG 

1872 self.log = log 

1873 self._base_writer = exit_stack.enter_context( 

1874 log_on_close.wrap( 

1875 BaseQuantumGraphWriter.open( 

1876 output_path, 

1877 header, 

1878 predicted.pipeline_graph, 

1879 address_filename="nodes", 

1880 zstd_level=zstd_level, 

1881 cdict_data=cdict_data, 

1882 ), 

1883 "Finishing writing provenance quantum graph.", 

1884 ) 

1885 ) 

1886 self._base_writer.address_writer.addresses = [{}, {}, {}, {}] 

1887 self._log_writer = exit_stack.enter_context( 

1888 log_on_close.wrap( 

1889 MultiblockWriter.open_in_zip( 

1890 self._base_writer.zf, LOG_MB_NAME, header.int_size, use_tempfile=True 

1891 ), 

1892 "Copying logs into zip archive.", 

1893 ), 

1894 ) 

1895 self._base_writer.address_writer.addresses[LOG_ADDRESS_INDEX] = self._log_writer.addresses 

1896 self._metadata_writer = exit_stack.enter_context( 

1897 log_on_close.wrap( 

1898 MultiblockWriter.open_in_zip( 

1899 self._base_writer.zf, METADATA_MB_NAME, header.int_size, use_tempfile=True 

1900 ), 

1901 "Copying metadata into zip archive.", 

1902 ) 

1903 ) 

1904 self._base_writer.address_writer.addresses[METADATA_ADDRESS_INDEX] = self._metadata_writer.addresses 

1905 self._dataset_writer = exit_stack.enter_context( 

1906 log_on_close.wrap( 

1907 MultiblockWriter.open_in_zip( 

1908 self._base_writer.zf, DATASET_MB_NAME, header.int_size, use_tempfile=True 

1909 ), 

1910 "Copying dataset provenance into zip archive.", 

1911 ) 

1912 ) 

1913 self._base_writer.address_writer.addresses[DATASET_ADDRESS_INDEX] = self._dataset_writer.addresses 

1914 self._quantum_writer = exit_stack.enter_context( 

1915 log_on_close.wrap( 

1916 MultiblockWriter.open_in_zip( 

1917 self._base_writer.zf, QUANTUM_MB_NAME, header.int_size, use_tempfile=True 

1918 ), 

1919 "Copying quantum provenance into zip archive.", 

1920 ) 

1921 ) 

1922 self._base_writer.address_writer.addresses[QUANTUM_ADDRESS_INDEX] = self._quantum_writer.addresses 

1923 self._init_predicted_quanta(predicted) 

1924 self._populate_xgraph_and_inputs(loop_wrapper) 

1925 self._existing_init_outputs: set[uuid.UUID] = set() 

1926 

1927 def _init_predicted_quanta( 

1928 self, predicted: PredictedQuantumGraph | PredictedQuantumGraphComponents 

1929 ) -> None: 

1930 self._predicted_init_quanta: list[PredictedQuantumDatasetsModel] = [] 

1931 self._predicted_quanta: dict[uuid.UUID, PredictedQuantumDatasetsModel] = {} 

1932 if isinstance(predicted, PredictedQuantumGraph): 

1933 self._predicted_init_quanta.extend(predicted._init_quanta.values()) 

1934 self._predicted_quanta.update(predicted._quantum_datasets) 

1935 else: 

1936 self._predicted_init_quanta.extend(predicted.init_quanta.root) 

1937 self._predicted_quanta.update(predicted.quantum_datasets) 

1938 self._predicted_quanta.update({q.quantum_id: q for q in self._predicted_init_quanta}) 

1939 

1940 def _populate_xgraph_and_inputs(self, loop_wrapper: LoopWrapper = pass_through) -> None: 

1941 self._xgraph = networkx.DiGraph() 

1942 self._overall_inputs: dict[uuid.UUID, PredictedDatasetModel] = {} 

1943 output_dataset_ids: set[uuid.UUID] = set() 

1944 for predicted_quantum in loop_wrapper(self._predicted_quanta.values()): 

1945 if not predicted_quantum.task_label: 

1946 # Skip the 'packages' producer quantum. 

1947 continue 

1948 output_dataset_ids.update(predicted_quantum.iter_output_dataset_ids()) 

1949 for predicted_quantum in loop_wrapper(self._predicted_quanta.values()): 

1950 if not predicted_quantum.task_label: 

1951 # Skip the 'packages' producer quantum. 

1952 continue 

1953 for predicted_input in itertools.chain.from_iterable(predicted_quantum.inputs.values()): 

1954 self._xgraph.add_edge(predicted_input.dataset_id, predicted_quantum.quantum_id) 

1955 if predicted_input.dataset_id not in output_dataset_ids: 

1956 self._overall_inputs.setdefault(predicted_input.dataset_id, predicted_input) 

1957 for predicted_output in itertools.chain.from_iterable(predicted_quantum.outputs.values()): 

1958 self._xgraph.add_edge(predicted_quantum.quantum_id, predicted_output.dataset_id) 

1959 

1960 @property 

1961 def compressor(self) -> Compressor: 

1962 """Object that should be used to compress all JSON blocks.""" 

1963 return self._base_writer.compressor 

1964 

1965 def write_packages(self) -> None: 

1966 """Write package version information to the provenance graph.""" 

1967 packages = Packages.fromSystem(include_all=True) 

1968 data = packages.toBytes("json") 

1969 self._base_writer.write_single_block("packages", data) 

1970 

1971 def write_overall_inputs(self, loop_wrapper: LoopWrapper = pass_through) -> None: 

1972 """Write provenance for overall-input datasets. 

1973 

1974 Parameters 

1975 ---------- 

1976 loop_wrapper : `~collections.abc.Callable`, optional 

1977 A callable that takes an iterable and returns an equivalent one, to 

1978 be used in all potentially-large loops. This can be used to add 

1979 progress reporting or check for cancelation signals. 

1980 """ 

1981 for predicted_input in loop_wrapper(self._overall_inputs.values()): 

1982 if predicted_input.dataset_id not in self._dataset_writer.addresses: 

1983 self._dataset_writer.write_model( 

1984 predicted_input.dataset_id, 

1985 ProvenanceDatasetModel.from_predicted( 

1986 predicted_input, 

1987 producer=None, 

1988 consumers=self._xgraph.successors(predicted_input.dataset_id), 

1989 ), 

1990 self.compressor, 

1991 ) 

1992 del self._overall_inputs 

1993 

1994 def write_init_outputs(self, assume_existence: bool = True) -> None: 

1995 """Write provenance for init-output datasets and init-quanta. 

1996 

1997 Parameters 

1998 ---------- 

1999 assume_existence : `bool`, optional 

2000 If `True`, just assume all init-outputs exist. 

2001 """ 

2002 init_quanta = ProvenanceInitQuantaModel() 

2003 for predicted_init_quantum in self._predicted_init_quanta: 

2004 if not predicted_init_quantum.task_label: 

2005 # Skip the 'packages' producer quantum. 

2006 continue 

2007 for predicted_output in itertools.chain.from_iterable(predicted_init_quantum.outputs.values()): 

2008 provenance_output = ProvenanceDatasetModel.from_predicted( 

2009 predicted_output, 

2010 producer=predicted_init_quantum.quantum_id, 

2011 consumers=self._xgraph.successors(predicted_output.dataset_id), 

2012 ) 

2013 provenance_output.produced = assume_existence or ( 

2014 provenance_output.dataset_id in self._existing_init_outputs 

2015 ) 

2016 self._dataset_writer.write_model( 

2017 provenance_output.dataset_id, provenance_output, self.compressor 

2018 ) 

2019 init_quanta.root.append(ProvenanceInitQuantumModel.from_predicted(predicted_init_quantum)) 

2020 self._base_writer.write_single_model("init_quanta", init_quanta) 

2021 

2022 def write_quantum_provenance( 

2023 self, quantum_id: uuid.UUID, metadata: TaskMetadata | None, logs: ButlerLogRecords | None 

2024 ) -> None: 

2025 """Gather and write provenance for a quantum. 

2026 

2027 Parameters 

2028 ---------- 

2029 quantum_id : `uuid.UUID` 

2030 Unique ID for the quantum. 

2031 metadata : `..TaskMetadata` or `None` 

2032 Task metadata. 

2033 logs : `lsst.daf.butler.logging.ButlerLogRecords` or `None` 

2034 Task logs. 

2035 """ 

2036 predicted_quantum = self._predicted_quanta[quantum_id] 

2037 provenance_models = ProvenanceQuantumScanModels.from_metadata_and_logs( 

2038 predicted_quantum, metadata, logs, incomplete=False 

2039 ) 

2040 scan_data = provenance_models.to_scan_data(predicted_quantum, compressor=self.compressor) 

2041 self.write_scan_data(scan_data) 

2042 

2043 def write_blocked_quantum_provenance(self, quantum_id: uuid.UUID) -> None: 

2044 """Gather and write provenance for a quantum that was blocked by an 

2045 upstream failure. 

2046 

2047 Parameters 

2048 ---------- 

2049 quantum_id : `uuid.UUID` 

2050 Unique ID for the quantum. 

2051 """ 

2052 self.write_scan_data(ProvenanceQuantumScanData.make_blocked(quantum_id)) 

2053 

2054 def write_scan_data(self, scan_data: ProvenanceQuantumScanData) -> None: 

2055 """Write the output of a quantum provenance scan to disk. 

2056 

2057 Parameters 

2058 ---------- 

2059 scan_data : `ProvenanceQuantumScanData` 

2060 Result of a quantum provenance scan. 

2061 """ 

2062 if scan_data.status is ProvenanceQuantumScanStatus.INIT: 

2063 self.log.debug("Handling init-output scan for %s.", scan_data.quantum_id) 

2064 self._existing_init_outputs.update(scan_data.existing_outputs) 

2065 return 

2066 self.log.debug("Handling quantum scan for %s.", scan_data.quantum_id) 

2067 # We shouldn't need this predicted quantum after this method runs; pop 

2068 # from the dict it in the hopes that'll free up some memory when we're 

2069 # done. 

2070 predicted_quantum = self._predicted_quanta.pop(scan_data.quantum_id) 

2071 outputs: dict[uuid.UUID, bytes] = {} 

2072 for predicted_output in itertools.chain.from_iterable(predicted_quantum.outputs.values()): 

2073 provenance_output = ProvenanceDatasetModel.from_predicted( 

2074 predicted_output, 

2075 producer=predicted_quantum.quantum_id, 

2076 consumers=self._xgraph.successors(predicted_output.dataset_id), 

2077 ) 

2078 provenance_output.produced = provenance_output.dataset_id in scan_data.existing_outputs 

2079 outputs[provenance_output.dataset_id] = self.compressor.compress( 

2080 provenance_output.model_dump_json().encode() 

2081 ) 

2082 if not scan_data.quantum: 

2083 scan_data.quantum = ( 

2084 ProvenanceQuantumModel.from_predicted(predicted_quantum).model_dump_json().encode() 

2085 ) 

2086 if scan_data.is_compressed: 

2087 scan_data.quantum = self.compressor.compress(scan_data.quantum) 

2088 if not scan_data.is_compressed: 

2089 scan_data.quantum = self.compressor.compress(scan_data.quantum) 

2090 if scan_data.metadata: 

2091 scan_data.metadata = self.compressor.compress(scan_data.metadata) 

2092 if scan_data.logs: 

2093 scan_data.logs = self.compressor.compress(scan_data.logs) 

2094 self.log.debug("Writing quantum %s.", scan_data.quantum_id) 

2095 self._quantum_writer.write_bytes(scan_data.quantum_id, scan_data.quantum) 

2096 for dataset_id, dataset_data in outputs.items(): 

2097 self._dataset_writer.write_bytes(dataset_id, dataset_data) 

2098 if scan_data.metadata: 

2099 (metadata_output,) = predicted_quantum.outputs[acc.METADATA_OUTPUT_CONNECTION_NAME] 

2100 address = self._metadata_writer.write_bytes(scan_data.quantum_id, scan_data.metadata) 

2101 self._metadata_writer.addresses[metadata_output.dataset_id] = address 

2102 if scan_data.logs: 

2103 (log_output,) = predicted_quantum.outputs[acc.LOG_OUTPUT_CONNECTION_NAME] 

2104 address = self._log_writer.write_bytes(scan_data.quantum_id, scan_data.logs) 

2105 self._log_writer.addresses[log_output.dataset_id] = address 

2106 

2107 

2108class ProvenanceQuantumScanStatus(enum.Enum): 

2109 """Status enum for quantum scanning. 

2110 

2111 Note that this records the status for the *scanning* which is distinct 

2112 from the status of the quantum's execution. 

2113 """ 

2114 

2115 INCOMPLETE = enum.auto() 

2116 """The quantum is not necessarily done running, and cannot be scanned 

2117 conclusively yet. 

2118 """ 

2119 

2120 ABANDONED = enum.auto() 

2121 """The quantum's execution appears to have failed but we cannot rule out 

2122 the possibility that it could be recovered, but we've also waited long 

2123 enough (according to `ScannerTimeConfigDict.retry_timeout`) that it's time 

2124 to stop trying for now. 

2125 

2126 This state means `ProvenanceQuantumScanModels.from_metadata_and_logs` must 

2127 be run again with ``incomplete=False``. 

2128 """ 

2129 

2130 SUCCESSFUL = enum.auto() 

2131 """The quantum was conclusively scanned and was executed successfully, 

2132 unblocking scans for downstream quanta. 

2133 """ 

2134 

2135 FAILED = enum.auto() 

2136 """The quantum was conclusively scanned and failed execution, blocking 

2137 scans for downstream quanta. 

2138 """ 

2139 

2140 BLOCKED = enum.auto() 

2141 """A quantum upstream of this one failed.""" 

2142 

2143 INIT = enum.auto() 

2144 """Init quanta need special handling, because they don't have logs and 

2145 metadata. 

2146 """ 

2147 

2148 

2149@dataclasses.dataclass 

2150class ProvenanceQuantumScanModels: 

2151 """A struct that represents provenance information for a single quantum.""" 

2152 

2153 quantum_id: uuid.UUID 

2154 """Unique ID for the quantum.""" 

2155 

2156 status: ProvenanceQuantumScanStatus = ProvenanceQuantumScanStatus.INCOMPLETE 

2157 """Combined status for the scan and the execution of the quantum.""" 

2158 

2159 attempts: list[ProvenanceQuantumAttemptModel] = dataclasses.field(default_factory=list) 

2160 """Provenance information about each attempt to run the quantum.""" 

2161 

2162 output_existence: dict[uuid.UUID, bool] = dataclasses.field(default_factory=dict) 

2163 """Unique IDs of the output datasets mapped to whether they were actually 

2164 produced. 

2165 """ 

2166 

2167 metadata: ProvenanceTaskMetadataModel = dataclasses.field(default_factory=ProvenanceTaskMetadataModel) 

2168 """Task metadata information for each attempt. 

2169 """ 

2170 

2171 logs: ProvenanceLogRecordsModel = dataclasses.field(default_factory=ProvenanceLogRecordsModel) 

2172 """Log records for each attempt. 

2173 """ 

2174 

2175 @classmethod 

2176 def from_metadata_and_logs( 

2177 cls, 

2178 predicted: PredictedQuantumDatasetsModel, 

2179 metadata: TaskMetadata | None, 

2180 logs: ButlerLogRecords | None, 

2181 *, 

2182 incomplete: bool = False, 

2183 ) -> ProvenanceQuantumScanModels: 

2184 """Construct provenance information from task metadata and logs. 

2185 

2186 Parameters 

2187 ---------- 

2188 predicted : `PredictedQuantumDatasetsModel` 

2189 Information about the predicted quantum. 

2190 metadata : `..TaskMetadata` or `None` 

2191 Task metadata. 

2192 logs : `lsst.daf.butler.logging.ButlerLogRecords` or `None` 

2193 Task logs. 

2194 incomplete : `bool`, optional 

2195 If `True`, treat execution failures as possibly-incomplete quanta 

2196 and do not fully process them; instead just set the status to 

2197 `ProvenanceQuantumScanStatus.ABANDONED` and return. 

2198 

2199 Returns 

2200 ------- 

2201 scan_models : `ProvenanceQuantumScanModels` 

2202 Struct of models that describe quantum provenance. 

2203 

2204 Notes 

2205 ----- 

2206 This method does not necessarily fully populate the `output_existence` 

2207 field; it does what it can given the information in the metadata and 

2208 logs, but the caller is responsible for filling in the existence status 

2209 for any predicted outputs that are not present at all in that `dict`. 

2210 """ 

2211 self = ProvenanceQuantumScanModels(predicted.quantum_id) 

2212 last_attempt = ProvenanceQuantumAttemptModel() 

2213 self._process_logs(predicted, logs, last_attempt, incomplete=incomplete) 

2214 self._process_metadata(predicted, metadata, last_attempt, incomplete=incomplete) 

2215 if self.status is ProvenanceQuantumScanStatus.ABANDONED: 

2216 return self 

2217 self._reconcile_attempts(last_attempt) 

2218 self._extract_output_existence(predicted) 

2219 return self 

2220 

2221 def _process_logs( 

2222 self, 

2223 predicted: PredictedQuantumDatasetsModel, 

2224 logs: ButlerLogRecords | None, 

2225 last_attempt: ProvenanceQuantumAttemptModel, 

2226 *, 

2227 incomplete: bool, 

2228 ) -> None: 

2229 (predicted_log_dataset,) = predicted.outputs[acc.LOG_OUTPUT_CONNECTION_NAME] 

2230 if logs is None: 

2231 self.output_existence[predicted_log_dataset.dataset_id] = False 

2232 if incomplete: 

2233 self.status = ProvenanceQuantumScanStatus.ABANDONED 

2234 else: 

2235 self.status = ProvenanceQuantumScanStatus.FAILED 

2236 else: 

2237 # Set the attempt's run status to FAILED, since the default is 

2238 # UNKNOWN (i.e. logs *and* metadata are missing) and we now know 

2239 # the logs exist. This will usually get replaced by SUCCESSFUL 

2240 # when we look for metadata next. 

2241 last_attempt.status = QuantumAttemptStatus.FAILED 

2242 self.output_existence[predicted_log_dataset.dataset_id] = True 

2243 if logs.extra: 

2244 log_extra = _ExecutionLogRecordsExtra.model_validate(logs.extra) 

2245 self._extract_from_log_extra(log_extra, last_attempt=last_attempt) 

2246 self.logs.attempts.append(list(logs)) 

2247 

2248 def _extract_from_log_extra( 

2249 self, 

2250 log_extra: _ExecutionLogRecordsExtra, 

2251 last_attempt: ProvenanceQuantumAttemptModel | None, 

2252 ) -> None: 

2253 for previous_attempt_log_extra in log_extra.previous_attempts: 

2254 self._extract_from_log_extra( 

2255 previous_attempt_log_extra, 

2256 last_attempt=None, 

2257 ) 

2258 quantum_attempt: ProvenanceQuantumAttemptModel 

2259 if last_attempt is None: 

2260 # This is not the last attempt, so it must be a failure. 

2261 quantum_attempt = ProvenanceQuantumAttemptModel( 

2262 attempt=len(self.attempts), status=QuantumAttemptStatus.FAILED 

2263 ) 

2264 # We also need to get the logs from this extra provenance, since 

2265 # they won't be the main section of the log records. 

2266 self.logs.attempts.append(log_extra.logs) 

2267 # The special last attempt is only appended after we attempt to 

2268 # read metadata later, but we have to append this one now. 

2269 self.attempts.append(quantum_attempt) 

2270 else: 

2271 assert not log_extra.logs, "Logs for the last attempt should not be stored in the extra JSON." 

2272 quantum_attempt = last_attempt 

2273 if log_extra.exception is not None or log_extra.metadata is not None or last_attempt is None: 

2274 # We won't be getting a separate metadata dataset, so anything we 

2275 # might get from the metadata has to come from this extra 

2276 # provenance in the logs. 

2277 quantum_attempt.exception = log_extra.exception 

2278 if log_extra.metadata is not None: 

2279 quantum_attempt.resource_usage = QuantumResourceUsage.from_task_metadata(log_extra.metadata) 

2280 self.metadata.attempts.append(log_extra.metadata) 

2281 else: 

2282 self.metadata.attempts.append(None) 

2283 # Regardless of whether this is the last attempt or not, we can only 

2284 # get the previous_process_quanta from the log extra. 

2285 quantum_attempt.previous_process_quanta.extend(log_extra.previous_process_quanta) 

2286 

2287 def _process_metadata( 

2288 self, 

2289 predicted: PredictedQuantumDatasetsModel, 

2290 metadata: TaskMetadata | None, 

2291 last_attempt: ProvenanceQuantumAttemptModel, 

2292 *, 

2293 incomplete: bool, 

2294 ) -> None: 

2295 (predicted_metadata_dataset,) = predicted.outputs[acc.METADATA_OUTPUT_CONNECTION_NAME] 

2296 if metadata is None: 

2297 self.output_existence[predicted_metadata_dataset.dataset_id] = False 

2298 if incomplete: 

2299 self.status = ProvenanceQuantumScanStatus.ABANDONED 

2300 else: 

2301 self.status = ProvenanceQuantumScanStatus.FAILED 

2302 else: 

2303 self.status = ProvenanceQuantumScanStatus.SUCCESSFUL 

2304 self.output_existence[predicted_metadata_dataset.dataset_id] = True 

2305 last_attempt.status = QuantumAttemptStatus.SUCCESSFUL 

2306 try: 

2307 # Int conversion guards against spurious conversion to 

2308 # float that can apparently sometimes happen in 

2309 # TaskMetadata. 

2310 last_attempt.caveats = QuantumSuccessCaveats(int(metadata["quantum"]["caveats"])) 

2311 except LookupError: 

2312 pass 

2313 try: 

2314 last_attempt.exception = ExceptionInfo._from_metadata( 

2315 metadata[predicted.task_label]["failure"] 

2316 ) 

2317 except LookupError: 

2318 pass 

2319 last_attempt.resource_usage = QuantumResourceUsage.from_task_metadata(metadata) 

2320 self.metadata.attempts.append(metadata) 

2321 

2322 def _reconcile_attempts(self, last_attempt: ProvenanceQuantumAttemptModel) -> None: 

2323 last_attempt.attempt = len(self.attempts) 

2324 self.attempts.append(last_attempt) 

2325 assert self.status is not ProvenanceQuantumScanStatus.INCOMPLETE 

2326 assert self.status is not ProvenanceQuantumScanStatus.ABANDONED 

2327 if len(self.logs.attempts) < len(self.attempts): 

2328 # Logs were not found for this attempt; must have been a hard error 

2329 # that kept the `finally` block from running or otherwise 

2330 # interrupted the writing of the logs. 

2331 self.logs.attempts.append(None) 

2332 if self.status is ProvenanceQuantumScanStatus.SUCCESSFUL: 

2333 # But we found the metadata! Either that hard error happened 

2334 # at a very unlucky time (in between those two writes), or 

2335 # something even weirder happened. 

2336 self.attempts[-1].status = QuantumAttemptStatus.ABORTED_SUCCESS 

2337 else: 

2338 self.attempts[-1].status = QuantumAttemptStatus.FAILED 

2339 if len(self.metadata.attempts) < len(self.attempts): 

2340 # Metadata missing usually just means a failure. In any case, the 

2341 # status will already be correct, either because it was set to a 

2342 # failure when we read the logs, or left at UNKNOWN if there were 

2343 # no logs. Note that scanners never process BLOCKED quanta at all. 

2344 self.metadata.attempts.append(None) 

2345 assert len(self.logs.attempts) == len(self.attempts) or len(self.metadata.attempts) == len( 

2346 self.attempts 

2347 ), ( 

2348 "The only way we can add more than one quantum attempt is by " 

2349 "extracting info stored with the logs, and that always appends " 

2350 "a log attempt and a metadata attempt, so this must be a bug in " 

2351 "this class." 

2352 ) 

2353 

2354 def _extract_output_existence(self, predicted: PredictedQuantumDatasetsModel) -> None: 

2355 try: 

2356 outputs_put = self.metadata.attempts[-1]["quantum"].getArray("outputs") # type: ignore[index] 

2357 except ( 

2358 IndexError, # metadata.attempts is empty 

2359 TypeError, # metadata.attempts[-1] is None 

2360 LookupError, # no 'quantum' entry in metadata or 'outputs' in that 

2361 ): 

2362 pass 

2363 else: 

2364 for id_str in ensure_iterable(outputs_put): 

2365 self.output_existence[uuid.UUID(id_str)] = True 

2366 # If the metadata told us what it wrote, anything not in that 

2367 # list was not written. 

2368 for predicted_output in itertools.chain.from_iterable(predicted.outputs.values()): 

2369 self.output_existence.setdefault(predicted_output.dataset_id, False) 

2370 

2371 def to_scan_data( 

2372 self: ProvenanceQuantumScanModels, 

2373 predicted_quantum: PredictedQuantumDatasetsModel, 

2374 compressor: Compressor | None = None, 

2375 ) -> ProvenanceQuantumScanData: 

2376 """Convert these models to JSON data. 

2377 

2378 Parameters 

2379 ---------- 

2380 predicted_quantum : `PredictedQuantumDatasetsModel` 

2381 Information about the predicted quantum. 

2382 compressor : `Compressor` 

2383 Object that can compress bytes. 

2384 

2385 Returns 

2386 ------- 

2387 scan_data : `ProvenanceQuantumScanData` 

2388 Scan information ready for serialization. 

2389 """ 

2390 quantum: ProvenanceInitQuantumModel | ProvenanceQuantumModel 

2391 if self.status is ProvenanceQuantumScanStatus.INIT: 

2392 quantum = ProvenanceInitQuantumModel.from_predicted(predicted_quantum) 

2393 else: 

2394 quantum = ProvenanceQuantumModel.from_predicted(predicted_quantum) 

2395 quantum.attempts = self.attempts 

2396 for predicted_output in itertools.chain.from_iterable(predicted_quantum.outputs.values()): 

2397 if predicted_output.dataset_id not in self.output_existence: 

2398 raise RuntimeError( 

2399 "Logic bug in provenance gathering or execution invariants: " 

2400 f"no existence information for output {predicted_output.dataset_id} " 

2401 f"({predicted_output.dataset_type_name}@{predicted_output.data_coordinate})." 

2402 ) 

2403 data = ProvenanceQuantumScanData( 

2404 self.quantum_id, 

2405 self.status, 

2406 existing_outputs={ 

2407 dataset_id for dataset_id, was_produced in self.output_existence.items() if was_produced 

2408 }, 

2409 quantum=quantum.model_dump_json().encode(), 

2410 logs=self.logs.model_dump_json().encode() if self.logs.attempts else b"", 

2411 metadata=self.metadata.model_dump_json().encode() if self.metadata.attempts else b"", 

2412 ) 

2413 if compressor is not None: 

2414 data.compress(compressor) 

2415 return data 

2416 

2417 

2418@dataclasses.dataclass 

2419class ProvenanceQuantumScanData: 

2420 """A struct that represents ready-for-serialization provenance information 

2421 for a single quantum. 

2422 """ 

2423 

2424 quantum_id: uuid.UUID 

2425 """Unique ID for the quantum.""" 

2426 

2427 status: ProvenanceQuantumScanStatus 

2428 """Combined status for the scan and the execution of the quantum.""" 

2429 

2430 existing_outputs: set[uuid.UUID] = dataclasses.field(default_factory=set) 

2431 """Unique IDs of the output datasets that were actually written.""" 

2432 

2433 quantum: bytes = b"" 

2434 """Serialized quantum provenance model. 

2435 

2436 This may be empty for quanta that had no attempts. 

2437 """ 

2438 

2439 metadata: bytes = b"" 

2440 """Serialized task metadata.""" 

2441 

2442 logs: bytes = b"" 

2443 """Serialized logs.""" 

2444 

2445 is_compressed: bool = False 

2446 """Whether the ``quantum``, ``metadata``, and ``log`` attributes are 

2447 compressed. 

2448 """ 

2449 

2450 @classmethod 

2451 def make_blocked(cls, quantum_id: uuid.UUID) -> ProvenanceQuantumScanData: 

2452 """Construct provenance information for a quantum blocked by an 

2453 upstream failure. 

2454 

2455 Parameters 

2456 ---------- 

2457 quantum_id : `uuid.UUID` 

2458 Unique ID of the quantum 

2459 

2460 Returns 

2461 ------- 

2462 scan_data : `ProvenanceQuantumScanData` 

2463 Struct with ready-to-write provenance data. 

2464 """ 

2465 return ProvenanceQuantumScanData( 

2466 quantum_id, 

2467 status=ProvenanceQuantumScanStatus.BLOCKED, 

2468 is_compressed=True, # nothing to compress 

2469 ) 

2470 

2471 def compress(self, compressor: Compressor) -> None: 

2472 """Compress the data in this struct if it has not been compressed 

2473 already. 

2474 

2475 Parameters 

2476 ---------- 

2477 compressor : `Compressor` 

2478 Object with a ``compress`` method that takes and returns `bytes`. 

2479 """ 

2480 if not self.is_compressed: 

2481 self.quantum = compressor.compress(self.quantum) 

2482 self.logs = compressor.compress(self.logs) if self.logs else b"" 

2483 self.metadata = compressor.compress(self.metadata) if self.metadata else b"" 

2484 self.is_compressed = True