Coverage for python / lsst / daf / butler / datastore / cache_manager.py: 22%

429 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-24 08:17 +0000

1# This file is part of daf_butler. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28"""Cache management for a datastore.""" 

29 

30from __future__ import annotations 

31 

32__all__ = ( 

33 "AbstractDatastoreCacheManager", 

34 "DatastoreCacheManager", 

35 "DatastoreCacheManagerConfig", 

36 "DatastoreDisabledCacheManager", 

37) 

38 

39import atexit 

40import contextlib 

41import datetime 

42import itertools 

43import logging 

44import os 

45import shutil 

46import tempfile 

47import uuid 

48from abc import ABC, abstractmethod 

49from collections import defaultdict 

50from collections.abc import ItemsView, Iterable, Iterator, KeysView, ValuesView 

51from random import Random 

52from typing import TYPE_CHECKING, Self 

53 

54from pydantic import BaseModel, PrivateAttr 

55 

56from lsst.resources import ResourcePath 

57 

58from .._config import Config, ConfigSubset 

59from .._config_support import processLookupConfigs 

60from .._dataset_ref import DatasetId, DatasetRef 

61 

62if TYPE_CHECKING: 

63 from .._config_support import LookupKey 

64 from .._dataset_type import DatasetType 

65 from .._storage_class import StorageClass 

66 from ..dimensions import DimensionUniverse 

67 

68log = logging.getLogger(__name__) 

69 

70 

71def remove_cache_directory(directory: str) -> None: 

72 """Remove the specified directory and all its contents. 

73 

74 Parameters 

75 ---------- 

76 directory : `str` 

77 Directory to remove. 

78 """ 

79 log.debug("Removing temporary cache directory %s", directory) 

80 shutil.rmtree(directory, ignore_errors=True) 

81 

82 

83def _construct_cache_path(root: ResourcePath, ref: DatasetRef, extension: str) -> ResourcePath: 

84 """Construct the full path to use for this dataset in the cache. 

85 

86 Parameters 

87 ---------- 

88 root : `lsst.resources.ResourcePath` 

89 The root of the cache. 

90 ref : `DatasetRef` 

91 The dataset to look up in or write to the cache. 

92 extension : `str` 

93 File extension to use for this file. Should include the 

94 leading "``.``". 

95 

96 Returns 

97 ------- 

98 uri : `lsst.resources.ResourcePath` 

99 URI to use for this dataset in the cache. 

100 """ 

101 # Dataset type component is needed in the name if composite 

102 # disassembly is happening since the ID is shared for all components. 

103 component = ref.datasetType.component() 

104 component = f"_{component}" if component else "" 

105 return root.join(f"{ref.id}{component}{extension}") 

106 

107 

108def _parse_cache_name(cached_location: str) -> tuple[uuid.UUID, str | None, str | None]: 

109 """For a given cache name, return its component parts. 

110 

111 Changes to ``_construct_cache_path()`` should be reflected here. 

112 

113 Parameters 

114 ---------- 

115 cached_location : `str` 

116 The name of the file within the cache. 

117 

118 Returns 

119 ------- 

120 id : `uuid.UUID` 

121 The dataset ID. 

122 component : `str` or `None` 

123 The name of the component, if present. 

124 extension: `str` or `None` 

125 The file extension, if present. 

126 """ 

127 # Assume first dot is the extension and so allow .fits.gz 

128 root_ext = cached_location.split(".", maxsplit=1) 

129 root = root_ext.pop(0) 

130 ext = "." + root_ext.pop(0) if root_ext else None 

131 

132 parts = root.split("_") 

133 try: 

134 id_ = uuid.UUID(parts.pop(0)) 

135 except ValueError as e: 

136 raise InvalidCacheFilenameError(f"Invalid or missing ID in cache file name: {cached_location}") from e 

137 component = parts.pop(0) if parts else None 

138 return id_, component, ext 

139 

140 

141class CacheEntry(BaseModel): 

142 """Represent an entry in the cache.""" 

143 

144 name: str 

145 """Name of the file.""" 

146 

147 size: int 

148 """Size of the file in bytes.""" 

149 

150 ctime: datetime.datetime 

151 """Creation time of the file.""" 

152 

153 ref: DatasetId 

154 """ID of this dataset.""" 

155 

156 component: str | None = None 

157 """Component for this disassembled composite (optional).""" 

158 

159 @classmethod 

160 def from_file(cls, file: ResourcePath, root: ResourcePath) -> CacheEntry: 

161 """Construct an object from a file name. 

162 

163 Parameters 

164 ---------- 

165 file : `lsst.resources.ResourcePath` 

166 Path to the file. 

167 root : `lsst.resources.ResourcePath` 

168 Cache root directory. 

169 """ 

170 file_in_cache = file.relative_to(root) 

171 if file_in_cache is None: 

172 raise ValueError(f"Supplied file {file} is not inside root {root}") 

173 id_, component, _ = _parse_cache_name(file_in_cache) 

174 

175 stat = os.stat(file.ospath) 

176 return cls.model_construct( 

177 name=file_in_cache, 

178 size=stat.st_size, 

179 ref=id_, 

180 component=component, 

181 ctime=datetime.datetime.fromtimestamp(stat.st_ctime, datetime.UTC), 

182 ) 

183 

184 

185class _MarkerEntry(CacheEntry): 

186 pass 

187 

188 

189class CacheRegistry(BaseModel): 

190 """Collection of cache entries.""" 

191 

192 _size: int = PrivateAttr(0) 

193 """Size of the cache.""" 

