Coverage for python / lsst / daf / butler / datastores / chainedDatastore.py: 0%

541 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-22 08:55 +0000

1# This file is part of daf_butler. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28"""Chained datastore.""" 

29 

30from __future__ import annotations 

31 

32__all__ = ("ChainedDatastore",) 

33 

34import itertools 

35import time 

36import warnings 

37from collections.abc import Callable, Collection, Iterable, Mapping, Sequence 

38from typing import TYPE_CHECKING, Any 

39 

40from lsst.daf.butler import DatasetRef, DatasetTypeNotSupportedError, DimensionUniverse, FileDataset 

41from lsst.daf.butler.datastore import ( 

42 DatasetRefURIs, 

43 Datastore, 

44 DatastoreConfig, 

45 DatastoreOpaqueTable, 

46 DatastoreValidationError, 

47) 

48from lsst.daf.butler.datastore.constraints import Constraints 

49from lsst.daf.butler.datastore.record_data import DatastoreRecordData 

50from lsst.daf.butler.datastores.file_datastore.get import DatasetLocationInformation 

51from lsst.daf.butler.datastores.file_datastore.retrieve_artifacts import ArtifactIndexInfo, ZipIndex 

52from lsst.resources import ResourcePath 

53from lsst.utils import doImportType 

54from lsst.utils.introspection import get_full_type_name 

55from lsst.utils.logging import getLogger 

56 

57from .._dataset_ref import DatasetId 

58from ..datastore import FileTransferMap 

59 

60if TYPE_CHECKING: 

61 from lsst.daf.butler import Config, DatasetProvenance, DatasetType, LookupKey, StorageClass 

62 from lsst.daf.butler.registry.interfaces import DatasetIdRef, DatastoreRegistryBridgeManager 

63 from lsst.resources import ResourcePathExpression 

64 

65log = getLogger(__name__) 

66 

67 

68class _IngestPrepData(Datastore.IngestPrepData): 

69 """Helper class for ChainedDatastore ingest implementation. 

70 

71 Parameters 

72 ---------- 

73 children : `list` of `tuple` 

74 Pairs of `Datastore`, `IngestPrepData` for all child datastores. 

75 """ 

76 

77 def __init__(self, children: list[tuple[Datastore, Datastore.IngestPrepData, set[ResourcePath]]]): 

78 super().__init__(itertools.chain.from_iterable(data.refs.values() for _, data, _ in children)) 

79 self.children = children 

80 

81 

82class ChainedDatastore(Datastore): 

83 """Chained Datastores to allow read and writes from multiple datastores. 

84 

85 A ChainedDatastore is configured with multiple datastore configurations. 

86 A ``put()`` is always sent to each datastore. A ``get()`` 

87 operation is sent to each datastore in turn and the first datastore 

88 to return a valid dataset is used. 

89 

90 Parameters 

91 ---------- 

92 config : `DatastoreConfig` or `str` 

93 Configuration. This configuration must include a ``datastores`` field 

94 as a sequence of datastore configurations. The order in this sequence 

95 indicates the order to use for read operations. 

96 bridgeManager : `DatastoreRegistryBridgeManager` 

97 Object that manages the interface between `Registry` and datastores. 

98 datastores : `list` [`Datastore`] 

99 All the child datastores known to this datastore. 

100 

101 Notes 

102 ----- 

103 ChainedDatastore never supports `None` or ``"move"`` as an ingest transfer 

104 mode. It supports ``"copy"``, ``"symlink"``, ``"relsymlink"`` 

105 and ``"hardlink"`` if and only if all its child datastores do. 

106 """ 

107 

108 defaultConfigFile = "datastores/chainedDatastore.yaml" 

109 """Path to configuration defaults. Accessed within the ``configs`` resource 

110 or relative to a search path. Can be None if no defaults specified. 

111 """ 

112 

113 containerKey = "datastores" 

114 """Key to specify where child datastores are configured.""" 

115 

116 datastores: list[Datastore] 

117 """All the child datastores known to this datastore.""" 

118 

119 datastoreConstraints: Sequence[Constraints | None] 

120 """Constraints to be applied to each of the child datastores.""" 

121 

122 universe: DimensionUniverse 

123 """Dimension universe associated with the butler.""" 

124 

125 @classmethod 

126 def setConfigRoot(cls, root: str, config: Config, full: Config, overwrite: bool = True) -> None: 

127 """Set any filesystem-dependent config options for child Datastores to 

128 be appropriate for a new empty repository with the given root. 

129 

130 Parameters 

131 ---------- 

132 root : `str` 

133 Filesystem path to the root of the data repository. 

134 config : `Config` 

135 A `Config` to update. Only the subset understood by 

136 this component will be updated. Will not expand 

137 defaults. 

138 full : `Config` 

139 A complete config with all defaults expanded that can be 

140 converted to a `DatastoreConfig`. Read-only and will not be 

141 modified by this method. 

142 Repository-specific options that should not be obtained 

143 from defaults when Butler instances are constructed 

144 should be copied from ``full`` to ``config``. 

145 overwrite : `bool`, optional 

146 If `False`, do not modify a value in ``config`` if the value 

147 already exists. Default is always to overwrite with the provided 

148 ``root``. 

149 

150 Notes 

151 ----- 

152 If a keyword is explicitly defined in the supplied ``config`` it 

153 will not be overridden by this method if ``overwrite`` is `False`. 

154 This allows explicit values set in external configs to be retained. 

155 """ 

156 # Extract the part of the config we care about updating 

157 datastoreConfig = DatastoreConfig(config, mergeDefaults=False) 

158 

159 # And the subset of the full config that we can use for reference. 

160 # Do not bother with defaults because we are told this already has 

161 # them. 

162 fullDatastoreConfig = DatastoreConfig(full, mergeDefaults=False) 

163 

164 # Loop over each datastore config and pass the subsets to the 

165 # child datastores to process. 

166 

167 containerKey = cls.containerKey 

168 for idx, (child, fullChild) in enumerate( 

169 zip(datastoreConfig[containerKey], fullDatastoreConfig[containerKey], strict=True) 

170 ): 

171 childConfig = DatastoreConfig(child, mergeDefaults=False) 

172 fullChildConfig = DatastoreConfig(fullChild, mergeDefaults=False) 

173 datastoreClass = doImportType(fullChildConfig["cls"]) 

174 if not issubclass(datastoreClass, Datastore): 

175 raise TypeError(f"Imported child class {fullChildConfig['cls']} is not a Datastore") 

176 newroot = f"{root}/{datastoreClass.__qualname__}_{idx}" 

177 datastoreClass.setConfigRoot(newroot, childConfig, fullChildConfig, overwrite=overwrite) 

178 

179 # Reattach to parent 

180 datastoreConfig[containerKey, idx] = childConfig 

181 

