Coverage for python / lsst / daf / butler / _quantum_backed.py: 28%

188 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-26 08:49 +0000

1# This file is part of daf_butler. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28from __future__ import annotations 

29 

30__all__ = ("QuantumBackedButler", "QuantumProvenanceData") 

31 

32import itertools 

33import logging 

34import uuid 

35from collections import defaultdict 

36from collections.abc import Iterable, Mapping 

37from typing import TYPE_CHECKING, Any 

38 

39import pydantic 

40 

41from lsst.resources import ResourcePath, ResourcePathExpression 

42 

43from ._butler_config import ButlerConfig 

44from ._butler_metrics import ButlerMetrics 

45from ._config import Config 

46from ._dataset_provenance import DatasetProvenance 

47from ._dataset_ref import DatasetId, DatasetRef 

48from ._dataset_type import DatasetType 

49from ._deferredDatasetHandle import DeferredDatasetHandle 

50from ._limited_butler import LimitedButler 

51from ._quantum import Quantum 

52from ._standalone_datastore import instantiate_standalone_datastore 

53from ._storage_class import StorageClass, StorageClassFactory 

54from .datastore import Datastore 

55from .datastore.record_data import DatastoreRecordData, SerializedDatastoreRecordData 

56from .datastores.file_datastore.retrieve_artifacts import retrieve_and_zip 

57from .dimensions import DimensionUniverse 

58from .registry.interfaces import Database, DatastoreRegistryBridgeManager, OpaqueTableStorageManager 

59 

60if TYPE_CHECKING: 

61 from ._butler import Butler 

62 

63_LOG = logging.getLogger(__name__) 

64 

65 

66class QuantumBackedButler(LimitedButler): 

67 """An implementation of `LimitedButler` intended to back execution of a 

68 single `Quantum`. 

69 

70 Parameters 

71 ---------- 

72 predicted_inputs : `~collections.abc.Iterable` [`DatasetId`] 

73 Dataset IDs for datasets that can can be read from this butler. 

74 predicted_outputs : `~collections.abc.Iterable` [`DatasetId`] 

75 Dataset IDs for datasets that can be stored in this butler. 

76 dimensions : `DimensionUniverse` 

77 Object managing all dimension definitions. 

78 datastore : `Datastore` 

79 Datastore to use for all dataset I/O and existence checks. 

80 storageClasses : `StorageClassFactory` 

81 Object managing all storage class definitions. 

82 dataset_types : `~collections.abc.Mapping` [`str`, `DatasetType`] 

83 The registry dataset type definitions, indexed by name. 

84 metrics : `lsst.daf.butler.ButlerMetrics` or `None`, optional 

85 Metrics object for tracking butler statistics. 

86 database : `Database`, optional 

87 Database instance used by datastore. Not required -- only provided 

88 to allow database connections to be closed during cleanup. 

89 

90 Notes 

91 ----- 

92 Most callers should use the `initialize` `classmethod` to construct new 

93 instances instead of calling the constructor directly. 

94 

95 `QuantumBackedButler` uses a SQLite database internally, in order to reuse 

96 existing `DatastoreRegistryBridge` and `OpaqueTableStorage` 

97 implementations that rely SQLAlchemy. If implementations are added in the 

98 future that don't rely on SQLAlchemy, it should be possible to swap them 

99 in by overriding the type arguments to `initialize` (though at present, 

100 `QuantumBackedButler` would still create at least an in-memory SQLite 

101 database that would then go unused).` 

102 

103 We imagine `QuantumBackedButler` being used during (at least) batch 

104 execution to capture `Datastore` records and save them to per-quantum 

105 files, which are also a convenient place to store provenance for eventual 

106 upload to a SQL-backed `Registry` (once `Registry` has tables to store 

107 provenance, that is). 

108 These per-quantum files can be written in two ways: 

109 

110 - The SQLite file used internally by `QuantumBackedButler` can be used 

111 directly but customizing the ``filename`` argument to ``initialize``, and 

112 then transferring that file to the object store after execution completes 

113 (or fails; a ``try/finally`` pattern probably makes sense here). 

114 

115 - A JSON or YAML file can be written by calling `extract_provenance_data`, 

116 and using ``pydantic`` methods to write the returned 

117 `QuantumProvenanceData` to a file. 

118 

119 Note that at present, the SQLite file only contains datastore records, not 

120 provenance, but that should be easy to address (if desired) after we 

121 actually design a `Registry` schema for provenance. I also suspect that 

122 we'll want to explicitly close the SQLite file somehow before trying to 

123 transfer it. But I'm guessing we'd prefer to write the per-quantum files 

124 as JSON anyway. 

125 """ 