194 

195 _entries: dict[str, CacheEntry] = PrivateAttr({}) 

196 """Internal collection of cache entries.""" 

197 

198 _ref_map: dict[DatasetId, list[str]] = PrivateAttr({}) 

199 """Mapping of DatasetID to corresponding keys in cache registry.""" 

200 

201 @property 

202 def cache_size(self) -> int: 

203 return self._size 

204 

205 def __getitem__(self, key: str) -> CacheEntry: 

206 return self._entries[key] 

207 

208 def __setitem__(self, key: str, entry: CacheEntry) -> None: 

209 self._size += entry.size 

210 self._entries[key] = entry 

211 

212 # Update the mapping from ref to path. 

213 if entry.ref not in self._ref_map: 

214 self._ref_map[entry.ref] = [] 

215 self._ref_map[entry.ref].append(key) 

216 

217 def __delitem__(self, key: str) -> None: 

218 entry = self._entries.pop(key) 

219 self._decrement(entry) 

220 self._ref_map[entry.ref].remove(key) 

221 

222 def _decrement(self, entry: CacheEntry | None) -> None: 

223 if entry: 

224 self._size -= entry.size 

225 if self._size < 0: 

226 log.warning("Cache size has gone negative. Inconsistent cache records...") 

227 self._size = 0 

228 

229 def __contains__(self, key: str) -> bool: 

230 return key in self._entries 

231 

232 def __len__(self) -> int: 

233 return len(self._entries) 

234 

235 def __iter__(self) -> Iterator[str]: # type: ignore 

236 return iter(self._entries) 

237 

238 def keys(self) -> KeysView[str]: 

239 return self._entries.keys() 

240 

241 def values(self) -> ValuesView[CacheEntry]: 

242 return self._entries.values() 

243 

244 def items(self) -> ItemsView[str, CacheEntry]: 

245 return self._entries.items() 

246 

247 # An private marker to indicate that pop() should raise if no default 

248 # is given. 

249 __marker = _MarkerEntry( 

250 name="marker", 

251 size=0, 

252 ref=uuid.UUID("{00000000-0000-0000-0000-000000000000}"), 

253 ctime=datetime.datetime.fromtimestamp(0, datetime.UTC), 

254 ) 

255 

256 def pop(self, key: str, default: CacheEntry | None = __marker) -> CacheEntry | None: 

257 # The marker for dict.pop is not the same as our marker. 

258 if default is self.__marker: 

259 entry = self._entries.pop(key) 

260 else: 

261 entry = self._entries.pop(key, self.__marker) 

262 # Should not attempt to correct for this entry being removed 

263 # if we got the default value. 

264 if entry is self.__marker: 

265 return default 

266 

267 self._decrement(entry) 

268 # The default entry given to this method may not even be in the cache. 

269 if entry and entry.ref in self._ref_map: 

270 keys = self._ref_map[entry.ref] 

271 if key in keys: 

272 keys.remove(key) 

273 return entry 

274 

275 def get_dataset_keys(self, dataset_id: DatasetId | None) -> list[str] | None: 

276 """Retrieve all keys associated with the given dataset ID. 

277 

278 Parameters 

279 ---------- 

280 dataset_id : `DatasetId` or `None` 

281 The dataset ID to look up. Returns `None` if the ID is `None`. 

282 

283 Returns 

284 ------- 

285 keys : `list` [`str`] 

286 Keys associated with this dataset. These keys can be used to lookup 

287 the cache entry information in the `CacheRegistry`. Returns 

288 `None` if the dataset is not known to the cache. 

289 """ 

290 if dataset_id not in self._ref_map: 

291 return None 

292 keys = self._ref_map[dataset_id] 

293 if not keys: 

294 return None 

295 return keys 

296 

297 

298class DatastoreCacheManagerConfig(ConfigSubset): 

299 """Configuration information for `DatastoreCacheManager`.""" 

300 

301 component = "cached" 

302 requiredKeys = ("cacheable",) 

303 

304 

305class AbstractDatastoreCacheManager(ABC): 

306 """An abstract base class for managing caching in a Datastore. 

307 

308 Parameters 

309 ---------- 

310 config : `str` or `DatastoreCacheManagerConfig` 

311 Configuration to control caching. 

312 universe : `DimensionUniverse` 

313 Set of all known dimensions, used to expand and validate any used 

314 in lookup keys. 

315 """ 

316 

317 @property 

318 def cache_size(self) -> int: 

319 """Size of the cache in bytes.""" 

320 return 0 

321 

322 @property 

323 def file_count(self) -> int: 

324 """Return number of cached files tracked by registry.""" 

325 return 0 

326 

327 def __init__(self, config: str | DatastoreCacheManagerConfig, universe: DimensionUniverse): 

328 if not isinstance(config, DatastoreCacheManagerConfig): 

329 config = DatastoreCacheManagerConfig(config) 

330 assert isinstance(config, DatastoreCacheManagerConfig) 

331 self.config = config 

332 

333 @abstractmethod 

334 def should_be_cached(self, entity: DatasetRef | DatasetType | StorageClass) -> bool: 

335 """Indicate whether the entity should be added to the cache. 

336 

337 This is relevant when reading or writing. 

338 

339 Parameters 

340 ---------- 

341 entity : `StorageClass` or `DatasetType` or `DatasetRef` 

342 Thing to test against the configuration. The ``name`` property 

343 is used to determine a match. A `DatasetType` will first check 

344 its name, before checking its `StorageClass`. If there are no 

345 matches the default will be returned. 

346 

347 Returns 

348 ------- 

349 should_cache : `bool` 

350 Returns `True` if the dataset should be cached; `False` otherwise. 

351 """ 

