Coverage for python / lsst / daf / butler / datastore / cache_manager.py: 22%
429 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 08:49 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 08:49 +0000
1# This file is part of daf_butler.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28"""Cache management for a datastore."""
30from __future__ import annotations
32__all__ = (
33 "AbstractDatastoreCacheManager",
34 "DatastoreCacheManager",
35 "DatastoreCacheManagerConfig",
36 "DatastoreDisabledCacheManager",
37)
39import atexit
40import contextlib
41import datetime
42import itertools
43import logging
44import os
45import shutil
46import tempfile
47import uuid
48from abc import ABC, abstractmethod
49from collections import defaultdict
50from collections.abc import ItemsView, Iterable, Iterator, KeysView, ValuesView
51from random import Random
52from typing import TYPE_CHECKING, Self
54from pydantic import BaseModel, PrivateAttr
56from lsst.resources import ResourcePath
58from .._config import Config, ConfigSubset
59from .._config_support import processLookupConfigs
60from .._dataset_ref import DatasetId, DatasetRef
62if TYPE_CHECKING:
63 from .._config_support import LookupKey
64 from .._dataset_type import DatasetType
65 from .._storage_class import StorageClass
66 from ..dimensions import DimensionUniverse
68log = logging.getLogger(__name__)
71def remove_cache_directory(directory: str) -> None:
72 """Remove the specified directory and all its contents.
74 Parameters
75 ----------
76 directory : `str`
77 Directory to remove.
78 """
79 log.debug("Removing temporary cache directory %s", directory)
80 shutil.rmtree(directory, ignore_errors=True)
83def _construct_cache_path(root: ResourcePath, ref: DatasetRef, extension: str) -> ResourcePath:
84 """Construct the full path to use for this dataset in the cache.
86 Parameters
87 ----------
88 root : `lsst.resources.ResourcePath`
89 The root of the cache.
90 ref : `DatasetRef`
91 The dataset to look up in or write to the cache.
92 extension : `str`
93 File extension to use for this file. Should include the
94 leading "``.``".
96 Returns
97 -------
98 uri : `lsst.resources.ResourcePath`
99 URI to use for this dataset in the cache.
100 """
101 # Dataset type component is needed in the name if composite
102 # disassembly is happening since the ID is shared for all components.
103 component = ref.datasetType.component()
104 component = f"_{component}" if component else ""
105 return root.join(f"{ref.id}{component}{extension}")
108def _parse_cache_name(cached_location: str) -> tuple[uuid.UUID, str | None, str | None]:
109 """For a given cache name, return its component parts.
111 Changes to ``_construct_cache_path()`` should be reflected here.
113 Parameters
114 ----------
115 cached_location : `str`
116 The name of the file within the cache.
118 Returns
119 -------
120 id : `uuid.UUID`
121 The dataset ID.
122 component : `str` or `None`
123 The name of the component, if present.
124 extension: `str` or `None`
125 The file extension, if present.
126 """
127 # Assume first dot is the extension and so allow .fits.gz
128 root_ext = cached_location.split(".", maxsplit=1)
129 root = root_ext.pop(0)
130 ext = "." + root_ext.pop(0) if root_ext else None
132 parts = root.split("_")
133 try:
134 id_ = uuid.UUID(parts.pop(0))
135 except ValueError as e:
136 raise InvalidCacheFilenameError(f"Invalid or missing ID in cache file name: {cached_location}") from e
137 component = parts.pop(0) if parts else None
138 return id_, component, ext
141class CacheEntry(BaseModel):
142 """Represent an entry in the cache."""
144 name: str
145 """Name of the file."""
147 size: int
148 """Size of the file in bytes."""
150 ctime: datetime.datetime
151 """Creation time of the file."""
153 ref: DatasetId
154 """ID of this dataset."""
156 component: str | None = None
157 """Component for this disassembled composite (optional)."""
159 @classmethod
160 def from_file(cls, file: ResourcePath, root: ResourcePath) -> CacheEntry:
161 """Construct an object from a file name.
163 Parameters
164 ----------
165 file : `lsst.resources.ResourcePath`
166 Path to the file.
167 root : `lsst.resources.ResourcePath`
168 Cache root directory.
169 """
170 file_in_cache = file.relative_to(root)
171 if file_in_cache is None:
172 raise ValueError(f"Supplied file {file} is not inside root {root}")
173 id_, component, _ = _parse_cache_name(file_in_cache)
175 stat = os.stat(file.ospath)
176 return cls.model_construct(
177 name=file_in_cache,
178 size=stat.st_size,
179 ref=id_,
180 component=component,
181 ctime=datetime.datetime.fromtimestamp(stat.st_ctime, datetime.UTC),
182 )
185class _MarkerEntry(CacheEntry):
186 pass
189class CacheRegistry(BaseModel):
190 """Collection of cache entries."""
192 _size: int = PrivateAttr(0)
193 """Size of the cache."""
195 _entries: dict[str, CacheEntry] = PrivateAttr({})
196 """Internal collection of cache entries."""
198 _ref_map: dict[DatasetId, list[str]] = PrivateAttr({})
199 """Mapping of DatasetID to corresponding keys in cache registry."""
201 @property
202 def cache_size(self) -> int:
203 return self._size
205 def __getitem__(self, key: str) -> CacheEntry:
206 return self._entries[key]
208 def __setitem__(self, key: str, entry: CacheEntry) -> None:
209 self._size += entry.size
210 self._entries[key] = entry
212 # Update the mapping from ref to path.
213 if entry.ref not in self._ref_map:
214 self._ref_map[entry.ref] = []
215 self._ref_map[entry.ref].append(key)
217 def __delitem__(self, key: str) -> None:
218 entry = self._entries.pop(key)
219 self._decrement(entry)
220 self._ref_map[entry.ref].remove(key)
222 def _decrement(self, entry: CacheEntry | None) -> None:
223 if entry:
224 self._size -= entry.size
225 if self._size < 0:
226 log.warning("Cache size has gone negative. Inconsistent cache records...")
227 self._size = 0
229 def __contains__(self, key: str) -> bool:
230 return key in self._entries
232 def __len__(self) -> int:
233 return len(self._entries)
235 def __iter__(self) -> Iterator[str]: # type: ignore
236 return iter(self._entries)
238 def keys(self) -> KeysView[str]:
239 return self._entries.keys()
241 def values(self) -> ValuesView[CacheEntry]:
242 return self._entries.values()
244 def items(self) -> ItemsView[str, CacheEntry]:
245 return self._entries.items()
247 # An private marker to indicate that pop() should raise if no default
248 # is given.
249 __marker = _MarkerEntry(
250 name="marker",
251 size=0,
252 ref=uuid.UUID("{00000000-0000-0000-0000-000000000000}"),
253 ctime=datetime.datetime.fromtimestamp(0, datetime.UTC),
254 )
256 def pop(self, key: str, default: CacheEntry | None = __marker) -> CacheEntry | None:
257 # The marker for dict.pop is not the same as our marker.
258 if default is self.__marker:
259 entry = self._entries.pop(key)
260 else:
261 entry = self._entries.pop(key, self.__marker)
262 # Should not attempt to correct for this entry being removed
263 # if we got the default value.
264 if entry is self.__marker:
265 return default
267 self._decrement(entry)
268 # The default entry given to this method may not even be in the cache.
269 if entry and entry.ref in self._ref_map:
270 keys = self._ref_map[entry.ref]
271 if key in keys:
272 keys.remove(key)
273 return entry
275 def get_dataset_keys(self, dataset_id: DatasetId | None) -> list[str] | None:
276 """Retrieve all keys associated with the given dataset ID.
278 Parameters
279 ----------
280 dataset_id : `DatasetId` or `None`
281 The dataset ID to look up. Returns `None` if the ID is `None`.
283 Returns
284 -------
285 keys : `list` [`str`]
286 Keys associated with this dataset. These keys can be used to lookup
287 the cache entry information in the `CacheRegistry`. Returns
288 `None` if the dataset is not known to the cache.
289 """
290 if dataset_id not in self._ref_map:
291 return None
292 keys = self._ref_map[dataset_id]
293 if not keys:
294 return None
295 return keys
298class DatastoreCacheManagerConfig(ConfigSubset):
299 """Configuration information for `DatastoreCacheManager`."""
301 component = "cached"
302 requiredKeys = ("cacheable",)
305class AbstractDatastoreCacheManager(ABC):
306 """An abstract base class for managing caching in a Datastore.
308 Parameters
309 ----------
310 config : `str` or `DatastoreCacheManagerConfig`
311 Configuration to control caching.
312 universe : `DimensionUniverse`
313 Set of all known dimensions, used to expand and validate any used
314 in lookup keys.
315 """
317 @property
318 def cache_size(self) -> int:
319 """Size of the cache in bytes."""
320 return 0
322 @property
323 def file_count(self) -> int:
324 """Return number of cached files tracked by registry."""
325 return 0
327 def __init__(self, config: str | DatastoreCacheManagerConfig, universe: DimensionUniverse):
328 if not isinstance(config, DatastoreCacheManagerConfig):
329 config = DatastoreCacheManagerConfig(config)
330 assert isinstance(config, DatastoreCacheManagerConfig)
331 self.config = config
333 @abstractmethod
334 def should_be_cached(self, entity: DatasetRef | DatasetType | StorageClass) -> bool:
335 """Indicate whether the entity should be added to the cache.
337 This is relevant when reading or writing.
339 Parameters
340 ----------
341 entity : `StorageClass` or `DatasetType` or `DatasetRef`
342 Thing to test against the configuration. The ``name`` property
343 is used to determine a match. A `DatasetType` will first check
344 its name, before checking its `StorageClass`. If there are no
345 matches the default will be returned.
347 Returns
348 -------
349 should_cache : `bool`
350 Returns `True` if the dataset should be cached; `False` otherwise.
351 """
352 raise NotImplementedError()
354 @abstractmethod
355 def known_to_cache(self, ref: DatasetRef, extension: str | None = None) -> bool:
356 """Report if the dataset is known to the cache.
358 Parameters
359 ----------
360 ref : `DatasetRef`
361 Dataset to check for in the cache.
362 extension : `str`, optional
363 File extension expected. Should include the leading "``.``".
364 If `None` the extension is ignored and the dataset ID alone is
365 used to check in the cache. The extension must be defined if
366 a specific component is being checked.
368 Returns
369 -------
370 known : `bool`
371 Returns `True` if the dataset is currently known to the cache
372 and `False` otherwise.
374 Notes
375 -----
376 This method can only report if the dataset is known to the cache
377 in this specific instant and does not indicate whether the file
378 can be read from the cache later. `find_in_cache()` should be called
379 if the cached file is to be used.
380 """
381 raise NotImplementedError()
383 @abstractmethod
384 def move_to_cache(self, uri: ResourcePath, ref: DatasetRef) -> ResourcePath | None:
385 """Move a file to the cache.
387 Move the given file into the cache, using the supplied DatasetRef
388 for naming. A call is made to `should_be_cached()` and if the
389 DatasetRef should not be accepted `None` will be returned.
391 Cache expiry can occur during this.
393 Parameters
394 ----------
395 uri : `lsst.resources.ResourcePath`
396 Location of the file to be relocated to the cache. Will be moved.
397 ref : `DatasetRef`
398 Ref associated with this file. Will be used to determine the name
399 of the file within the cache.
401 Returns
402 -------
403 new : `lsst.resources.ResourcePath` or `None`
404 URI to the file within the cache, or `None` if the dataset
405 was not accepted by the cache.
406 """
407 raise NotImplementedError()
409 @abstractmethod
410 @contextlib.contextmanager
411 def find_in_cache(self, ref: DatasetRef, extension: str) -> Iterator[ResourcePath | None]:
412 """Look for a dataset in the cache and return its location.
414 Parameters
415 ----------
416 ref : `DatasetRef`
417 Dataset to locate in the cache.
418 extension : `str`
419 File extension expected. Should include the leading "``.``".
421 Yields
422 ------
423 uri : `lsst.resources.ResourcePath` or `None`
424 The URI to the cached file, or `None` if the file has not been
425 cached.
427 Notes
428 -----
429 Should be used as a context manager in order to prevent this
430 file from being removed from the cache for that context.
431 """
432 raise NotImplementedError()
434 @abstractmethod
435 def remove_from_cache(self, ref: DatasetRef | Iterable[DatasetRef]) -> None:
436 """Remove the specified datasets from the cache.
438 It is not an error for these datasets to be missing from the cache.
440 Parameters
441 ----------
442 ref : `DatasetRef` or iterable of `DatasetRef`
443 The datasets to remove from the cache.
444 """
445 raise NotImplementedError()
447 @abstractmethod
448 def __str__(self) -> str:
449 raise NotImplementedError()
452class DatastoreCacheManager(AbstractDatastoreCacheManager):
453 """A class for managing caching in a Datastore using local files.
455 Parameters
456 ----------
457 config : `str` or `DatastoreCacheManagerConfig`
458 Configuration to control caching.
459 universe : `DimensionUniverse`
460 Set of all known dimensions, used to expand and validate any used
461 in lookup keys.
463 Notes
464 -----
465 Two environment variables can be used to override the cache directory
466 and expiration configuration:
468 * ``$DAF_BUTLER_CACHE_DIRECTORY``
469 * ``$DAF_BUTLER_CACHE_EXPIRATION_MODE``
471 The expiration mode should take the form ``mode=threshold`` so for
472 example to configure expiration to limit the cache directory to 5 datasets
473 the value would be ``datasets=5``.
475 Additionally the ``$DAF_BUTLER_CACHE_DIRECTORY_IF_UNSET`` environment
476 variable can be used to indicate that this directory should be used
477 if no explicit directory has been specified from configuration or from
478 the ``$DAF_BUTLER_CACHE_DIRECTORY`` environment variable.
479 """
481 _temp_exemption_prefix = "exempt/"
482 _tmpdir_prefix = "butler-cache-dir-"
484 def __init__(self, config: str | DatastoreCacheManagerConfig, universe: DimensionUniverse):
485 super().__init__(config, universe)
487 # Expiration mode. Read from config but allow override from
488 # the environment.
489 expiration_mode = self.config.get(("expiry", "mode"))
490 threshold = self.config.get(("expiry", "threshold"))
492 external_mode = os.environ.get("DAF_BUTLER_CACHE_EXPIRATION_MODE")
493 if external_mode:
494 if external_mode == "disabled":
495 expiration_mode = external_mode
496 threshold = 0
497 elif "=" in external_mode:
498 expiration_mode, expiration_threshold = external_mode.split("=", 1)
499 threshold = int(expiration_threshold)
500 else:
501 log.warning(
502 "Unrecognized form (%s) for DAF_BUTLER_CACHE_EXPIRATION_MODE environment variable. "
503 "Ignoring.",
504 external_mode,
505 )
506 if expiration_mode is None:
507 # Force to None to avoid confusion.
508 threshold = None
510 allowed = ("disabled", "datasets", "age", "size", "files")
511 if expiration_mode and expiration_mode not in allowed:
512 raise ValueError(
513 f"Unrecognized value for cache expiration mode. Got {expiration_mode} but expected "
514 + ",".join(allowed)
515 )
517 self._expiration_mode: str | None = expiration_mode
518 self._expiration_threshold: int | None = threshold
519 if self._expiration_threshold is None and self._expiration_mode is not None:
520 raise ValueError(
521 f"Cache expiration threshold must be set for expiration mode {self._expiration_mode}"
522 )
524 # Files in cache, indexed by path within the cache directory.
525 self._cache_entries = CacheRegistry()
527 # No need for additional configuration if the cache has been disabled.
528 if self._expiration_mode == "disabled":
529 log.debug("Cache configured in disabled state.")
530 self._cache_directory: ResourcePath | None = ResourcePath(
531 "datastore_cache_disabled", forceAbsolute=False, forceDirectory=True
532 )
533 return
535 # Set cache directory if it pre-exists, else defer creation until
536 # requested. Allow external override from environment.
537 root = os.environ.get("DAF_BUTLER_CACHE_DIRECTORY") or self.config.get("root")
539 # Allow the execution environment to override the default values
540 # so long as no default value has been set from the line above.
541 if root is None:
542 root = os.environ.get("DAF_BUTLER_CACHE_DIRECTORY_IF_UNSET")
544 self._cache_directory = (
545 ResourcePath(root, forceAbsolute=True, forceDirectory=True) if root is not None else None
546 )
548 if self._cache_directory:
549 if not self._cache_directory.isLocal:
550 raise ValueError(
551 f"Cache directory must be on a local file system. Got: {self._cache_directory}"
552 )
553 # Ensure that the cache directory is created. We assume that
554 # someone specifying a permanent cache directory will be expecting
555 # it to always be there. This will also trigger an error
556 # early rather than waiting until the cache is needed.
557 self._cache_directory.mkdir()
559 # Calculate the caching lookup table.
560 self._lut = processLookupConfigs(self.config["cacheable"], universe=universe)
562 # Default decision to for whether a dataset should be cached.
563 self._caching_default = self.config.get("default", False)
565 log.debug(
566 "Cache configuration:\n- root: %s\n- expiration mode: %s",
567 self._cache_directory if self._cache_directory else "tmpdir",
568 f"{self._expiration_mode}={self._expiration_threshold}" if self._expiration_mode else "disabled",
569 )
571 @property
572 def cache_directory(self) -> ResourcePath:
573 if self._cache_directory is None:
574 # Create on demand. Allow the override environment variable
575 # to be used in case it got set after this object was created
576 # but before a cache was used.
577 if cache_dir := os.environ.get("DAF_BUTLER_CACHE_DIRECTORY_IF_UNSET"):
578 # Someone else will clean this up.
579 isTemporary = False
580 msg = "deferred fallback"
581 else:
582 cache_dir = tempfile.mkdtemp(prefix=self._tmpdir_prefix)
583 isTemporary = True
584 msg = "temporary"
586 self._cache_directory = ResourcePath(cache_dir, forceDirectory=True, isTemporary=isTemporary)
587 log.debug("Using %s cache directory at %s", msg, self._cache_directory)
589 # Remove when we no longer need it.
590 if isTemporary:
591 atexit.register(remove_cache_directory, self._cache_directory.ospath)
592 return self._cache_directory
594 @property
595 def _temp_exempt_directory(self) -> ResourcePath:
596 """Return the directory in which to store temporary cache files that
597 should not be expired.
598 """
599 return self.cache_directory.join(self._temp_exemption_prefix)
601 @property
602 def cache_size(self) -> int:
603 return self._cache_entries.cache_size
605 @property
606 def file_count(self) -> int:
607 return len(self._cache_entries)
609 @classmethod
610 def create_disabled(cls, universe: DimensionUniverse) -> Self:
611 """Create an instance that is disabled by default but can be
612 overridden by the environment.
614 Parameters
615 ----------
616 universe : `DimensionUniverse`
617 The universe to use if the datastore becomes enabled via the
618 environment.
620 Returns
621 -------
622 cache_manager : `DatastoreCacheManager`
623 A new cache manager, that is disabled by default but might be
624 enabled if environment variables are set.
625 """
626 # It is not sufficient to set the mode to disabled, there has to be
627 # enough configuration for the caching to work when enabled. This
628 # means setting the default to true (cache everything), inheriting
629 # inherit the FileDatastore default config (which works for Rubin but
630 # doesn't allow non-Rubin deployments to cache anything, but can in
631 # theory be overridden), or allowing a parameter to be passed in here
632 # defining the cacheable section of the config. For now cache
633 # everything. Supporting a JSON environment variable defining the
634 # cacheable storage classes is also a possibility.
635 config_str = """
636cached:
637 default: true
638 cacheable:
639 irrelevant: false
640 expiry:
641 mode: disabled
642 threshold: 0
643 """
644 config = Config.fromYaml(config_str)
645 return cls(DatastoreCacheManagerConfig(config), universe)
647 @classmethod
648 def set_fallback_cache_directory_if_unset(cls) -> tuple[bool, str]:
649 """Define a fallback cache directory if a fallback not set already.
651 Returns
652 -------
653 defined : `bool`
654 `True` if the fallback directory was newly-defined in this method.
655 `False` if it had already been set.
656 cache_dir : `str`
657 Returns the path to the cache directory that will be used if it's
658 needed. This can allow the caller to run a directory cleanup
659 when it's no longer needed (something that the cache manager
660 can not do because forks should not clean up directories defined
661 by the parent process).
663 Notes
664 -----
665 The fallback directory will not be defined if one has already been
666 defined. This method sets the ``DAF_BUTLER_CACHE_DIRECTORY_IF_UNSET``
667 environment variable only if a value has not previously been stored
668 in that environment variable. Setting the environment variable allows
669 this value to survive into spawned subprocesses. Calling this method
670 will lead to all subsequently created cache managers sharing the same
671 cache.
672 """
673 if cache_dir := os.environ.get("DAF_BUTLER_CACHE_DIRECTORY_IF_UNSET"):
674 # A value has already been set.
675 return (False, cache_dir)
677 # As a class method, we do not know at this point whether a cache
678 # directory will be needed so it would be impolite to create a
679 # directory that will never be used.
681 # Construct our own temp name -- 16 characters should have a fairly
682 # low chance of clashing when combined with the process ID.
683 characters = "abcdefghijklmnopqrstuvwxyz0123456789_"
684 rng = Random()
685 tempchars = "".join(rng.choice(characters) for _ in range(16))
687 tempname = f"{cls._tmpdir_prefix}{os.getpid()}-{tempchars}"
689 cache_dir = os.path.join(tempfile.gettempdir(), tempname)
690 os.environ["DAF_BUTLER_CACHE_DIRECTORY_IF_UNSET"] = cache_dir
691 return (True, cache_dir)
693 def should_be_cached(self, entity: DatasetRef | DatasetType | StorageClass) -> bool:
694 # Docstring inherited
695 if self._expiration_mode == "disabled":
696 return False
698 matchName: LookupKey | str = f"{entity} (via default)"
699 should_cache = self._caching_default
701 for key in entity._lookupNames():
702 if key in self._lut:
703 should_cache = bool(self._lut[key])
704 matchName = key
705 break
707 if not isinstance(should_cache, bool):
708 raise TypeError(
709 f"Got cache value {should_cache!r} for config entry {matchName!r}; expected bool."
710 )
712 log.debug("%s (match: %s) should%s be cached", entity, matchName, "" if should_cache else " not")
713 return should_cache
715 def _construct_cache_name(self, ref: DatasetRef, extension: str) -> ResourcePath:
716 """Construct the name to use for this dataset in the cache.
718 Parameters
719 ----------
720 ref : `DatasetRef`
721 The dataset to look up in or write to the cache.
722 extension : `str`
723 File extension to use for this file. Should include the
724 leading "``.``".
726 Returns
727 -------
728 uri : `lsst.resources.ResourcePath`
729 URI to use for this dataset in the cache.
730 """
731 return _construct_cache_path(self.cache_directory, ref, extension)
733 def move_to_cache(self, uri: ResourcePath, ref: DatasetRef) -> ResourcePath | None:
734 # Docstring inherited
735 if self._expiration_mode == "disabled":
736 return None
737 if not self.should_be_cached(ref):
738 return None
740 # Write the file using the id of the dataset ref and the file
741 # extension.
742 cached_location = self._construct_cache_name(ref, uri.getExtension())
744 # Run cache expiry to ensure that we have room for this
745 # item.
746 self._expire_cache()
748 # The above reset the in-memory cache status. It's entirely possible
749 # that another process has just cached this file (if multiple
750 # processes are caching on read), so check our in-memory cache
751 # before attempting to cache the dataset.
752 path_in_cache = cached_location.relative_to(self.cache_directory)
753 if path_in_cache and path_in_cache in self._cache_entries:
754 return cached_location
756 # Move into the cache. Given that multiple processes might be
757 # sharing a single cache directory, and the file we need might have
758 # been copied in whilst we were checking, allow overwrite without
759 # complaint. Even for a private cache directory it is possible that
760 # a second butler in a subprocess could be writing to it.
761 cached_location.transfer_from(uri, transfer="move", overwrite=True)
762 log.debug("Cached dataset %s to %s", ref, cached_location)
764 self._register_cache_entry(cached_location)
766 return cached_location
768 @contextlib.contextmanager
769 def find_in_cache(self, ref: DatasetRef, extension: str) -> Iterator[ResourcePath | None]:
770 # Docstring inherited
771 if self._expiration_mode == "disabled":
772 yield None
773 return
775 # Short circuit this if the cache directory has not been created yet.
776 if self._cache_directory is None:
777 yield None
778 return
780 cached_location = self._construct_cache_name(ref, extension)
781 if cached_location.exists():
782 log.debug("Found cached file %s for dataset %s.", cached_location, ref)
784 # The cached file could be removed by another process doing
785 # cache expiration so we need to protect against that by making
786 # a copy in a different tree. Use hardlinks to ensure that
787 # we either have the cached file or we don't. This is robust
788 # against race conditions that can be caused by using soft links
789 # and the other end of the link being deleted just after it
790 # is created.
791 path_in_cache = cached_location.relative_to(self.cache_directory)
792 assert path_in_cache is not None, f"Somehow {cached_location} not in cache directory"
794 # Need to use a unique file name for the temporary location to
795 # ensure that two different processes can read the file
796 # simultaneously without one of them deleting it when it's in
797 # use elsewhere. Retain the original filename for easier debugging.
798 random = str(uuid.uuid4())[:8]
799 basename = cached_location.basename()
800 filename = f"{random}-{basename}"
802 temp_location: ResourcePath | None = self._temp_exempt_directory.join(filename)
803 try:
804 if temp_location is not None:
805 temp_location.transfer_from(cached_location, transfer="hardlink")
806 except Exception as e:
807 log.debug("Detected error creating hardlink for dataset %s: %s", ref, e)
808 # Any failure will be treated as if the file was not
809 # in the cache. Yielding the original cache location
810 # is too dangerous.
811 temp_location = None
813 try:
814 log.debug("Yielding temporary cache location %s for dataset %s", temp_location, ref)
815 yield temp_location
816 finally:
817 try:
818 if temp_location:
819 temp_location.remove()
820 except FileNotFoundError:
821 pass
822 return
824 log.debug("Dataset %s not found in cache.", ref)
825 yield None
826 return
828 def remove_from_cache(self, refs: DatasetRef | Iterable[DatasetRef]) -> None:
829 # Docstring inherited.
831 # Stop early if there are no cache entries anyhow.
832 if len(self._cache_entries) == 0:
833 return
835 if isinstance(refs, DatasetRef):
836 refs = [refs]
838 # Create a set of all the IDs
839 all_ids = {ref.id for ref in refs}
841 keys_to_remove = []
842 for key, entry in self._cache_entries.items():
843 if entry.ref in all_ids:
844 keys_to_remove.append(key)
845 self._remove_from_cache(keys_to_remove)
847 def _register_cache_entry(self, cached_location: ResourcePath, can_exist: bool = False) -> str | None:
848 """Record the file in the cache registry.
850 Parameters
851 ----------
852 cached_location : `lsst.resources.ResourcePath`
853 Location of the file to be registered.
854 can_exist : `bool`, optional
855 If `True` the item being registered can already be listed.
856 This can allow a cache refresh to run without checking the
857 file again. If `False` it is an error for the registry to
858 already know about this file.
860 Returns
861 -------
862 cache_key : `str` or `None`
863 The key used in the registry for this file. `None` if the file
864 no longer exists (it could have been expired by another process).
865 """
866 path_in_cache = cached_location.relative_to(self.cache_directory)
867 if path_in_cache is None:
868 raise ValueError(
869 f"Can not register cached file {cached_location} that is not within"
870 f" the cache directory at {self.cache_directory}."
871 )
872 if path_in_cache in self._cache_entries:
873 if can_exist:
874 return path_in_cache
875 else:
876 raise ValueError(
877 f"Cached file {cached_location} is already known to the registry"
878 " but this was expected to be a new file."
879 )
880 try:
881 details = CacheEntry.from_file(cached_location, root=self.cache_directory)
882 except FileNotFoundError:
883 return None
884 self._cache_entries[path_in_cache] = details
885 return path_in_cache
887 def scan_cache(self) -> None:
888 """Scan the cache directory and record information about files."""
889 if self._expiration_mode == "disabled":
890 return
892 found = set()
893 for file in ResourcePath.findFileResources([self.cache_directory]):
894 assert isinstance(file, ResourcePath), "Unexpectedly did not get ResourcePath from iterator"
896 # Skip any that are found in an exempt part of the hierarchy
897 # since they should not be part of the registry.
898 if file.relative_to(self._temp_exempt_directory) is not None:
899 continue
901 try:
902 path_in_cache = self._register_cache_entry(file, can_exist=True)
903 if path_in_cache:
904 found.add(path_in_cache)
905 except InvalidCacheFilenameError:
906 # Skip files that are in the cache directory, but were not
907 # created by us.
908 pass
910 # Find any files that were recorded in the cache but are no longer
911 # on disk. (something else cleared them out?)
912 known_to_cache = set(self._cache_entries)
913 missing = known_to_cache - found
915 if missing:
916 log.debug(
917 "Entries no longer on disk but thought to be in cache and so removed: %s", ",".join(missing)
918 )
919 for path_in_cache in missing:
920 self._cache_entries.pop(path_in_cache, None)
922 def known_to_cache(self, ref: DatasetRef, extension: str | None = None) -> bool:
923 """Report if the dataset is known to the cache.
925 Parameters
926 ----------
927 ref : `DatasetRef`
928 Dataset to check for in the cache.
929 extension : `str`, optional
930 File extension expected. Should include the leading "``.``".
931 If `None` the extension is ignored and the dataset ID alone is
932 used to check in the cache. The extension must be defined if
933 a specific component is being checked.
935 Returns
936 -------
937 known : `bool`
938 Returns `True` if the dataset is currently known to the cache
939 and `False` otherwise. If the dataset refers to a component and
940 an extension is given then only that component is checked.
942 Notes
943 -----
944 This method can only report if the dataset is known to the cache
945 in this specific instant and does not indicate whether the file
946 can be read from the cache later. `find_in_cache()` should be called
947 if the cached file is to be used.
949 This method does not force the cache to be re-scanned and so can miss
950 cached datasets that have recently been written by other processes.
951 """
952 if self._expiration_mode == "disabled":
953 return False
954 if self._cache_directory is None:
955 return False
956 if self.file_count == 0:
957 return False
959 if extension is None:
960 # Look solely for matching dataset ref ID and not specific
961 # components.
962 cached_paths = self._cache_entries.get_dataset_keys(ref.id)
963 return bool(cached_paths)
965 else:
966 # Extension is known so we can do an explicit look up for the
967 # cache entry.
968 cached_location = self._construct_cache_name(ref, extension)
969 path_in_cache = cached_location.relative_to(self.cache_directory)
970 assert path_in_cache is not None # For mypy
971 return path_in_cache in self._cache_entries
973 def _remove_from_cache(self, cache_entries: Iterable[str]) -> None:
974 """Remove the specified cache entries from cache.
976 Parameters
977 ----------
978 cache_entries : `~collections.abc.Iterable` of `str`
979 The entries to remove from the cache. The values are the path
980 within the cache.
981 """
982 for entry in cache_entries:
983 path = self.cache_directory.join(entry)
985 self._cache_entries.pop(entry, None)
986 log.debug("Removing file from cache: %s", path)
987 with contextlib.suppress(FileNotFoundError):
988 path.remove()
990 def _expire_cache(self) -> None:
991 """Expire the files in the cache.
993 Notes
994 -----
995 The expiration modes are defined by the config or can be overridden.
996 Available options:
998 * ``files``: Number of files.
999 * ``datasets``: Number of datasets
1000 * ``size``: Total size of files.
1001 * ``age``: Age of files.
1003 The first three would remove in reverse time order.
1004 Number of files is complicated by the possibility of disassembled
1005 composites where 10 small files can be created for each dataset.
1007 Additionally there is a use case for an external user to explicitly
1008 state the dataset refs that should be cached and then when to
1009 remove them. Overriding any global configuration.
1010 """
1011 if self._expiration_mode is None:
1012 # Expiration has been disabled.
1013 return
1015 # mypy can't be sure we have set a threshold properly
1016 if self._expiration_threshold is None:
1017 log.warning(
1018 "Requesting cache expiry of mode %s but no threshold set in config.", self._expiration_mode
1019 )
1020 return
1022 # Sync up cache. There is no file locking involved so for a shared
1023 # cache multiple processes may be racing to delete files. Deleting
1024 # a file that no longer exists is not an error.
1025 self.scan_cache()
1027 if self._expiration_mode == "files":
1028 n_files = len(self._cache_entries)
1029 n_over = n_files - self._expiration_threshold
1030 if n_over > 0:
1031 sorted_keys = self._sort_cache()
1032 keys_to_remove = sorted_keys[:n_over]
1033 self._remove_from_cache(keys_to_remove)
1034 return
1036 if self._expiration_mode == "datasets":
1037 # Count the datasets, in ascending timestamp order,
1038 # so that oldest turn up first.
1039 datasets = defaultdict(list)
1040 for key in self._sort_cache():
1041 entry = self._cache_entries[key]
1042 datasets[entry.ref].append(key)
1044 n_datasets = len(datasets)
1045 n_over = n_datasets - self._expiration_threshold
1046 if n_over > 0:
1047 # Keys will be read out in insert order which
1048 # will be date order so oldest ones are removed.
1049 ref_ids = list(datasets.keys())[:n_over]
1050 keys_to_remove = list(itertools.chain.from_iterable(datasets[ref_id] for ref_id in ref_ids))
1051 self._remove_from_cache(keys_to_remove)
1052 return
1054 if self._expiration_mode == "size":
1055 if self.cache_size > self._expiration_threshold:
1056 for key in self._sort_cache():
1057 self._remove_from_cache([key])
1058 if self.cache_size <= self._expiration_threshold:
1059 break
1060 return
1062 if self._expiration_mode == "age":
1063 now = datetime.datetime.now(datetime.UTC)
1064 for key in self._sort_cache():
1065 delta = now - self._cache_entries[key].ctime
1066 if delta.seconds > self._expiration_threshold:
1067 self._remove_from_cache([key])
1068 else:
1069 # We're already in date order.
1070 break
1071 return
1073 raise ValueError(f"Unrecognized cache expiration mode of {self._expiration_mode}")
1075 def _sort_cache(self) -> list[str]:
1076 """Sort the cache entries by time and return the sorted keys.
1078 Returns
1079 -------
1080 sorted : `list` of `str`
1081 Keys into the cache, sorted by time with oldest first.
1082 """
1084 def _sort_by_time(key: str) -> datetime.datetime:
1085 """Sorter key function using cache entry details."""
1086 return self._cache_entries[key].ctime
1088 return sorted(self._cache_entries, key=_sort_by_time)
1090 def __str__(self) -> str:
1091 if self._expiration_mode == "disabled":
1092 return f"""{type(self).__name__}(disabled)"""
1093 cachedir = self._cache_directory if self._cache_directory else "<tempdir>"
1094 return (
1095 f"{type(self).__name__}@{cachedir} ({self._expiration_mode}={self._expiration_threshold},"
1096 f"default={self._caching_default}) "
1097 f"n_files={self.file_count}, n_bytes={self.cache_size}"
1098 )
1101class DatastoreDisabledCacheManager(AbstractDatastoreCacheManager):
1102 """A variant of the datastore cache where no cache is enabled.
1104 Parameters
1105 ----------
1106 config : `str` or `DatastoreCacheManagerConfig`
1107 Configuration to control caching.
1108 universe : `DimensionUniverse`
1109 Set of all known dimensions, used to expand and validate any used
1110 in lookup keys.
1111 """
1113 def __init__(
1114 self,
1115 config: str | DatastoreCacheManagerConfig | None = None,
1116 universe: DimensionUniverse | None = None,
1117 ):
1118 return
1120 def should_be_cached(self, entity: DatasetRef | DatasetType | StorageClass) -> bool:
1121 """Indicate whether the entity should be added to the cache.
1123 Parameters
1124 ----------
1125 entity : `StorageClass` or `DatasetType` or `DatasetRef`
1126 Thing to test against the configuration. The ``name`` property
1127 is used to determine a match. A `DatasetType` will first check
1128 its name, before checking its `StorageClass`. If there are no
1129 matches the default will be returned.
1131 Returns
1132 -------
1133 should_cache : `bool`
1134 Always returns `False`.
1135 """
1136 return False
1138 def move_to_cache(self, uri: ResourcePath, ref: DatasetRef) -> ResourcePath | None:
1139 """Move dataset to cache.
1141 Parameters
1142 ----------
1143 uri : `lsst.resources.ResourcePath`
1144 Location of the file to be relocated to the cache. Will be moved.
1145 ref : `DatasetRef`
1146 Ref associated with this file. Will be used to determine the name
1147 of the file within the cache.
1149 Returns
1150 -------
1151 new : `lsst.resources.ResourcePath` or `None`
1152 Always refuses and returns `None`.
1153 """
1154 return None
1156 @contextlib.contextmanager
1157 def find_in_cache(self, ref: DatasetRef, extension: str) -> Iterator[ResourcePath | None]:
1158 """Look for a dataset in the cache and return its location.
1160 Parameters
1161 ----------
1162 ref : `DatasetRef`
1163 Dataset to locate in the cache.
1164 extension : `str`
1165 File extension expected. Should include the leading "``.``".
1167 Yields
1168 ------
1169 uri : `lsst.resources.ResourcePath` or `None`
1170 Never finds a file. Always returns `None`.
1171 """
1172 yield None
1174 def remove_from_cache(self, ref: DatasetRef | Iterable[DatasetRef]) -> None:
1175 """Remove datasets from cache.
1177 Parameters
1178 ----------
1179 ref : `DatasetRef` or iterable of `DatasetRef`
1180 The datasets to remove from the cache. Always does nothing.
1181 """
1182 return
1184 def known_to_cache(self, ref: DatasetRef, extension: str | None = None) -> bool:
1185 """Report if a dataset is known to the cache.
1187 Parameters
1188 ----------
1189 ref : `DatasetRef`
1190 Dataset to check for in the cache.
1191 extension : `str`, optional
1192 File extension expected. Should include the leading "``.``".
1193 If `None` the extension is ignored and the dataset ID alone is
1194 used to check in the cache. The extension must be defined if
1195 a specific component is being checked.
1197 Returns
1198 -------
1199 known : `bool`
1200 Always returns `False`.
1201 """
1202 return False
1204 def __str__(self) -> str:
1205 return f"{type(self).__name__}()"
1208class InvalidCacheFilenameError(Exception):
1209 """Raised when attempting to register a file in the cache with a name in
1210 the incorrect format.
1211 """