126 

127 def __init__( 

128 self, 

129 predicted_inputs: Iterable[DatasetId], 

130 predicted_outputs: Iterable[DatasetId], 

131 dimensions: DimensionUniverse, 

132 datastore: Datastore, 

133 storageClasses: StorageClassFactory, 

134 dataset_types: Mapping[str, DatasetType] | None = None, 

135 metrics: ButlerMetrics | None = None, 

136 database: Database | None = None, 

137 ): 

138 self._dimensions = dimensions 

139 self._predicted_inputs = set(predicted_inputs) 

140 self._predicted_outputs = set(predicted_outputs) 

141 self._available_inputs: set[DatasetId] = set() 

142 self._unavailable_inputs: set[DatasetId] = set() 

143 self._actual_inputs: set[DatasetId] = set() 

144 self._actual_output_refs: set[DatasetRef] = set() 

145 self._datastore = datastore 

146 self.storageClasses = storageClasses 

147 self._dataset_types: Mapping[str, DatasetType] = {} 

148 self._metrics = metrics if metrics is not None else ButlerMetrics() 

149 self._database = database 

150 if dataset_types is not None: 

151 self._dataset_types = dataset_types 

152 self._datastore.set_retrieve_dataset_type_method(self._retrieve_dataset_type) 

153 

154 @classmethod 

155 def initialize( 

156 cls, 

157 config: Config | ResourcePathExpression, 

158 quantum: Quantum, 

159 dimensions: DimensionUniverse, 

160 filename: str | None = None, 

161 OpaqueManagerClass: type[OpaqueTableStorageManager] | None = None, 

162 BridgeManagerClass: type[DatastoreRegistryBridgeManager] | None = None, 

163 search_paths: list[str] | None = None, 

164 dataset_types: Mapping[str, DatasetType] | None = None, 

165 metrics: ButlerMetrics | None = None, 

166 ) -> QuantumBackedButler: 

167 """Construct a new `QuantumBackedButler` from repository configuration 

168 and helper types. 

169 

170 Parameters 

171 ---------- 

172 config : `Config` or `~lsst.resources.ResourcePathExpression` 

173 A butler repository root, configuration filename, or configuration 

174 instance. 

175 quantum : `Quantum` 

176 Object describing the predicted input and output dataset relevant 

177 to this butler. This must have resolved `DatasetRef` instances for 

178 all inputs and outputs. 

179 dimensions : `DimensionUniverse` 

180 Object managing all dimension definitions. 

181 filename : `str`, optional 

182 Name for the SQLite database that will back this butler; defaults 

183 to an in-memory database. 

184 OpaqueManagerClass : `type`, optional 

185 A subclass of `OpaqueTableStorageManager` to use for datastore 

186 opaque records. Default is a SQL-backed implementation. 

187 BridgeManagerClass : `type`, optional 

188 A subclass of `DatastoreRegistryBridgeManager` to use for datastore 

189 location records. Default is a SQL-backed implementation. 

190 search_paths : `list` of `str`, optional 

191 Additional search paths for butler configuration. 

192 dataset_types : `~collections.abc.Mapping` [`str`, `DatasetType`], \ 

193 optional 

194 Mapping of the dataset type name to its registry definition. 

195 metrics : `lsst.daf.butler.ButlerMetrics` or `None`, optional 

196 Metrics object for gathering butler statistics. 

197 """ 

198 predicted_inputs = [ref.id for ref in itertools.chain.from_iterable(quantum.inputs.values())] 

199 predicted_inputs += [ref.id for ref in quantum.initInputs.values()] 

200 predicted_outputs = [ref.id for ref in itertools.chain.from_iterable(quantum.outputs.values())] 

201 return cls._initialize( 

202 config=config, 

203 predicted_inputs=predicted_inputs, 

204 predicted_outputs=predicted_outputs, 

205 dimensions=dimensions, 

206 filename=filename, 

207 datastore_records=quantum.datastore_records, 

208 OpaqueManagerClass=OpaqueManagerClass, 

209 BridgeManagerClass=BridgeManagerClass, 

210 search_paths=search_paths, 

211 dataset_types=dataset_types, 

212 metrics=metrics, 

213 ) 

