Coverage for python / lsst / daf / butler / datastores / fileDatastore.py: 8%

1063 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-26 08:49 +0000

1# This file is part of daf_butler. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28"""Generic file-based datastore code.""" 

29 

30from __future__ import annotations 

31 

32__all__ = ("FileDatastore",) 

33 

34import contextlib 

35import hashlib 

36import logging 

37import math 

38from collections import defaultdict 

39from collections.abc import Callable, Collection, Iterable, Mapping, Sequence 

40from typing import TYPE_CHECKING, Any, ClassVar, cast 

41 

42from sqlalchemy import BigInteger, String 

43 

44from lsst.daf.butler import ( 

45 Config, 

46 DatasetDatastoreRecords, 

47 DatasetId, 

48 DatasetRef, 

49 DatasetType, 

50 DatasetTypeNotSupportedError, 

51 FileDataset, 

52 FileDescriptor, 

53 Formatter, 

54 FormatterFactory, 

55 FormatterV1inV2, 

56 FormatterV2, 

57 Location, 

58 LocationFactory, 

59 Progress, 

60 StorageClass, 

61 ddl, 

62) 

63from lsst.daf.butler.datastore import ( 

64 DatasetRefURIs, 

65 Datastore, 

66 DatastoreConfig, 

67 DatastoreOpaqueTable, 

68 DatastoreValidationError, 

69) 

70from lsst.daf.butler.datastore.cache_manager import ( 

71 AbstractDatastoreCacheManager, 

72 DatastoreCacheManager, 

73 DatastoreDisabledCacheManager, 

74) 

75from lsst.daf.butler.datastore.composites import CompositesMap 

76from lsst.daf.butler.datastore.file_templates import FileTemplates, FileTemplateValidationError 

77from lsst.daf.butler.datastore.generic_base import GenericBaseDatastore 

78from lsst.daf.butler.datastore.record_data import DatastoreRecordData 

79from lsst.daf.butler.datastore.stored_file_info import StoredDatastoreItemInfo, StoredFileInfo 

80from lsst.daf.butler.datastores.file_datastore.get import ( 

81 DatasetLocationInformation, 

82 DatastoreFileGetInformation, 

83 generate_datastore_get_information, 

84 get_dataset_as_python_object_from_get_info, 

85) 

86from lsst.daf.butler.datastores.file_datastore.retrieve_artifacts import ( 

87 ArtifactIndexInfo, 

88 ZipIndex, 

89 determine_destination_for_retrieved_artifact, 

90 unpack_zips, 

91) 

92from lsst.daf.butler.registry.interfaces import ( 

93 DatabaseInsertMode, 

94 DatastoreRegistryBridge, 

95 FakeDatasetRef, 

96 ReadOnlyDatabaseError, 

97) 

98from lsst.daf.butler.repo_relocation import replaceRoot 

99from lsst.daf.butler.utils import transactional 

100from lsst.resources import ResourcePath, ResourcePathExpression 

101from lsst.utils.introspection import get_class_of, get_full_type_name 

102from lsst.utils.iteration import chunk_iterable 

103 

104# For VERBOSE logging usage. 

105from lsst.utils.logging import VERBOSE, getLogger 

106from lsst.utils.timer import time_this 

107 

108from ..datastore import FileTransferMap, FileTransferRecord 

109 

110if TYPE_CHECKING: 

111 from lsst.daf.butler import DatasetProvenance, LookupKey 

112 from lsst.daf.butler.registry.interfaces import DatasetIdRef, DatastoreRegistryBridgeManager 

113 

114log = getLogger(__name__) 

115 

116 

117class _IngestPrepData(Datastore.IngestPrepData): 

118 """Helper class for FileDatastore ingest implementation. 

119 

120 Parameters 

121 ---------- 

122 datasets : `~collections.abc.Iterable` of `FileDataset` 

123 Files to be ingested by this datastore. 

124 """ 

125 

126 def __init__(self, datasets: Iterable[FileDataset]): 

127 super().__init__(ref for dataset in datasets for ref in dataset.refs) 

128 self.datasets = datasets 

129 

130 

131class FileDatastore(GenericBaseDatastore[StoredFileInfo]): 

132 """Generic Datastore for file-based implementations. 

133 

134 Should always be sub-classed since key abstract methods are missing. 

135 

136 Parameters 

137 ---------- 

138 config : `DatastoreConfig` or `str` 

139 Configuration as either a `Config` object or URI to file. 

140 bridgeManager : `DatastoreRegistryBridgeManager` 

141 Object that manages the interface between `Registry` and datastores. 

142 root : `lsst.resources.ResourcePath` 

143 Root directory URI of this `Datastore`. 

144 formatterFactory : `FormatterFactory` 

145 Factory for creating instances of formatters. 

146 templates : `FileTemplates` 

147 File templates that can be used by this `Datastore`. 

148 composites : `CompositesMap` 

149 Determines whether a dataset should be disassembled on put. 

150 trustGetRequest : `bool` 

151 Determine whether we can fall back to configuration if a requested 

152 dataset is not known to registry. 

153 

154 Raises 

155 ------ 

156 ValueError 

157 If root location does not exist and ``create`` is `False` in the 

158 configuration. 

159 """ 

160 

161 defaultConfigFile: ClassVar[str | None] = None 

162 """Path to configuration defaults. Accessed within the ``config`` resource 

163 or relative to a search path. Can be None if no defaults specified. 

164 """ 

165 

166 root: ResourcePath 

167 """Root directory URI of this `Datastore`.""" 

168 

169 locationFactory: LocationFactory 

170 """Factory for creating locations relative to the datastore root.""" 

171 

172 formatterFactory: FormatterFactory 

173 """Factory for creating instances of formatters.""" 

174 

175 templates: FileTemplates 

176 """File templates that can be used by this `Datastore`.""" 

177 

178 composites: CompositesMap 

179 """Determines whether a dataset should be disassembled on put.""" 

180 

181 defaultConfigFile = "datastores/fileDatastore.yaml" 

182 """Path to configuration defaults. Accessed within the ``config`` resource 

183 or relative to a search path. Can be None if no defaults specified. 

184 """ 

185 

186 _retrieve_dataset_method: Callable[[str], DatasetType | None] | None = None 

187 """Callable that is used in trusted mode to retrieve registry definition 

188 of a named dataset type. 

189 """ 

190 

191 @classmethod 

192 def setConfigRoot(cls, root: str, config: Config, full: Config, overwrite: bool = True) -> None: 

193 """Set any filesystem-dependent config options for this Datastore to 

194 be appropriate for a new empty repository with the given root. 

195 

196 Parameters 

197 ---------- 

198 root : `str` 

199 URI to the root of the data repository. 

200 config : `Config` 

201 A `Config` to update. Only the subset understood by 

202 this component will be updated. Will not expand 

203 defaults. 

204 full : `Config` 

205 A complete config with all defaults expanded that can be 

206 converted to a `DatastoreConfig`. Read-only and will not be 

207 modified by this method. 

208 Repository-specific options that should not be obtained 

209 from defaults when Butler instances are constructed 

210 should be copied from ``full`` to ``config``. 

211 overwrite : `bool`, optional 

212 If `False`, do not modify a value in ``config`` if the value 

213 already exists. Default is always to overwrite with the provided 

214 ``root``. 

215 

216 Notes 

217 ----- 

218 If a keyword is explicitly defined in the supplied ``config`` it 

219 will not be overridden by this method if ``overwrite`` is `False`. 

220 This allows explicit values set in external configs to be retained. 

221 """ 

222 Config.updateParameters( 

223 DatastoreConfig, 

224 config, 

225 full, 

226 toUpdate={"root": root}, 

227 toCopy=("cls", ("records", "table")), 

228 overwrite=overwrite, 

229 ) 

230 

231 @classmethod 

232 def makeTableSpec(cls) -> ddl.TableSpec: 

233 return ddl.TableSpec( 

234 fields=[ 

235 ddl.FieldSpec(name="dataset_id", dtype=ddl.GUID, primaryKey=True), 

236 ddl.FieldSpec(name="path", dtype=String, length=256, nullable=False), 

237 ddl.FieldSpec(name="formatter", dtype=String, length=128, nullable=False), 

238 ddl.FieldSpec(name="storage_class", dtype=String, length=64, nullable=False), 

239 # Use empty string to indicate no component 

240 ddl.FieldSpec(name="component", dtype=String, length=32, primaryKey=True), 

241 # TODO: should checksum be Base64Bytes instead? 

242 ddl.FieldSpec(name="checksum", dtype=String, length=128, nullable=True), 

243 ddl.FieldSpec(name="file_size", dtype=BigInteger, nullable=True), 

244 ], 

245 unique=frozenset(), 

246 indexes=[ddl.IndexSpec("path")], 

247 ) 

248 

249 def __init__( 

250 self, 

251 config: DatastoreConfig, 

252 bridgeManager: DatastoreRegistryBridgeManager, 

253 root: ResourcePath, 

254 formatterFactory: FormatterFactory, 

255 templates: FileTemplates, 

256 composites: CompositesMap, 

257 trustGetRequest: bool, 

258 ): 

259 super().__init__(config, bridgeManager) 

260 self.root = ResourcePath(root) 

261 self.formatterFactory = formatterFactory 

262 self.templates = templates 

263 self.composites = composites 

264 self.trustGetRequest = trustGetRequest 

265 

266 # Name ourselves either using an explicit name or a name 

267 # derived from the (unexpanded) root 

268 if "name" in self.config: 

269 self.name = self.config["name"] 

270 else: 

271 # We use the unexpanded root in the name to indicate that this 

272 # datastore can be moved without having to update registry. 

273 self.name = "{}@{}".format(type(self).__name__, self.config["root"]) 

274 

275 self.locationFactory = LocationFactory(self.root) 

276 

277 self._opaque_table_name = self.config["records", "table"] 

278 try: 

279 # Storage of paths and formatters, keyed by dataset_id 

280 self._table = bridgeManager.opaque.register(self._opaque_table_name, self.makeTableSpec()) 

281 # Interface to Registry. 

282 self._bridge = bridgeManager.register(self.name) 

283 except ReadOnlyDatabaseError: 

284 # If the database is read only and we just tried and failed to 

285 # create a table, it means someone is trying to create a read-only 

286 # butler client for an empty repo. That should be okay, as long 

287 # as they then try to get any datasets before some other client 

288 # creates the table. Chances are they're just validating 

289 # configuration. 

290 pass 

291 

292 # Determine whether checksums should be used - default to False 

293 self.useChecksum = self.config.get("checksum", False) 

294 

295 # Create a cache manager 

296 self.cacheManager: AbstractDatastoreCacheManager 

297 if "cached" in self.config: 

298 self.cacheManager = DatastoreCacheManager(self.config["cached"], universe=bridgeManager.universe) 

299 else: 

300 self.cacheManager = DatastoreDisabledCacheManager("", universe=bridgeManager.universe) 

301 

302 self.universe = bridgeManager.universe 

303 

304 @classmethod 

305 def _create_from_config( 

306 cls, 

307 config: DatastoreConfig, 

308 bridgeManager: DatastoreRegistryBridgeManager, 

309 butlerRoot: ResourcePathExpression | None, 

310 ) -> FileDatastore: 

311 if "root" not in config: 

312 raise ValueError("No root directory specified in configuration") 

313 

314 # Support repository relocation in config 

315 # Existence of self.root is checked in subclass 

316 root = ResourcePath(replaceRoot(config["root"], butlerRoot), forceDirectory=True, forceAbsolute=True) 

317 

318 # Now associate formatters with storage classes 

319 formatterFactory = FormatterFactory() 

320 formatterFactory.registerFormatters(config["formatters"], universe=bridgeManager.universe) 

321 

322 # Read the file naming templates 

323 templates = FileTemplates(config["templates"], universe=bridgeManager.universe) 

324 

325 # See if composites should be disassembled 

326 composites = CompositesMap(config["composites"], universe=bridgeManager.universe) 

327 

328 # Determine whether we can fall back to configuration if a 

329 # requested dataset is not known to registry 

330 trustGetRequest = config.get("trust_get_request", False) 

331 

332 self = FileDatastore( 

333 config, bridgeManager, root, formatterFactory, templates, composites, trustGetRequest 

334 ) 

335 

336 # Check existence and create directory structure if necessary. 

337 # 

338 # The concept of a 'root directory' is problematic for some resource 

339 # path types that don't necessarily support the concept of a directory 

340 # (http, s3, gs... basically anything that isn't a local filesystem or 

341 # WebDAV.) 

342 # On these resource paths an object representing the 

343 # "root" directory may not exist even though files under the root do, 

344 # and in a read-only repository we will be unable to create it. 

345 # So we only immediately verify the root for local filesystems, 

346 # the only case where this check will definitely not give a false 

347 # negative. 

348 if self.root.isLocal and not self.root.exists(): 

349 if "create" not in self.config or not self.config["create"]: 

350 raise ValueError(f"No valid root and not allowed to create one at: {self.root}") 

351 try: 

352 self.root.mkdir() 

353 except Exception as e: 

354 raise ValueError( 

355 f"Can not create datastore root '{self.root}', check permissions. Got error: {e}" 

356 ) from e 

357 

358 return self 

359 

360 def clone(self, bridgeManager: DatastoreRegistryBridgeManager) -> Datastore: 

361 return FileDatastore( 

362 self.config, 

363 bridgeManager, 

364 self.root, 

365 self.formatterFactory, 

366 self.templates, 

367 self.composites, 

368 self.trustGetRequest, 

369 ) 

370 

371 def __str__(self) -> str: 

372 return str(self.root) 

373 

374 @property 

375 def bridge(self) -> DatastoreRegistryBridge: 

376 return self._bridge 

377 

378 @property 

379 def roots(self) -> dict[str, ResourcePath | None]: 

380 # Docstring inherited. 

381 return {self.name: self.root} 

382 

383 def _set_trust_mode(self, mode: bool) -> None: 

384 self.trustGetRequest = mode 

385 

386 def _artifact_exists(self, location: Location) -> bool: 

387 """Check that an artifact exists in this datastore at the specified 

388 location. 

389 

390 Parameters 

391 ---------- 

392 location : `Location` 

393 Expected location of the artifact associated with this datastore. 

394 

395 Returns 

396 ------- 

397 exists : `bool` 

398 True if the location can be found, false otherwise. 

399 """ 

400 log.debug("Checking if resource exists: %s", location.uri) 

401 return location.uri.exists() 

402 

403 def addStoredItemInfo( 

404 self, 

405 refs: Iterable[DatasetRef], 

406 infos: Iterable[StoredFileInfo], 

407 insert_mode: DatabaseInsertMode = DatabaseInsertMode.INSERT, 

408 ) -> None: 

409 """Record internal storage information associated with one or more 

410 datasets. 

411 

412 Parameters 

413 ---------- 

414 refs : sequence of `DatasetRef` 

415 The datasets that have been stored. 

416 infos : sequence of `StoredDatastoreItemInfo` 

417 Metadata associated with the stored datasets. 

418 insert_mode : `~lsst.daf.butler.registry.interfaces.DatabaseInsertMode` 

419 Mode to use to insert the new records into the table. The 

420 options are ``INSERT`` (error if pre-existing), ``REPLACE`` 

421 (replace content with new values), and ``ENSURE`` (skip if the row 

422 already exists). 

423 """ 

424 records = [ 

425 info.rebase(ref).to_record(dataset_id=ref.id) for ref, info in zip(refs, infos, strict=True) 

426 ] 

427 match insert_mode: 

428 case DatabaseInsertMode.INSERT: 

429 self._table.insert(*records, transaction=self._transaction) 

430 case DatabaseInsertMode.ENSURE: 

431 self._table.ensure(*records, transaction=self._transaction) 

432 case DatabaseInsertMode.REPLACE: 

433 self._table.replace(*records, transaction=self._transaction) 

434 case _: 

435 raise ValueError(f"Unknown insert mode of '{insert_mode}'") 

436 

437 def getStoredItemsInfo( 

438 self, ref: DatasetIdRef, ignore_datastore_records: bool = False 

439 ) -> list[StoredFileInfo]: 

440 """Retrieve information associated with files stored in this 

441 `Datastore` associated with this dataset ref. 

442 

443 Parameters 

444 ---------- 

445 ref : `DatasetRef` 

446 The dataset that is to be queried. 

447 ignore_datastore_records : `bool` 

448 If `True` then do not use datastore records stored in refs. 

449 

450 Returns 

451 ------- 

452 items : `~collections.abc.Iterable` [`StoredDatastoreItemInfo`] 

453 Stored information about the files and associated formatters 

454 associated with this dataset. Only one file will be returned 

455 if the dataset has not been disassembled. Can return an empty 

456 list if no matching datasets can be found. 

457 """ 