182 # Reattach modified datastore config to parent 

183 # If this has a datastore key we attach there, otherwise we assume 

184 # this information goes at the top of the config hierarchy. 

185 if DatastoreConfig.component in config: 

186 config[DatastoreConfig.component] = datastoreConfig 

187 else: 

188 config.update(datastoreConfig) 

189 

190 return 

191 

192 def __init__( 

193 self, 

194 config: DatastoreConfig, 

195 bridgeManager: DatastoreRegistryBridgeManager, 

196 datastores: list[Datastore], 

197 ): 

198 super().__init__(config, bridgeManager) 

199 

200 self.datastores = list(datastores) 

201 

202 # Name ourself based on our children 

203 if self.datastores: 

204 # We must set the names explicitly 

205 self._names = [d.name for d in self.datastores] 

206 childNames = ",".join(self.names) 

207 else: 

208 childNames = f"(empty@{time.time()})" 

209 self._names = [childNames] 

210 self.name = f"{type(self).__qualname__}[{childNames}]" 

211 

212 # We declare we are ephemeral if all our child datastores declare 

213 # they are ephemeral 

214 self.isEphemeral = all(d.isEphemeral for d in self.datastores) 

215 

216 # per-datastore override constraints 

217 if "datastore_constraints" in self.config: 

218 overrides = self.config["datastore_constraints"] 

219 

220 if len(overrides) != len(self.datastores): 

221 raise DatastoreValidationError( 

222 f"Number of registered datastores ({len(self.datastores)})" 

223 " differs from number of constraints overrides" 

224 f" {len(overrides)}" 

225 ) 

226 

227 self.datastoreConstraints = [ 

228 Constraints(c.get("constraints"), universe=bridgeManager.universe) for c in overrides 

229 ] 

230 

231 else: 

232 self.datastoreConstraints = (None,) * len(self.datastores) 

233 

234 self.universe = bridgeManager.universe 

235 

236 log.debug("Created %s (%s)", self.name, ("ephemeral" if self.isEphemeral else "permanent")) 

237 

238 @classmethod 

239 def _create_from_config( 

240 cls, 

241 config: DatastoreConfig, 

242 bridgeManager: DatastoreRegistryBridgeManager, 

243 butlerRoot: ResourcePathExpression | None, 

244 ) -> ChainedDatastore: 

245 # Scan for child datastores and instantiate them with the same registry 

246 datastores = [] 

247 for c in config["datastores"]: 

248 c = DatastoreConfig(c) 

249 datastoreType = doImportType(c["cls"]) 

250 if not issubclass(datastoreType, Datastore): 

251 raise TypeError(f"Imported child class {c['cls']} is not a Datastore") 

252 datastore = datastoreType._create_from_config(c, bridgeManager, butlerRoot=butlerRoot) 

253 log.debug("Creating child datastore %s", datastore.name) 

254 datastores.append(datastore) 

255 

256 return ChainedDatastore(config, bridgeManager, datastores) 

257 

258 def clone(self, bridgeManager: DatastoreRegistryBridgeManager) -> Datastore: 

259 datastores = [ds.clone(bridgeManager) for ds in self.datastores] 

260 return ChainedDatastore(self.config, bridgeManager, datastores) 

261 

262 @property 

263 def names(self) -> tuple[str, ...]: 

264 return tuple(self._names) 

265 

266 @property 

267 def roots(self) -> dict[str, ResourcePath | None]: 

268 # Docstring inherited. 

269 roots = {} 

270 for datastore in self.datastores: 

271 roots.update(datastore.roots) 

272 return roots 

273 

274 def __str__(self) -> str: 

275 chainName = ", ".join(str(ds) for ds in self.datastores) 

276 return chainName 

277 

278 def _set_trust_mode(self, mode: bool) -> None: 

279 for datastore in self.datastores: 

280 datastore._set_trust_mode(mode) 

281 

282 def knows(self, ref: DatasetRef) -> bool: 

283 """Check if the dataset is known to any of the datastores. 

284 

285 Does not check for existence of any artifact. 

286 

287 Parameters 

288 ---------- 

289 ref : `DatasetRef` 

290 Reference to the required dataset. 

291 

292 Returns 

293 ------- 

294 exists : `bool` 

295 `True` if the dataset is known to the datastore. 

296 """ 

297 for datastore in self.datastores: 

298 if datastore.knows(ref): 

299 log.debug("%s known to datastore %s", ref, datastore.name) 

300 return True 

301 return False 

302 

303 def knows_these(self, refs: Iterable[DatasetRef]) -> dict[DatasetRef, bool]: 

304 # Docstring inherited from the base class. 

305 refs_known: dict[DatasetRef, bool] = {} 

306 for datastore in self.datastores: 

307 refs_known.update(datastore.knows_these(refs)) 

308 

309 # No need to check in next datastore for refs that are known. 

310 # We only update entries that were initially False. 

311 refs = [ref for ref, known in refs_known.items() if not known] 

312 

313 return refs_known 

314 

315 def mexists( 

316 self, refs: Iterable[DatasetRef], artifact_existence: dict[ResourcePath, bool] | None = None 

317 ) -> dict[DatasetRef, bool]: 

318 """Check the existence of multiple datasets at once. 

319 

320 Parameters 

321 ---------- 

322 refs : `~collections.abc.Iterable` of `DatasetRef` 

323 The datasets to be checked. 

324 artifact_existence : `dict` [`lsst.resources.ResourcePath`, `bool`] 

325 Optional mapping of datastore artifact to existence. Updated by 

326 this method with details of all artifacts tested. Can be `None` 

327 if the caller is not interested. 

328 

329 Returns 

330 ------- 

331 existence : `dict` of [`DatasetRef`, `bool`] 

332 Mapping from dataset to boolean indicating existence in any 

333 of the child datastores. 

334 """ 

335 dataset_existence: dict[DatasetRef, bool] = {} 

336 for datastore in self.datastores: 

337 dataset_existence.update(datastore.mexists(refs, artifact_existence=artifact_existence)) 

338 

339 # For next datastore no point asking about ones we know 

340 # exist already. No special exemption for ephemeral datastores. 

341 refs = [ref for ref, exists in dataset_existence.items() if not exists] 

342 

343 return dataset_existence 

344 

345 def exists(self, ref: DatasetRef) -> bool: 

346 """Check if the dataset exists in one of the datastores. 

347 

348 Parameters 

349 ---------- 

350 ref : `DatasetRef` 

351 Reference to the required dataset. 

352 

353 Returns 

354 ------- 

355 exists : `bool` 

356 `True` if the entity exists in one of the child datastores. 

357 """ 

358 for datastore in self.datastores: 

359 if datastore.exists(ref): 

360 log.debug("Found %s in datastore %s", ref, datastore.name) 

361 return True 

362 return False 

363 