214 

215 @classmethod 

216 def from_predicted( 

217 cls, 

218 config: Config | ResourcePathExpression, 

219 predicted_inputs: Iterable[DatasetId], 

220 predicted_outputs: Iterable[DatasetId], 

221 dimensions: DimensionUniverse, 

222 datastore_records: Mapping[str, DatastoreRecordData], 

223 filename: str | None = None, 

224 OpaqueManagerClass: type[OpaqueTableStorageManager] | None = None, 

225 BridgeManagerClass: type[DatastoreRegistryBridgeManager] | None = None, 

226 search_paths: list[str] | None = None, 

227 dataset_types: Mapping[str, DatasetType] | None = None, 

228 metrics: ButlerMetrics | None = None, 

229 ) -> QuantumBackedButler: 

230 """Construct a new `QuantumBackedButler` from sets of input and output 

231 dataset IDs. 

232 

233 Parameters 

234 ---------- 

235 config : `Config` or `~lsst.resources.ResourcePathExpression` 

236 A butler repository root, configuration filename, or configuration 

237 instance. 

238 predicted_inputs : `~collections.abc.Iterable` [`DatasetId`] 

239 Dataset IDs for datasets that can can be read from this butler. 

240 predicted_outputs : `~collections.abc.Iterable` [`DatasetId`] 

241 Dataset IDs for datasets that can be stored in this butler, must be 

242 fully resolved. 

243 dimensions : `DimensionUniverse` 

244 Object managing all dimension definitions. 

245 datastore_records : `dict` [`str`, `DatastoreRecordData`] or `None` 

246 Datastore records to import into a datastore. 

247 filename : `str`, optional 

248 Name for the SQLite database that will back this butler; defaults 

249 to an in-memory database. 

250 OpaqueManagerClass : `type`, optional 

251 A subclass of `OpaqueTableStorageManager` to use for datastore 

252 opaque records. Default is a SQL-backed implementation. 

253 BridgeManagerClass : `type`, optional 

254 A subclass of `DatastoreRegistryBridgeManager` to use for datastore 

255 location records. Default is a SQL-backed implementation. 

256 search_paths : `list` of `str`, optional 

257 Additional search paths for butler configuration. 

258 dataset_types : `~collections.abc.Mapping` [`str`, `DatasetType`], \ 

259 optional 

260 Mapping of the dataset type name to its registry definition. 

261 metrics : `lsst.daf.butler.ButlerMetrics` or `None`, optional 

262 Metrics object for gathering butler statistics. 

263 """ 

264 return cls._initialize( 

265 config=config, 

266 predicted_inputs=predicted_inputs, 

267 predicted_outputs=predicted_outputs, 

268 dimensions=dimensions, 

269 filename=filename, 

270 datastore_records=datastore_records, 

271 OpaqueManagerClass=OpaqueManagerClass, 

272 BridgeManagerClass=BridgeManagerClass, 

273 search_paths=search_paths, 

274 dataset_types=dataset_types, 

275 metrics=metrics, 

276 ) 

277 

278 @classmethod 

279 def _initialize( 

280 cls, 

281 *, 

282 config: Config | ResourcePathExpression, 

283 predicted_inputs: Iterable[DatasetId], 

284 predicted_outputs: Iterable[DatasetId], 

285 dimensions: DimensionUniverse, 

286 filename: str | None = None, 

287 datastore_records: Mapping[str, DatastoreRecordData] | None = None, 

288 OpaqueManagerClass: type[OpaqueTableStorageManager] | None = None, 

289 BridgeManagerClass: type[DatastoreRegistryBridgeManager] | None = None, 

290 search_paths: list[str] | None = None, 

291 dataset_types: Mapping[str, DatasetType] | None = None, 

292 metrics: ButlerMetrics | None = None, 

293 ) -> QuantumBackedButler: 