352 raise NotImplementedError() 

353 

354 @abstractmethod 

355 def known_to_cache(self, ref: DatasetRef, extension: str | None = None) -> bool: 

356 """Report if the dataset is known to the cache. 

357 

358 Parameters 

359 ---------- 

360 ref : `DatasetRef` 

361 Dataset to check for in the cache. 

362 extension : `str`, optional 

363 File extension expected. Should include the leading "``.``". 

364 If `None` the extension is ignored and the dataset ID alone is 

365 used to check in the cache. The extension must be defined if 

366 a specific component is being checked. 

367 

368 Returns 

369 ------- 

370 known : `bool` 

371 Returns `True` if the dataset is currently known to the cache 

372 and `False` otherwise. 

373 

374 Notes 

375 ----- 

376 This method can only report if the dataset is known to the cache 

377 in this specific instant and does not indicate whether the file 

378 can be read from the cache later. `find_in_cache()` should be called 

379 if the cached file is to be used. 

380 """ 

381 raise NotImplementedError() 

382 

383 @abstractmethod 

384 def move_to_cache(self, uri: ResourcePath, ref: DatasetRef) -> ResourcePath | None: 

385 """Move a file to the cache. 

386 

387 Move the given file into the cache, using the supplied DatasetRef 

388 for naming. A call is made to `should_be_cached()` and if the 

389 DatasetRef should not be accepted `None` will be returned. 

390 

391 Cache expiry can occur during this. 

392 

393 Parameters 

394 ---------- 

395 uri : `lsst.resources.ResourcePath` 

396 Location of the file to be relocated to the cache. Will be moved. 

397 ref : `DatasetRef` 

398 Ref associated with this file. Will be used to determine the name 

399 of the file within the cache. 

400 

401 Returns 

402 ------- 

403 new : `lsst.resources.ResourcePath` or `None` 

404 URI to the file within the cache, or `None` if the dataset 

405 was not accepted by the cache. 

406 """ 

407 raise NotImplementedError() 

408 

409 @abstractmethod 

410 @contextlib.contextmanager 

411 def find_in_cache(self, ref: DatasetRef, extension: str) -> Iterator[ResourcePath | None]: 

412 """Look for a dataset in the cache and return its location. 

413 

414 Parameters 

415 ---------- 

416 ref : `DatasetRef` 

417 Dataset to locate in the cache. 

418 extension : `str` 

419 File extension expected. Should include the leading "``.``". 

420 

421 Yields 

422 ------ 

423 uri : `lsst.resources.ResourcePath` or `None` 

424 The URI to the cached file, or `None` if the file has not been 

425 cached. 

426 

427 Notes 

428 ----- 

429 Should be used as a context manager in order to prevent this 

430 file from being removed from the cache for that context. 

431 """ 

432 raise NotImplementedError() 

433 

434 @abstractmethod 

435 def remove_from_cache(self, ref: DatasetRef | Iterable[DatasetRef]) -> None: 

436 """Remove the specified datasets from the cache. 

437 

438 It is not an error for these datasets to be missing from the cache. 

439 

440 Parameters 

441 ---------- 

442 ref : `DatasetRef` or iterable of `DatasetRef` 

443 The datasets to remove from the cache. 

444 """ 

445 raise NotImplementedError() 

446 

447 @abstractmethod 

448 def __str__(self) -> str: 

449 raise NotImplementedError() 

450 

451 

452class DatastoreCacheManager(AbstractDatastoreCacheManager): 

453 """A class for managing caching in a Datastore using local files. 

454 

455 Parameters 

456 ---------- 

457 config : `str` or `DatastoreCacheManagerConfig` 

458 Configuration to control caching. 

459 universe : `DimensionUniverse` 

460 Set of all known dimensions, used to expand and validate any used 

461 in lookup keys. 

462 

463 Notes 

464 ----- 

465 Two environment variables can be used to override the cache directory 

466 and expiration configuration: 

467 

468 * ``$DAF_BUTLER_CACHE_DIRECTORY`` 

469 * ``$DAF_BUTLER_CACHE_EXPIRATION_MODE`` 

470 

471 The expiration mode should take the form ``mode=threshold`` so for 

472 example to configure expiration to limit the cache directory to 5 datasets 

473 the value would be ``datasets=5``. 

474 

475 Additionally the ``$DAF_BUTLER_CACHE_DIRECTORY_IF_UNSET`` environment 

476 variable can be used to indicate that this directory should be used 

477 if no explicit directory has been specified from configuration or from 

478 the ``$DAF_BUTLER_CACHE_DIRECTORY`` environment variable. 

479 """ 

480 

481 _temp_exemption_prefix = "exempt/" 

482 _tmpdir_prefix = "butler-cache-dir-" 

483 

484 def __init__(self, config: str | DatastoreCacheManagerConfig, universe: DimensionUniverse): 

485 super().__init__(config, universe) 

486 

487 # Expiration mode. Read from config but allow override from 

488 # the environment. 

489 expiration_mode = self.config.get(("expiry", "mode")) 

490 threshold = self.config.get(("expiry", "threshold")) 

491 

492 external_mode = os.environ.get("DAF_BUTLER_CACHE_EXPIRATION_MODE") 

493 if external_mode: 

494 if external_mode == "disabled": 

495 expiration_mode = external_mode 

496 threshold = 0 

497 elif "=" in external_mode: 