458 # Try to get them from the ref first. 

459 if ref._datastore_records is not None and not ignore_datastore_records: 

460 ref_records = ref._datastore_records.get(self._table.name, []) 

461 # Need to make sure they have correct type. 

462 for record in ref_records: 

463 if not isinstance(record, StoredFileInfo): 

464 raise TypeError(f"Datastore record has unexpected type {record.__class__.__name__}") 

465 return cast(list[StoredFileInfo], ref_records) 

466 

467 # Look for the dataset_id -- there might be multiple matches 

468 # if we have disassembled the dataset. 

469 records = self._table.fetch(dataset_id=ref.id) 

470 return [StoredFileInfo.from_record(record) for record in records] 

471 

472 def _register_datasets( 

473 self, 

474 refsAndInfos: Iterable[tuple[DatasetRef, StoredFileInfo]], 

475 insert_mode: DatabaseInsertMode = DatabaseInsertMode.INSERT, 

476 ) -> None: 

477 """Update registry to indicate that one or more datasets have been 

478 stored. 

479 

480 Parameters 

481 ---------- 

482 refsAndInfos : sequence `tuple` [`DatasetRef`, 

483 `StoredDatastoreItemInfo`] 

484 Datasets to register and the internal datastore metadata associated 

485 with them. 

486 insert_mode : `str`, optional 

487 Indicate whether the new records should be new ("insert", default), 

488 or allowed to exists ("ensure") or be replaced if already present 

489 ("replace"). 

490 """ 

491 expandedRefs: list[DatasetRef] = [] 

492 expandedItemInfos: list[StoredFileInfo] = [] 

493 

494 for ref, itemInfo in refsAndInfos: 

495 expandedRefs.append(ref) 

496 expandedItemInfos.append(itemInfo) 

497 

498 # Dataset location only cares about registry ID so if we have 

499 # disassembled in datastore we have to deduplicate. Since they 

500 # will have different datasetTypes we can't use a set 

501 registryRefs = {r.id: r for r in expandedRefs} 

502 if insert_mode == DatabaseInsertMode.INSERT: 

503 self.bridge.insert(registryRefs.values()) 

504 else: 

505 # There are only two columns and all that matters is the 

506 # dataset ID. 

507 self.bridge.ensure(registryRefs.values()) 

508 self.addStoredItemInfo(expandedRefs, expandedItemInfos, insert_mode=insert_mode) 

509 

510 def _get_stored_records_associated_with_refs( 

511 self, refs: Iterable[DatasetIdRef], ignore_datastore_records: bool = False 

512 ) -> dict[DatasetId, list[StoredFileInfo]]: 

513 """Retrieve all records associated with the provided refs. 

514 

515 Parameters 

516 ---------- 

517 refs : `~collections.abc.Iterable` of `DatasetIdRef` 

518 The refs for which records are to be retrieved. 

519 ignore_datastore_records : `bool` 

520 If `True` then do not use datastore records stored in refs. 

521 

522 Returns 

523 ------- 

524 records : `dict` of [`DatasetId`, `list` of `StoredFileInfo`] 

525 The matching records indexed by the ref ID. The number of entries 

526 in the dict can be smaller than the number of requested refs. 

527 """ 

528 # Check datastore records in refs first. 

529 records_by_ref: defaultdict[DatasetId, list[StoredFileInfo]] = defaultdict(list) 

530 refs_with_no_records = [] 

531 for ref in refs: 

532 if ignore_datastore_records or ref._datastore_records is None: 

533 refs_with_no_records.append(ref) 

534 else: 

535 if (ref_records := ref._datastore_records.get(self._table.name)) is not None: 

536 # Need to make sure they have correct type. 

537 for ref_record in ref_records: 

538 if not isinstance(ref_record, StoredFileInfo): 

539 raise TypeError( 

540 f"Datastore record has unexpected type {ref_record.__class__.__name__}" 

541 ) 

542 records_by_ref[ref.id].append(ref_record) 

543 

544 # If there were any refs without datastore records, check opaque table. 

545 records = self._table.fetch(dataset_id=[ref.id for ref in refs_with_no_records]) 

546 

547 # Uniqueness is dataset_id + component so can have multiple records 

548 # per ref. 

549 for record in records: 

550 records_by_ref[record["dataset_id"]].append(StoredFileInfo.from_record(record)) 

551 return records_by_ref 

552 

553 def _refs_associated_with_artifacts( 

554 self, paths: Iterable[str | ResourcePath] 

555 ) -> dict[str, set[DatasetId]]: 

556 """Return paths and associated dataset refs. 

557 

558 Parameters 

559 ---------- 

560 paths : `list` of `str` or `lsst.resources.ResourcePath` 

561 All the paths to include in search. These are exact matches 

562 to the entries in the records table and can include fragments. 

563 

564 Returns 

565 ------- 

566 mapping : `dict` of [`str`, `set` [`DatasetId`]] 

567 Mapping of each path to a set of associated database IDs. 

568 These are artifacts and so any fragments are stripped from the 

569 keys. 

570 """ 

571 # Group paths by those that have fragments and those that do not. 

572 with_fragment = set() 

573 without_fragment = set() 

574 for rpath in paths: 

575 spath = str(rpath) # Typing says can be ResourcePath so must force to string. 

576 if "#" in spath: 

577 spath, fragment = spath.rsplit("#", 1) 

578 with_fragment.add(spath) 

579 else: 

580 without_fragment.add(spath) 

581 

582 result: dict[str, set[DatasetId]] = defaultdict(set) 

583 if without_fragment: 

584 records = self._table.fetch(path=without_fragment) 

585 for row in records: 

586 path = row["path"] 

587 result[path].add(row["dataset_id"]) 

588 if with_fragment: 

589 # Do a query per prefix. 

590 for path in with_fragment: 

591 records = self._table.fetch(path=f"{path}#%") 

592 for row in records: 

593 # Need to strip fragments before adding to dict. 

594 row_path = row["path"] 

595 artifact_path = row_path[: row_path.rfind("#")] 

596 result[artifact_path].add(row["dataset_id"]) 

597 return result 

598 

599 def _registered_refs_per_artifact(self, pathInStore: ResourcePath) -> set[DatasetId]: 

600 """Return all dataset refs associated with the supplied path. 

601 

602 Parameters 

603 ---------- 

604 pathInStore : `lsst.resources.ResourcePath` 

605 Path of interest in the data store. 

606 

607 Returns 

608 ------- 

609 ids : `set` of `int` 

610 All `DatasetRef` IDs associated with this path. 

611 """ 

612 records = list(self._table.fetch(path=str(pathInStore))) 

613 ids = {r["dataset_id"] for r in records} 

614 return ids 

615 

616 def removeStoredItemInfo(self, ref: DatasetIdRef) -> None: 

617 """Remove information about the file associated with this dataset. 

618 

619 Parameters 

620 ---------- 

621 ref : `DatasetRef` 

622 The dataset that has been removed. 

623 """ 

624 # Note that this method is actually not used by this implementation, 

625 # we depend on bridge to delete opaque records. But there are some 

626 # tests that check that this method works, so we keep it for now. 

627 self._table.delete(["dataset_id"], {"dataset_id": ref.id}) 

628 

629 def _get_dataset_locations_info( 

630 self, ref: DatasetIdRef, ignore_datastore_records: bool = False 

631 ) -> list[DatasetLocationInformation]: 

632 r"""Find all the `Location`\ s of the requested dataset in the 

633 `Datastore` and the associated stored file information. 

634 

635 Parameters 

636 ---------- 

637 ref : `DatasetRef` 

638 Reference to the required `Dataset`. 

639 ignore_datastore_records : `bool` 

640 If `True` then do not use datastore records stored in refs. 

641 

642 Returns 

643 ------- 

644 results : `list` [`tuple` [`Location`, `StoredFileInfo` ]] 

645 Location of the dataset within the datastore and 

646 stored information about each file and its formatter. 

647 """ 

648 # Get the file information (this will fail if no file) 

649 records = self.getStoredItemsInfo(ref, ignore_datastore_records) 

650 

651 # Use the path to determine the location -- we need to take 

652 # into account absolute URIs in the datastore record 

653 return [(r.file_location(self.locationFactory), r) for r in records] 

654 

655 def _can_remove_dataset_artifact(self, ref: DatasetIdRef, location: Location) -> bool: 

656 """Check that there is only one dataset associated with the 

657 specified artifact. 

658 

659 Parameters 

660 ---------- 

661 ref : `DatasetRef` or `FakeDatasetRef` 

662 Dataset to be removed. 

663 location : `Location` 

664 The location of the artifact to be removed. 

665 

666 Returns 

667 ------- 

668 can_remove : `Bool` 

669 True if the artifact can be safely removed. 

670 """ 

671 # Can't ever delete absolute URIs. 

672 if location.pathInStore.isabs(): 

673 return False 

674 

675 # Get all entries associated with this path 

676 allRefs = self._registered_refs_per_artifact(location.pathInStore) 

677 if not allRefs: 

678 raise RuntimeError(f"Datastore inconsistency error. {location.pathInStore} not in registry") 

679 

680 # Remove these refs from all the refs and if there is nothing left 

681 # then we can delete 

682 remainingRefs = allRefs - {ref.id} 

683 

684 if remainingRefs: 

685 return False 

686 return True 

687 

688 def _get_expected_dataset_locations_info(self, ref: DatasetRef) -> list[tuple[Location, StoredFileInfo]]: 

689 """Predict the location and related file information of the requested 

690 dataset in this datastore. 

691 

692 Parameters 

693 ---------- 

694 ref : `DatasetRef` 

695 Reference to the required `Dataset`. 

696 

697 Returns 

698 ------- 

699 results : `list` [`tuple` [`Location`, `StoredFileInfo` ]] 

700 Expected Location of the dataset within the datastore and 

701 placeholder information about each file and its formatter. 

702 

703 Notes 

704 ----- 

705 Uses the current configuration to determine how we would expect the 

706 datastore files to have been written if we couldn't ask registry. 

707 This is safe so long as there has been no change to datastore 

708 configuration between writing the dataset and wanting to read it. 

709 Will not work for files that have been ingested without using the 

710 standard file template or default formatter. 

711 """ 

712 # If we have a component ref we always need to ask the questions 

713 # of the composite. If the composite is disassembled this routine 

714 # should return all components. If the composite was not 

715 # disassembled the composite is what is stored regardless of 

716 # component request. Note that if the caller has disassembled 

717 # a composite there is no way for this guess to know that 

718 # without trying both the composite and component ref and seeing 

719 # if there is something at the component Location even without 

720 # disassembly being enabled. 

721 if ref.datasetType.isComponent(): 

722 ref = ref.makeCompositeRef() 

723 

724 # See if the ref is a composite that should be disassembled 

725 doDisassembly = self.composites.shouldBeDisassembled(ref) 

726 

727 all_info: list[tuple[Location, Formatter | FormatterV2, StorageClass, str | None]] = [] 

728 

729 if doDisassembly: 

730 for component, componentStorage in ref.datasetType.storageClass.components.items(): 

731 compRef = ref.makeComponentRef(component) 

732 location, formatter = self._determine_put_formatter_location(compRef) 

733 all_info.append((location, formatter, componentStorage, component)) 

734 

735 else: 

736 # Always use the composite ref if no disassembly 

737 location, formatter = self._determine_put_formatter_location(ref) 

738 all_info.append((location, formatter, ref.datasetType.storageClass, None)) 

739 

740 # Convert the list of tuples to have StoredFileInfo as second element 

741 return [ 

742 ( 

743 location, 

744 StoredFileInfo( 

745 formatter=formatter, 

746 path=location.pathInStore.path, 

747 storageClass=storageClass, 

748 component=component, 

749 checksum=None, 

750 file_size=-1, 

751 ), 

752 ) 

753 for location, formatter, storageClass, component in all_info 

754 ] 

755 

756 def _prepare_for_direct_get( 

757 self, ref: DatasetRef, parameters: Mapping[str, Any] | None = None 

758 ) -> list[DatastoreFileGetInformation]: 

759 """Check parameters for ``get`` and obtain formatter and 

760 location. 

761 

762 Parameters 

763 ---------- 

764 ref : `DatasetRef` 

765 Reference to the required Dataset. 

766 parameters : `dict` 

767 `StorageClass`-specific parameters that specify, for example, 

768 a slice of the dataset to be loaded. 

769 

770 Returns 

771 ------- 

772 getInfo : `list` [`DatastoreFileGetInformation`] 

773 Parameters needed to retrieve each file. 

774 """ 

775 log.debug("Retrieve %s from %s with parameters %s", ref, self.name, parameters) 

776 

777 # The storage class we want to use eventually 

778 refStorageClass = ref.datasetType.storageClass 

779 

780 # For trusted mode need to reset storage class. 

781 ref = self._cast_storage_class(ref) 

782 

783 # Get file metadata and internal metadata 

784 fileLocations = self._get_dataset_locations_info(ref) 

785 if not fileLocations: 

786 if not self.trustGetRequest: 

787 raise FileNotFoundError(f"Could not retrieve dataset {ref}.") 

788 # Assume the dataset is where we think it should be 

789 fileLocations = self._get_expected_dataset_locations_info(ref) 

790 

791 if len(fileLocations) > 1: 

792 # If trust is involved it is possible that there will be 

793 # components listed here that do not exist in the datastore. 

794 # Explicitly check for file artifact existence and filter out any 

795 # that are missing. 

796 if self.trustGetRequest: 

797 fileLocations = [loc for loc in fileLocations if loc[0].uri.exists()] 

798 

799 # For now complain only if we have no components at all. One 

800 # component is probably a problem but we can punt that to the 

801 # assembler. 

802 if not fileLocations: 

803 raise FileNotFoundError(f"None of the component files for dataset {ref} exist.") 

804 

805 return generate_datastore_get_information( 

806 fileLocations, 

807 readStorageClass=refStorageClass, 

808 ref=ref, 

809 parameters=parameters, 

810 ) 

811 

812 def _determine_put_formatter_location( 

813 self, ref: DatasetRef, provenance: DatasetProvenance | None = None 

814 ) -> tuple[Location, Formatter | FormatterV2]: 

815 """Calculate the formatter and output location to use for put. 

816 

817 Parameters 

818 ---------- 

819 ref : `DatasetRef` 

820 Reference to the associated Dataset. 

821 provenance : `DatasetProvenance` 

822 Any provenance that should be attached to the serialized dataset. 

823 

824 Returns 

825 ------- 

826 location : `Location` 

827 The location to write the dataset. 

828 formatter : `Formatter` 

829 The `Formatter` to use to write the dataset. 

830 """ 

831 # Work out output file name 

832 try: 

833 template = self.templates.getTemplate(ref) 

834 except KeyError as e: 

835 raise DatasetTypeNotSupportedError(f"Unable to find template for {ref}") from e 

836 

837 # Validate the template to protect against filenames from different 

838 # dataIds returning the same and causing overwrite confusion. 

839 template.validateTemplate(ref) 

840 

841 location = self.locationFactory.fromPath(template.format(ref), trusted_path=True) 

842 

843 # Get the formatter based on the storage class 

844 storageClass = ref.datasetType.storageClass 

845 try: 

846 formatter = self.formatterFactory.getFormatter( 

847 ref, 

848 FileDescriptor(location, storageClass=storageClass, component=ref.datasetType.component()), 

849 dataId=ref.dataId, 

850 ref=ref, 

851 provenance=provenance, 

852 ) 

853 except KeyError as e: 

854 raise DatasetTypeNotSupportedError( 

855 f"Unable to find formatter for {ref} in datastore {self.name}" 

856 ) from e 

857 

858 # Now that we know the formatter, update the location 

859 location = formatter.make_updated_location(location) 

860 

861 return location, formatter 

862 

863 def _overrideTransferMode(self, *datasets: FileDataset, transfer: str | None = None) -> str | None: 

864 # Docstring inherited from base class 

865 if transfer != "auto": 

866 return transfer 

867 

868 # See if the paths are within the datastore or not 

869 inside = [self._pathInStore(d.path) is not None for d in datasets] 

870 

871 if all(inside): 

872 transfer = None 

873 elif not any(inside): 