294 """Initialize quantum-backed butler. 

295 

296 Internal method with common implementation used by `initialize` and 

297 `for_output`. 

298 

299 Parameters 

300 ---------- 

301 config : `Config` or `~lsst.resources.ResourcePathExpression` 

302 A butler repository root, configuration filename, or configuration 

303 instance. 

304 predicted_inputs : `~collections.abc.Iterable` [`DatasetId`] 

305 Dataset IDs for datasets that can can be read from this butler. 

306 predicted_outputs : `~collections.abc.Iterable` [`DatasetId`] 

307 Dataset IDs for datasets that can be stored in this butler. 

308 dimensions : `DimensionUniverse` 

309 Object managing all dimension definitions. 

310 filename : `str`, optional 

311 Name for the SQLite database that will back this butler; defaults 

312 to an in-memory database. 

313 datastore_records : `dict` [`str`, `DatastoreRecordData`] or `None` 

314 Datastore records to import into a datastore. 

315 OpaqueManagerClass : `type`, optional 

316 A subclass of `OpaqueTableStorageManager` to use for datastore 

317 opaque records. Default is a SQL-backed implementation. 

318 BridgeManagerClass : `type`, optional 

319 A subclass of `DatastoreRegistryBridgeManager` to use for datastore 

320 location records. Default is a SQL-backed implementation. 

321 search_paths : `list` of `str`, optional 

322 Additional search paths for butler configuration. 

323 dataset_types : `~collections.abc.Mapping` [`str`, `DatasetType`] 

324 Mapping of the dataset type name to its registry definition. 

325 metrics : `lsst.daf.butler.ButlerMetrics` or `None`, optional 

326 Metrics object for gathering butler statistics. 

327 """ 

328 butler_config = ButlerConfig(config, searchPaths=search_paths) 

329 datastore, database = instantiate_standalone_datastore( 

330 butler_config, dimensions, filename, OpaqueManagerClass, BridgeManagerClass 

331 ) 

332 

333 # TODO: We need to inform `Datastore` here that it needs to support 

334 # predictive reads; This only really works for file datastore but 

335 # we need to try everything in case there is a chained datastore. 

336 datastore._set_trust_mode(True) 

337 

338 if datastore_records is not None: 

339 datastore.import_records(datastore_records) 

340 storageClasses = StorageClassFactory() 

341 storageClasses.addFromConfig(butler_config) 

342 return cls( 

343 predicted_inputs, 

344 predicted_outputs, 

345 dimensions, 

346 datastore, 

347 storageClasses=storageClasses, 

348 dataset_types=dataset_types, 

349 metrics=metrics, 

350 database=database, 

351 ) 

352 

353 def close(self) -> None: 

354 if self._database is not None: 

355 self._database.dispose() 

356 

357 def _retrieve_dataset_type(self, name: str) -> DatasetType | None: 

358 """Return DatasetType defined in registry given dataset type name.""" 

359 return self._dataset_types.get(name) 

360 

361 def isWriteable(self) -> bool: 

362 # Docstring inherited. 

363 return True 

364 

365 def get( 

366 self, 

367 ref: DatasetRef, 

368 /, 

369 *, 

370 parameters: dict[str, Any] | None = None, 

371 storageClass: StorageClass | str | None = None, 

372 ) -> Any: 

373 try: 

374 obj = super().get( 

375 ref, 

376 parameters=parameters, 

377 storageClass=storageClass, 

378 ) 

379 except (LookupError, FileNotFoundError, OSError): 

380 self._unavailable_inputs.add(ref.id) 

381 raise 

382 if ref.id in self._predicted_inputs: 

383 # do this after delegating to super in case that raises. 

384 self._actual_inputs.add(ref.id) 

385 self._available_inputs.add(ref.id) 

386 return obj 

387 

388 def getDeferred( 

389 self, 

390 ref: DatasetRef, 

391 /, 

392 *, 

393 parameters: dict[str, Any] | None = None, 

394 storageClass: str | StorageClass | None = None, 

395 ) -> DeferredDatasetHandle: 

396 if ref.id in self._predicted_inputs: 

397 # Unfortunately, we can't do this after the handle succeeds in 

398 # loading, so it's conceivable here that we're marking an input 

399 # as "actual" even when it's not even available. 

400 self._actual_inputs.add(ref.id) 

401 return super().getDeferred(ref, parameters=parameters, storageClass=storageClass) 

402 

403 def stored(self, ref: DatasetRef) -> bool: 

404 # Docstring inherited. 

405 stored = super().stored(ref) 

406 if ref.id in self._predicted_inputs: 

407 if stored: 

408 self._available_inputs.add(ref.id) 

409 else: 

410 self._unavailable_inputs.add(ref.id) 

411 return stored 

412 

413 def stored_many( 

414 self, 

415 refs: Iterable[DatasetRef], 

416 ) -> dict[DatasetRef, bool]: 

