Coverage for python / lsst / pipe / base / _dataset_handle.py: 18%
91 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 08:59 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 08:59 +0000
1# This file is part of pipe_base.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
27from __future__ import annotations
29__all__ = ["InMemoryDatasetHandle"]
31import dataclasses
32from typing import Any, cast
34from frozendict import frozendict
36from lsst.daf.butler import (
37 DataCoordinate,
38 DataId,
39 DimensionUniverse,
40 StorageClass,
41 StorageClassDelegate,
42 StorageClassFactory,
43)
46# Use an empty dataID as a default.
47def _default_dataId() -> DataCoordinate:
48 return DataCoordinate.make_empty(DimensionUniverse())
51@dataclasses.dataclass(frozen=True, init=False)
52class InMemoryDatasetHandle:
53 """An in-memory version of a `~lsst.daf.butler.DeferredDatasetHandle`.
55 Parameters
56 ----------
57 inMemoryDataset : `~typing.Any`
58 The dataset to be used by this handle.
59 storageClass : `~lsst.daf.butler.StorageClass` or `None`, optional
60 The storage class associated with the in-memory dataset. If `None`
61 and if a storage class is needed, an attempt will be made to work one
62 out from the underlying python type.
63 parameters : `dict` [`str`, `~typing.Any`]
64 Parameters to be used with `get`.
65 dataId : `~lsst.daf.butler.DataId` or `None`, optional
66 The dataId associated with this dataset. Only used for compatibility
67 with the Butler implementation. Can be used for logging messages
68 by calling code. If ``dataId`` is not specified, a default empty
69 dataId will be constructed.
70 copy : `bool`, optional
71 Whether to copy on `get` or not.
72 **kwargs : `~typing.Any`
73 If ``kwargs`` are provided without specifying a ``dataId``, those
74 parameters will be converted into a dataId-like entity.
75 """
77 _empty = DataCoordinate.make_empty(DimensionUniverse())
79 def __init__(
80 self,
81 inMemoryDataset: Any,
82 *,
83 storageClass: StorageClass | str | None = None,
84 parameters: dict[str, Any] | None = None,
85 dataId: DataId | None = None,
86 copy: bool = False,
87 **kwargs: Any,
88 ):
89 object.__setattr__(self, "inMemoryDataset", inMemoryDataset)
90 object.__setattr__(self, "storageClass", storageClass)
91 object.__setattr__(self, "parameters", parameters)
92 object.__setattr__(self, "copy", copy)
93 # Need to be able to construct a dataId from kwargs for convenience.
94 # This will not be a full DataCoordinate.
95 if dataId is None:
96 if kwargs:
97 dataId = frozendict(kwargs)
98 else:
99 dataId = self._empty
100 elif kwargs:
101 if isinstance(dataId, DataCoordinate):
102 dataId = DataCoordinate.standardize(kwargs, defaults=dataId, universe=dataId.universe)
103 else:
104 new = dict(dataId)
105 new.update(kwargs)
106 dataId = frozendict(new)
107 object.__setattr__(self, "dataId", dataId)
109 def get(
110 self,
111 *,
112 component: str | None = None,
113 parameters: dict | None = None,
114 storageClass: str | StorageClass | None = None,
115 ) -> Any:
116 """Retrieve the dataset pointed to by this handle.
118 This handle may be used multiple times, possibly with different
119 parameters.
121 Parameters
122 ----------
123 component : `str` or None
124 If the deferred object is a component dataset type, this parameter
125 may specify the name of the component to use in the get operation.
126 parameters : `dict` or None
127 The parameters argument will be passed to the butler get method.
128 It defaults to `None`. If the value is not `None`, this `dict` will
129 be merged with the parameters dict used to construct the
130 `~lsst.daf.butler.DeferredDatasetHandle` class.
131 storageClass : `~lsst.daf.butler.StorageClass` or `str`, optional
132 The storage class to be used to override the Python type
133 returned by this method. By default the returned type matches
134 the type stored. Specifying a read `~lsst.daf.butler.StorageClass`
135 can force a different type to be returned.
136 This type must be compatible with the original type.
138 Returns
139 -------
140 return : `object`
141 The dataset pointed to by this handle. Whether this returns the
142 original object or a copy is controlled by the ``copy`` property
143 of the handle that is set at handle construction time.
144 If the stored object is `None` this method always returns `None`
145 regardless of any component request or parameters.
147 Raises
148 ------
149 KeyError
150 Raised if a component or parameters are used but no storage
151 class can be found.
152 """
153 if self.inMemoryDataset is None:
154 return None
156 if self.parameters is not None:
157 mergedParameters = self.parameters.copy()
158 if parameters is not None:
159 mergedParameters.update(parameters)
160 elif parameters is not None:
161 mergedParameters = parameters
162 else:
163 mergedParameters = {}
165 returnStorageClass: StorageClass | None = None
166 if storageClass:
167 if isinstance(storageClass, str):
168 factory = StorageClassFactory()
169 returnStorageClass = factory.getStorageClass(storageClass)
170 else:
171 returnStorageClass = storageClass
173 inMemoryDataset = self.inMemoryDataset
175 if self.copy:
176 # An optimization might be to defer copying until any components
177 # and parameters have been applied. This can be a problem since
178 # most component storage classes do not bother to define a
179 # storage class delegate and the default delegate uses deepcopy()
180 # which can fail if explicit support for deepcopy() is missing
181 # or pickle does not work.
182 # Copying will require a storage class be determined, which is
183 # not normally required for the default case of no parameters and
184 # no components.
185 thisStorageClass = self._getStorageClass()
186 try:
187 delegate = thisStorageClass.delegate()
188 except TypeError:
189 # Try the default copy options if no delegate is available.
190 delegate = StorageClassDelegate(thisStorageClass)
192 inMemoryDataset = delegate.copy(inMemoryDataset)
194 if component or mergedParameters:
195 # This requires a storage class look up to locate the delegate
196 # class.
197 thisStorageClass = self._getStorageClass()
199 # Parameters for derived components are applied against the
200 # composite.
201 if component in thisStorageClass.derivedComponents:
202 # For some reason MyPy doesn't see the line above as narrowing
203 # 'component' from 'str | None' to 'str'.
204 component = cast(str, component)
205 thisStorageClass.validateParameters(parameters)
207 # Process the parameters (hoping this never modified the
208 # original object).
209 inMemoryDataset = thisStorageClass.delegate().handleParameters(
210 inMemoryDataset, mergedParameters
211 )
212 mergedParameters = {} # They have now been used
214 readStorageClass = thisStorageClass.derivedComponents[component]
215 else:
216 if component:
217 readStorageClass = thisStorageClass.components[component]
218 else:
219 readStorageClass = thisStorageClass
220 readStorageClass.validateParameters(mergedParameters)
222 if component:
223 inMemoryDataset = thisStorageClass.delegate().getComponent(inMemoryDataset, component)
225 if mergedParameters:
226 inMemoryDataset = readStorageClass.delegate().handleParameters(
227 inMemoryDataset, mergedParameters
228 )
230 if returnStorageClass:
231 return returnStorageClass.coerce_type(inMemoryDataset)
232 return inMemoryDataset
233 else:
234 # If there are no parameters or component requests the object
235 # can be returned as is, but possibly with conversion.
236 if returnStorageClass:
237 return returnStorageClass.coerce_type(inMemoryDataset)
238 return inMemoryDataset
240 def _getStorageClass(self) -> StorageClass:
241 """Return the relevant storage class.
243 Returns
244 -------
245 storageClass : `~lsst.daf.butler.StorageClass`
246 The storage class associated with this handle, or one derived
247 from the python type of the stored object.
249 Raises
250 ------
251 KeyError
252 Raised if the storage class could not be found.
253 """
254 factory = StorageClassFactory()
255 if self.storageClass:
256 if isinstance(self.storageClass, str):
257 return factory.getStorageClass(self.storageClass)
258 else:
259 return self.storageClass
261 # Need to match python type.
262 pytype = type(self.inMemoryDataset)
263 return factory.findStorageClass(pytype)
265 inMemoryDataset: Any
266 """The object to store in this dataset handle for later retrieval.
267 """
269 dataId: DataCoordinate | frozendict
270 """The `~lsst.daf.butler.DataCoordinate` associated with this dataset
271 handle.
272 """
274 storageClass: StorageClass | str | None = None
275 """The name of the `~lsst.daf.butler.StorageClass` associated with this
276 dataset.
278 If `None`, the storage class will be looked up from the factory.
279 """
281 parameters: dict | None = None
282 """Optional parameters that may be used to specify a subset of the dataset
283 to be loaded (`dict` or `None`).
284 """
286 copy: bool = False
287 """Control whether a copy of the in-memory dataset is returned for every
288 call to `get()`."""