874 # Allow ResourcePath to use its own knowledge 

875 transfer = "auto" 

876 else: 

877 # This can happen when importing from a datastore that 

878 # has had some datasets ingested using "direct" mode. 

879 # Also allow ResourcePath to sort it out but warn about it. 

880 # This can happen if you are importing from a datastore 

881 # that had some direct transfer datasets. 

882 log.warning( 

883 "Some datasets are inside the datastore and some are outside. Using 'split' " 

884 "transfer mode. This assumes that the files outside the datastore are " 

885 "still accessible to the new butler since they will not be copied into " 

886 "the target datastore." 

887 ) 

888 transfer = "split" 

889 

890 return transfer 

891 

892 def _pathInStore(self, path: ResourcePathExpression) -> str | None: 

893 """Return path relative to datastore root. 

894 

895 Parameters 

896 ---------- 

897 path : `lsst.resources.ResourcePathExpression` 

898 Path to dataset. Can be absolute URI. If relative assumed to 

899 be relative to the datastore. Returns path in datastore 

900 or raises an exception if the path it outside. 

901 

902 Returns 

903 ------- 

904 inStore : `str` 

905 Path relative to datastore root. Returns `None` if the file is 

906 outside the root. 

907 """ 

908 # Relative path will always be relative to datastore 

909 pathUri = ResourcePath(path, forceAbsolute=False, forceDirectory=False) 

910 return pathUri.relative_to(self.root) 

911 

912 def _standardizeIngestPath( 

913 self, 

914 path: str | ResourcePath, 

915 *, 

916 transfer: str | None = None, 

917 check_existence: bool = False, 

918 ) -> str | ResourcePath: 

919 """Standardize the path of a to-be-ingested file. 

920 

921 Parameters 

922 ---------- 

923 path : `str` or `lsst.resources.ResourcePath` 

924 Path of a file to be ingested. This parameter is not expected 

925 to be all the types that can be used to construct a 

926 `~lsst.resources.ResourcePath`. 

927 transfer : `str`, optional 

928 How (and whether) the dataset should be added to the datastore. 

929 See `ingest` for details of transfer modes. 

930 This implementation is provided only so 

931 `NotImplementedError` can be raised if the mode is not supported; 

932 actual transfers are deferred to `_extractIngestInfo`. 

933 check_existence : `bool`, optional 

934 If `True` the existence of the file will be checked, otherwise 

935 no check will be made. 

936 

937 Returns 

938 ------- 

939 path : `str` or `lsst.resources.ResourcePath` 

940 New path in what the datastore considers standard form. If an 

941 absolute URI was given that will be returned unchanged. 

942 

943 Notes 

944 ----- 

945 Subclasses of `FileDatastore` can implement this method instead 

946 of `_prepIngest`. It should not modify the data repository or given 

947 file in any way. 

948 

949 Raises 

950 ------ 

951 NotImplementedError 

952 Raised if the datastore does not support the given transfer mode 

953 (including the case where ingest is not supported at all). 

954 """ 

955 if transfer not in (None, "direct", "split") + self.root.transferModes: 

956 raise NotImplementedError(f"Transfer mode {transfer} not supported.") 

957 

958 # A relative URI indicates relative to datastore root 

959 srcUri = ResourcePath(path, forceAbsolute=False, forceDirectory=False) 

960 if not srcUri.isabs(): 

961 srcUri = self.root.join(path) 

962 

963 if check_existence and not srcUri.exists(): 

964 raise FileNotFoundError( 

965 f"Resource at {srcUri} does not exist; note that paths to ingest " 

966 f"are assumed to be relative to {self.root} unless they are absolute." 

967 ) 

968 

969 if transfer is None: 

970 relpath = srcUri.relative_to(self.root) 

971 if not relpath: 

972 raise RuntimeError( 

973 f"Transfer is none but source file ({srcUri}) is not within datastore ({self.root})" 

974 ) 

975 

976 # Return the relative path within the datastore for internal 

977 # transfer 

978 path = relpath 

979 

980 return path 

981 

982 def _extractIngestInfo( 

983 self, 

984 path: ResourcePathExpression, 

985 ref: DatasetRef, 

986 *, 

987 formatter: Formatter | FormatterV2 | type[Formatter | FormatterV2], 

988 transfer: str | None = None, 

989 record_validation_info: bool = True, 

990 ) -> StoredFileInfo: 

991 """Relocate (if necessary) and extract `StoredFileInfo` from a 

992 to-be-ingested file. 

993 

994 Parameters 

995 ---------- 

996 path : `lsst.resources.ResourcePathExpression` 

997 URI or path of a file to be ingested. 

998 ref : `DatasetRef` 

999 Reference for the dataset being ingested. Guaranteed to have 

1000 ``dataset_id not None`. 

1001 formatter : `type` or `Formatter` 

1002 `Formatter` subclass to use for this dataset or an instance. 

1003 transfer : `str`, optional 

1004 How (and whether) the dataset should be added to the datastore. 

1005 See `ingest` for details of transfer modes. 

1006 record_validation_info : `bool`, optional 

1007 If `True`, the default, the datastore can record validation 

1008 information associated with the file. If `False` the datastore 

1009 will not attempt to track any information such as checksums 

1010 or file sizes. This can be useful if such information is tracked 

1011 in an external system or if the file is to be compressed in place. 

1012 It is up to the datastore whether this parameter is relevant. 

1013 

1014 Returns 

1015 ------- 

1016 info : `StoredFileInfo` 

1017 Internal datastore record for this file. This will be inserted by 

1018 the caller; the `_extractIngestInfo` is only responsible for 

1019 creating and populating the struct. 

1020 

1021 Raises 

1022 ------ 

1023 FileNotFoundError 

1024 Raised if one of the given files does not exist. 

1025 FileExistsError 

1026 Raised if transfer is not `None` but the (internal) location the 

1027 file would be moved to is already occupied. 

1028 """ 

1029 if self._transaction is None: 

1030 raise RuntimeError("Ingest called without transaction enabled") 

1031 

1032 # Create URI of the source path, do not need to force a relative 

1033 # path to absolute. 

1034 srcUri = ResourcePath(path, forceAbsolute=False, forceDirectory=False) 

1035 

1036 # Track whether we have read the size of the source yet 

1037 have_sized = False 

1038 

1039 tgtLocation: Location | None 

1040 if transfer is None or transfer == "split": 

1041 # A relative path is assumed to be relative to the datastore 

1042 # in this context 

1043 if not srcUri.isabs(): 

1044 tgtLocation = self.locationFactory.fromPath(srcUri.ospath, trusted_path=False) 

1045 else: 

1046 # Work out the path in the datastore from an absolute URI 

1047 # This is required to be within the datastore. 

1048 pathInStore = srcUri.relative_to(self.root) 

1049 if pathInStore is None and transfer is None: 

1050 raise RuntimeError( 

1051 f"Unexpectedly learned that {srcUri} is not within datastore {self.root}" 

1052 ) 

1053 if pathInStore: 

1054 tgtLocation = self.locationFactory.fromPath(pathInStore, trusted_path=True) 

1055 elif transfer == "split": 

1056 # Outside the datastore but treat that as a direct ingest 

1057 # instead. 

1058 tgtLocation = None 

1059 else: 

1060 raise RuntimeError(f"Unexpected transfer mode encountered: {transfer} for URI {srcUri}") 

1061 elif transfer == "direct": 

1062 # Want to store the full URI to the resource directly in 

1063 # datastore. This is useful for referring to permanent archive 

1064 # storage for raw data. 

1065 # Trust that people know what they are doing. 

1066 tgtLocation = None 

1067 else: 

1068 # Work out the name we want this ingested file to have 

1069 # inside the datastore 

1070 tgtLocation = self._calculate_ingested_datastore_name(srcUri, ref, formatter) 

1071 

1072 # if we are transferring from a local file to a remote location 

1073 # it may be more efficient to get the size and checksum of the 

1074 # local file rather than the transferred one 

1075 if record_validation_info and srcUri.isLocal: 

1076 size = srcUri.size() 

1077 checksum = self.computeChecksum(srcUri) if self.useChecksum else None 

1078 have_sized = True 

1079 

1080 # Transfer the resource to the destination. 

1081 # Allow overwrite of an existing file. This matches the behavior 

1082 # of datastore.put() in that it trusts that registry would not 

1083 # be asking to overwrite unless registry thought that the 

1084 # overwrite was allowed. 

1085 tgtLocation.uri.transfer_from( 

1086 srcUri, transfer=transfer, transaction=self._transaction, overwrite=True 

1087 ) 

1088 

1089 if tgtLocation is None: 

1090 # This means we are using direct mode 

1091 targetUri = srcUri 

1092 targetPath = str(srcUri) 

1093 else: 

1094 targetUri = tgtLocation.uri 

1095 targetPath = tgtLocation.pathInStore.path 

1096 

1097 # the file should exist in the datastore now 

1098 if record_validation_info: 

1099 if not have_sized: 

1100 size = targetUri.size() 

1101 checksum = self.computeChecksum(targetUri) if self.useChecksum else None 

1102 else: 

1103 # Not recording any file information. 

1104 size = -1 

1105 checksum = None 

1106 

1107 return StoredFileInfo( 

1108 formatter=formatter, 

1109 path=targetPath, 

1110 storageClass=ref.datasetType.storageClass, 

1111 component=ref.datasetType.component(), 

1112 file_size=size, 

1113 checksum=checksum, 

1114 ) 

1115 

1116 def _prepIngest(self, *datasets: FileDataset, transfer: str | None = None) -> _IngestPrepData: 

1117 # Docstring inherited from Datastore._prepIngest. 

1118 filtered = [] 

1119 

1120 # Ingest could be given tens of thousands of files. It is not efficient 

1121 # to check for the existence of every single file (especially if they 

1122 # are remote URIs) but in some transfer modes the files will be checked 

1123 # anyhow when they are relocated. For direct or None transfer modes 

1124 # it is possible to not know if the file is accessible at all. 

1125 # Therefore limit number of files that will be checked (but always 

1126 # include the first one). 

1127 max_checks = 200 

1128 n_datasets = len(datasets) 

1129 if n_datasets <= max_checks: 

1130 check_every_n = 1 

1131 elif transfer in ("direct", None): 

1132 check_every_n = int(n_datasets / max_checks + 1) # +1 so that if n < max_checks the answer is 1. 

1133 else: 

1134 check_every_n = 0 

1135 

1136 for count, dataset in enumerate(datasets): 

1137 acceptable = [ref for ref in dataset.refs if self.constraints.isAcceptable(ref)] 

1138 if not acceptable: 

1139 continue 

1140 else: 

1141 dataset.refs = acceptable 

1142 if dataset.formatter is None: 

1143 dataset.formatter = self.formatterFactory.getFormatterClass(dataset.refs[0]) 

1144 else: 

1145 assert isinstance(dataset.formatter, type | str) 

1146 formatter_class = get_class_of(dataset.formatter) 

1147 if not issubclass(formatter_class, Formatter | FormatterV2): 

1148 raise TypeError(f"Requested formatter {dataset.formatter} is not a Formatter class.") 

1149 dataset.formatter = formatter_class 

1150 

1151 # Decide whether the file should be checked. 

1152 check_existence = False 

1153 if check_every_n != 0: 

1154 # First time through count is 0 so we guarantee to check 

1155 # the first file but not necessarily the final one. 

1156 check_existence = count % check_every_n == 0 

1157 

1158 if check_existence: 

1159 log.debug( 

1160 "Checking file existence: %s (%d/%d) [%s]", 

1161 check_existence, 

1162 count + 1, 

1163 n_datasets, 

1164 transfer, 

1165 ) 

1166 

1167 dataset.path = self._standardizeIngestPath( 

1168 dataset.path, transfer=transfer, check_existence=check_existence 

1169 ) 

1170 filtered.append(dataset) 

1171 return _IngestPrepData(filtered) 

1172 

1173 @transactional 

1174 def _finishIngest( 

1175 self, 

1176 prepData: Datastore.IngestPrepData, 

1177 *, 

1178 transfer: str | None = None, 

1179 record_validation_info: bool = True, 

1180 ) -> None: 

1181 # Docstring inherited from Datastore._finishIngest. 

1182 refsAndInfos = [] 

1183 progress = Progress("lsst.daf.butler.datastores.FileDatastore.ingest", level=logging.DEBUG) 

1184 for dataset in progress.wrap(prepData.datasets, desc="Ingesting dataset files"): 

1185 # Do ingest as if the first dataset ref is associated with the file 

1186 info = self._extractIngestInfo( 

1187 dataset.path, 

1188 dataset.refs[0], 

1189 formatter=dataset.formatter, 

1190 transfer=transfer, 

1191 record_validation_info=record_validation_info, 

1192 ) 

1193 refsAndInfos.extend([(ref, info) for ref in dataset.refs]) 

1194 

1195 # In direct mode we can allow repeated ingests of the same thing 

1196 # if we are sure that the external dataset is immutable. We use 

1197 # UUIDv5 to indicate this. If there is a mix of v4 and v5 they are 

1198 # separated. 

1199 refs_and_infos_replace = [] 

1200 refs_and_infos_insert = [] 

1201 if transfer == "direct": 

1202 for entry in refsAndInfos: 

1203 if entry[0].id.version == 5: 

1204 refs_and_infos_replace.append(entry) 

1205 else: 

1206 refs_and_infos_insert.append(entry) 

1207 else: 

1208 refs_and_infos_insert = refsAndInfos 

1209 

1210 if refs_and_infos_insert: 

1211 self._register_datasets(refs_and_infos_insert, insert_mode=DatabaseInsertMode.INSERT) 

1212 if refs_and_infos_replace: 

1213 self._register_datasets(refs_and_infos_replace, insert_mode=DatabaseInsertMode.REPLACE) 

1214 

1215 def _calculate_ingested_datastore_name( 

1216 self, 

1217 srcUri: ResourcePath, 

1218 ref: DatasetRef, 

1219 formatter: Formatter | FormatterV2 | type[Formatter | FormatterV2] | None = None, 

1220 ) -> Location: 

1221 """Given a source URI and a DatasetRef, determine the name the 

1222 dataset will have inside datastore. 

1223 

1224 Parameters 

1225 ---------- 

1226 srcUri : `lsst.resources.ResourcePath` 

1227 URI to the source dataset file. 

1228 ref : `DatasetRef` 

1229 Ref associated with the newly-ingested dataset artifact. This 

1230 is used to determine the name within the datastore. 

1231 formatter : `Formatter` or Formatter class. 

1232 Formatter to use for validation. Can be a class or an instance. 

1233 No validation of the file extension is performed if the 

1234 ``formatter`` is `None`. This can be used if the caller knows 

1235 that the source URI and target URI will use the same formatter. 

1236 

1237 Returns 

1238 ------- 

1239 location : `Location` 

1240 Target location for the newly-ingested dataset. 

1241 """ 

1242 # Ingesting a file from outside the datastore. 

1243 # This involves a new name. 

1244 template = self.templates.getTemplate(ref) 

1245 location = self.locationFactory.fromPath(template.format(ref), trusted_path=True) 

1246 

1247 # Get the extension 

1248 ext = srcUri.getExtension() 

1249 

1250 # Update the destination to include that extension 

1251 location.updateExtension(ext) 

1252 

1253 # Ask the formatter to validate this extension 

1254 if formatter is not None: 

1255 formatter.validate_extension(location) 

1256 

1257 return location 

1258 

1259 def _write_in_memory_to_artifact( 

1260 self, inMemoryDataset: Any, ref: DatasetRef, provenance: DatasetProvenance | None = None 

1261 ) -> StoredFileInfo: 

1262 """Write out in memory dataset to datastore. 

1263 

1264 Parameters 

1265 ---------- 

1266 inMemoryDataset : `object` 

1267 Dataset to write to datastore. 

1268 ref : `DatasetRef` 

1269 Registry information associated with this dataset. 

1270 provenance : `DatasetProvenance` or `None`, optional 

1271 Any provenance that should be attached to the serialized dataset. 

1272 Not supported by all formatters. 

1273 

1274 Returns 

1275 ------- 

1276 info : `StoredFileInfo` 

1277 Information describing the artifact written to the datastore. 

1278 """ 

1279 # May need to coerce the in memory dataset to the correct 

1280 # python type, but first we need to make sure the storage class 

1281 # reflects the one defined in the data repository. 

1282 ref = self._cast_storage_class(ref) 

1283 

1284 # Confirm that we can accept this dataset 