417 # Docstring inherited. 

418 existence = super().stored_many(refs) 

419 

420 for ref, stored in existence.items(): 

421 if ref.id in self._predicted_inputs: 

422 if stored: 

423 self._available_inputs.add(ref.id) 

424 else: 

425 self._unavailable_inputs.add(ref.id) 

426 return existence 

427 

428 def markInputUnused(self, ref: DatasetRef) -> None: 

429 # Docstring inherited. 

430 self._actual_inputs.discard(ref.id) 

431 

432 @property 

433 def dimensions(self) -> DimensionUniverse: 

434 # Docstring inherited. 

435 return self._dimensions 

436 

437 def put(self, obj: Any, ref: DatasetRef, /, *, provenance: DatasetProvenance | None = None) -> DatasetRef: 

438 # Docstring inherited. 

439 if ref.id not in self._predicted_outputs: 

440 raise RuntimeError("Cannot `put` dataset that was not predicted as an output.") 

441 with self._metrics.instrument_put(log=_LOG, msg="Put QBB dataset"): 

442 self._datastore.put(obj, ref, provenance=provenance) 

443 self._actual_output_refs.add(ref) 

444 return ref 

445 

446 def pruneDatasets( 

447 self, 

448 refs: Iterable[DatasetRef], 

449 *, 

450 disassociate: bool = True, 

451 unstore: bool = False, 

452 tags: Iterable[str] = (), 

453 purge: bool = False, 

454 ) -> None: 

455 # docstring inherited from LimitedButler 

456 

457 if purge: 

458 if not disassociate: 

459 raise TypeError("Cannot pass purge=True without disassociate=True.") 

460 if not unstore: 

461 raise TypeError("Cannot pass purge=True without unstore=True.") 

462 elif disassociate: 

463 # No tagged collections for this butler. 

464 raise TypeError("Cannot pass disassociate=True without purge=True.") 

465 

466 refs = list(refs) 

467 

468 # Pruning a component of a DatasetRef makes no sense. 

469 for ref in refs: 

470 if ref.datasetType.component(): 

471 raise ValueError(f"Can not prune a component of a dataset (ref={ref})") 

472 

473 if unstore: 

474 self._datastore.trash(refs) 

475 if purge: 

476 for ref in refs: 

477 # We only care about removing them from actual output refs, 

478 self._actual_output_refs.discard(ref) 

479 

480 if unstore: 

481 # Point of no return for removing artifacts. Only try to remove 

482 # refs associated with this pruning. 

483 self._datastore.emptyTrash(refs=refs) 

484 

485 def retrieve_artifacts_zip( 

486 self, 

487 refs: Iterable[DatasetRef], 

488 destination: ResourcePathExpression, 

489 overwrite: bool = True, 

490 ) -> ResourcePath: 

491 """Retrieve artifacts from the graph and place in ZIP file. 

492 

493 Parameters 

494 ---------- 

495 refs : `~collections.abc.Iterable` [ `DatasetRef` ] 

496 The datasets to be included in the zip file. 

497 destination : `lsst.resources.ResourcePathExpression` 

498 Directory to write the new ZIP file. This directory will 

499 also be used as a staging area for the datasets being downloaded 

500 from the datastore. 

501 overwrite : `bool`, optional 

502 If `False` the output Zip will not be written if a file of the 

503 same name is already present in ``destination``. 

504 

505 Returns 

506 ------- 

507 zip_file : `lsst.resources.ResourcePath` 

508 The path to the new ZIP file. 

509 

510 Raises 

511 ------ 

512 ValueError 

513 Raised if there are no refs to retrieve. 

514 """ 

515 return retrieve_and_zip(refs, destination, self._datastore.retrieveArtifacts, overwrite) 

516 

517 def retrieve_artifacts( 

518 self, 

519 refs: Iterable[DatasetRef], 

520 destination: ResourcePathExpression, 

521 transfer: str = "auto", 

522 preserve_path: bool = True, 

523 overwrite: bool = False, 

524 ) -> list[ResourcePath]: 