498 expiration_mode, expiration_threshold = external_mode.split("=", 1) 

499 threshold = int(expiration_threshold) 

500 else: 

501 log.warning( 

502 "Unrecognized form (%s) for DAF_BUTLER_CACHE_EXPIRATION_MODE environment variable. " 

503 "Ignoring.", 

504 external_mode, 

505 ) 

506 if expiration_mode is None: 

507 # Force to None to avoid confusion. 

508 threshold = None 

509 

510 allowed = ("disabled", "datasets", "age", "size", "files") 

511 if expiration_mode and expiration_mode not in allowed: 

512 raise ValueError( 

513 f"Unrecognized value for cache expiration mode. Got {expiration_mode} but expected " 

514 + ",".join(allowed) 

515 ) 

516 

517 self._expiration_mode: str | None = expiration_mode 

518 self._expiration_threshold: int | None = threshold 

519 if self._expiration_threshold is None and self._expiration_mode is not None: 

520 raise ValueError( 

521 f"Cache expiration threshold must be set for expiration mode {self._expiration_mode}" 

522 ) 

523 

524 # Files in cache, indexed by path within the cache directory. 

525 self._cache_entries = CacheRegistry() 

526 

527 # No need for additional configuration if the cache has been disabled. 

528 if self._expiration_mode == "disabled": 

529 log.debug("Cache configured in disabled state.") 

530 self._cache_directory: ResourcePath | None = ResourcePath( 

531 "datastore_cache_disabled", forceAbsolute=False, forceDirectory=True 

532 ) 

533 return 

534 

535 # Set cache directory if it pre-exists, else defer creation until 

536 # requested. Allow external override from environment. 

537 root = os.environ.get("DAF_BUTLER_CACHE_DIRECTORY") or self.config.get("root") 

538 

539 # Allow the execution environment to override the default values 

540 # so long as no default value has been set from the line above. 

541 if root is None: 

542 root = os.environ.get("DAF_BUTLER_CACHE_DIRECTORY_IF_UNSET") 

543 

544 self._cache_directory = ( 

545 ResourcePath(root, forceAbsolute=True, forceDirectory=True) if root is not None else None 

546 ) 

547 

548 if self._cache_directory: 

549 if not self._cache_directory.isLocal: 

550 raise ValueError( 

551 f"Cache directory must be on a local file system. Got: {self._cache_directory}" 

552 ) 

553 # Ensure that the cache directory is created. We assume that 

554 # someone specifying a permanent cache directory will be expecting 

555 # it to always be there. This will also trigger an error 

556 # early rather than waiting until the cache is needed. 

557 self._cache_directory.mkdir() 

558 

559 # Calculate the caching lookup table. 

560 self._lut = processLookupConfigs(self.config["cacheable"], universe=universe) 

561 

562 # Default decision to for whether a dataset should be cached. 

563 self._caching_default = self.config.get("default", False) 

564 

565 log.debug( 

566 "Cache configuration:\n- root: %s\n- expiration mode: %s", 

567 self._cache_directory if self._cache_directory else "tmpdir", 

568 f"{self._expiration_mode}={self._expiration_threshold}" if self._expiration_mode else "disabled", 

569 ) 

570 

571 @property 

572 def cache_directory(self) -> ResourcePath: 

573 if self._cache_directory is None: 

574 # Create on demand. Allow the override environment variable 

575 # to be used in case it got set after this object was created 

576 # but before a cache was used. 

577 if cache_dir := os.environ.get("DAF_BUTLER_CACHE_DIRECTORY_IF_UNSET"): 

578 # Someone else will clean this up. 

579 isTemporary = False 

580 msg = "deferred fallback" 

581 else: 

582 cache_dir = tempfile.mkdtemp(prefix=self._tmpdir_prefix) 

583 isTemporary = True 

584 msg = "temporary" 

585 

586 self._cache_directory = ResourcePath(cache_dir, forceDirectory=True, isTemporary=isTemporary) 

587 log.debug("Using %s cache directory at %s", msg, self._cache_directory) 

588 

589 # Remove when we no longer need it. 

590 if isTemporary: 

591 atexit.register(remove_cache_directory, self._cache_directory.ospath) 

592 return self._cache_directory 

593 

594 @property 

595 def _temp_exempt_directory(self) -> ResourcePath: 

596 """Return the directory in which to store temporary cache files that 

597 should not be expired. 

598 """ 

599 return self.cache_directory.join(self._temp_exemption_prefix) 

600 

601 @property 

602 def cache_size(self) -> int: 

603 return self._cache_entries.cache_size 

604 

605 @property 

606 def file_count(self) -> int: 

607 return len(self._cache_entries) 

608 

609 @classmethod 

610 def create_disabled(cls, universe: DimensionUniverse) -> Self: 

611 """Create an instance that is disabled by default but can be 

612 overridden by the environment. 

613 

614 Parameters 

615 ---------- 

616 universe : `DimensionUniverse` 

617 The universe to use if the datastore becomes enabled via the 

618 environment. 

619 

620 Returns 

621 ------- 

622 cache_manager : `DatastoreCacheManager` 

623 A new cache manager, that is disabled by default but might be 

624 enabled if environment variables are set. 

625 """ 

626 # It is not sufficient to set the mode to disabled, there has to be 

627 # enough configuration for the caching to work when enabled. This 

628 # means setting the default to true (cache everything), inheriting 

629 # inherit the FileDatastore default config (which works for Rubin but 

630 # doesn't allow non-Rubin deployments to cache anything, but can in 