1285 if not self.constraints.isAcceptable(ref): 

1286 # Raise rather than use boolean return value. 

1287 raise DatasetTypeNotSupportedError( 

1288 f"Dataset {ref} has been rejected by this datastore via configuration." 

1289 ) 

1290 

1291 location, formatter = self._determine_put_formatter_location(ref) 

1292 

1293 # The external storage class can differ from the registry storage 

1294 # class AND the given in-memory dataset might not match any of the 

1295 # storage class definitions. 

1296 if formatter.can_accept(inMemoryDataset): 

1297 # Do not need to coerce. Must assume that the formatter can handle 

1298 # it without further checking of types. 

1299 pass 

1300 else: 

1301 # Coerce to a type that it can accept. 

1302 inMemoryDataset = ref.datasetType.storageClass.coerce_type(inMemoryDataset) 

1303 required_pytype = ref.datasetType.storageClass.pytype 

1304 

1305 if not isinstance(inMemoryDataset, required_pytype): 

1306 raise TypeError( 

1307 f"Inconsistency between supplied object ({type(inMemoryDataset)}) " 

1308 f"and storage class type ({required_pytype})" 

1309 ) 

1310 

1311 if self._transaction is None: 

1312 raise RuntimeError("Attempting to write artifact without transaction enabled") 

1313 

1314 def _removeFileExists(uri: ResourcePath) -> None: 

1315 """Remove a file and do not complain if it is not there. 

1316 

1317 This is important since a formatter might fail before the file 

1318 is written and we should not confuse people by writing spurious 

1319 error messages to the log. 

1320 """ 

1321 with contextlib.suppress(FileNotFoundError): 

1322 uri.remove() 

1323 

1324 # Register a callback to try to delete the uploaded data if 

1325 # something fails below 

1326 uri = location.uri 

1327 self._transaction.registerUndo("artifactWrite", _removeFileExists, uri) 

1328 

1329 # Need to record the specified formatter but if this is a V1 formatter 

1330 # we need to convert it to a V2 compatible shim to do the write. 

1331 if not isinstance(formatter, Formatter): 

1332 formatter_compat = formatter 

1333 else: 

1334 formatter_compat = FormatterV1inV2( 

1335 formatter.file_descriptor, 

1336 ref=ref, 

1337 formatter=formatter, 

1338 write_parameters=formatter.write_parameters, 

1339 write_recipes=formatter.write_recipes, 

1340 ) 

1341 

1342 assert isinstance(formatter_compat, FormatterV2) 

1343 

1344 with time_this(log, msg="Writing dataset %s with formatter %s", args=(ref, formatter.name())): 

1345 try: 

1346 formatter_compat.write( 

1347 inMemoryDataset, cache_manager=self.cacheManager, provenance=provenance 

1348 ) 

1349 except Exception as e: 

1350 raise RuntimeError( 

1351 f"Failed to serialize dataset {ref} of type {get_full_type_name(inMemoryDataset)} " 

1352 f"using formatter {formatter.name()}." 

1353 ) from e 

1354 

1355 # URI is needed to resolve what ingest case are we dealing with 

1356 return self._extractIngestInfo(uri, ref, formatter=formatter) 

1357 

1358 def knows(self, ref: DatasetRef) -> bool: 

1359 """Check if the dataset is known to the datastore. 

1360 

1361 Does not check for existence of any artifact. 

1362 

1363 Parameters 

1364 ---------- 

1365 ref : `DatasetRef` 

1366 Reference to the required dataset. 

1367 

1368 Returns 

1369 ------- 

1370 exists : `bool` 

1371 `True` if the dataset is known to the datastore. 

1372 """ 

1373 fileLocations = self._get_dataset_locations_info(ref) 

1374 if fileLocations: 

1375 return True 

1376 return False 

1377 

1378 def knows_these(self, refs: Iterable[DatasetRef]) -> dict[DatasetRef, bool]: 

1379 # Docstring inherited from the base class. 

1380 refs = list(refs) 

1381 

1382 # The records themselves. Could be missing some entries. 

1383 records = self._get_stored_records_associated_with_refs(refs, ignore_datastore_records=True) 

1384 

1385 return {ref: ref.id in records for ref in refs} 

1386 

1387 def _process_mexists_records( 

1388 self, 

1389 id_to_ref: dict[DatasetId, DatasetRef], 

1390 records: dict[DatasetId, list[StoredFileInfo]], 

1391 all_required: bool, 

1392 artifact_existence: dict[ResourcePath, bool] | None = None, 

1393 ) -> dict[DatasetRef, bool]: 

1394 """Check given records for existence. 

1395 

1396 Helper function for `mexists()`. 

1397 

1398 Parameters 

1399 ---------- 

1400 id_to_ref : `dict` of [`DatasetId`, `DatasetRef`] 

1401 Mapping of the dataset ID to the dataset ref itself. 

1402 records : `dict` of [`DatasetId`, `list` of `StoredFileInfo`] 

1403 Records as generally returned by 

1404 ``_get_stored_records_associated_with_refs``. 

1405 all_required : `bool` 

1406 Flag to indicate whether existence requires all artifacts 

1407 associated with a dataset ID to exist or not for existence. 

1408 artifact_existence : `dict` [`lsst.resources.ResourcePath`, `bool`] 

1409 Optional mapping of datastore artifact to existence. Updated by 

1410 this method with details of all artifacts tested. Can be `None` 

1411 if the caller is not interested. 

1412 

1413 Returns 

1414 ------- 

1415 existence : `dict` of [`DatasetRef`, `bool`] 

1416 Mapping from dataset to boolean indicating existence. 

1417 """ 

1418 # The URIs to be checked and a mapping of those URIs to 

1419 # the dataset ID. 

1420 uris_to_check: list[ResourcePath] = [] 

1421 location_map: dict[ResourcePath, DatasetId] = {} 

1422 

1423 location_factory = self.locationFactory 

1424 

1425 uri_existence: dict[ResourcePath, bool] = {} 

1426 for ref_id, infos in records.items(): 

1427 # Key is the dataset Id, value is list of StoredItemInfo 

1428 uris = [info.file_location(location_factory).uri for info in infos] 

1429 location_map.update({uri: ref_id for uri in uris}) 

1430 

1431 # Check the local cache directly for a dataset corresponding 

1432 # to the remote URI. 

1433 if self.cacheManager.file_count > 0: 

1434 ref = id_to_ref[ref_id] 

1435 for uri, storedFileInfo in zip(uris, infos, strict=True): 

1436 check_ref = ref 

1437 if not ref.datasetType.isComponent() and (component := storedFileInfo.component): 

1438 check_ref = ref.makeComponentRef(component) 

1439 if self.cacheManager.known_to_cache(check_ref, uri.getExtension()): 

1440 # Proxy for URI existence. 

1441 uri_existence[uri] = True 

1442 else: 

1443 uris_to_check.append(uri) 

1444 else: 

1445 # Check all of them. 

1446 uris_to_check.extend(uris) 

1447 

1448 if artifact_existence is not None: 

1449 # If a URI has already been checked remove it from the list 

1450 # and immediately add the status to the output dict. 

1451 filtered_uris_to_check = [] 

1452 for uri in uris_to_check: 

1453 if uri in artifact_existence: 

1454 uri_existence[uri] = artifact_existence[uri] 

1455 else: 

1456 filtered_uris_to_check.append(uri) 

1457 uris_to_check = filtered_uris_to_check 

1458 

1459 # Results. 

1460 dataset_existence: dict[DatasetRef, bool] = {} 

1461 

1462 uri_existence.update(ResourcePath.mexists(uris_to_check)) 

1463 for uri, exists in uri_existence.items(): 

1464 dataset_id = location_map[uri] 

1465 ref = id_to_ref[dataset_id] 

1466 

1467 # Disassembled composite needs to check all locations. 

1468 # all_required indicates whether all need to exist or not. 

1469 if ref in dataset_existence: 

1470 if all_required: 

1471 exists = dataset_existence[ref] and exists 

1472 else: 

1473 exists = dataset_existence[ref] or exists 

1474 dataset_existence[ref] = exists 

1475 

1476 if artifact_existence is not None: 

1477 artifact_existence.update(uri_existence) 

1478 

1479 return dataset_existence 

1480 

1481 def mexists( 

1482 self, refs: Iterable[DatasetRef], artifact_existence: dict[ResourcePath, bool] | None = None 

1483 ) -> dict[DatasetRef, bool]: 

1484 """Check the existence of multiple datasets at once. 

1485 

1486 Parameters 

1487 ---------- 

1488 refs : `~collections.abc.Iterable` of `DatasetRef` 

1489 The datasets to be checked. 

1490 artifact_existence : `dict` [`lsst.resources.ResourcePath`, `bool`] 

1491 Optional mapping of datastore artifact to existence. Updated by 

1492 this method with details of all artifacts tested. Can be `None` 

1493 if the caller is not interested. 

1494 

1495 Returns 

1496 ------- 

1497 existence : `dict` of [`DatasetRef`, `bool`] 

1498 Mapping from dataset to boolean indicating existence. 

1499 

1500 Notes 

1501 ----- 

1502 To minimize potentially costly remote existence checks, the local 

1503 cache is checked as a proxy for existence. If a file for this 

1504 `DatasetRef` does exist no check is done for the actual URI. This 

1505 could result in possibly unexpected behavior if the dataset itself 

1506 has been removed from the datastore by another process whilst it is 

1507 still in the cache. 

1508 """ 

1509 chunk_size = 50_000 

1510 dataset_existence: dict[DatasetRef, bool] = {} 

1511 log.debug("Checking for the existence of multiple artifacts in datastore in chunks of %d", chunk_size) 

1512 n_found_total = 0 

1513 n_checked = 0 

1514 n_chunks = 0 

1515 for chunk in chunk_iterable(refs, chunk_size=chunk_size): 

1516 chunk_result = self._mexists(chunk, artifact_existence) 

1517 

1518 # The log message level and content depend on how many 

1519 # datasets we are processing. 

1520 n_results = len(chunk_result) 

1521 

1522 # Use verbose logging to ensure that messages can be seen 

1523 # easily if many refs are being checked. 

1524 log_threshold = VERBOSE 

1525 n_checked += n_results 

1526 

1527 # This sum can take some time so only do it if we know the 

1528 # result is going to be used. 

1529 n_found = 0 

1530 if log.isEnabledFor(log_threshold): 

1531 # Can treat the booleans as 0, 1 integers and sum them. 

1532 n_found = sum(chunk_result.values()) 

1533 n_found_total += n_found 

1534 

1535 # We are deliberately not trying to count the number of refs 

1536 # provided in case it's in the millions. This means there is a 

1537 # situation where the number of refs exactly matches the chunk 

1538 # size and we will switch to the multi-chunk path even though 

1539 # we only have a single chunk. 

1540 if n_results < chunk_size and n_chunks == 0: 

1541 # Single chunk will be processed so we can provide more detail. 

1542 if n_results == 1: 

1543 ref = list(chunk_result)[0] 

1544 # Use debug logging to be consistent with `exists()`. 

1545 log.debug( 

1546 "Calling mexists() with single ref that does%s exist (%s).", 

1547 "" if chunk_result[ref] else " not", 

1548 ref, 

1549 ) 

1550 else: 

1551 # Single chunk but multiple files. Summarize. 

1552 log.log( 

1553 log_threshold, 

1554 "Number of datasets found in datastore %s: %d out of %d datasets checked.", 

1555 self.name, 

1556 n_found, 

1557 n_checked, 

1558 ) 

1559 

1560 else: 

1561 # Use incremental verbose logging when we have multiple chunks. 

1562 log.log( 

1563 log_threshold, 

1564 "Number of datasets found in datastore for chunk %d: %d out of %d checked " 

1565 "(running total from all chunks so far: %d found out of %d checked)", 

1566 n_chunks, 

1567 n_found, 

1568 n_results, 

1569 n_found_total, 

1570 n_checked, 

1571 ) 

1572 dataset_existence.update(chunk_result) 

1573 n_chunks += 1 

1574 

1575 return dataset_existence 

1576 

1577 def _mexists( 

1578 self, refs: Sequence[DatasetRef], artifact_existence: dict[ResourcePath, bool] | None = None 

1579 ) -> dict[DatasetRef, bool]: 

1580 """Check the existence of multiple datasets at once. 

1581 

1582 Parameters 

1583 ---------- 

1584 refs : `~collections.abc.Iterable` of `DatasetRef` 

1585 The datasets to be checked. 

1586 artifact_existence : `dict` [`lsst.resources.ResourcePath`, `bool`] 

1587 Optional mapping of datastore artifact to existence. Updated by 

1588 this method with details of all artifacts tested. Can be `None` 

1589 if the caller is not interested. 

1590 

1591 Returns 

1592 ------- 

1593 existence : `dict` of [`DatasetRef`, `bool`] 

1594 Mapping from dataset to boolean indicating existence. 

1595 """ 

1596 # Make a mapping from refs with the internal storage class to the given 

1597 # refs that may have a different one. We'll use the internal refs 

1598 # throughout this method and convert back at the very end. 

1599 internal_ref_to_input_ref = {self._cast_storage_class(ref): ref for ref in refs} 

1600 

1601 # Need a mapping of dataset_id to (internal) dataset ref since some 

1602 # internal APIs work with dataset_id. 

1603 id_to_ref = {ref.id: ref for ref in internal_ref_to_input_ref} 

1604 

1605 # Set of all IDs we are checking for. 

1606 requested_ids = set(id_to_ref.keys()) 

1607 

1608 # The records themselves. Could be missing some entries. 

1609 records = self._get_stored_records_associated_with_refs( 

1610 id_to_ref.values(), ignore_datastore_records=True 

1611 ) 

1612 

1613 dataset_existence = self._process_mexists_records( 

1614 id_to_ref, records, True, artifact_existence=artifact_existence 

1615 ) 

1616 

1617 # Set of IDs that have been handled. 

1618 handled_ids = {ref.id for ref in dataset_existence} 

1619 

1620 missing_ids = requested_ids - handled_ids 

1621 if missing_ids: 

1622 dataset_existence.update( 

1623 self._mexists_check_expected( 

1624 [id_to_ref[missing] for missing in missing_ids], artifact_existence 

1625 ) 

1626 ) 

1627 

1628 return { 

1629 internal_ref_to_input_ref[internal_ref]: existence 

1630 for internal_ref, existence in dataset_existence.items() 

1631 } 

1632 

1633 def _mexists_check_expected( 

1634 self, refs: Sequence[DatasetRef], artifact_existence: dict[ResourcePath, bool] | None = None 

1635 ) -> dict[DatasetRef, bool]: 

1636 """Check existence of refs that are not known to datastore. 

1637 

1638 Parameters 

1639 ---------- 

1640 refs : `~collections.abc.Iterable` of `DatasetRef` 

1641 The datasets to be checked. These are assumed not to be known 

1642 to datastore. 

1643 artifact_existence : `dict` [`lsst.resources.ResourcePath`, `bool`] 

1644 Optional mapping of datastore artifact to existence. Updated by 

1645 this method with details of all artifacts tested. Can be `None` 

1646 if the caller is not interested. 

1647 

1648 Returns 

1649 ------- 

1650 existence : `dict` of [`DatasetRef`, `bool`] 

1651 Mapping from dataset to boolean indicating existence. 

1652 """ 

1653 dataset_existence: dict[DatasetRef, bool] = {} 

1654 if not self.trustGetRequest: 

1655 # Must assume these do not exist 

1656 for ref in refs: 

1657 dataset_existence[ref] = False 

1658 else: 

1659 log.debug( 

1660 "%d datasets were not known to datastore during initial existence check.", 

1661 len(refs), 

1662 ) 

1663 

1664 # Construct data structure identical to that returned 

1665 # by _get_stored_records_associated_with_refs() but using 

1666 # guessed names. 

1667 records = {} 

1668 id_to_ref = {} 

1669 for missing_ref in refs: 

1670 expected = self._get_expected_dataset_locations_info(missing_ref) 

1671 dataset_id = missing_ref.id 

1672 records[dataset_id] = [info for _, info in expected] 

1673 id_to_ref[dataset_id] = missing_ref 

1674 

1675 dataset_existence.update( 

1676 self._process_mexists_records( 

1677 id_to_ref, 

1678 records, 

1679 False, 

1680 artifact_existence=artifact_existence, 

1681 ) 

1682 ) 

1683 

1684 return dataset_existence 

1685 