364 def get( 

365 self, 

366 ref: DatasetRef, 

367 parameters: Mapping[str, Any] | None = None, 

368 storageClass: StorageClass | str | None = None, 

369 ) -> Any: 

370 """Load an InMemoryDataset from the store. 

371 

372 The dataset is returned from the first datastore that has 

373 the dataset. 

374 

375 Parameters 

376 ---------- 

377 ref : `DatasetRef` 

378 Reference to the required Dataset. 

379 parameters : `dict` 

380 `StorageClass`-specific parameters that specify, for example, 

381 a slice of the dataset to be loaded. 

382 storageClass : `StorageClass` or `str`, optional 

383 The storage class to be used to override the Python type 

384 returned by this method. By default the returned type matches 

385 the dataset type definition for this dataset. Specifying a 

386 read `StorageClass` can force a different type to be returned. 

387 This type must be compatible with the original type. 

388 

389 Returns 

390 ------- 

391 inMemoryDataset : `object` 

392 Requested dataset or slice thereof as an InMemoryDataset. 

393 

394 Raises 

395 ------ 

396 FileNotFoundError 

397 Requested dataset can not be retrieved. 

398 TypeError 

399 Return value from formatter has unexpected type. 

400 ValueError 

401 Formatter failed to process the dataset. 

402 """ 

403 for datastore in self.datastores: 

404 try: 

405 inMemoryObject = datastore.get(ref, parameters, storageClass=storageClass) 

406 log.debug("Found dataset %s in datastore %s", ref, datastore.name) 

407 return inMemoryObject 

408 except FileNotFoundError: 

409 pass 

410 

411 raise FileNotFoundError(f"Dataset {ref} could not be found in any of the datastores") 

412 

413 def prepare_get_for_external_client(self, ref: DatasetRef) -> list[DatasetLocationInformation] | None: 

414 datastore = self._get_matching_datastore(ref) 

415 if datastore is None: 

416 return None 

417 

418 return datastore.prepare_get_for_external_client(ref) 

419 

420 def _get_matching_datastore(self, ref: DatasetRef) -> Datastore | None: 

421 """Return the first child datastore that owns the specified dataset.""" 

422 for datastore in self.datastores: 

423 if datastore.knows(ref): 

424 return datastore 

425 

426 return None 

427 

428 def put(self, inMemoryDataset: Any, ref: DatasetRef, provenance: DatasetProvenance | None = None) -> None: 

429 """Write a InMemoryDataset with a given `DatasetRef` to each 

430 datastore. 

431 

432 The put() to child datastores can fail with 

433 `DatasetTypeNotSupportedError`. The put() for this datastore will be 

434 deemed to have succeeded so long as at least one child datastore 

435 accepted the inMemoryDataset. 

436 

437 Parameters 

438 ---------- 

439 inMemoryDataset : `object` 

440 The dataset to store. 

441 ref : `DatasetRef` 

442 Reference to the associated Dataset. 

443 provenance : `DatasetProvenance` or `None`, optional 

444 Any provenance that should be attached to the serialized dataset. 

445 Not supported by all serialization mechanisms. 

446 

447 Raises 

448 ------ 

449 TypeError 

450 Supplied object and storage class are inconsistent. 

451 DatasetTypeNotSupportedError 

452 All datastores reported `DatasetTypeNotSupportedError`. 

453 """ 

454 log.debug("Put %s", ref) 

455 

456 # Confirm that we can accept this dataset 

457 if not self.constraints.isAcceptable(ref): 

458 # Raise rather than use boolean return value. 

459 raise DatasetTypeNotSupportedError( 

460 f"Dataset {ref} has been rejected by this datastore via configuration." 

461 ) 

462 

463 isPermanent = False 

464 nsuccess = 0 

465 npermanent = 0 

466 nephemeral = 0 

467 for datastore, constraints in zip(self.datastores, self.datastoreConstraints, strict=True): 

468 if ( 

469 constraints is not None and not constraints.isAcceptable(ref) 

470 ) or not datastore.constraints.isAcceptable(ref): 

471 log.debug("Datastore %s skipping put via configuration for ref %s", datastore.name, ref) 

472 continue 

473 

474 if datastore.isEphemeral: 

475 nephemeral += 1 

476 else: 

477 npermanent += 1 

478 try: 

479 datastore.put(inMemoryDataset, ref, provenance=provenance) 

480 nsuccess += 1 

481 if not datastore.isEphemeral: 

482 isPermanent = True 

483 except DatasetTypeNotSupportedError: 

484 pass 

485 

486 if nsuccess == 0: 

487 raise DatasetTypeNotSupportedError(f"None of the chained datastores supported ref {ref}") 

488 

489 if not isPermanent and npermanent > 0: 

490 warnings.warn(f"Put of {ref} only succeeded in ephemeral databases", stacklevel=2) 

491 

492 if self._transaction is not None: 

493 self._transaction.registerUndo("put", self.remove, ref) 

494 

495 def put_new(self, in_memory_dataset: Any, ref: DatasetRef) -> Mapping[str, DatasetRef]: 

496 # Docstring inherited from base class. 

497 log.debug("Put %s", ref) 

498 

499 # Confirm that we can accept this dataset 

500 if not self.constraints.isAcceptable(ref): 

501 # Raise rather than use boolean return value. 

502 raise DatasetTypeNotSupportedError( 

503 f"Dataset {ref} has been rejected by this datastore via configuration." 

504 ) 

505 

506 isPermanent = False 

507 nsuccess = 0 

508 npermanent = 0 

509 nephemeral = 0 

510 stored_refs: dict[str, DatasetRef] = {} 

511 for datastore, constraints in zip(self.datastores, self.datastoreConstraints, strict=True): 

512 if ( 

513 constraints is not None and not constraints.isAcceptable(ref) 

514 ) or not datastore.constraints.isAcceptable(ref): 

515 log.debug("Datastore %s skipping put via configuration for ref %s", datastore.name, ref) 

516 continue 

517 

518 if datastore.isEphemeral: 

519 nephemeral += 1 

520 else: 

521 npermanent += 1 

522 try: 

523 stored_ref_map = datastore.put_new(in_memory_dataset, ref) 

524 stored_refs.update(stored_ref_map) 

525 nsuccess += 1 

526 if not datastore.isEphemeral: 

527 isPermanent = True 

528 except DatasetTypeNotSupportedError: 

529 pass 

530 

531 if nsuccess == 0: 

532 raise DatasetTypeNotSupportedError(f"None of the chained datastores supported ref {ref}") 

533 

534 if not isPermanent and npermanent > 0: 

535 warnings.warn(f"Put of {ref} only succeeded in ephemeral databases", stacklevel=2) 

536 

537 if self._transaction is not None: 

538 self._transaction.registerUndo("put", self.remove, ref) 

539 

540 return stored_refs 