631 # theory be overridden), or allowing a parameter to be passed in here 

632 # defining the cacheable section of the config. For now cache 

633 # everything. Supporting a JSON environment variable defining the 

634 # cacheable storage classes is also a possibility. 

635 config_str = """ 

636cached: 

637 default: true 

638 cacheable: 

639 irrelevant: false 

640 expiry: 

641 mode: disabled 

642 threshold: 0 

643 """ 

644 config = Config.fromYaml(config_str) 

645 return cls(DatastoreCacheManagerConfig(config), universe) 

646 

647 @classmethod 

648 def set_fallback_cache_directory_if_unset(cls) -> tuple[bool, str]: 

649 """Define a fallback cache directory if a fallback not set already. 

650 

651 Returns 

652 ------- 

653 defined : `bool` 

654 `True` if the fallback directory was newly-defined in this method. 

655 `False` if it had already been set. 

656 cache_dir : `str` 

657 Returns the path to the cache directory that will be used if it's 

658 needed. This can allow the caller to run a directory cleanup 

659 when it's no longer needed (something that the cache manager 

660 can not do because forks should not clean up directories defined 

661 by the parent process). 

662 

663 Notes 

664 ----- 

665 The fallback directory will not be defined if one has already been 

666 defined. This method sets the ``DAF_BUTLER_CACHE_DIRECTORY_IF_UNSET`` 

667 environment variable only if a value has not previously been stored 

668 in that environment variable. Setting the environment variable allows 

669 this value to survive into spawned subprocesses. Calling this method 

670 will lead to all subsequently created cache managers sharing the same 

671 cache. 

672 """ 

673 if cache_dir := os.environ.get("DAF_BUTLER_CACHE_DIRECTORY_IF_UNSET"): 

674 # A value has already been set. 

675 return (False, cache_dir) 

676 

677 # As a class method, we do not know at this point whether a cache 

678 # directory will be needed so it would be impolite to create a 

679 # directory that will never be used. 

680 

681 # Construct our own temp name -- 16 characters should have a fairly 

682 # low chance of clashing when combined with the process ID. 

683 characters = "abcdefghijklmnopqrstuvwxyz0123456789_" 

684 rng = Random() 

685 tempchars = "".join(rng.choice(characters) for _ in range(16)) 

686 

687 tempname = f"{cls._tmpdir_prefix}{os.getpid()}-{tempchars}" 

688 

689 cache_dir = os.path.join(tempfile.gettempdir(), tempname) 

690 os.environ["DAF_BUTLER_CACHE_DIRECTORY_IF_UNSET"] = cache_dir 

691 return (True, cache_dir) 

692 

693 def should_be_cached(self, entity: DatasetRef | DatasetType | StorageClass) -> bool: 

694 # Docstring inherited 

695 if self._expiration_mode == "disabled": 

696 return False 

697 

698 matchName: LookupKey | str = f"{entity} (via default)" 

699 should_cache = self._caching_default 

700 

701 for key in entity._lookupNames(): 

702 if key in self._lut: 

703 should_cache = bool(self._lut[key]) 

704 matchName = key 

705 break 

706 

707 if not isinstance(should_cache, bool): 

708 raise TypeError( 

709 f"Got cache value {should_cache!r} for config entry {matchName!r}; expected bool." 

710 ) 

711 

712 log.debug("%s (match: %s) should%s be cached", entity, matchName, "" if should_cache else " not") 

713 return should_cache 

714 

715 def _construct_cache_name(self, ref: DatasetRef, extension: str) -> ResourcePath: 

716 """Construct the name to use for this dataset in the cache. 

717 

718 Parameters 

719 ---------- 

720 ref : `DatasetRef` 

721 The dataset to look up in or write to the cache. 

722 extension : `str` 

723 File extension to use for this file. Should include the 

724 leading "``.``". 

725 

726 Returns 

727 ------- 

728 uri : `lsst.resources.ResourcePath` 

729 URI to use for this dataset in the cache. 

730 """ 

731 return _construct_cache_path(self.cache_directory, ref, extension) 

732 

733 def move_to_cache(self, uri: ResourcePath, ref: DatasetRef) -> ResourcePath | None: 

734 # Docstring inherited 

735 if self._expiration_mode == "disabled": 

736 return None 

737 if not self.should_be_cached(ref): 

738 return None 

739 

740 # Write the file using the id of the dataset ref and the file 

741 # extension. 

742 cached_location = self._construct_cache_name(ref, uri.getExtension()) 

743 

744 # Run cache expiry to ensure that we have room for this 

745 # item. 

746 self._expire_cache() 

747 

748 # The above reset the in-memory cache status. It's entirely possible 

749 # that another process has just cached this file (if multiple 

750 # processes are caching on read), so check our in-memory cache 

751 # before attempting to cache the dataset. 

752 path_in_cache = cached_location.relative_to(self.cache_directory) 

753 if path_in_cache and path_in_cache in self._cache_entries: 

754 return cached_location 

755 

756 # Move into the cache. Given that multiple processes might be 

757 # sharing a single cache directory, and the file we need might have 

758 # been copied in whilst we were checking, allow overwrite without 

759 # complaint. Even for a private cache directory it is possible that 

760 # a second butler in a subprocess could be writing to it. 

761 cached_location.transfer_from(uri, transfer="move", overwrite=True) 

762 log.debug("Cached dataset %s to %s", ref, cached_location) 

763 

764 self._register_cache_entry(cached_location) 

765 

766 return cached_location 

767 

768 @contextlib.contextmanager 