1686 def exists(self, ref: DatasetRef) -> bool: 

1687 """Check if the dataset exists in the datastore. 

1688 

1689 Parameters 

1690 ---------- 

1691 ref : `DatasetRef` 

1692 Reference to the required dataset. 

1693 

1694 Returns 

1695 ------- 

1696 exists : `bool` 

1697 `True` if the entity exists in the `Datastore`. 

1698 

1699 Notes 

1700 ----- 

1701 The local cache is checked as a proxy for existence in the remote 

1702 object store. It is possible that another process on a different 

1703 compute node could remove the file from the object store even 

1704 though it is present in the local cache. 

1705 """ 

1706 ref = self._cast_storage_class(ref) 

1707 # We cannot trust datastore records from ref, as many unit tests delete 

1708 # datasets and check their existence. 

1709 fileLocations = self._get_dataset_locations_info(ref, ignore_datastore_records=True) 

1710 

1711 # if we are being asked to trust that registry might not be correct 

1712 # we ask for the expected locations and check them explicitly 

1713 if not fileLocations: 

1714 if not self.trustGetRequest: 

1715 return False 

1716 

1717 # First check the cache. If it is not found we must check 

1718 # the datastore itself. Assume that any component in the cache 

1719 # means that the dataset does exist somewhere. 

1720 if self.cacheManager.known_to_cache(ref): 

1721 return True 

1722 

1723 # When we are guessing a dataset location we can not check 

1724 # for the existence of every component since we can not 

1725 # know if every component was written. Instead we check 

1726 # for the existence of any of the expected locations. 

1727 for location, _ in self._get_expected_dataset_locations_info(ref): 

1728 if self._artifact_exists(location): 

1729 return True 

1730 return False 

1731 

1732 # All listed artifacts must exist. 

1733 for location, storedFileInfo in fileLocations: 

1734 # Checking in cache needs the component ref. 

1735 check_ref = ref 

1736 if not ref.datasetType.isComponent() and (component := storedFileInfo.component): 

1737 check_ref = ref.makeComponentRef(component) 

1738 if self.cacheManager.known_to_cache(check_ref, location.getExtension()): 

1739 continue 

1740 

1741 if not self._artifact_exists(location): 

1742 return False 

1743 

1744 return True 

1745 

1746 def getURIs(self, ref: DatasetRef, predict: bool = False) -> DatasetRefURIs: 

1747 """Return URIs associated with dataset. 

1748 

1749 Parameters 

1750 ---------- 

1751 ref : `DatasetRef` 

1752 Reference to the required dataset. 

1753 predict : `bool`, optional 

1754 If the datastore does not know about the dataset, controls whether 

1755 it should return a predicted URI or not. 

1756 

1757 Returns 

1758 ------- 

1759 uris : `DatasetRefURIs` 

1760 The URI to the primary artifact associated with this dataset (if 

1761 the dataset was disassembled within the datastore this may be 

1762 `None`), and the URIs to any components associated with the dataset 

1763 artifact. (can be empty if there are no components). 

1764 """ 

1765 many = self.getManyURIs([ref], predict=predict, allow_missing=False) 

1766 return many[ref] 

1767 

1768 def getURI(self, ref: DatasetRef, predict: bool = False) -> ResourcePath: 

1769 """URI to the Dataset. 

1770 

1771 Parameters 

1772 ---------- 

1773 ref : `DatasetRef` 

1774 Reference to the required Dataset. 

1775 predict : `bool` 

1776 If `True`, allow URIs to be returned of datasets that have not 

1777 been written. 

1778 

1779 Returns 

1780 ------- 

1781 uri : `str` 

1782 URI pointing to the dataset within the datastore. If the 

1783 dataset does not exist in the datastore, and if ``predict`` is 

1784 `True`, the URI will be a prediction and will include a URI 

1785 fragment "#predicted". 

1786 If the datastore does not have entities that relate well 

1787 to the concept of a URI the returned URI will be 

1788 descriptive. The returned URI is not guaranteed to be obtainable. 

1789 

1790 Raises 

1791 ------ 

1792 FileNotFoundError 

1793 Raised if a URI has been requested for a dataset that does not 

1794 exist and guessing is not allowed. 

1795 RuntimeError 

1796 Raised if a request is made for a single URI but multiple URIs 

1797 are associated with this dataset. 

1798 

1799 Notes 

1800 ----- 

1801 When a predicted URI is requested an attempt will be made to form 

1802 a reasonable URI based on file templates and the expected formatter. 

1803 """ 

1804 primary, components = self.getURIs(ref, predict) 

1805 if primary is None or components: 

1806 raise RuntimeError( 

1807 f"Dataset ({ref}) includes distinct URIs for components. Use Datastore.getURIs() instead." 

1808 ) 

1809 return primary 

1810 

1811 def _predict_URIs( 

1812 self, 

1813 ref: DatasetRef, 

1814 ) -> DatasetRefURIs: 

1815 """Predict the URIs of a dataset ref. 

1816 

1817 Parameters 

1818 ---------- 

1819 ref : `DatasetRef` 

1820 Reference to the required Dataset. 

1821 

1822 Returns 

1823 ------- 

1824 URI : DatasetRefUris 

1825 Primary and component URIs. URIs will contain a URI fragment 

1826 "#predicted". 

1827 """ 

1828 uris = DatasetRefURIs() 

1829 

1830 if self.composites.shouldBeDisassembled(ref): 

1831 for component, _ in ref.datasetType.storageClass.components.items(): 

1832 comp_ref = ref.makeComponentRef(component) 

1833 comp_location, _ = self._determine_put_formatter_location(comp_ref) 

1834 

1835 # Add the "#predicted" URI fragment to indicate this is a 

1836 # guess 

1837 uris.componentURIs[component] = ResourcePath( 

1838 comp_location.uri.geturl() + "#predicted", forceDirectory=comp_location.uri.dirLike 

1839 ) 

1840 

1841 else: 

1842 location, _ = self._determine_put_formatter_location(ref) 

1843 

1844 # Add the "#predicted" URI fragment to indicate this is a guess 

1845 uris.primaryURI = ResourcePath( 

1846 location.uri.geturl() + "#predicted", forceDirectory=location.uri.dirLike 

1847 ) 

1848 

1849 return uris 

1850 

1851 def getManyURIs( 

1852 self, 

1853 refs: Iterable[DatasetRef], 

1854 predict: bool = False, 

1855 allow_missing: bool = False, 

1856 ) -> dict[DatasetRef, DatasetRefURIs]: 

1857 # Docstring inherited 

1858 

1859 uris: dict[DatasetRef, DatasetRefURIs] = {} 

1860 

1861 records = self._get_stored_records_associated_with_refs(refs) 

1862 records_keys = records.keys() 

1863 

1864 existing_refs = tuple(ref for ref in refs if ref.id in records_keys) 

1865 missing_refs = tuple(ref for ref in refs if ref.id not in records_keys) 

1866 

1867 # Have to handle trustGetRequest mode by checking for the existence 

1868 # of the missing refs on disk. 

1869 if missing_refs and not predict: 

1870 dataset_existence = self._mexists_check_expected(missing_refs, None) 

1871 really_missing = set() 

1872 not_missing = set() 

1873 for ref, exists in dataset_existence.items(): 

1874 if exists: 

1875 not_missing.add(ref) 

1876 else: 

1877 really_missing.add(ref) 

1878 

1879 if not_missing: 

1880 # Need to recalculate the missing/existing split. 

1881 existing_refs = existing_refs + tuple(not_missing) 

1882 missing_refs = tuple(really_missing) 

1883 

1884 for ref in missing_refs: 

1885 # if this has never been written then we have to guess 

1886 if not predict: 

1887 if not allow_missing: 

1888 raise FileNotFoundError(f"Dataset {ref} not in this datastore.") 

1889 else: 

1890 uris[ref] = self._predict_URIs(ref) 

1891 

1892 for ref in existing_refs: 

1893 file_infos = records[ref.id] 

1894 file_locations = [(i.file_location(self.locationFactory), i) for i in file_infos] 

1895 uris[ref] = self._locations_to_URI(ref, file_locations) 

1896 

1897 return uris 

1898 

1899 def _locations_to_URI( 

1900 self, 

1901 ref: DatasetRef, 

1902 file_locations: Sequence[tuple[Location, StoredFileInfo]], 

1903 ) -> DatasetRefURIs: 

1904 """Convert one or more file locations associated with a DatasetRef 

1905 to a DatasetRefURIs. 

1906 

1907 Parameters 

1908 ---------- 

1909 ref : `DatasetRef` 

1910 Reference to the dataset. 

1911 file_locations : Sequence[Tuple[Location, StoredFileInfo]] 

1912 Each item in the sequence is the location of the dataset within the 

1913 datastore and stored information about the file and its formatter. 

1914 If there is only one item in the sequence then it is treated as the 

1915 primary URI. If there is more than one item then they are treated 

1916 as component URIs. If there are no items then an error is raised 

1917 unless ``self.trustGetRequest`` is `True`. 

1918 

1919 Returns 

1920 ------- 

1921 uris: DatasetRefURIs 

1922 Represents the primary URI or component URIs described by the 

1923 inputs. 

1924 

1925 Raises 

1926 ------ 

1927 RuntimeError 

1928 If no file locations are passed in and ``self.trustGetRequest`` is 

1929 `False`. 

1930 FileNotFoundError 

1931 If the a passed-in URI does not exist, and ``self.trustGetRequest`` 

1932 is `False`. 

1933 RuntimeError 

1934 If a passed in `StoredFileInfo`'s ``component`` is `None` (this is 

1935 unexpected). 

1936 """ 

1937 guessing = False 

1938 uris = DatasetRefURIs() 

1939 

1940 if not file_locations: 

1941 if not self.trustGetRequest: 

1942 raise RuntimeError(f"Unexpectedly got no artifacts for dataset {ref}") 

1943 file_locations = self._get_expected_dataset_locations_info(ref) 

1944 guessing = True 

1945 

1946 if len(file_locations) == 1: 

1947 # No disassembly so this is the primary URI 

1948 uris.primaryURI = file_locations[0][0].uri 

1949 if guessing and not uris.primaryURI.exists(): 

1950 raise FileNotFoundError(f"Expected URI ({uris.primaryURI}) does not exist") 

1951 else: 

1952 for location, file_info in file_locations: 

1953 if file_info.component is None: 

1954 raise RuntimeError(f"Unexpectedly got no component name for a component at {location}") 

1955 if guessing and not location.uri.exists(): 

1956 # If we are trusting then it is entirely possible for 

1957 # some components to be missing. In that case we skip 

1958 # to the next component. 

1959 if self.trustGetRequest: 

1960 continue 

1961 raise FileNotFoundError(f"Expected URI ({location.uri}) does not exist") 

1962 uris.componentURIs[file_info.component] = location.uri 

1963 

1964 return uris 

1965 

1966 def _find_missing_records( 

1967 self, 

1968 refs: Iterable[DatasetRef], 

1969 missing_ids: set[DatasetId], 

1970 artifact_existence: dict[ResourcePath, bool] | None = None, 

1971 warn_for_missing: bool = True, 

1972 ) -> dict[DatasetId, list[StoredFileInfo]]: 

1973 if not missing_ids: 

1974 return {} 

1975 

1976 if artifact_existence is None: 

1977 artifact_existence = {} 

1978 

1979 found_records: dict[DatasetId, list[StoredFileInfo]] = defaultdict(list) 

1980 id_to_ref = {ref.id: ref for ref in refs if ref.id in missing_ids} 

1981 

1982 # This should be chunked in case we end up having to check 

1983 # the file store since we need some log output to show 

1984 # progress. 

1985 chunk_size = 50_000 

1986 for missing_ids_chunk in chunk_iterable(missing_ids, chunk_size=chunk_size): 

1987 records = {} 

1988 for missing in missing_ids_chunk: 

1989 # Ask the source datastore where the missing artifacts 

1990 # should be. An execution butler might not know about the 

1991 # artifacts even if they are there. 

1992 expected = self._get_expected_dataset_locations_info(id_to_ref[missing]) 

1993 records[missing] = [info for _, info in expected] 

1994 

1995 # Call the mexist helper method in case we have not already 

1996 # checked these artifacts such that artifact_existence is 

1997 # empty. This allows us to benefit from parallelism. 

1998 # datastore.mexists() itself does not give us access to the 

1999 # derived datastore record. 

2000 log.verbose("Checking existence of %d datasets unknown to datastore", len(records)) 

2001 ref_exists = self._process_mexists_records( 

2002 id_to_ref, records, False, artifact_existence=artifact_existence 

2003 ) 

2004 

2005 # Now go through the records and propagate the ones that exist. 

2006 location_factory = self.locationFactory 

2007 for missing, record_list in records.items(): 

2008 # Skip completely if the ref does not exist. 

2009 ref = id_to_ref[missing] 

2010 if not ref_exists[ref]: 

2011 if warn_for_missing: 

2012 log.warning("Asked to transfer dataset %s but no file artifacts exist for it.", ref) 

2013 continue 

2014 # Check for file artifact to decide which parts of a 

2015 # disassembled composite do exist. If there is only a 

2016 # single record we don't even need to look because it can't 

2017 # be a composite and must exist. 

2018 if len(record_list) == 1: 

2019 dataset_records = record_list 

2020 else: 

2021 dataset_records = [ 

2022 record 

2023 for record in record_list 

2024 if artifact_existence[record.file_location(location_factory).uri] 

2025 ] 

2026 assert len(dataset_records) > 0, "Disassembled composite should have had some files." 

2027 

2028 # Rely on source_records being a defaultdict. 

2029 found_records[missing].extend(dataset_records) 

2030 log.verbose("Completed scan for missing data files") 

2031 return found_records 

2032 

2033 def retrieveArtifacts( 

2034 self, 

2035 refs: Iterable[DatasetRef], 

2036 destination: ResourcePath, 

2037 transfer: str = "auto", 

2038 preserve_path: bool = True, 

2039 overwrite: bool = False, 

2040 write_index: bool = True, 

2041 add_prefix: bool = False, 

2042 ) -> dict[ResourcePath, ArtifactIndexInfo]: 

2043 """Retrieve the file artifacts associated with the supplied refs. 

2044 

2045 Parameters 

2046 ---------- 

2047 refs : `~collections.abc.Iterable` of `DatasetRef` 

2048 The datasets for which file artifacts are to be retrieved. 

2049 A single ref can result in multiple files. The refs must 

2050 be resolved. 

2051 destination : `lsst.resources.ResourcePath` 

2052 Location to write the file artifacts. 

2053 transfer : `str`, optional 

2054 Method to use to transfer the artifacts. Must be one of the options 

2055 supported by `lsst.resources.ResourcePath.transfer_from`. 

2056 "move" is not allowed. 

2057 preserve_path : `bool`, optional 

2058 If `True` the full path of the file artifact within the datastore 

2059 is preserved. If `False` the final file component of the path 

2060 is used. 

2061 overwrite : `bool`, optional 

2062 If `True` allow transfers to overwrite existing files at the 

2063 destination. 

2064 write_index : `bool`, optional 

2065 If `True` write a file at the top level containing a serialization 

2066 of a `ZipIndex` for the downloaded datasets. 

2067 add_prefix : `bool`, optional 

2068 If `True` and if ``preserve_path`` is `False`, apply a prefix to 

2069 the filenames corresponding to some part of the dataset ref ID. 

2070 This can be used to guarantee uniqueness. 

2071 

2072 Returns 

2073 ------- 

2074 artifact_map : `dict` [ `lsst.resources.ResourcePath`, \ 

2075 `ArtifactIndexInfo` ] 

2076 Mapping of retrieved file to associated index information. 

2077 """ 

2078 if not destination.isdir(): 

2079 raise ValueError(f"Destination location must refer to a directory. Given {destination}") 

2080 

2081 if transfer == "move": 

2082 raise ValueError("Can not move artifacts out of datastore. Use copy instead.") 

2083 

2084 # Source -> Destination 

2085 # This also helps filter out duplicate DatasetRef in the request 

2086 # that will map to the same underlying file transfer. 

2087 to_transfer: dict[ResourcePath, ResourcePath] = {} 

2088 zips_to_transfer: set[ResourcePath] = set() 

2089 

2090 # Retrieve all the records in bulk indexed by ref.id. 

2091 records = self._get_stored_records_associated_with_refs(refs, ignore_datastore_records=True) 

2092 

2093 # Check for missing records. 

2094 known_ids = set(records) 