541 

542 def _overrideTransferMode(self, *datasets: Any, transfer: str | None = None) -> str | None: 

543 # Docstring inherited from base class. 

544 if transfer != "auto": 

545 return transfer 

546 # Ask each datastore what they think auto means 

547 transfers = {d._overrideTransferMode(*datasets, transfer=transfer) for d in self.datastores} 

548 

549 # Remove any untranslated "auto" values 

550 transfers.discard(transfer) 

551 

552 if len(transfers) == 1: 

553 return transfers.pop() 

554 if not transfers: 

555 # Everything reported "auto" 

556 return transfer 

557 

558 raise RuntimeError( 

559 "Chained datastore does not yet support different transfer modes" 

560 f" from 'auto' in each child datastore (wanted {transfers})" 

561 ) 

562 

563 def _prepIngest(self, *datasets: FileDataset, transfer: str | None = None) -> _IngestPrepData: 

564 # Docstring inherited from Datastore._prepIngest. 

565 def isDatasetAcceptable(dataset: FileDataset, *, name: str, constraints: Constraints) -> bool: 

566 acceptable = [ref for ref in dataset.refs if constraints.isAcceptable(ref)] 

567 if not acceptable: 

568 log.debug( 

569 "Datastore %s skipping ingest via configuration for refs %s", 

570 name, 

571 ", ".join(str(ref) for ref in dataset.refs), 

572 ) 

573 return False 

574 else: 

575 return True 

576 

577 # Filter down to just datasets the chained datastore's own 

578 # configuration accepts. 

579 okForParent: list[FileDataset] = [ 

580 dataset 

581 for dataset in datasets 

582 if isDatasetAcceptable(dataset, name=self.name, constraints=self.constraints) 

583 ] 

584 

585 # Iterate over nested datastores and call _prepIngest on each. 

586 # Save the results to a list: 

587 children: list[tuple[Datastore, Datastore.IngestPrepData, set[ResourcePath]]] = [] 

588 # ...and remember whether all of the failures are due to 

589 # NotImplementedError being raised. 

590 allFailuresAreNotImplementedError = True 

591 for datastore, constraints in zip(self.datastores, self.datastoreConstraints, strict=True): 

592 okForChild: list[FileDataset] 

593 if constraints is not None: 

594 okForChild = [ 

595 dataset 

596 for dataset in okForParent 

597 if isDatasetAcceptable(dataset, name=datastore.name, constraints=constraints) 

598 ] 

599 else: 

600 okForChild = okForParent 

601 try: 

602 prepDataForChild = datastore._prepIngest(*okForChild, transfer=transfer) 

603 except NotImplementedError: 

604 log.debug( 

605 "Skipping ingest for datastore %s because transfer mode %s is not supported.", 

606 datastore.name, 

607 transfer, 

608 ) 

609 continue 

610 allFailuresAreNotImplementedError = False 

611 if okForChild: 

612 # Do not store for later if a datastore has rejected 

613 # everything. 

614 # Include the source paths if this is a "move". It's clearer 

615 # to find the paths now rather than try to infer how 

616 # each datastore has stored them in the internal prep class. 

617 paths = ( 

618 {ResourcePath(dataset.path, forceDirectory=False) for dataset in okForChild} 

619 if transfer == "move" 

620 else set() 

621 ) 

622 children.append((datastore, prepDataForChild, paths)) 

623 if allFailuresAreNotImplementedError: 

624 raise NotImplementedError(f"No child datastore supports transfer mode {transfer}.") 

625 return _IngestPrepData(children=children) 

626 

627 def _finishIngest( 

628 self, 

629 prepData: _IngestPrepData, 

630 *, 

631 transfer: str | None = None, 

632 record_validation_info: bool = True, 

633 ) -> None: 

634 # Docstring inherited from Datastore._finishIngest. 

635 # For "move" we must use "copy" and then delete the input 

636 # data at the end. This has no rollback option if the ingest 

637 # subsequently fails. If there is only one active datastore 

638 # accepting any files we can leave it as "move" 

639 actual_transfer: str | None 

640 if transfer == "move" and len(prepData.children) > 1: 

641 actual_transfer = "copy" 

642 else: 

643 actual_transfer = transfer 

644 to_be_deleted: set[ResourcePath] = set() 

645 for datastore, prepDataForChild, paths in prepData.children: 

646 datastore._finishIngest( 

647 prepDataForChild, transfer=actual_transfer, record_validation_info=record_validation_info 

648 ) 

649 to_be_deleted.update(paths) 

650 if actual_transfer != transfer: 

651 # These datasets were copied but now need to be deleted. 

652 # This can not be rolled back. 

653 for uri in to_be_deleted: 

654 uri.remove() 

655 

656 def getManyURIs( 

657 self, 

658 refs: Iterable[DatasetRef], 

659 predict: bool = False, 

660 allow_missing: bool = False, 

661 ) -> dict[DatasetRef, DatasetRefURIs]: 

662 # Docstring inherited 

663 

664 uris: dict[DatasetRef, DatasetRefURIs] = {} 

665 ephemeral_uris: dict[DatasetRef, DatasetRefURIs] = {} 

666 

667 missing_refs = set(refs) 

668 

669 # If predict is True we don't want to predict a dataset in the first 

670 # datastore if it actually exists in a later datastore, so in that 

671 # case check all datastores with predict=False first, and then try 

672 # again with predict=True. 

673 for p in (False, True) if predict else (False,): 

674 if not missing_refs: 

675 break 

676 for datastore in self.datastores: 

677 try: 

678 got_uris = datastore.getManyURIs(missing_refs, p, allow_missing=True) 

679 except NotImplementedError: 

680 # some datastores may not implement generating URIs 

681 continue 

682 if datastore.isEphemeral: 

683 # Only use these as last resort so do not constrain 

684 # subsequent queries. 

685 ephemeral_uris.update(got_uris) 

686 continue 

687 

688 missing_refs -= got_uris.keys() 

689 uris.update(got_uris) 

690 if not missing_refs: 

691 break 

692 

693 if missing_refs and ephemeral_uris: 

694 ephemeral_refs = missing_refs.intersection(ephemeral_uris.keys()) 

695 for ref in ephemeral_refs: 

696 uris[ref] = ephemeral_uris[ref] 

697 missing_refs.remove(ref) 

698 

699 if missing_refs and not allow_missing: 

700 raise FileNotFoundError(f"Dataset(s) {missing_refs} not in this datastore.") 

701 

702 return uris 

703 

704 def getURIs(self, ref: DatasetRef, predict: bool = False) -> DatasetRefURIs: 