769 def find_in_cache(self, ref: DatasetRef, extension: str) -> Iterator[ResourcePath | None]: 

770 # Docstring inherited 

771 if self._expiration_mode == "disabled": 

772 yield None 

773 return 

774 

775 # Short circuit this if the cache directory has not been created yet. 

776 if self._cache_directory is None: 

777 yield None 

778 return 

779 

780 cached_location = self._construct_cache_name(ref, extension) 

781 if cached_location.exists(): 

782 log.debug("Found cached file %s for dataset %s.", cached_location, ref) 

783 

784 # The cached file could be removed by another process doing 

785 # cache expiration so we need to protect against that by making 

786 # a copy in a different tree. Use hardlinks to ensure that 

787 # we either have the cached file or we don't. This is robust 

788 # against race conditions that can be caused by using soft links 

789 # and the other end of the link being deleted just after it 

790 # is created. 

791 path_in_cache = cached_location.relative_to(self.cache_directory) 

792 assert path_in_cache is not None, f"Somehow {cached_location} not in cache directory" 

793 

794 # Need to use a unique file name for the temporary location to 

795 # ensure that two different processes can read the file 

796 # simultaneously without one of them deleting it when it's in 

797 # use elsewhere. Retain the original filename for easier debugging. 

798 random = str(uuid.uuid4())[:8] 

799 basename = cached_location.basename() 

800 filename = f"{random}-{basename}" 

801 

802 temp_location: ResourcePath | None = self._temp_exempt_directory.join(filename) 

803 try: 

804 if temp_location is not None: 

805 temp_location.transfer_from(cached_location, transfer="hardlink") 

806 except Exception as e: 

807 log.debug("Detected error creating hardlink for dataset %s: %s", ref, e) 

808 # Any failure will be treated as if the file was not 

809 # in the cache. Yielding the original cache location 

810 # is too dangerous. 

811 temp_location = None 

812 

813 try: 

814 log.debug("Yielding temporary cache location %s for dataset %s", temp_location, ref) 

815 yield temp_location 

816 finally: 

817 try: 

818 if temp_location: 

819 temp_location.remove() 

820 except FileNotFoundError: 

821 pass 

822 return 

823 

824 log.debug("Dataset %s not found in cache.", ref) 

825 yield None 

826 return 

827 

828 def remove_from_cache(self, refs: DatasetRef | Iterable[DatasetRef]) -> None: 

829 # Docstring inherited. 

830 

831 # Stop early if there are no cache entries anyhow. 

832 if len(self._cache_entries) == 0: 

833 return 

834 

835 if isinstance(refs, DatasetRef): 

836 refs = [refs] 

837 

838 # Create a set of all the IDs 

839 all_ids = {ref.id for ref in refs} 

840 

841 keys_to_remove = [] 

842 for key, entry in self._cache_entries.items(): 

843 if entry.ref in all_ids: 

844 keys_to_remove.append(key) 

845 self._remove_from_cache(keys_to_remove) 

846 

847 def _register_cache_entry(self, cached_location: ResourcePath, can_exist: bool = False) -> str | None: 

848 """Record the file in the cache registry. 

849 

850 Parameters 

851 ---------- 

852 cached_location : `lsst.resources.ResourcePath` 

853 Location of the file to be registered. 

854 can_exist : `bool`, optional 

855 If `True` the item being registered can already be listed. 

856 This can allow a cache refresh to run without checking the 

857 file again. If `False` it is an error for the registry to 

858 already know about this file. 

859 

860 Returns 

861 ------- 

862 cache_key : `str` or `None` 

863 The key used in the registry for this file. `None` if the file 

864 no longer exists (it could have been expired by another process). 

865 """ 

866 path_in_cache = cached_location.relative_to(self.cache_directory) 

867 if path_in_cache is None: 

868 raise ValueError( 

869 f"Can not register cached file {cached_location} that is not within" 

870 f" the cache directory at {self.cache_directory}." 

871 ) 

872 if path_in_cache in self._cache_entries: 

873 if can_exist: 

874 return path_in_cache 

875 else: 

876 raise ValueError( 

877 f"Cached file {cached_location} is already known to the registry" 

878 " but this was expected to be a new file." 

879 ) 

880 try: 

881 details = CacheEntry.from_file(cached_location, root=self.cache_directory) 

882 except FileNotFoundError: 

883 return None 

884 self._cache_entries[path_in_cache] = details 

885 return path_in_cache 

886 

887 def scan_cache(self) -> None: 

888 """Scan the cache directory and record information about files.""" 

889 if self._expiration_mode == "disabled": 

890 return 

891 

892 found = set() 

893 for file in ResourcePath.findFileResources([self.cache_directory]): 

894 assert isinstance(file, ResourcePath), "Unexpectedly did not get ResourcePath from iterator" 

895 

896 # Skip any that are found in an exempt part of the hierarchy 

897 # since they should not be part of the registry. 

898 if file.relative_to(self._temp_exempt_directory) is not None: 

899 continue 

900 

901 try: 

902 path_in_cache = self._register_cache_entry(file, can_exist=True) 

903 if path_in_cache: 

904 found.add(path_in_cache) 

905 except InvalidCacheFilenameError: 

906 # Skip files that are in the cache directory, but were not 

907 # created by us. 

908 pass 

909 

910 # Find any files that were recorded in the cache but are no longer 

911 # on disk. (something else cleared them out?) 

912 known_to_cache = set(self._cache_entries) 

913 missing = known_to_cache - found 

914 

915 if missing: 