2095 log.debug("Number of datastore records found in database: %d", len(known_ids)) 

2096 requested_ids = {ref.id for ref in refs} 

2097 missing_ids = requested_ids - known_ids 

2098 

2099 if missing_ids and not self.trustGetRequest: 

2100 raise ValueError(f"Number of datasets missing from this datastore: {len(missing_ids)}") 

2101 

2102 missing_records = self._find_missing_records(refs, missing_ids) 

2103 records.update(missing_records) 

2104 

2105 # One artifact can be used by multiple DatasetRef. 

2106 # e.g. DECam. 

2107 artifact_map: dict[ResourcePath, ArtifactIndexInfo] = {} 

2108 # Sort to ensure that in many refs to one file situation the same 

2109 # ref is used for any prefix that might be added. 

2110 for ref in sorted(refs): 

2111 prefix = str(ref.id)[:8] + "-" if add_prefix else "" 

2112 for info in records[ref.id]: 

2113 location = info.file_location(self.locationFactory) 

2114 source_uri = location.uri 

2115 # For DECam/zip we only want to copy once. 

2116 # For zip files we need to unpack so that they can be 

2117 # zipped up again if needed. 

2118 is_zip = source_uri.getExtension() == ".zip" and "zip-path" in source_uri.fragment 

2119 # We need to remove fragments for consistency. 

2120 cleaned_source_uri = source_uri.replace(fragment="", query="", params="") 

2121 if is_zip: 

2122 # Assume the DatasetRef definitions are within the Zip 

2123 # file itself and so can be dropped from loop. 

2124 zips_to_transfer.add(cleaned_source_uri) 

2125 elif cleaned_source_uri not in to_transfer: 

2126 target_uri = determine_destination_for_retrieved_artifact( 

2127 destination, location.pathInStore, preserve_path, prefix 

2128 ) 

2129 to_transfer[cleaned_source_uri] = target_uri 

2130 artifact_map[target_uri] = ArtifactIndexInfo.from_single(info.to_simple(), ref.id) 

2131 else: 

2132 target_uri = to_transfer[cleaned_source_uri] 

2133 artifact_map[target_uri].append(ref.id) 

2134 

2135 # Parallelize the transfer. Re-raise as a single exception if 

2136 # a FileExistsError is encountered anywhere. 

2137 log.debug("Number of artifacts to transfer to %s: %d", str(destination), len(to_transfer)) 

2138 try: 

2139 ResourcePath.mtransfer(transfer, tuple(to_transfer.items()), overwrite=overwrite) 

2140 except* FileExistsError as egroup: 

2141 raise FileExistsError( 

2142 "Some files already exist in destination directory and overwrite is False" 

2143 ) from egroup 

2144 

2145 # Transfer the Zip files and unpack them. 

2146 zipped_artifacts = unpack_zips(zips_to_transfer, requested_ids, destination, preserve_path, overwrite) 

2147 artifact_map.update(zipped_artifacts) 

2148 

2149 if write_index: 

2150 index = ZipIndex.from_artifact_map(refs, artifact_map, destination) 

2151 index.write_index(destination) 

2152 

2153 return artifact_map 

2154 

2155 def ingest_zip( 

2156 self, 

2157 zip_path: ResourcePath, 

2158 transfer: str | None, 

2159 *, 

2160 dry_run: bool = False, 

2161 ) -> None: 

2162 """Ingest an indexed Zip file and contents. 

2163 

2164 The Zip file must have an index file as created by `retrieveArtifacts`. 

2165 

2166 Parameters 

2167 ---------- 

2168 zip_path : `lsst.resources.ResourcePath` 

2169 Path to the Zip file. 

2170 transfer : `str` 

2171 Method to use for transferring the Zip file into the datastore. 

2172 dry_run : `bool`, optional 

2173 If `True` the ingest will be processed without any modifications 

2174 made to the target datastore and as if the target datastore did not 

2175 have any of the datasets. 

2176 

2177 Notes 

2178 ----- 

2179 Datastore constraints are bypassed with Zip ingest. A zip file can 

2180 contain multiple dataset types. Should the entire Zip be rejected 

2181 if one dataset type is in the constraints list? 

2182 

2183 If any dataset is already present in the datastore the entire ingest 

2184 will fail. 

2185 """ 

2186 index = ZipIndex.from_zip_file(zip_path) 

2187 

2188 # Refs indexed by UUID. 

2189 refs = index.refs.to_refs(universe=self.universe) 

2190 id_to_ref = {ref.id: ref for ref in refs} 

2191 

2192 # Any failing constraints trigger entire failure. 

2193 if any(not self.constraints.isAcceptable(ref) for ref in refs): 

2194 raise DatasetTypeNotSupportedError( 

2195 "Some refs in the Zip file are not supported by this datastore" 

2196 ) 

2197 

2198 # Transfer the Zip file into the datastore file system. 

2199 # There is no RUN as such to use for naming. 

2200 # Potentially could use the RUN from the first ref in the index 

2201 # There is no requirement that the contents of the Zip files share 

2202 # the same RUN. 

2203 # Could use the Zip UUID from the index + special "zips/" prefix. 

2204 if transfer is None: 

2205 # Indicated that the zip file is already in the right place. 

2206 if not zip_path.isabs(): 

2207 tgtLocation = self.locationFactory.fromPath(zip_path.ospath, trusted_path=False) 

2208 else: 

2209 pathInStore = zip_path.relative_to(self.root) 

2210 if pathInStore is None: 

2211 raise RuntimeError( 

2212 f"Unexpectedly learned that {zip_path} is not within datastore {self.root}" 

2213 ) 

2214 tgtLocation = self.locationFactory.fromPath(pathInStore, trusted_path=True) 

2215 elif transfer == "direct": 

2216 # Reference in original location. 

2217 tgtLocation = None 

2218 else: 

2219 # Name the zip file based on index contents. 

2220 tgtLocation = self.locationFactory.fromPath(index.calculate_zip_file_path_in_store()) 

2221 

2222 # Transfer the Zip file into the datastore. 

2223 if not dry_run: 

2224 tgtLocation.uri.transfer_from( 

2225 zip_path, transfer=transfer, transaction=self._transaction, overwrite=True 

2226 ) 

2227 else: 

2228 log.info("Would be copying Zip from %s to %s", zip_path, tgtLocation) 

2229 

2230 if tgtLocation is None: 

2231 path_in_store = str(zip_path) 

2232 else: 

2233 path_in_store = tgtLocation.pathInStore.path 

2234 

2235 # Associate each file with a (DatasetRef, StoredFileInfo) tuple. 

2236 artifacts: list[tuple[DatasetRef, StoredFileInfo]] = [] 

2237 for path_in_zip, index_info in index.artifact_map.items(): 

2238 # Need to modify the info to include the path to the Zip file 

2239 # that was previously written to the datastore. 

2240 index_info.info.path = f"{path_in_store}#zip-path={path_in_zip}" 

2241 

2242 info = StoredFileInfo.from_simple(index_info.info) 

2243 for id_ in index_info.ids: 

2244 artifacts.append((id_to_ref[id_], info)) 

2245 

2246 if not dry_run: 

2247 self._register_datasets(artifacts, insert_mode=DatabaseInsertMode.INSERT) 

2248 else: 

2249 log.info("Would be registering %d artifacts from Zip into datastore", len(artifacts)) 

2250 

2251 def get( 

2252 self, 

2253 ref: DatasetRef, 

2254 parameters: Mapping[str, Any] | None = None, 

2255 storageClass: StorageClass | str | None = None, 

2256 ) -> Any: 

2257 """Load an InMemoryDataset from the store. 

2258 

2259 Parameters 

2260 ---------- 

2261 ref : `DatasetRef` 

2262 Reference to the required Dataset. 

2263 parameters : `dict` 

2264 `StorageClass`-specific parameters that specify, for example, 

2265 a slice of the dataset to be loaded. 

2266 storageClass : `StorageClass` or `str`, optional 

2267 The storage class to be used to override the Python type 

2268 returned by this method. By default the returned type matches 

2269 the dataset type definition for this dataset. Specifying a 

2270 read `StorageClass` can force a different type to be returned. 

2271 This type must be compatible with the original type. 

2272 

2273 Returns 

2274 ------- 

2275 inMemoryDataset : `object` 

2276 Requested dataset or slice thereof as an InMemoryDataset. 

2277 

2278 Raises 

2279 ------ 

2280 FileNotFoundError 

2281 Requested dataset can not be retrieved. 

2282 TypeError 

2283 Return value from formatter has unexpected type. 

2284 ValueError 

2285 Formatter failed to process the dataset. 

2286 """ 

2287 # Supplied storage class for the component being read is either 

2288 # from the ref itself or some an override if we want to force 

2289 # type conversion. 

2290 if storageClass is not None: 

2291 ref = ref.overrideStorageClass(storageClass) 

2292 

2293 allGetInfo = self._prepare_for_direct_get(ref, parameters) 

2294 return get_dataset_as_python_object_from_get_info( 

2295 allGetInfo, ref=ref, parameters=parameters, cache_manager=self.cacheManager 

2296 ) 

2297 

2298 def prepare_get_for_external_client(self, ref: DatasetRef) -> list[DatasetLocationInformation] | None: 

2299 # Docstring inherited 

2300 

2301 locations = self._get_dataset_locations_info(ref) 

2302 if len(locations) == 0: 

2303 return None 

2304 

2305 return locations 

2306 

2307 @transactional 

2308 def put(self, inMemoryDataset: Any, ref: DatasetRef, provenance: DatasetProvenance | None = None) -> None: 

2309 """Write a InMemoryDataset with a given `DatasetRef` to the store. 

2310 

2311 Parameters 

2312 ---------- 

2313 inMemoryDataset : `object` 

2314 The dataset to store. 

2315 ref : `DatasetRef` 

2316 Reference to the associated Dataset. 

2317 provenance : `DatasetProvenance` or `None`, optional 

2318 Any provenance that should be attached to the serialized dataset. 

2319 Can be ignored by a formatter or delegate. 

2320 

2321 Raises 

2322 ------ 

2323 TypeError 

2324 Supplied object and storage class are inconsistent. 

2325 DatasetTypeNotSupportedError 

2326 The associated `DatasetType` is not handled by this datastore. 

2327 

2328 Notes 

2329 ----- 

2330 If the datastore is configured to reject certain dataset types it 

2331 is possible that the put will fail and raise a 

2332 `DatasetTypeNotSupportedError`. The main use case for this is to 

2333 allow `ChainedDatastore` to put to multiple datastores without 

2334 requiring that every datastore accepts the dataset. 

2335 """ 

2336 doDisassembly = self.composites.shouldBeDisassembled(ref) 

2337 # doDisassembly = True 

2338 

2339 artifacts = [] 

2340 if doDisassembly: 

2341 inMemoryDataset = ref.datasetType.storageClass.delegate().add_provenance( 

2342 inMemoryDataset, ref, provenance=provenance 

2343 ) 

2344 components = ref.datasetType.storageClass.delegate().disassemble(inMemoryDataset) 

2345 if components is None: 

2346 raise RuntimeError( 

2347 f"Inconsistent configuration: dataset type {ref.datasetType.name} " 

2348 f"with storage class {ref.datasetType.storageClass.name} " 

2349 "is configured to be disassembled, but cannot be." 

2350 ) 

2351 for component, componentInfo in components.items(): 

2352 # Don't recurse because we want to take advantage of 

2353 # bulk insert -- need a new DatasetRef that refers to the 

2354 # same dataset_id but has the component DatasetType 

2355 # DatasetType does not refer to the types of components 

2356 # So we construct one ourselves. 

2357 compRef = ref.makeComponentRef(component) 

2358 # Provenance has already been attached above. 

2359 storedInfo = self._write_in_memory_to_artifact(componentInfo.component, compRef) 

2360 artifacts.append((compRef, storedInfo)) 

2361 else: 

2362 # Write the entire thing out 

2363 storedInfo = self._write_in_memory_to_artifact(inMemoryDataset, ref, provenance=provenance) 

2364 artifacts.append((ref, storedInfo)) 

2365 

2366 self._register_datasets(artifacts, insert_mode=DatabaseInsertMode.INSERT) 

2367 

2368 @transactional 

2369 def put_new(self, in_memory_dataset: Any, ref: DatasetRef) -> Mapping[str, DatasetRef]: 

2370 doDisassembly = self.composites.shouldBeDisassembled(ref) 

2371 # doDisassembly = True 

2372 

2373 artifacts = [] 

2374 if doDisassembly: 

2375 components = ref.datasetType.storageClass.delegate().disassemble(in_memory_dataset) 

2376 if components is None: 

2377 raise RuntimeError( 

2378 f"Inconsistent configuration: dataset type {ref.datasetType.name} " 

2379 f"with storage class {ref.datasetType.storageClass.name} " 

2380 "is configured to be disassembled, but cannot be." 

2381 ) 

2382 for component, componentInfo in components.items(): 

2383 # Don't recurse because we want to take advantage of 

2384 # bulk insert -- need a new DatasetRef that refers to the 

2385 # same dataset_id but has the component DatasetType 

2386 # DatasetType does not refer to the types of components 

2387 # So we construct one ourselves. 

2388 compRef = ref.makeComponentRef(component) 

2389 storedInfo = self._write_in_memory_to_artifact(componentInfo.component, compRef) 

2390 artifacts.append((compRef, storedInfo)) 

2391 else: 

2392 # Write the entire thing out 

2393 storedInfo = self._write_in_memory_to_artifact(in_memory_dataset, ref) 

2394 artifacts.append((ref, storedInfo)) 

2395 

2396 ref_records: DatasetDatastoreRecords = {self._opaque_table_name: [info for _, info in artifacts]} 

2397 ref = ref.replace(datastore_records=ref_records) 

2398 return {self.name: ref} 

2399 

2400 @transactional 

2401 def trash(self, ref: DatasetRef | Iterable[DatasetRef], ignore_errors: bool = True) -> None: 

2402 # At this point can safely remove these datasets from the cache 

2403 # to avoid confusion later on. If they are not trashed later 

2404 # the cache will simply be refilled. 

2405 self.cacheManager.remove_from_cache(ref) 

2406 

2407 # If we are in trust mode there will be nothing to move to 

2408 # the trash table and we will have to try to delete the file 

2409 # immediately. 

2410 if self.trustGetRequest: 

2411 # Try to keep the logic below for a single file trash. 

2412 if isinstance(ref, DatasetRef): 

2413 refs = {ref} 

2414 else: 

2415 # Will recreate ref at the end of this branch. 

2416 refs = set(ref) 

2417 

2418 # Determine which datasets are known to datastore directly. 

2419 id_to_ref = {ref.id: ref for ref in refs} 

2420 existing_ids = self._get_stored_records_associated_with_refs(refs, ignore_datastore_records=True) 

2421 existing_refs = {id_to_ref[ref_id] for ref_id in existing_ids} 

2422 

2423 missing = refs - existing_refs 

2424 if missing: 

2425 # Do an explicit existence check on these refs. 

2426 # We only care about the artifacts at this point and not 

2427 # the dataset existence. 

2428 artifact_existence: dict[ResourcePath, bool] = {} 

2429 _ = self.mexists(missing, artifact_existence) 

2430 uris = [uri for uri, exists in artifact_existence.items() if exists] 

2431 

2432 # FUTURE UPGRADE: Implement a parallelized bulk remove. 

2433 log.debug("Removing %d artifacts from datastore that are unknown to datastore", len(uris)) 

2434 for uri in uris: 

2435 try: 

2436 uri.remove() 

2437 except Exception as e: 

2438 if ignore_errors: 

2439 log.debug("Artifact %s could not be removed: %s", uri, e) 

2440 continue 

2441 raise 

2442 

2443 # There is no point asking the code below to remove refs we 

2444 # know are missing so update it with the list of existing 

2445 # records. Try to retain one vs many logic. 

2446 if not existing_refs: 

2447 # Nothing more to do since none of the datasets were 

2448 # known to the datastore record table. 

2449 return 

2450 ref = list(existing_refs) 

2451 if len(ref) == 1: 

2452 ref = ref[0] 

2453 

2454 # Get file metadata and internal metadata 

2455 if not isinstance(ref, DatasetRef): 

2456 log.debug("Doing multi-dataset trash in datastore %s", self.name) 

2457 # Assumed to be an iterable of refs so bulk mode enabled. 

2458 try: 

2459 self.bridge.moveToTrash(ref, transaction=self._transaction) 