525 """Retrieve the artifacts associated with the supplied refs. 

526 

527 Parameters 

528 ---------- 

529 refs : `~collections.abc.Iterable` of `DatasetRef` 

530 The datasets for which artifacts are to be retrieved. 

531 A single ref can result in multiple artifacts. The refs must 

532 be resolved. 

533 destination : `lsst.resources.ResourcePath` or `str` 

534 Location to write the artifacts. 

535 transfer : `str`, optional 

536 Method to use to transfer the artifacts. Must be one of the options 

537 supported by `~lsst.resources.ResourcePath.transfer_from`. 

538 "move" is not allowed. 

539 preserve_path : `bool`, optional 

540 If `True` the full path of the artifact within the datastore 

541 is preserved. If `False` the final file component of the path 

542 is used. 

543 overwrite : `bool`, optional 

544 If `True` allow transfers to overwrite existing files at the 

545 destination. 

546 

547 Returns 

548 ------- 

549 targets : `list` of `lsst.resources.ResourcePath` 

550 URIs of file artifacts in destination location. Order is not 

551 preserved. 

552 """ 

553 outdir = ResourcePath(destination) 

554 artifact_map = self._datastore.retrieveArtifacts( 

555 refs, 

556 outdir, 

557 transfer=transfer, 

558 preserve_path=preserve_path, 

559 overwrite=overwrite, 

560 write_index=True, 

561 ) 

562 return list(artifact_map) 

563 

564 def extract_provenance_data(self) -> QuantumProvenanceData: 

565 """Extract provenance information and datastore records from this 

566 butler. 

567 

568 Returns 

569 ------- 

570 provenance : `QuantumProvenanceData` 

571 A serializable struct containing input/output dataset IDs and 

572 datastore records. This assumes all dataset IDs are UUIDs (just to 

573 make it easier for `pydantic` to reason about the struct's types); 

574 the rest of this class makes no such assumption, but the approach 

575 to processing in which it's useful effectively requires UUIDs 

576 anyway. 

577 

578 Notes 

579 ----- 

580 `QuantumBackedButler` records this provenance information when its 

581 methods are used, which mostly saves `~lsst.pipe.base.PipelineTask` 

582 authors from having to worry about while still recording very 

583 detailed information. But it has two small weaknesses: 

584 

585 - Calling `getDeferred` or `get` is enough to mark a 

586 dataset as an "actual input", which may mark some datasets that 

587 aren't actually used. We rely on task authors to use 

588 `markInputUnused` to address this. 

589 

590 - We assume that the execution system will call ``stored`` 

591 on all predicted inputs prior to execution, in order to populate the 

592 "available inputs" set. This is what I envision 

593 '`~lsst.ctrl.mpexec.SingleQuantumExecutor` doing after we update it 

594 to use this class, but it feels fragile for this class to make such 

595 a strong assumption about how it will be used, even if I can't think 

596 of any other executor behavior that would make sense. 

597 """ 

598 if not self._actual_inputs.isdisjoint(self._unavailable_inputs): 

599 _LOG.warning( 

600 "Inputs %s were marked as actually used (probably because a DeferredDatasetHandle) " 

601 "was obtained, but did not actually exist. This task should be be using markInputUnused " 

602 "directly to clarify its provenance.", 

603 self._actual_inputs & self._unavailable_inputs, 

604 ) 

605 self._actual_inputs -= self._unavailable_inputs 

606 checked_inputs = self._available_inputs | self._unavailable_inputs 

607 if self._predicted_inputs != checked_inputs: 

608 _LOG.warning( 

609 "Execution harness did not check predicted inputs %s for existence; available inputs " 

610 "recorded in provenance may be incomplete.", 

611 self._predicted_inputs - checked_inputs, 

612 ) 

613 datastore_records = self._datastore.export_records(self._actual_output_refs) 

614 provenance_records = { 

615 datastore_name: records.to_simple() for datastore_name, records in datastore_records.items() 

616 } 

617 

618 return QuantumProvenanceData( 

619 predicted_inputs=self._predicted_inputs, 

620 available_inputs=self._available_inputs, 

621 actual_inputs=self._actual_inputs, 

622 predicted_outputs=self._predicted_outputs, 

623 actual_outputs={ref.id for ref in self._actual_output_refs}, 

624 datastore_records=provenance_records, 

625 ) 

626 

627 def export_predicted_datastore_records( 

628 self, refs: Iterable[DatasetRef] 

629 ) -> dict[str, DatastoreRecordData]: 