916 log.debug( 

917 "Entries no longer on disk but thought to be in cache and so removed: %s", ",".join(missing) 

918 ) 

919 for path_in_cache in missing: 

920 self._cache_entries.pop(path_in_cache, None) 

921 

922 def known_to_cache(self, ref: DatasetRef, extension: str | None = None) -> bool: 

923 """Report if the dataset is known to the cache. 

924 

925 Parameters 

926 ---------- 

927 ref : `DatasetRef` 

928 Dataset to check for in the cache. 

929 extension : `str`, optional 

930 File extension expected. Should include the leading "``.``". 

931 If `None` the extension is ignored and the dataset ID alone is 

932 used to check in the cache. The extension must be defined if 

933 a specific component is being checked. 

934 

935 Returns 

936 ------- 

937 known : `bool` 

938 Returns `True` if the dataset is currently known to the cache 

939 and `False` otherwise. If the dataset refers to a component and 

940 an extension is given then only that component is checked. 

941 

942 Notes 

943 ----- 

944 This method can only report if the dataset is known to the cache 

945 in this specific instant and does not indicate whether the file 

946 can be read from the cache later. `find_in_cache()` should be called 

947 if the cached file is to be used. 

948 

949 This method does not force the cache to be re-scanned and so can miss 

950 cached datasets that have recently been written by other processes. 

951 """ 

952 if self._expiration_mode == "disabled": 

953 return False 

954 if self._cache_directory is None: 

955 return False 

956 if self.file_count == 0: 

957 return False 

958 

959 if extension is None: 

960 # Look solely for matching dataset ref ID and not specific 

961 # components. 

962 cached_paths = self._cache_entries.get_dataset_keys(ref.id) 

963 return bool(cached_paths) 

964 

965 else: 

966 # Extension is known so we can do an explicit look up for the 

967 # cache entry. 

968 cached_location = self._construct_cache_name(ref, extension) 

969 path_in_cache = cached_location.relative_to(self.cache_directory) 

970 assert path_in_cache is not None # For mypy 

971 return path_in_cache in self._cache_entries 

972 

973 def _remove_from_cache(self, cache_entries: Iterable[str]) -> None: 

974 """Remove the specified cache entries from cache. 

975 

976 Parameters 

977 ---------- 

978 cache_entries : `~collections.abc.Iterable` of `str` 

979 The entries to remove from the cache. The values are the path 

980 within the cache. 

981 """ 

982 for entry in cache_entries: 

983 path = self.cache_directory.join(entry) 

984 

985 self._cache_entries.pop(entry, None) 

986 log.debug("Removing file from cache: %s", path) 

987 with contextlib.suppress(FileNotFoundError): 

988 path.remove() 

989 

990 def _expire_cache(self) -> None: 

991 """Expire the files in the cache. 

992 

993 Notes 

994 ----- 

995 The expiration modes are defined by the config or can be overridden. 

996 Available options: 

997 

998 * ``files``: Number of files. 

999 * ``datasets``: Number of datasets 

1000 * ``size``: Total size of files. 

1001 * ``age``: Age of files. 

1002 

1003 The first three would remove in reverse time order. 

1004 Number of files is complicated by the possibility of disassembled 

1005 composites where 10 small files can be created for each dataset. 

1006 

1007 Additionally there is a use case for an external user to explicitly 

1008 state the dataset refs that should be cached and then when to 

1009 remove them. Overriding any global configuration. 

1010 """ 

1011 if self._expiration_mode is None: 

1012 # Expiration has been disabled. 

1013 return 

1014 

1015 # mypy can't be sure we have set a threshold properly 

1016 if self._expiration_threshold is None: 

1017 log.warning( 

1018 "Requesting cache expiry of mode %s but no threshold set in config.", self._expiration_mode 

1019 ) 

1020 return 

1021 

1022 # Sync up cache. There is no file locking involved so for a shared 

1023 # cache multiple processes may be racing to delete files. Deleting 

1024 # a file that no longer exists is not an error. 

1025 self.scan_cache() 

1026 

1027 if self._expiration_mode == "files": 

1028 n_files = len(self._cache_entries) 

1029 n_over = n_files - self._expiration_threshold 

1030 if n_over > 0: 

1031 sorted_keys = self._sort_cache() 

1032 keys_to_remove = sorted_keys[:n_over] 

1033 self._remove_from_cache(keys_to_remove) 

1034 return 

1035 

1036 if self._expiration_mode == "datasets": 

1037 # Count the datasets, in ascending timestamp order, 

1038 # so that oldest turn up first. 

1039 datasets = defaultdict(list) 

1040 for key in self._sort_cache(): 

1041 entry = self._cache_entries[key] 

1042 datasets[entry.ref].append(key) 

1043 

1044 n_datasets = len(datasets) 

1045 n_over = n_datasets - self._expiration_threshold 

1046 if n_over > 0: 

1047 # Keys will be read out in insert order which 

1048 # will be date order so oldest ones are removed. 

1049 ref_ids = list(datasets.keys())[:n_over] 

1050 keys_to_remove = list(itertools.chain.from_iterable(datasets[ref_id] for ref_id in ref_ids)) 

1051 self._remove_from_cache(keys_to_remove) 

1052 return 

1053 

1054 if self._expiration_mode == "size": 

1055 if self.cache_size > self._expiration_threshold: 

1056 for key in self._sort_cache(): 

1057 self._remove_from_cache([key]) 

1058 if self.cache_size <= self._expiration_threshold: 

1059 break 

1060 return 

1061 

1062 if self._expiration_mode == "age": 