705 """Return URIs associated with dataset. 

706 

707 Parameters 

708 ---------- 

709 ref : `DatasetRef` 

710 Reference to the required dataset. 

711 predict : `bool`, optional 

712 If the datastore does not know about the dataset, controls whether 

713 it should return a predicted URI or not. 

714 

715 Returns 

716 ------- 

717 uris : `DatasetRefURIs` 

718 The URI to the primary artifact associated with this dataset (if 

719 the dataset was disassembled within the datastore this may be 

720 `None`), and the URIs to any components associated with the dataset 

721 artifact. (can be empty if there are no components). 

722 

723 Notes 

724 ----- 

725 The returned URI is from the first datastore in the list that has 

726 the dataset with preference given to the first dataset coming from 

727 a permanent datastore. If no datastores have the dataset and prediction 

728 is allowed, the predicted URI for the first datastore in the list will 

729 be returned. 

730 """ 

731 log.debug("Requesting URIs for %s", ref) 

732 predictedUri: DatasetRefURIs | None = None 

733 predictedEphemeralUri: DatasetRefURIs | None = None 

734 firstEphemeralUri: DatasetRefURIs | None = None 

735 for datastore in self.datastores: 

736 if datastore.exists(ref): 

737 if not datastore.isEphemeral: 

738 uri = datastore.getURIs(ref) 

739 log.debug("Retrieved non-ephemeral URI: %s", uri) 

740 return uri 

741 elif not firstEphemeralUri: 

742 firstEphemeralUri = datastore.getURIs(ref) 

743 elif predict: 

744 if not predictedUri and not datastore.isEphemeral: 

745 predictedUri = datastore.getURIs(ref, predict) 

746 elif not predictedEphemeralUri and datastore.isEphemeral: 

747 predictedEphemeralUri = datastore.getURIs(ref, predict) 

748 

749 if firstEphemeralUri: 

750 log.debug("Retrieved ephemeral URI: %s", firstEphemeralUri) 

751 return firstEphemeralUri 

752 

753 if predictedUri: 

754 log.debug("Retrieved predicted URI: %s", predictedUri) 

755 return predictedUri 

756 

757 if predictedEphemeralUri: 

758 log.debug("Retrieved predicted ephemeral URI: %s", predictedEphemeralUri) 

759 return predictedEphemeralUri 

760 

761 raise FileNotFoundError(f"Dataset {ref} not in any datastore") 

762 

763 def getURI(self, ref: DatasetRef, predict: bool = False) -> ResourcePath: 

764 """URI to the Dataset. 

765 

766 The returned URI is from the first datastore in the list that has 

767 the dataset with preference given to the first dataset coming from 

768 a permanent datastore. If no datastores have the dataset and prediction 

769 is allowed, the predicted URI for the first datastore in the list will 

770 be returned. 

771 

772 Parameters 

773 ---------- 

774 ref : `DatasetRef` 

775 Reference to the required Dataset. 

776 predict : `bool` 

777 If `True`, allow URIs to be returned of datasets that have not 

778 been written. 

779 

780 Returns 

781 ------- 

782 uri : `lsst.resources.ResourcePath` 

783 URI pointing to the dataset within the datastore. If the 

784 dataset does not exist in the datastore, and if ``predict`` is 

785 `True`, the URI will be a prediction and will include a URI 

786 fragment "#predicted". 

787 

788 Notes 

789 ----- 

790 If the datastore does not have entities that relate well 

791 to the concept of a URI the returned URI string will be 

792 descriptive. The returned URI is not guaranteed to be obtainable. 

793 

794 Raises 

795 ------ 

796 FileNotFoundError 

797 A URI has been requested for a dataset that does not exist and 

798 guessing is not allowed. 

799 RuntimeError 

800 Raised if a request is made for a single URI but multiple URIs 

801 are associated with this dataset. 

802 """ 

803 log.debug("Requesting URI for %s", ref) 

804 primary, components = self.getURIs(ref, predict) 

805 if primary is None or components: 

806 raise RuntimeError( 

807 f"Dataset ({ref}) includes distinct URIs for components. Use Datastore.getURIs() instead." 

808 ) 

809 return primary 

810 

811 def retrieveArtifacts( 

812 self, 

813 refs: Iterable[DatasetRef], 

814 destination: ResourcePath, 

815 transfer: str = "auto", 

816 preserve_path: bool = True, 

817 overwrite: bool = False, 

818 write_index: bool = True, 

819 add_prefix: bool = False, 

820 ) -> dict[ResourcePath, ArtifactIndexInfo]: 

821 """Retrieve the file artifacts associated with the supplied refs. 

822 

823 Parameters 

824 ---------- 

825 refs : `~collections.abc.Iterable` of `DatasetRef` 

826 The datasets for which file artifacts are to be retrieved. 

827 A single ref can result in multiple files. The refs must 

828 be resolved. 

829 destination : `lsst.resources.ResourcePath` 

830 Location to write the file artifacts. 

831 transfer : `str`, optional 

832 Method to use to transfer the artifacts. Must be one of the options 

833 supported by `lsst.resources.ResourcePath.transfer_from`. 

834 "move" is not allowed. 

835 preserve_path : `bool`, optional 

836 If `True` the full path of the file artifact within the datastore 

837 is preserved. If `False` the final file component of the path 

838 is used. 

839 overwrite : `bool`, optional 

840 If `True` allow transfers to overwrite existing files at the 

841 destination. 

842 write_index : `bool`, optional 

843 If `True` write a file at the top level containing a serialization 

844 of a `ZipIndex` for the downloaded datasets. 

845 add_prefix : `bool`, optional 

846 Add a prefix based on the DatasetId. Only used if ``preserve_path`` 

847 is `False`. 

848 

849 Returns 

850 ------- 

851 artifact_map : `dict` [ `lsst.resources.ResourcePath`, \ 

852 `ArtifactIndexInfo` ] 

853 Mapping of retrieved file to associated index information. 

854 """ 

855 if not destination.isdir(): 

856 raise ValueError(f"Destination location must refer to a directory. Given {destination}") 

857 

858 # Using getURIs is not feasible since it becomes difficult to 

859 # determine the path within the datastore later on. For now 

860 # follow getURIs implementation approach. 

861 

862 pending = set(refs) 

863 

864 # There is a question as to whether an exception should be raised 

865 # early if some of the refs are missing, or whether files should be 

866 # transferred until a problem is hit. Prefer to complain up front. 

867 # Use the datastore integer as primary key. 

868 grouped_by_datastore: dict[int, set[DatasetRef]] = {} 

869 

870 for number, datastore in enumerate(self.datastores): 

871 if datastore.isEphemeral: 

872 # In the future we will want to distinguish in-memory from 

873 # caching datastore since using an on-disk local 

874 # cache is exactly what we should be doing. 

875 continue 

876 try: 

877 # Checking file existence is expensive. Have the option 

878 # of checking whether the datastore knows of these datasets 

879 # instead, which is fast but can potentially lead to 

880 # retrieveArtifacts failing. 