2460 except Exception as e: 

2461 if ignore_errors: 

2462 log.warning("Unexpected issue moving multiple datasets to trash: %s", e) 

2463 else: 

2464 raise 

2465 return 

2466 

2467 log.debug("Trashing dataset %s in datastore %s", ref, self.name) 

2468 

2469 fileLocations = self._get_dataset_locations_info(ref) 

2470 

2471 if not fileLocations: 

2472 err_msg = f"Requested dataset to trash ({ref}) is not known to datastore {self.name}" 

2473 if ignore_errors: 

2474 log.warning(err_msg) 

2475 return 

2476 else: 

2477 raise FileNotFoundError(err_msg) 

2478 

2479 for location, _ in fileLocations: 

2480 if not self._artifact_exists(location): 

2481 err_msg = ( 

2482 f"Dataset is known to datastore {self.name} but " 

2483 f"associated artifact ({location.uri}) is missing" 

2484 ) 

2485 if ignore_errors: 

2486 log.warning(err_msg) 

2487 return 

2488 else: 

2489 raise FileNotFoundError(err_msg) 

2490 

2491 # Mark dataset as trashed 

2492 try: 

2493 self.bridge.moveToTrash([ref], transaction=self._transaction) 

2494 except Exception as e: 

2495 if ignore_errors: 

2496 log.warning( 

2497 "Attempted to mark dataset (%s) to be trashed in datastore %s " 

2498 "but encountered an error: %s", 

2499 ref, 

2500 self.name, 

2501 e, 

2502 ) 

2503 pass 

2504 else: 

2505 raise 

2506 

2507 def emptyTrash( 

2508 self, ignore_errors: bool = True, refs: Collection[DatasetRef] | None = None, dry_run: bool = False 

2509 ) -> set[ResourcePath]: 

2510 """Remove all datasets from the trash. 

2511 

2512 Parameters 

2513 ---------- 

2514 ignore_errors : `bool` 

2515 If `True` return without error even if something went wrong. 

2516 Problems could occur if another process is simultaneously trying 

2517 to delete. 

2518 refs : `collections.abc.Collection` [ `DatasetRef` ] or `None` 

2519 Explicit list of datasets that can be removed from trash. If listed 

2520 datasets are not already stored in the trash table they will be 

2521 ignored. If `None` every entry in the trash table will be 

2522 processed. 

2523 dry_run : `bool`, optional 

2524 If `True`, the trash table will be queried and results reported 

2525 but no artifacts will be removed. 

2526 

2527 Returns 

2528 ------- 

2529 removed : `set` [ `lsst.resources.ResourcePath` ] 

2530 List of artifacts that were removed. 

2531 

2532 Notes 

2533 ----- 

2534 Will empty the records from the trash tables only if this call finishes 

2535 without raising. 

2536 """ 

2537 removed = set() 

2538 if refs: 

2539 selected_ids = {ref.id for ref in refs} 

2540 chunk_size = 50_000 

2541 n_chunks = math.ceil(len(selected_ids) / chunk_size) 

2542 chunk_num = 0 

2543 for chunk in chunk_iterable(selected_ids, chunk_size=chunk_size): 

2544 chunk_num += 1 

2545 if n_chunks == 1: 

2546 log.verbose( 

2547 "Emptying datastore trash for %d dataset%s", 

2548 len(chunk), 

2549 "s" if len(chunk) != 1 else "", 

2550 ) 

2551 else: 

2552 log.verbose( 

2553 "Emptying datastore trash for chunk %d out of %d of size %d", 

2554 chunk_num, 

2555 n_chunks, 

2556 len(chunk), 

2557 ) 

2558 removed.update( 

2559 self._empty_trash_subset(ignore_errors=ignore_errors, selected_ids=chunk, dry_run=dry_run) 

2560 ) 

2561 else: 

2562 log.verbose("Emptying all trash in datastore %s", self.name) 

2563 removed = self._empty_trash_subset(ignore_errors=ignore_errors, dry_run=dry_run) 

2564 log.info( 

2565 "%sRemoved %d file artifact%s from datastore %s", 

2566 "Would have " if dry_run else "", 

2567 len(removed), 

2568 "s" if len(removed) != 1 else "", 

2569 self.name, 

2570 ) 

2571 return removed 

2572 

2573 @transactional 

2574 def _empty_trash_subset( 

2575 self, 

2576 *, 

2577 ignore_errors: bool = True, 

2578 selected_ids: Collection[DatasetId] | None = None, 

2579 dry_run: bool = False, 

2580 ) -> set[ResourcePath]: 

2581 """Empty trash table in transaction. 

2582 

2583 Parameters 

2584 ---------- 

2585 ignore_errors : `bool` 

2586 If `True` return without error even if something went wrong. 

2587 Problems could occur if another process is simultaneously trying 

2588 to delete. 

2589 selected_ids : `collections.abc.collection` [`DatasetId`] or `None` 

2590 Explicit list of dataset IDs that can be removed from the trash. 

2591 If listed datasets are not already included in the trash table 

2592 they will be ignored. If `None` every entry in the trash table 

2593 will be processed. 

2594 dry_run : `bool`, optional 

2595 If `True`, the trash table will be queried and results reported 

2596 but no artifacts will be removed. 

2597 

2598 Returns 

2599 ------- 

2600 removed : `set` [ `lsst.resources.ResourcePath` ] 

2601 Artifacts successfully removed. 

2602 

2603 Notes 

2604 ----- 

2605 Will empty the records from the trash tables only if this call finishes 

2606 without raising. 

2607 """ 

2608 # Context manager will empty trash iff we finish it without raising. 

2609 # It will also automatically delete the relevant rows from the 

2610 # trash table and the records table. 

2611 with self.bridge.emptyTrash( 

2612 self._table, 

2613 record_class=StoredFileInfo, 

2614 record_column="path", 

2615 selected_ids=selected_ids, 

2616 dry_run=dry_run, 

2617 ) as trash_data: 

2618 # Removing the artifacts themselves requires that the files are 

2619 # not also associated with refs that are not to be trashed. 

2620 # Therefore need to do a query with the file paths themselves 

2621 # and return all the refs associated with them. Can only delete 

2622 # a file if the refs to be trashed are the only refs associated 

2623 # with the file. 

2624 # This requires multiple copies of the trashed items 

2625 trashed, artifacts_to_keep = trash_data 

2626 

2627 # Assume that # in path means there are fragments involved. The 

2628 # fragments can not be handled by the emptyTrash bridge call 

2629 # so need to be processed independently. 

2630 # The generator has to be converted to a list for multiple 

2631 # iterations. Clean up the typing so that multiple isinstance 

2632 # tests aren't needed later. 

2633 trashed_list = [(ref, ninfo) for ref, ninfo in trashed if isinstance(ninfo, StoredFileInfo)] 

2634 

2635 if artifacts_to_keep is None or any("#" in info[1].path for info in trashed_list): 

2636 # The bridge is not helping us so have to work it out 

2637 # ourselves. This is not going to be as efficient. 

2638 # This mapping does not include the fragments. 

2639 if artifacts_to_keep is not None: 

2640 # This means we have already checked for non-fragment 

2641 # examples so can filter. 

2642 paths_to_check = {info.path for _, info in trashed_list if "#" in info.path} 

2643 else: 

2644 paths_to_check = {info.path for _, info in trashed_list} 

2645 

2646 path_map = self._refs_associated_with_artifacts(paths_to_check) 

2647 

2648 for ref, info in trashed_list: 

2649 path = info.artifact_path 

2650 # For disassembled composites in a Zip it is possible 

2651 # for the same path to correspond to the same dataset ref 

2652 # multiple times so trap for that. 

2653 if ref.id in path_map[path]: 

2654 path_map[path].remove(ref.id) 

2655 if not path_map[path]: 

2656 del path_map[path] 

2657 

2658 slow_artifacts_to_keep = set(path_map) 

2659 if artifacts_to_keep is not None: 

2660 artifacts_to_keep.update(slow_artifacts_to_keep) 

2661 else: 

2662 artifacts_to_keep = slow_artifacts_to_keep 

2663 

2664 n_direct = 0 

2665 artifacts_to_delete: set[ResourcePath] = set() 

2666 for ref, info in trashed_list: 

2667 # Should not happen for this implementation but need 

2668 # to keep mypy happy. 

2669 assert info is not None, f"Internal logic error in emptyTrash with ref {ref}." 

2670 

2671 if info.artifact_path in artifacts_to_keep: 

2672 # This is a multi-dataset artifact and we are not 

2673 # removing all associated refs. 

2674 continue 

2675 

2676 # Only trashed refs still known to datastore will be returned. 

2677 location = info.file_location(self.locationFactory) 

2678 

2679 if location.pathInStore.isabs(): 

2680 n_direct += 1 

2681 continue 

2682 

2683 # Strip fragment before storing since it is the artifact 

2684 # we are deleting and we do not want repeats for every member 

2685 # in a zip. 

2686 artifacts_to_delete.add(location.uri.replace(fragment="")) 

2687 

2688 if n_direct > 0: 

2689 s = "s" if n_direct != 1 else "" 

2690 log.verbose("Not deleting %d artifact%s using absolute URI%s", n_direct, s, s) 

2691 

2692 if artifacts_to_keep: 

2693 log.verbose( 

2694 "%d artifact%s %s not deleted because of association with other datasets", 

2695 len(artifacts_to_keep), 

2696 "s" if len(artifacts_to_keep) != 1 else "", 

2697 "were" if len(artifacts_to_keep) != 1 else "was", 

2698 ) 

2699 

2700 if not artifacts_to_delete: 

2701 return set() 

2702 

2703 # Now do the deleting. Special case the log message for a single 

2704 # artifact. 

2705 if len(artifacts_to_delete) == 1: 

2706 log.verbose( 

2707 "%s removing file artifact %s from datastore %s", 

2708 "Would be" if dry_run else "Now", 

2709 list(artifacts_to_delete)[0], 

2710 self.name, 

2711 ) 

2712 else: 

2713 log.verbose( 

2714 "%s removing %d file artifacts from datastore %s", 

2715 "Would be" if dry_run else "Now", 

2716 len(artifacts_to_delete), 

2717 self.name, 

2718 ) 

2719 

2720 # For dry-run mode do not attempt to search the file store for 

2721 # the artifacts to determine whether they exist or not. Simply 

2722 # report that an attempt would be made to delete them. Never 

2723 # report direct imports. 

2724 if dry_run: 

2725 return artifacts_to_delete 

2726 

2727 # Now remove the actual file artifacts. 

2728 remove_result = ResourcePath.mremove(artifacts_to_delete, do_raise=False) 

2729 

2730 removed: set[ResourcePath] = set() 

2731 exceptions: list[Exception] = [] 

2732 for uri, result in remove_result.items(): 

2733 if result.exception is None or isinstance(result.exception, FileNotFoundError): 

2734 # File not existing is not an error since some other 

2735 # process might have been trying to clean it and we do not 

2736 # want to raise an error for a situation where the file 

2737 # is not there and we do not want it to be there. 

2738 removed.add(uri) 

2739 else: 

2740 exceptions.append(result.exception) 

2741 

2742 if exceptions: 

2743 s_err = "s" if len(exceptions) != 1 else "" 

2744 e = ExceptionGroup(f"Error{s_err} removing {len(exceptions)} artifact{s_err}", exceptions) 

2745 if ignore_errors: 

2746 # Use a debug message here even though it's not 

2747 # a good situation. In some cases this can be 

2748 # caused by a race between user A and user B 

2749 # and neither of them has permissions for the 

2750 # other's files. Butler does not know about users 

2751 # and trash has no idea what collections these 

2752 # files were in (without guessing from a path). 

2753 log.debug( 

2754 "Encountered %d error%s removing %d artifact%s from datastore %s: %s", 

2755 len(exceptions), 

2756 s_err, 

2757 len(artifacts_to_delete), 

2758 "s" if len(artifacts_to_delete) != 1 else "", 

2759 self.name, 

2760 e, 

2761 ) 

2762 else: 

2763 raise e 

2764 return removed 

2765 

2766 @transactional 

2767 def transfer_from( 

2768 self, 

2769 source_records: FileTransferMap, 

2770 refs: Collection[DatasetRef], 

2771 transfer: str = "auto", 

2772 artifact_existence: dict[ResourcePath, bool] | None = None, 

2773 dry_run: bool = False, 

2774 ) -> tuple[set[DatasetRef], set[DatasetRef]]: 

2775 log.verbose("Transferring %d datasets to %s", len(refs), self.name) 

2776 

2777 # Stop early if "direct" transfer mode is requested. That would 

2778 # require that the URI inside the source datastore should be stored 

2779 # directly in the target datastore, which seems unlikely to be useful 

2780 # since at any moment the source datastore could delete the file. 

2781 if transfer in ("direct", "split"): 

2782 raise ValueError( 

2783 f"Can not transfer from a source datastore using {transfer} mode since" 

2784 " those files are controlled by the other datastore." 

2785 ) 

2786 

2787 if not refs: 

2788 return set(), set() 

2789 

2790 # Empty existence lookup if none given. 

2791 if artifact_existence is None: 

2792 artifact_existence = {} 

2793 

2794 # In order to handle disassembled composites the code works 

2795 # at the records level since it can assume that internal APIs 

2796 # can be used. 

2797 # - If the record already exists in the destination this is assumed 

2798 # to be okay. 

2799 # - If there is no record but the source and destination URIs are 

2800 # identical no transfer is done but the record is added. 

2801 # - If the source record refers to an absolute URI currently assume 

2802 # that that URI should remain absolute and will be visible to the 

2803 # destination butler. May need to have a flag to indicate whether 

2804 # the dataset should be transferred. This will only happen if 

2805 # the detached Butler has had a local ingest. 

2806 

2807 # See if we already have these records 

2808 log.verbose("Looking up existing datastore records in target %s for %d refs", self.name, len(refs)) 

2809 target_records = self._get_stored_records_associated_with_refs(refs, ignore_datastore_records=True) 

2810 

2811 # The artifacts to register 

2812 artifacts = [] 

2813 

2814 # Refs that already exist 

2815 already_present = [] 

2816 

2817 # Refs that were rejected by this datastore. 

2818 rejected = set() 

2819 

2820 # Refs that were transferred successfully. 

2821 accepted = set() 

2822 

2823 # Record each time we have done a "direct" transfer. 

2824 direct_transfers = [] 

2825 

2826 # Keep track of all the file transfers that are required. 

2827 from_to: list[tuple[ResourcePath, ResourcePath]] = [] 

2828 

2829 # Now can transfer the artifacts 

2830 log.verbose("Transferring artifacts") 

2831 for ref in refs: 

2832 if not self.constraints.isAcceptable(ref): 

2833 # This datastore should not be accepting this dataset. 

2834 rejected.add(ref) 

2835 continue 

2836 

2837 accepted.add(ref) 

2838 

2839 if ref.id in target_records: 

2840 # Already have an artifact for this. 

2841 already_present.append(ref) 

2842 continue 

2843 

2844 # mypy needs to know these are always resolved refs 

2845 for transfer_info in source_records.get(ref.id, []): 

2846 info = transfer_info.file_info 

2847 source_location = transfer_info.location 

2848 target_location = info.file_location(self.locationFactory) 

2849 if transfer == "unsafe_direct": 

2850 # Use the existing file from the source location in place, 

2851 # by recording the absolute URI in the target DB. This is 

2852 # "unsafe" because the file could be deleted from the 

2853 # source Butler at any time, leaving a dangling reference. 

2854 source_location = source_location.toAbsolute() 

2855 direct_transfers.append(source_location) 

2856 info = info.update(path=str(source_location.uri)) 

2857 elif source_location == target_location and not source_location.pathInStore.isabs(): 

2858 # Artifact is already in the target location. 

2859 # (which is how execution butler currently runs) 

2860 pass 

2861 else: 

2862 if target_location.pathInStore.isabs(): 

2863 # Just because we can see the artifact when running 

2864 # the transfer doesn't mean it will be generally 

2865 # accessible to a user of this butler. Need to decide 

2866 # what to do about an absolute path. 

2867 if transfer == "auto": 

2868 # For "auto" transfers we allow the absolute URI 

2869 # to be recorded in the target datastore. 

2870 direct_transfers.append(source_location) 

2871 else: 

2872 # The user is explicitly requesting a transfer 

2873 # even for an absolute URI. This requires us to 

2874 # calculate the target path. 

2875 template_ref = ref 

2876 if info.component: 