630 """Export datastore records for a set of predicted output dataset 

631 references. 

632 

633 Parameters 

634 ---------- 

635 refs : `~collections.abc.Iterable` [ `DatasetRef` ] 

636 Dataset references for which to export datastore records. These 

637 refs must be known to this butler. 

638 

639 Returns 

640 ------- 

641 records : `dict` [ `str`, `DatastoreRecordData` ] 

642 Predicted datastore records indexed by datastore name. No attempt 

643 is made to ensure that the associated datasets exist on disk. 

644 """ 

645 unknowns = [ 

646 ref 

647 for ref in refs 

648 if (ref.id not in self._predicted_outputs and ref.id not in self._predicted_inputs) 

649 ] 

650 if unknowns: 

651 raise ValueError(f"Cannot export datastore records for unknown outputs: {unknowns}") 

652 

653 return self._datastore.export_predicted_records(refs) 

654 

655 

656class QuantumProvenanceData(pydantic.BaseModel): 

657 """A serializable struct for per-quantum provenance information and 

658 datastore records. 

659 

660 Notes 

661 ----- 

662 This class slightly duplicates information from the `Quantum` class itself 

663 (the ``predicted_inputs`` and ``predicted_outputs`` sets should have the 

664 same IDs present in `Quantum.inputs` and `Quantum.outputs`), but overall it 

665 assumes the original `Quantum` is also available to reconstruct the 

666 complete provenance (e.g. by associating dataset IDs with data IDs, 

667 dataset types, and `~CollectionType.RUN` names. 

668 

669 Note that ``pydantic`` method ``parse_raw()`` is not going to work 

670 correctly for this class, use `direct` method instead. 

671 """ 

672 

673 # This class probably should have information about its execution 

674 # environment (anything not controlled and recorded at the 

675 # `~CollectionType.RUN` level, such as the compute node ID). but adding it 

676 # now is out of scope for this prototype. 

677 

678 predicted_inputs: set[uuid.UUID] 

679 """Unique IDs of datasets that were predicted as inputs to this quantum 

680 when the QuantumGraph was built. 

681 """ 

682 

683 available_inputs: set[uuid.UUID] 

684 """Unique IDs of input datasets that were actually present in the datastore 

685 when this quantum was executed. 

686 

687 This is a subset of ``predicted_inputs``, with the difference generally 

688 being datasets were ``predicted_outputs`` but not ``actual_outputs`` of 

689 some upstream task. 

690 """ 

691 

692 actual_inputs: set[uuid.UUID] 

693 """Unique IDs of datasets that were actually used as inputs by this task. 

694 

695 This is a subset of ``available_inputs``. 

696 

697 Notes 

698 ----- 

699 The criteria for marking an input as used is that rerunning the quantum 

700 with only these ``actual_inputs`` available must yield identical outputs. 

701 This means that (for example) even just using an input to help determine 

702 an output rejection criteria and then rejecting it as an outlier qualifies 

703 that input as actually used. 

704 """ 

705 

706 predicted_outputs: set[uuid.UUID] 

707 """Unique IDs of datasets that were predicted as outputs of this quantum 

708 when the QuantumGraph was built. 

709 """ 

710 

711 actual_outputs: set[uuid.UUID] 

712 """Unique IDs of datasets that were actually written when this quantum 

713 was executed. 

714 """ 

715 

716 datastore_records: dict[str, SerializedDatastoreRecordData] 

717 """Datastore records indexed by datastore name.""" 

718 

719 @staticmethod 

720 def collect_and_transfer( 

721 butler: Butler, quanta: Iterable[Quantum], provenance: Iterable[QuantumProvenanceData] 

722 ) -> None: 

723 """Transfer output datasets from multiple quanta to a more permanent 

724 `Butler` repository. 

725 

726 Parameters 

727 ---------- 

728 butler : `Butler` 

729 Full butler representing the data repository to transfer datasets 

730 to. 

731 quanta : `~collections.abc.Iterable` [ `Quantum` ] 

732 Iterable of `Quantum` objects that carry information about 

733 predicted outputs. May be a single-pass iterator. 

734 provenance : `~collections.abc.Iterable` [ `QuantumProvenanceData` ] 

735 Provenance and datastore data for each of the given quanta, in the 

736 same order. May be a single-pass iterator. 

737 

738 Notes 

739 ----- 

740 Input-output provenance data is not actually transferred yet, because 

741 `Registry` has no place to store it. 

742 

743 This method probably works most efficiently if run on all quanta for a 

744 single task label at once, because this will gather all datasets of 

745 a particular type together into a single vectorized `Registry` import. 

746 It should still behave correctly if run on smaller groups of quanta 

747 or even quanta from multiple tasks. 

748 

749 Currently this method transfers datastore record data unchanged, with 

750 no possibility of actually moving (e.g.) files. Datastores that are 

751 present only in execution or only in the more permanent butler are 

752 ignored. 

753 """ 