881 knows = datastore.knows_these(pending) 

882 datastore_refs = {ref for ref, exists in knows.items() if exists} 

883 except NotImplementedError: 

884 # Some datastores may not support retrieving artifacts 

885 continue 

886 

887 if datastore_refs: 

888 grouped_by_datastore[number] = datastore_refs 

889 

890 # Remove these from the pending list so that we do not bother 

891 # looking for them any more. 

892 pending = pending - datastore_refs 

893 

894 if pending: 

895 raise RuntimeError(f"Some datasets were not found in any datastores: {pending}") 

896 

897 # Now do the transfer. 

898 merged_artifact_map: dict[ResourcePath, ArtifactIndexInfo] = {} 

899 for number, datastore_refs in grouped_by_datastore.items(): 

900 artifact_map = self.datastores[number].retrieveArtifacts( 

901 datastore_refs, 

902 destination, 

903 transfer=transfer, 

904 preserve_path=preserve_path, 

905 overwrite=overwrite, 

906 write_index=False, # Disable index writing regardless. 

907 add_prefix=add_prefix, 

908 ) 

909 merged_artifact_map.update(artifact_map) 

910 

911 if write_index: 

912 index = ZipIndex.from_artifact_map(refs, merged_artifact_map, destination) 

913 index.write_index(destination) 

914 

915 return merged_artifact_map 

916 

917 def ingest_zip(self, zip_path: ResourcePath, transfer: str | None, *, dry_run: bool = False) -> None: 

918 """Ingest an indexed Zip file and contents. 

919 

920 The Zip file must have an index file as created by `retrieveArtifacts`. 

921 

922 Parameters 

923 ---------- 

924 zip_path : `lsst.resources.ResourcePath` 

925 Path to the Zip file. 

926 transfer : `str` 

927 Method to use for transferring the Zip file into the datastore. 

928 dry_run : `bool`, optional 

929 If `True` the ingest will be processed without any modifications 

930 made to the target datastore and as if the target datastore did not 

931 have any of the datasets. 

932 

933 Notes 

934 ----- 

935 Datastore constraints are bypassed with Zip ingest. A zip file can 

936 contain multiple dataset types. Should the entire Zip be rejected 

937 if one dataset type is in the constraints list? If configured to 

938 reject everything, ingest should not be attempted. 

939 

940 The Zip file is given to each datastore in turn, ignoring datastores 

941 where it is not supported. Is deemed successful if any of the 

942 datastores accept the file. 

943 """ 

944 index = ZipIndex.from_zip_file(zip_path) 

945 refs = index.refs.to_refs(self.universe) 

946 

947 # For now raise if any refs are not supported. 

948 # Being selective will require that we return the ingested refs 

949 # to the caller so that registry can be modified to remove the 

950 # entries. 

951 if any(not self.constraints.isAcceptable(ref) for ref in refs): 

952 raise DatasetTypeNotSupportedError( 

953 "Some of the refs in the given Zip file are not acceptable to this datastore." 

954 ) 

955 

956 n_success = 0 

957 final_exception: Exception | None = None 

958 for number, (datastore, constraints) in enumerate( 

959 zip(self.datastores, self.datastoreConstraints, strict=True) 

960 ): 

961 if datastore.isEphemeral: 

962 continue 

963 

964 # There can be constraints for the datastore in the configuration 

965 # of the chaining, or constraints in the configuration of the 

966 # datastore itself. 

967 if any( 

968 (constraints is not None and not constraints.isAcceptable(ref)) 

969 or not datastore.constraints.isAcceptable(ref) 

970 for ref in refs 

971 ): 

972 log.debug("Datastore %s skipping zip ingest due to constraints", datastore.name) 

973 continue 

974 try: 

975 datastore.ingest_zip(zip_path, transfer=transfer, dry_run=dry_run) 

976 except NotImplementedError: 

977 continue 

978 except Exception as e: 

979 final_exception = e 

980 else: 

981 n_success += 1 

982 

983 if n_success: 

984 return 

985 if final_exception: 

986 raise final_exception 

987 raise RuntimeError("Ingest was not successful in any datastores.") 

988 

989 def remove(self, ref: DatasetRef) -> None: 

990 """Indicate to the datastore that a dataset can be removed. 

991 

992 The dataset will be removed from each datastore. The dataset is 

993 not required to exist in every child datastore. 

994 

995 Parameters 

996 ---------- 

997 ref : `DatasetRef` 

998 Reference to the required dataset. 

999 

1000 Raises 

1001 ------ 

1002 FileNotFoundError 

1003 Attempt to remove a dataset that does not exist. Raised if none 

1004 of the child datastores removed the dataset. 

1005 """ 

1006 log.debug("Removing %s", ref) 

1007 self.trash(ref, ignore_errors=False) 

1008 self.emptyTrash(ignore_errors=False, refs=[ref]) 

1009 

1010 def forget(self, refs: Iterable[DatasetRef]) -> None: 

1011 for datastore in tuple(self.datastores): 

1012 datastore.forget(refs) 

1013 

1014 def trash(self, ref: DatasetRef | Iterable[DatasetRef], ignore_errors: bool = True) -> None: 

1015 if isinstance(ref, DatasetRef): 

1016 ref_label = str(ref) 

1017 else: 

1018 ref_label = "bulk datasets" 

1019 

1020 log.debug("Trashing %s", ref_label) 

1021 

1022 counter = 0 

1023 for datastore in self.datastores: 

1024 try: 

1025 datastore.trash(ref, ignore_errors=ignore_errors) 

1026 counter += 1 

1027 except FileNotFoundError: 

1028 pass 

1029 

1030 if counter == 0: 

1031 err_msg = f"Could not mark for removal from any child datastore: {ref_label}" 

1032 if ignore_errors: 

1033 log.warning(err_msg) 

1034 else: 

1035 raise FileNotFoundError(err_msg) 

1036 

1037 def emptyTrash( 

1038 self, ignore_errors: bool = True, refs: Collection[DatasetRef] | None = None, dry_run: bool = False 

1039 ) -> set[ResourcePath]: 

1040 removed = set() 

1041 for datastore in self.datastores: 

1042 removed.update(datastore.emptyTrash(ignore_errors=ignore_errors, refs=refs, dry_run=dry_run)) 

1043 return removed 

1044 

1045 def transfer(self, inputDatastore: Datastore, ref: DatasetRef) -> None: 

1046 """Retrieve a dataset from an input `Datastore`, 

1047 and store the result in this `Datastore`. 

1048 

1049 Parameters 

1050 ---------- 

1051 inputDatastore : `Datastore` 

1052 The external `Datastore` from which to retrieve the Dataset. 

1053 ref : `DatasetRef` 

1054 Reference to the required dataset in the input data store. 

1055 

1056 Returns 

1057 ------- 

1058 results : `list` 

1059 List containing the return value from the ``put()`` to each 

1060 child datastore. 

1061 """ 