2877 template_ref = ref.makeComponentRef(info.component) 

2878 target_location = self._calculate_ingested_datastore_name( 

2879 source_location.uri, 

2880 template_ref, 

2881 ) 

2882 

2883 info = info.update(path=target_location.pathInStore.path) 

2884 

2885 # Need to transfer it to the new location. 

2886 from_to.append((source_location.uri, target_location.uri)) 

2887 

2888 artifacts.append((ref, info)) 

2889 

2890 # Do the file transfers in bulk. 

2891 # Assume we should always overwrite. If the artifact 

2892 # is there this might indicate that a previous transfer 

2893 # was interrupted but was not able to be rolled back 

2894 # completely (eg pre-emption) so follow Datastore default 

2895 # and overwrite. Do not copy if we are in dry-run mode. 

2896 if dry_run: 

2897 log.info("Would be copying %d file artifacts", len(from_to)) 

2898 else: 

2899 log.verbose("Copying %d file artifacts", len(from_to)) 

2900 with time_this(log, msg="Transferring datasets into datastore", level=VERBOSE): 

2901 ResourcePath.mtransfer( 

2902 transfer, 

2903 from_to, 

2904 overwrite=True, 

2905 transaction=self._transaction, 

2906 ) 

2907 

2908 if direct_transfers: 

2909 log.info( 

2910 "Transfer request for an outside-datastore artifact with absolute URI done %d time%s", 

2911 len(direct_transfers), 

2912 "" if len(direct_transfers) == 1 else "s", 

2913 ) 

2914 

2915 # We are overwriting previous datasets that may have already 

2916 # existed. We therefore should ensure that we force the 

2917 # datastore records to agree. Note that this can potentially lead 

2918 # to difficulties if the dataset has previously been ingested 

2919 # disassembled and is somehow now assembled, or vice versa. 

2920 if not dry_run: 

2921 log.verbose("Registering datastore records in database") 

2922 self._register_datasets(artifacts, insert_mode=DatabaseInsertMode.REPLACE) 

2923 

2924 if already_present: 

2925 n_skipped = len(already_present) 

2926 log.info( 

2927 "Skipped transfer of %d dataset%s already present in datastore", 

2928 n_skipped, 

2929 "" if n_skipped == 1 else "s", 

2930 ) 

2931 

2932 log.verbose( 

2933 "Finished transfer_from to %s with %d accepted, %d rejected", 

2934 self.name, 

2935 len(accepted), 

2936 len(rejected), 

2937 ) 

2938 return accepted, rejected 

2939 

2940 def get_file_info_for_transfer(self, dataset_ids: Iterable[DatasetId]) -> FileTransferMap: 

2941 source_records = self._get_stored_records_associated_with_refs( 

2942 [FakeDatasetRef(id) for id in dataset_ids], ignore_datastore_records=True 

2943 ) 

2944 return self._convert_stored_file_info_to_file_transfer_record(source_records) 

2945 

2946 def locate_missing_files_for_transfer( 

2947 self, refs: Iterable[DatasetRef], artifact_existence: dict[ResourcePath, bool] 

2948 ) -> FileTransferMap: 

2949 missing_ids = {ref.id for ref in refs} 

2950 # Missing IDs can be okay if that datastore has allowed 

2951 # gets based on file existence. Should we transfer what we can 

2952 # or complain about it and warn? 

2953 if not self.trustGetRequest: 

2954 return {} 

2955 

2956 found_records = self._find_missing_records( 

2957 refs, missing_ids, artifact_existence, warn_for_missing=False 

2958 ) 

2959 return self._convert_stored_file_info_to_file_transfer_record(found_records) 

2960 

2961 def _convert_stored_file_info_to_file_transfer_record( 

2962 self, info_map: dict[DatasetId, list[StoredFileInfo]] 

2963 ) -> FileTransferMap: 

2964 output: dict[DatasetId, list[FileTransferRecord]] = {} 

2965 for k, file_info_list in info_map.items(): 

2966 output[k] = [ 

2967 FileTransferRecord(file_info=info, location=info.file_location(self.locationFactory)) 

2968 for info in file_info_list 

2969 ] 

2970 return output 

2971 

2972 @transactional 

2973 def forget(self, refs: Iterable[DatasetRef]) -> None: 

2974 # Docstring inherited. 

2975 refs = list(refs) 

2976 self.bridge.forget(refs) 

2977 self._table.delete(["dataset_id"], *[{"dataset_id": ref.id} for ref in refs]) 

2978 

2979 def validateConfiguration( 

2980 self, entities: Iterable[DatasetRef | DatasetType | StorageClass], logFailures: bool = False 

2981 ) -> None: 

2982 """Validate some of the configuration for this datastore. 

2983 

2984 Parameters 

2985 ---------- 

2986 entities : `~collections.abc.Iterable` [`DatasetRef` | `DatasetType` \ 

2987 | `StorageClass`] 

2988 Entities to test against this configuration. Can be differing 

2989 types. 

2990 logFailures : `bool`, optional 

2991 If `True`, output a log message for every validation error 

2992 detected. 

2993 

2994 Returns 

2995 ------- 

2996 None 

2997 

2998 Raises 

2999 ------ 

3000 DatastoreValidationError 

3001 Raised if there is a validation problem with a configuration. 

3002 All the problems are reported in a single exception. 

3003 

3004 Notes 

3005 ----- 

3006 This method checks that all the supplied entities have valid file 

3007 templates and also have formatters defined. 

3008 """ 

3009 templateFailed = None 

3010 try: 

3011 self.templates.validateTemplates(entities, logFailures=logFailures) 

3012 except FileTemplateValidationError as e: 

3013 templateFailed = str(e) 

3014 

3015 formatterFailed = [] 

3016 for entity in entities: 

3017 try: 

3018 self.formatterFactory.getFormatterClass(entity) 

3019 except KeyError as e: 

3020 formatterFailed.append(str(e)) 

3021 if logFailures: 

3022 log.critical("Formatter failure: %s", e) 

3023 

3024 if templateFailed or formatterFailed: 

3025 messages = [] 

3026 if templateFailed: 

3027 messages.append(templateFailed) 

3028 if formatterFailed: 

3029 messages.append(",".join(formatterFailed)) 

3030 msg = ";\n".join(messages) 

3031 raise DatastoreValidationError(msg) 

3032 

3033 def getLookupKeys(self) -> set[LookupKey]: 

3034 # Docstring is inherited from base class 

3035 return ( 

3036 self.templates.getLookupKeys() 

3037 | self.formatterFactory.getLookupKeys() 

3038 | self.constraints.getLookupKeys() 

3039 ) 

3040 

3041 def validateKey(self, lookupKey: LookupKey, entity: DatasetRef | DatasetType | StorageClass) -> None: 

3042 # Docstring is inherited from base class 

3043 # The key can be valid in either formatters or templates so we can 

3044 # only check the template if it exists 

3045 if lookupKey in self.templates: 

3046 try: 

3047 self.templates[lookupKey].validateTemplate(entity) 

3048 except FileTemplateValidationError as e: 

3049 raise DatastoreValidationError(e) from e 

3050 

3051 def export( 

3052 self, 

3053 refs: Iterable[DatasetRef], 

3054 *, 

3055 directory: ResourcePathExpression | None = None, 

3056 transfer: str | None = "auto", 

3057 ) -> Iterable[FileDataset]: 

3058 # Docstring inherited from Datastore.export. 

3059 if transfer == "auto" and directory is None: 

3060 transfer = None 

3061 

3062 if transfer is not None and transfer != "direct" and directory is None: 

3063 raise TypeError(f"Cannot export using transfer mode {transfer} with no export directory given") 

3064 

3065 if transfer == "move": 

3066 raise TypeError("Can not export by moving files out of datastore.") 

3067 

3068 # Force the directory to be a URI object 

3069 directoryUri: ResourcePath | None = None 

3070 if directory is not None: 

3071 directoryUri = ResourcePath(directory, forceDirectory=True) 

3072 

3073 if transfer is not None and directoryUri is not None and not directoryUri.exists(): 

3074 # mypy needs the second test 

3075 raise FileNotFoundError(f"Export location {directory} does not exist") 

3076 

3077 progress = Progress("lsst.daf.butler.datastores.FileDatastore.export", level=logging.DEBUG) 

3078 for ref in progress.wrap(refs, "Exporting dataset files"): 

3079 fileLocations = self._get_dataset_locations_info(ref) 

3080 if not fileLocations: 

3081 raise FileNotFoundError(f"Could not retrieve dataset {ref}.") 

3082 # For now we can not export disassembled datasets 

3083 if len(fileLocations) > 1: 

3084 raise NotImplementedError(f"Can not export disassembled datasets such as {ref}") 

3085 location, storedFileInfo = fileLocations[0] 

3086 

3087 pathInStore = location.pathInStore.path 

3088 if transfer is None: 

3089 # TODO: do we also need to return the readStorageClass somehow? 

3090 # We will use the path in store directly. If this is an 

3091 # absolute URI, preserve it. 

3092 if location.pathInStore.isabs(): 

3093 pathInStore = str(location.uri) 

3094 elif transfer == "direct": 

3095 # Use full URIs to the remote store in the export 

3096 pathInStore = str(location.uri) 

3097 else: 

3098 # mypy needs help 

3099 assert directoryUri is not None, "directoryUri must be defined to get here" 

3100 storeUri = ResourcePath(location.uri, forceDirectory=False) 

3101 

3102 # if the datastore has an absolute URI to a resource, we 

3103 # have two options: 

3104 # 1. Keep the absolute URI in the exported YAML 

3105 # 2. Allocate a new name in the local datastore and transfer 

3106 # it. 

3107 # For now go with option 2 

3108 if location.pathInStore.isabs(): 

3109 template = self.templates.getTemplate(ref) 

3110 newURI = ResourcePath(template.format(ref), forceAbsolute=False, forceDirectory=False) 

3111 pathInStore = str(newURI.updatedExtension(location.pathInStore.getExtension())) 

3112 

3113 exportUri = directoryUri.join(pathInStore) 

3114 exportUri.transfer_from(storeUri, transfer=transfer) 

3115 

3116 yield FileDataset(refs=[ref], path=pathInStore, formatter=storedFileInfo.formatter) 

3117 

3118 @staticmethod 

3119 def computeChecksum(uri: ResourcePath, algorithm: str = "blake2b", block_size: int = 8192) -> str | None: 

3120 """Compute the checksum of the supplied file. 

3121 

3122 Parameters 

3123 ---------- 

3124 uri : `lsst.resources.ResourcePath` 

3125 Name of resource to calculate checksum from. 

3126 algorithm : `str`, optional 

3127 Name of algorithm to use. Must be one of the algorithms supported 

3128 by :py:class`hashlib`. 

3129 block_size : `int` 

3130 Number of bytes to read from file at one time. 

3131 

3132 Returns 

3133 ------- 

3134 hexdigest : `str` 

3135 Hex digest of the file. 

3136 

3137 Notes 

3138 ----- 

3139 Currently returns None if the URI is for a remote resource. 

3140 """ 

3141 if algorithm not in hashlib.algorithms_guaranteed: 

3142 raise NameError(f"The specified algorithm '{algorithm}' is not supported by hashlib") 

3143 

3144 if not uri.isLocal: 

3145 return None 

3146 

3147 hasher = hashlib.new(algorithm) 

3148 

3149 with uri.as_local() as local_uri, open(local_uri.ospath, "rb") as f: 

3150 for chunk in iter(lambda: f.read(block_size), b""): 

3151 hasher.update(chunk) 

3152 

3153 return hasher.hexdigest() 

3154 

3155 def needs_expanded_data_ids( 

3156 self, 

3157 transfer: str | None, 

3158 entity: DatasetRef | DatasetType | StorageClass | None = None, 

3159 ) -> bool: 

3160 # Docstring inherited. 

3161 # This _could_ also use entity to inspect whether the filename template 

3162 # involves placeholders other than the required dimensions for its 

3163 # dataset type, but that's not necessary for correctness; it just 

3164 # enables more optimizations (perhaps only in theory). 

3165 return transfer not in ("direct", None) 

3166 

3167 def import_records(self, data: Mapping[str, DatastoreRecordData]) -> None: 

3168 # Docstring inherited from the base class. 

3169 record_data = data.get(self.name) 

3170 if not record_data: 

3171 return 

3172 

3173 self._bridge.insert(FakeDatasetRef(dataset_id) for dataset_id in record_data.records) 

3174 

3175 # TODO: Verify that there are no unexpected table names in the dict? 

3176 unpacked_records = [] 

3177 for dataset_id, dataset_data in record_data.records.items(): 

3178 records = dataset_data.get(self._table.name) 

3179 if records: 

3180 for info in records: 

3181 assert isinstance(info, StoredFileInfo), "Expecting StoredFileInfo records" 

3182 unpacked_records.append(info.to_record(dataset_id=dataset_id)) 

3183 if unpacked_records: 

3184 self._table.insert(*unpacked_records, transaction=self._transaction) 

3185 

3186 def export_records(self, refs: Iterable[DatasetIdRef]) -> Mapping[str, DatastoreRecordData]: 

3187 # Docstring inherited from the base class. 

3188 

3189 # This call to 'bridge.check' filters out "partially deleted" datasets. 

3190 # Specifically, ones in the unusual edge state that: 

3191 # 1. They have an entry in the registry dataset tables 

3192 # 2. They were "trashed" from the datastore, so they are not 

3193 # present in the "dataset_location" table.) 

3194 # 3. But the trash has not been "emptied", so there are still entries 

3195 # in the "opaque" datastore records table. 

3196 # 

3197 # As far as I can tell, this can only occur in the case of a concurrent 

3198 # or aborted call to `Butler.pruneDatasets(unstore=True, purge=False)`. 

3199 # Datasets (with or without files existing on disk) can persist in 

3200 # this zombie state indefinitely, until someone manually empties 

3201 # the trash. 

3202 exported_refs = list(self._bridge.check(refs)) 

3203 ids = {ref.id for ref in exported_refs} 

3204 records: dict[DatasetId, dict[str, list[StoredDatastoreItemInfo]]] = {id: {} for id in ids} 

3205 for row in self._table.fetch(dataset_id=ids): 

3206 info: StoredDatastoreItemInfo = StoredFileInfo.from_record(row) 

3207 dataset_records = records.setdefault(row["dataset_id"], {}) 

3208 dataset_records.setdefault(self._table.name, []).append(info) 

3209 

3210 record_data = DatastoreRecordData(records=records) 

3211 return {self.name: record_data} 

3212 

3213 def export_predicted_records(self, refs: Iterable[DatasetRef]) -> dict[str, DatastoreRecordData]: 

3214 # Docstring inherited from the base class. 

3215 refs = [self._cast_storage_class(ref) for ref in refs] 

3216 records: dict[DatasetId, dict[str, list[StoredDatastoreItemInfo]]] = {} 

3217 for ref in refs: 

3218 if not self.constraints.isAcceptable(ref): 

3219 continue 

3220 fileLocations = self._get_expected_dataset_locations_info(ref) 

3221 if not fileLocations: 

3222 continue 

3223 dataset_records = records.setdefault(ref.id, {}) 

3224 dataset_records.setdefault(self._table.name, []) 

3225 for _, storedFileInfo in fileLocations: 

3226 dataset_records[self._table.name].append(storedFileInfo) 

3227 

3228 record_data = DatastoreRecordData(records=records) 

3229 return {self.name: record_data} 

3230 

3231 def set_retrieve_dataset_type_method(self, method: Callable[[str], DatasetType | None] | None) -> None: 

3232 # Docstring inherited from the base class. 

3233 self._retrieve_dataset_method = method 

3234 

3235 def _cast_storage_class(self, ref: DatasetRef) -> DatasetRef: 

3236 """Update dataset reference to use the storage class from registry.""" 

3237 if self._retrieve_dataset_method is None: 

3238 # We could raise an exception here but unit tests do not define 

3239 # this method. 

3240 return ref 

3241 dataset_type = self._retrieve_dataset_method(ref.datasetType.name) 

3242 if dataset_type is not None: 

3243 ref = ref.overrideStorageClass(dataset_type.storageClass_name) 

3244 return ref 

3245 

3246 def get_opaque_table_definitions(self) -> Mapping[str, DatastoreOpaqueTable]: 

3247 # Docstring inherited from the base class. 

3248 return {self._opaque_table_name: DatastoreOpaqueTable(self.makeTableSpec(), StoredFileInfo)}