754 grouped_refs = defaultdict(list) 

755 summary_records: dict[str, DatastoreRecordData] = {} 

756 for quantum, provenance_for_quantum in zip(quanta, provenance, strict=True): 

757 quantum_refs_by_id = { 

758 ref.id: ref 

759 for ref in itertools.chain.from_iterable(quantum.outputs.values()) 

760 if ref.id in provenance_for_quantum.actual_outputs 

761 } 

762 for ref in quantum_refs_by_id.values(): 

763 grouped_refs[ref.datasetType, ref.run].append(ref) 

764 

765 # merge datastore records into a summary structure 

766 for datastore_name, serialized_records in provenance_for_quantum.datastore_records.items(): 

767 quantum_records = DatastoreRecordData.from_simple(serialized_records) 

768 if (records := summary_records.get(datastore_name)) is not None: 

769 records.update(quantum_records) 

770 else: 

771 summary_records[datastore_name] = quantum_records 

772 

773 for refs in grouped_refs.values(): 

774 butler.registry._importDatasets(refs) 

775 butler._datastore.import_records(summary_records) 

776 

777 @classmethod 

778 def parse_raw(cls, *args: Any, **kwargs: Any) -> QuantumProvenanceData: 

779 raise NotImplementedError("parse_raw() is not usable for this class, use direct() instead.") 

780 

781 @classmethod 

782 def direct( 

783 cls, 

784 *, 

785 predicted_inputs: Iterable[str | uuid.UUID], 

786 available_inputs: Iterable[str | uuid.UUID], 

787 actual_inputs: Iterable[str | uuid.UUID], 

788 predicted_outputs: Iterable[str | uuid.UUID], 

789 actual_outputs: Iterable[str | uuid.UUID], 

790 datastore_records: Mapping[str, Mapping], 

791 ) -> QuantumProvenanceData: 

792 """Construct an instance directly without validators. 

793 

794 Parameters 

795 ---------- 

796 predicted_inputs : `~collections.abc.Iterable` of `str` or `uuid.UUID` 

797 The predicted inputs. 

798 available_inputs : `~collections.abc.Iterable` of `str` or `uuid.UUID` 

799 The available inputs. 

800 actual_inputs : `~collections.abc.Iterable` of `str` or `uuid.UUID` 

801 The actual inputs. 

802 predicted_outputs : `~collections.abc.Iterable` of `str` or `uuid.UUID` 

803 The predicted outputs. 

804 actual_outputs : `~collections.abc.Iterable` of `str` or `uuid.UUID` 

805 The actual outputs. 

806 datastore_records : `~collections.abc.Mapping` [ `str`, \ 

807 `~collections.abc.Mapping` ] 

808 The datastore records. 

809 

810 Returns 

811 ------- 

812 provenance : `QuantumProvenanceData` 

813 Serializable model of the quantum provenance. 

814 

815 Notes 

816 ----- 

817 This differs from the Pydantic "construct" method in that the 

818 arguments are explicitly what the model requires, and it will recurse 

819 through members, constructing them from their corresponding `direct` 

820 methods. 

821 

822 This method should only be called when the inputs are trusted. 

823 """ 

824 

825 def _to_uuid_set(uuids: Iterable[str | uuid.UUID]) -> set[uuid.UUID]: 

826 """Convert input UUIDs, which could be in string representation to 

827 a set of `UUID` instances. 

828 """ 

829 return {uuid.UUID(id) if isinstance(id, str) else id for id in uuids} 

830 

831 data = cls.model_construct( 

832 predicted_inputs=_to_uuid_set(predicted_inputs), 

833 available_inputs=_to_uuid_set(available_inputs), 

834 actual_inputs=_to_uuid_set(actual_inputs), 

835 predicted_outputs=_to_uuid_set(predicted_outputs), 

836 actual_outputs=_to_uuid_set(actual_outputs), 

837 datastore_records={ 

838 key: SerializedDatastoreRecordData.direct(**records) 

839 for key, records in datastore_records.items() 

840 }, 

841 ) 

842 

843 return data