1062 assert inputDatastore is not self # unless we want it for renames? 

1063 inMemoryDataset = inputDatastore.get(ref) 

1064 self.put(inMemoryDataset, ref) 

1065 

1066 def validateConfiguration( 

1067 self, entities: Iterable[DatasetRef | DatasetType | StorageClass], logFailures: bool = False 

1068 ) -> None: 

1069 """Validate some of the configuration for this datastore. 

1070 

1071 Parameters 

1072 ---------- 

1073 entities : `~collections.abc.Iterable` [`DatasetRef` | `DatasetType` \ 

1074 | `StorageClass`] 

1075 Entities to test against this configuration. Can be differing 

1076 types. 

1077 logFailures : `bool`, optional 

1078 If `True`, output a log message for every validation error 

1079 detected. 

1080 

1081 Returns 

1082 ------- 

1083 None 

1084 

1085 Raises 

1086 ------ 

1087 DatastoreValidationError 

1088 Raised if there is a validation problem with a configuration. 

1089 All the problems are reported in a single exception. 

1090 

1091 Notes 

1092 ----- 

1093 This method checks each datastore in turn. 

1094 """ 

1095 # Need to catch each of the datastore outputs and ensure that 

1096 # all are tested. 

1097 failures = [] 

1098 for datastore in self.datastores: 

1099 try: 

1100 datastore.validateConfiguration(entities, logFailures=logFailures) 

1101 except DatastoreValidationError as e: 

1102 if logFailures: 

1103 log.critical("Datastore %s failed validation", datastore.name) 

1104 failures.append(f"Datastore {self.name}: {e}") 

1105 

1106 if failures: 

1107 msg = ";\n".join(failures) 

1108 raise DatastoreValidationError(msg) 

1109 

1110 def validateKey(self, lookupKey: LookupKey, entity: DatasetRef | DatasetType | StorageClass) -> None: 

1111 # Docstring is inherited from base class 

1112 failures = [] 

1113 for datastore in self.datastores: 

1114 try: 

1115 datastore.validateKey(lookupKey, entity) 

1116 except DatastoreValidationError as e: 

1117 failures.append(f"Datastore {self.name}: {e}") 

1118 

1119 if failures: 

1120 msg = ";\n".join(failures) 

1121 raise DatastoreValidationError(msg) 

1122 

1123 def getLookupKeys(self) -> set[LookupKey]: 

1124 # Docstring is inherited from base class 

1125 keys = set() 

1126 for datastore in self.datastores: 

1127 keys.update(datastore.getLookupKeys()) 

1128 

1129 keys.update(self.constraints.getLookupKeys()) 

1130 for p in self.datastoreConstraints: 

1131 if p is not None: 

1132 keys.update(p.getLookupKeys()) 

1133 

1134 return keys 

1135 

1136 def needs_expanded_data_ids( 

1137 self, 

1138 transfer: str | None, 

1139 entity: DatasetRef | DatasetType | StorageClass | None = None, 

1140 ) -> bool: 

1141 # Docstring inherited. 

1142 # We can't safely use `self.datastoreConstraints` with `entity` to 

1143 # check whether a child datastore would even want to ingest this 

1144 # dataset, because we don't want to filter out datastores that might 

1145 # need an expanded data ID based in incomplete information (e.g. we 

1146 # pass a StorageClass, but the constraint dispatches on DatasetType). 

1147 # So we pessimistically check if any datastore would need an expanded 

1148 # data ID for this transfer mode. 

1149 return any(datastore.needs_expanded_data_ids(transfer, entity) for datastore in self.datastores) 

1150 

1151 def import_records(self, data: Mapping[str, DatastoreRecordData]) -> None: 

1152 # Docstring inherited from the base class. 

1153 

1154 for datastore in self.datastores: 

1155 datastore.import_records(data) 

1156 

1157 def export_records(self, refs: Iterable[DatasetIdRef]) -> Mapping[str, DatastoreRecordData]: 

1158 # Docstring inherited from the base class. 

1159 

1160 all_records: dict[str, DatastoreRecordData] = {} 

1161 

1162 # Merge all sub-datastore records into one structure 

1163 for datastore in self.datastores: 

1164 sub_records = datastore.export_records(refs) 

1165 for name, record_data in sub_records.items(): 

1166 # All datastore names must be unique in a chain. 

1167 if name in all_records: 

1168 raise ValueError("Non-unique datastore name found in datastore {datastore}") 

1169 all_records[name] = record_data 

1170 

1171 return all_records 

1172 

1173 def export_predicted_records(self, refs: Iterable[DatasetRef]) -> dict[str, DatastoreRecordData]: 

1174 # Docstring inherited from the base class. 

1175 

1176 all_records: dict[str, DatastoreRecordData] = {} 

1177 

1178 # Filter out datasets that this datastore is not allowed to contain. 

1179 refs = [ref for ref in refs if self.constraints.isAcceptable(ref)] 

1180 

1181 # Merge all sub-datastore records into one structure 

1182 for datastore in self.datastores: 

1183 sub_records = datastore.export_predicted_records(refs) 

1184 for name, record_data in sub_records.items(): 

1185 # All datastore names must be unique in a chain. 

1186 if name in all_records: 

1187 raise ValueError("Non-unique datastore name found in datastore {datastore}") 

1188 all_records[name] = record_data 

1189 

1190 return all_records 

1191 

1192 def export( 

1193 self, 

1194 refs: Iterable[DatasetRef], 

1195 *, 

1196 directory: ResourcePathExpression | None = None, 

1197 transfer: str | None = "auto", 

1198 ) -> Iterable[FileDataset]: 

1199 # Docstring inherited from Datastore.export. 

1200 if transfer == "auto" and directory is None: 

1201 transfer = None 

1202 

1203 if transfer is not None and directory is None: 

1204 raise TypeError(f"Cannot export using transfer mode {transfer} with no export directory given") 

1205 

1206 if transfer == "move": 

1207 raise TypeError("Can not export by moving files out of datastore.") 

1208 

1209 # Exporting from a chain has the potential for a dataset to be 

1210 # in one or more of the datastores in the chain. We only need one 

1211 # of them since we assume the datasets are the same in all (but 

1212 # the file format could be different of course since that is a 

1213 # per-datastore configuration). 

1214 # We also do not know whether any of the datastores in the chain 

1215 # support file export. 

1216 

1217 # Ensure we have an ordered sequence that is not an iterator or set. 

1218 if not isinstance(refs, Sequence): 

1219 refs = list(refs) 

1220 

1221 # If any of the datasets are missing entirely we need to raise early 

1222 # before we try to run the export. This can be a little messy but is 

1223 # better than exporting files from the first datastore and then finding 

