Coverage for python / lsst / pipe / base / caching_limited_butler.py: 26%
79 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-22 08:57 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-22 08:57 +0000
1# This file is part of pipe_base.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28from __future__ import annotations
30__all__ = ["CachingLimitedButler"]
32import logging
33from collections.abc import Iterable, Set
34from typing import Any
36from lsst.daf.butler import (
37 ButlerMetrics,
38 DatasetId,
39 DatasetProvenance,
40 DatasetRef,
41 DeferredDatasetHandle,
42 DimensionUniverse,
43 LimitedButler,
44 StorageClass,
45)
47from ._dataset_handle import InMemoryDatasetHandle
49_LOG = logging.getLogger(__name__)
52class CachingLimitedButler(LimitedButler):
53 """A `LimitedButler` that caches datasets.
55 A `CachingLimitedButler` caches on both `.put()` and `.get()`, and holds a
56 single instance of the most recently used dataset type for that put/get.
58 The dataset types which will be cached on put/get are controlled via the
59 `cache_on_put` and `cache_on_get` attributes, respectively.
61 By default, copies of the cached items are returned on `get`, so that code
62 is free to operate on data in-place. A `no_copy_on_cache` attribute also
63 exists to tell the `CachingLimitedButler` not to return copies when it is
64 known that the calling code can be trusted not to change values, e.g. when
65 passing calibs to `isrTask`.
67 Parameters
68 ----------
69 wrapped : `LimitedButler`
70 The butler to wrap.
71 cache_on_put : `set` [`str`], optional
72 The dataset types to cache on put.
73 cache_on_get : `set` [`str`], optional
74 The dataset types to cache on get.
75 no_copy_on_cache : `set` [`str`], optional
76 The dataset types for which to not return copies when cached.
77 """
79 def __init__(
80 self,
81 wrapped: LimitedButler,
82 cache_on_put: Set[str] = frozenset(),
83 cache_on_get: Set[str] = frozenset(),
84 no_copy_on_cache: Set[str] = frozenset(),
85 ):
86 self._wrapped = wrapped
87 self.storageClasses = self._wrapped.storageClasses
88 self._cache_on_put = cache_on_put
89 self._cache_on_get = cache_on_get
90 self._cache: dict[str, tuple[DatasetId, InMemoryDatasetHandle]] = {}
91 self._no_copy_on_cache = no_copy_on_cache
93 def close(self) -> None:
94 self._wrapped.close()
96 @property
97 def _metrics(self) -> ButlerMetrics:
98 # Need to always forward from the wrapped metrics object.
99 return self._wrapped._metrics
101 @_metrics.setter
102 def _metrics(self, metrics: ButlerMetrics) -> None:
103 # Allow record_metrics() context manager to override the wrapped
104 # butler.
105 self._wrapped._metrics = metrics
107 def get(
108 self,
109 ref: DatasetRef,
110 /,
111 *,
112 parameters: dict[str, Any] | None = None,
113 storageClass: StorageClass | str | None = None,
114 ) -> Any:
115 if storageClass is None:
116 storageClass = ref.datasetType.storageClass
117 elif isinstance(storageClass, str):
118 storageClass = self.storageClasses.getStorageClass(storageClass)
120 # check if we have this dataset type in the cache
121 if cached := self._cache.get(ref.datasetType.name):
122 dataset_id, handle = cached
123 if dataset_id == ref.id: # if we do, check it's the right object
124 _LOG.debug("Returning cached dataset %s", ref)
125 return handle.get(parameters=parameters, storageClass=storageClass)
127 obj = self._wrapped.get(ref, parameters=parameters, storageClass=storageClass)
128 if ref.datasetType.name in self._cache_on_get and not parameters:
129 handle = InMemoryDatasetHandle(
130 obj,
131 storageClass=storageClass,
132 dataId=ref.dataId,
133 copy=ref.datasetType.name not in self._no_copy_on_cache,
134 )
135 # and not parameters is to make sure we don't cache sub-images etc
136 self._cache[ref.datasetType.name] = (ref.id, handle)
137 _LOG.debug("Cached dataset %s", ref)
138 # make sure copy fires if needed
139 return handle.get()
140 return obj
142 def getDeferred(
143 self,
144 ref: DatasetRef,
145 /,
146 *,
147 parameters: dict[str, Any] | None = None,
148 storageClass: str | StorageClass | None = None,
149 ) -> DeferredDatasetHandle:
150 # note that this does not use the cache at all
151 return self._wrapped.getDeferred(ref, parameters=parameters, storageClass=storageClass)
153 def stored_many(self, refs: Iterable[DatasetRef]) -> dict[DatasetRef, bool]:
154 result = {}
155 unknown_refs = []
156 for ref in refs:
157 if cached := self._cache.get(ref.datasetType.name):
158 dataset_id, _ = cached
159 if dataset_id == ref.id:
160 result[ref] = True
161 continue
162 unknown_refs.append(ref)
164 result.update(self._wrapped.stored_many(unknown_refs))
165 return result
167 def isWriteable(self) -> bool:
168 return self._wrapped.isWriteable()
170 def put(self, obj: Any, ref: DatasetRef, /, *, provenance: DatasetProvenance | None = None) -> DatasetRef:
171 if ref.datasetType.name in self._cache_on_put:
172 self._cache[ref.datasetType.name] = (
173 ref.id,
174 InMemoryDatasetHandle(
175 obj,
176 storageClass=ref.datasetType.storageClass,
177 dataId=ref.dataId,
178 copy=ref.datasetType.name not in self._no_copy_on_cache,
179 ),
180 )
181 _LOG.debug("Cached dataset %s on put", ref)
182 return self._wrapped.put(obj, ref, provenance=provenance)
184 def pruneDatasets(
185 self,
186 refs: Iterable[DatasetRef],
187 *,
188 disassociate: bool = True,
189 unstore: bool = False,
190 tags: Iterable[str] = (),
191 purge: bool = False,
192 ) -> None:
193 refs = list(refs)
194 for ref in refs:
195 if cached := self._cache.get(ref.datasetType.name):
196 dataset_id, _ = cached
197 if dataset_id == ref.id:
198 del self._cache[ref.datasetType.name]
200 return self._wrapped.pruneDatasets(
201 refs, disassociate=disassociate, unstore=unstore, tags=tags, purge=purge
202 )
204 @property
205 def dimensions(self) -> DimensionUniverse:
206 return self._wrapped.dimensions
208 @property
209 def _datastore(self) -> Any:
210 return self._wrapped._datastore
212 @_datastore.setter # demanded by MyPy since we declare it to be an instance attribute in LimitedButler.
213 def _datastore(self, value: Any) -> None:
214 self._wrapped._datastore = value