1063 now = datetime.datetime.now(datetime.UTC) 

1064 for key in self._sort_cache(): 

1065 delta = now - self._cache_entries[key].ctime 

1066 if delta.seconds > self._expiration_threshold: 

1067 self._remove_from_cache([key]) 

1068 else: 

1069 # We're already in date order. 

1070 break 

1071 return 

1072 

1073 raise ValueError(f"Unrecognized cache expiration mode of {self._expiration_mode}") 

1074 

1075 def _sort_cache(self) -> list[str]: 

1076 """Sort the cache entries by time and return the sorted keys. 

1077 

1078 Returns 

1079 ------- 

1080 sorted : `list` of `str` 

1081 Keys into the cache, sorted by time with oldest first. 

1082 """ 

1083 

1084 def _sort_by_time(key: str) -> datetime.datetime: 

1085 """Sorter key function using cache entry details.""" 

1086 return self._cache_entries[key].ctime 

1087 

1088 return sorted(self._cache_entries, key=_sort_by_time) 

1089 

1090 def __str__(self) -> str: 

1091 if self._expiration_mode == "disabled": 

1092 return f"""{type(self).__name__}(disabled)""" 

1093 cachedir = self._cache_directory if self._cache_directory else "<tempdir>" 

1094 return ( 

1095 f"{type(self).__name__}@{cachedir} ({self._expiration_mode}={self._expiration_threshold}," 

1096 f"default={self._caching_default}) " 

1097 f"n_files={self.file_count}, n_bytes={self.cache_size}" 

1098 ) 

1099 

1100 

1101class DatastoreDisabledCacheManager(AbstractDatastoreCacheManager): 

1102 """A variant of the datastore cache where no cache is enabled. 

1103 

1104 Parameters 

1105 ---------- 

1106 config : `str` or `DatastoreCacheManagerConfig` 

1107 Configuration to control caching. 

1108 universe : `DimensionUniverse` 

1109 Set of all known dimensions, used to expand and validate any used 

1110 in lookup keys. 

1111 """ 

1112 

1113 def __init__( 

1114 self, 

1115 config: str | DatastoreCacheManagerConfig | None = None, 

1116 universe: DimensionUniverse | None = None, 

1117 ): 

1118 return 

1119 

1120 def should_be_cached(self, entity: DatasetRef | DatasetType | StorageClass) -> bool: 

1121 """Indicate whether the entity should be added to the cache. 

1122 

1123 Parameters 

1124 ---------- 

1125 entity : `StorageClass` or `DatasetType` or `DatasetRef` 

1126 Thing to test against the configuration. The ``name`` property 

1127 is used to determine a match. A `DatasetType` will first check 

1128 its name, before checking its `StorageClass`. If there are no 

1129 matches the default will be returned. 

1130 

1131 Returns 

1132 ------- 

1133 should_cache : `bool` 

1134 Always returns `False`. 

1135 """ 

1136 return False 

1137 

1138 def move_to_cache(self, uri: ResourcePath, ref: DatasetRef) -> ResourcePath | None: 

1139 """Move dataset to cache. 

1140 

1141 Parameters 

1142 ---------- 

1143 uri : `lsst.resources.ResourcePath` 

1144 Location of the file to be relocated to the cache. Will be moved. 

1145 ref : `DatasetRef` 

1146 Ref associated with this file. Will be used to determine the name 

1147 of the file within the cache. 

1148 

1149 Returns 

1150 ------- 

1151 new : `lsst.resources.ResourcePath` or `None` 

1152 Always refuses and returns `None`. 

1153 """ 

1154 return None 

1155 

1156 @contextlib.contextmanager 

1157 def find_in_cache(self, ref: DatasetRef, extension: str) -> Iterator[ResourcePath | None]: 

1158 """Look for a dataset in the cache and return its location. 

1159 

1160 Parameters 

1161 ---------- 

1162 ref : `DatasetRef` 

1163 Dataset to locate in the cache. 

1164 extension : `str` 

1165 File extension expected. Should include the leading "``.``". 

1166 

1167 Yields 

1168 ------ 

1169 uri : `lsst.resources.ResourcePath` or `None` 

1170 Never finds a file. Always returns `None`. 

1171 """ 

1172 yield None 

1173 

1174 def remove_from_cache(self, ref: DatasetRef | Iterable[DatasetRef]) -> None: 

1175 """Remove datasets from cache. 

1176 

1177 Parameters 

1178 ---------- 

1179 ref : `DatasetRef` or iterable of `DatasetRef` 

1180 The datasets to remove from the cache. Always does nothing. 

1181 """ 

1182 return 

1183 

1184 def known_to_cache(self, ref: DatasetRef, extension: str | None = None) -> bool: 

1185 """Report if a dataset is known to the cache. 

1186 

1187 Parameters 

1188 ---------- 

1189 ref : `DatasetRef` 

1190 Dataset to check for in the cache. 

1191 extension : `str`, optional 

1192 File extension expected. Should include the leading "``.``". 

1193 If `None` the extension is ignored and the dataset ID alone is 

1194 used to check in the cache. The extension must be defined if 

1195 a specific component is being checked. 

1196 

1197 Returns 

1198 ------- 

1199 known : `bool` 

1200 Always returns `False`. 

1201 """ 

1202 return False 

1203 

1204 def __str__(self) -> str: 

1205 return f"{type(self).__name__}()" 

1206 

1207 

1208class InvalidCacheFilenameError(Exception): 

1209 """Raised when attempting to register a file in the cache with a name in 

1210 the incorrect format. 

1211 """