1224 # that one is missing but is not in the second datastore either. 

1225 known = [datastore.knows_these(refs) for datastore in self.datastores] 

1226 refs_known: set[DatasetRef] = set() 

1227 for known_to_this in known: 

1228 refs_known.update({ref for ref, knows_this in known_to_this.items() if knows_this}) 

1229 missing_count = len(refs) - len(refs_known) 

1230 if missing_count: 

1231 raise FileNotFoundError(f"Not all datasets known to this datastore. Missing {missing_count}") 

1232 

1233 # To allow us to slot each result into the right place after 

1234 # asking each datastore, create a dict with the index. 

1235 ref_positions = {ref: i for i, ref in enumerate(refs)} 

1236 

1237 # Presize the final export list. 

1238 exported: list[FileDataset | None] = [None] * len(refs) 

1239 

1240 # The order of the returned dataset has to match the order of the 

1241 # given refs, even if they are all from different datastores. 

1242 for i, datastore in enumerate(self.datastores): 

1243 known_to_this = known[i] 

1244 filtered = [ref for ref, knows in known_to_this.items() if knows and ref in ref_positions] 

1245 

1246 try: 

1247 this_export = datastore.export(filtered, directory=directory, transfer=transfer) 

1248 except NotImplementedError: 

1249 # Try the next datastore. 

1250 continue 

1251 

1252 for ref, export in zip(filtered, this_export, strict=True): 

1253 # Get the position and also delete it from the list. 

1254 exported[ref_positions.pop(ref)] = export 

1255 

1256 # Every dataset should be accounted for because of the earlier checks 

1257 # but make sure that we did fill all the slots to appease mypy. 

1258 for i, dataset in enumerate(exported): 

1259 if dataset is None: 

1260 raise FileNotFoundError(f"Failed to export dataset {refs[i]}.") 

1261 yield dataset 

1262 

1263 def get_file_info_for_transfer(self, dataset_ids: Iterable[DatasetId]) -> FileTransferMap: 

1264 unassigned_ids = set(dataset_ids) 

1265 output: FileTransferMap = {} 

1266 found_acceptable_datastore = False 

1267 for datastore in self.datastores: 

1268 try: 

1269 found = datastore.get_file_info_for_transfer(unassigned_ids) 

1270 found_acceptable_datastore = True 

1271 output.update(found) 

1272 unassigned_ids -= found.keys() 

1273 except NotImplementedError: 

1274 pass 

1275 

1276 if not found_acceptable_datastore: 

1277 types = {get_full_type_name(d) for d in self.datastores} 

1278 raise TypeError( 

1279 "ChainedDatastore had no datastores able to provide file transfer information." 

1280 f" Had {','.join(types)}" 

1281 ) 

1282 

1283 return output 

1284 

1285 def locate_missing_files_for_transfer( 

1286 self, refs: Iterable[DatasetRef], artifact_existence: dict[ResourcePath, bool] 

1287 ) -> FileTransferMap: 

1288 missing_refs = {ref.id: ref for ref in refs} 

1289 output: FileTransferMap = {} 

1290 for datastore in self.datastores: 

1291 # Have to check each datastore in turn. If we do not do 

1292 # this warnings will be issued further down for datasets 

1293 # that are in one and not the other. The existence cache 

1294 # will prevent repeat checks. 

1295 

1296 found = datastore.locate_missing_files_for_transfer(missing_refs.values(), artifact_existence) 

1297 output.update(found) 

1298 for id in found.keys(): 

1299 missing_refs.pop(id) 

1300 log.debug("Adding %d missing refs to list for transfer from %s", len(found), datastore.name) 

1301 

1302 return output 

1303 

1304 def transfer_from( 

1305 self, 

1306 source_records: FileTransferMap, 

1307 refs: Collection[DatasetRef], 

1308 transfer: str = "auto", 

1309 artifact_existence: dict[ResourcePath, bool] | None = None, 

1310 dry_run: bool = False, 

1311 ) -> tuple[set[DatasetRef], set[DatasetRef]]: 

1312 # Docstring inherited 

1313 if not refs: 

1314 return set(), set() 

1315 

1316 # Assume that each child datastore knows how to look inside a chained 

1317 # datastore for compatible datastores (and so there is no need to 

1318 # unpack the source datastores here). 

1319 # Need to decide if a ref accepted by one datastore should be sent to 

1320 # later datastores (as is done in put()). More efficient to filter out 

1321 # accepted datasets. 

1322 if artifact_existence is None: 

1323 artifact_existence = {} 

1324 available_refs = set(refs) 

1325 accepted: set[DatasetRef] = set() 

1326 rejected: set[DatasetRef] = set() 

1327 nsuccess = 0 

1328 

1329 log.debug("Initiating transfer to chained datastore %s", self.name) 

1330 for datastore in self.datastores: 

1331 # Rejections from this datastore might be acceptances in the next. 

1332 # We add them all up but then recalculate at the end. 

1333 if not available_refs: 

1334 break 

1335 log.verbose("Transferring %d datasets to %s from chain", len(available_refs), datastore.name) 

1336 

1337 try: 

1338 current_accepted, current_rejected = datastore.transfer_from( 

1339 source_records, 

1340 available_refs, 

1341 transfer=transfer, 

1342 artifact_existence=artifact_existence, 

1343 dry_run=dry_run, 

1344 ) 

1345 except (TypeError, NotImplementedError): 

1346 # The datastores were incompatible. 

1347 continue 

1348 else: 

1349 nsuccess += 1 

1350 

1351 # Do not send accepted refs to later datastores. 

1352 available_refs -= current_accepted 

1353 

1354 accepted.update(current_accepted) 

1355 rejected.update(current_rejected) 

1356 

1357 if nsuccess == 0: 

1358 raise TypeError("None of the child datastores could accept file transfers") 

1359 

1360 # It's not rejected if some other datastore accepted it. 

1361 rejected -= accepted 

1362 log.verbose( 

1363 "Finished transfer_from to %s with %d accepted, %d rejected from %d requested.", 

1364 self.name, 

1365 len(accepted), 

1366 len(rejected), 

1367 len(refs), 

1368 ) 

1369 

1370 return accepted, rejected 

1371 

1372 def get_opaque_table_definitions(self) -> Mapping[str, DatastoreOpaqueTable]: 

1373 # Docstring inherited from the base class. 

1374 tables: dict[str, DatastoreOpaqueTable] = {} 

1375 for datastore in self.datastores: 

1376 tables.update(datastore.get_opaque_table_definitions()) 

1377 return tables 

1378 

1379 def set_retrieve_dataset_type_method(self, method: Callable[[str], DatasetType | None] | None) -> None: 

1380 # Docstring inherited from the base class. 

1381 for datastore in self.datastores: 

1382 datastore.set_retrieve_dataset_type_method(method)