Coverage for python / lsst / pipe / base / tests / in_memory_limited_butler.py: 36%

64 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-28 08:44 +0000

1# This file is part of pipe_base. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28from __future__ import annotations 

29 

30__all__ = ["InMemoryLimitedButler"] 

31 

32import logging 

33import uuid 

34from collections.abc import Iterable 

35from typing import Any 

36 

37from lsst.daf.butler import ( 

38 ButlerMetrics, 

39 DatasetProvenance, 

40 DatasetRef, 

41 DatasetType, 

42 DimensionUniverse, 

43 LimitedButler, 

44 MissingDatasetTypeError, 

45 Quantum, 

46 StorageClass, 

47 StorageClassFactory, 

48) 

49from lsst.daf.butler.registry import ConflictingDefinitionError 

50 

51from .._dataset_handle import InMemoryDatasetHandle 

52 

53_LOG = logging.getLogger(__name__) 

54 

55 

56class InMemoryLimitedButler(LimitedButler): 

57 """A `LimitedButler` that just stores datasets in an in-memory mapping. 

58 

59 Parameters 

60 ---------- 

61 universe : `lsst.daf.butler.DimensionUniverse` 

62 Definitions for all dimensions. 

63 dataset_types : `~collections.abc.Iterable` [ \ 

64 `lsst.daf.butler.DatasetType` ] 

65 Definitions of all dataset types. 

66 

67 Notes 

68 ----- 

69 This is an incomplete implementation of the `LimitedButler` interface 

70 intended only for tests. It supports all methods required by 

71 `SingleQuantumExecutor`, but not transfers or URI retrieval. 

72 

73 While this class supports storage class conversions in `get` and `put`, it 

74 uses different code paths from real butlers, and should not be used in 

75 tests in which storage class correctness is part of what is being tested. 

76 

77 Objects are always copied (via storage class machinery) by `get`. 

78 

79 Pickling this class will pickle all datasets already `put` (which must be 

80 pickleable). This generally allows a central butler to be initialized with 

81 input datasets in one process and distributed to worker processes that run 

82 quanta *once*, but it does not allow outputs from a worker process to be 

83 distributed to others or the originating process. This can be hard to 

84 notice because quanta will usually be skipped with 

85 `lsst.pipe.base.NoWorkFound` (a success!) when all of their inputs are 

86 missing. 

87 """ 

88 

89 def __init__(self, universe: DimensionUniverse, dataset_types: Iterable[DatasetType] = ()): 

90 self.storageClasses = StorageClassFactory() 

91 self._universe = universe 

92 self._datasets: dict[uuid.UUID, tuple[DatasetRef, InMemoryDatasetHandle]] = {} 

93 self._metrics = ButlerMetrics() 

94 self._dataset_types = {dt.name: dt for dt in dataset_types} 

95 assert not any(dt.component() for dt in self._dataset_types.values()), ( 

96 "Dataset type definitions must not be components." 

97 ) 

98 

99 def __getstate__(self) -> dict[str, Any]: 

100 # Pickle customization is needed because StorageClassFactory is not 

101 # pickleable. 

102 return { 

103 "universe": self._universe, 

104 "datasets": self._datasets, 

105 "dataset_types": self._dataset_types, 

106 } 

107 

108 def __setstate__(self, state: dict[str, Any]) -> None: 

109 self.storageClasses = StorageClassFactory() 

110 self._universe = state["universe"] 

111 self._datasets = state["datasets"] 

112 self._metrics = ButlerMetrics() 

113 self._dataset_types = state["dataset_types"] 

114 

115 def get_datasets(self, dataset_type: str | None = None) -> dict[DatasetRef, object]: 

116 """Return datasets that have been `put` to this butler. 

117 

118 Storage classes and corresponding Python types will match the dataset 

119 type definitions provided at butler construction, which may not be the 

120 same as what was `put`. 

121 

122 Parameters 

123 ---------- 

124 dataset_type : `str`, optional 

125 Dataset type name used to filter results. 

126 

127 Returns 

128 ------- 

129 refs : `dict` [ `lsst.daf.butler.DatasetRef`, `object` ] 

130 Datasets held by this butler. 

131 """ 

132 return { 

133 ref: handle.get() 

134 for ref, handle in self._datasets.values() 

135 if dataset_type is None or dataset_type == ref.datasetType.name 

136 } 

137 

138 def isWriteable(self) -> bool: 

139 return True 

140 

141 def put(self, obj: Any, ref: DatasetRef, /, *, provenance: DatasetProvenance | None = None) -> DatasetRef: 

142 with self._metrics.instrument_put(): 

143 assert not ref.isComponent(), "Component dataset types cannot be put." 

144 if ref.id in self._datasets: 

145 # Some butlers may not raise reliably when a dataset already 

146 # exists (it's hard to be rigorous in parallel given different 

147 # guarantees provided by storage), but we don't want code to 

148 # rely on it not being an error, so we want a test butler to 

149 # always complain. 

150 raise ConflictingDefinitionError(f"Dataset {ref} already exists.") 

151 if (repo_dataset_type := self._dataset_types.get(ref.datasetType.name)) is None: 

152 raise MissingDatasetTypeError(f"Dataset type {ref.datasetType.name!r} not recognized.") 

153 repo_dataset_type.storageClass.coerce_type(obj) 

154 self._datasets[ref.id] = ( 

155 ref.overrideStorageClass(repo_dataset_type.storageClass), 

156 InMemoryDatasetHandle( 

157 obj, 

158 storageClass=repo_dataset_type.storageClass, 

159 dataId=ref.dataId, 

160 copy=True, 

161 ), 

162 ) 

163 return ref 

164 

165 def get( 

166 self, 

167 ref: DatasetRef, 

168 /, 

169 *, 

170 parameters: dict[str, Any] | None = None, 

171 storageClass: StorageClass | str | None = None, 

172 ) -> Any: 

173 with self._metrics.instrument_get(): 

174 if storageClass is None: 

175 storageClass = ref.datasetType.storageClass 

176 elif isinstance(storageClass, str): 

177 storageClass = self.storageClasses.getStorageClass(storageClass) 

178 if entry := self._datasets.get(ref.id): 

179 (ref, handle) = entry 

180 return handle.get( 

181 component=ref.datasetType.component(), parameters=parameters, storageClass=storageClass 

182 ) 

183 raise FileNotFoundError(f"Dataset {ref} does not exist.") 

184 

185 def stored_many(self, refs: Iterable[DatasetRef]) -> dict[DatasetRef, bool]: 

186 return {ref: ref.id in self._datasets for ref in refs} 

187 

188 def pruneDatasets( 

189 self, 

190 refs: Iterable[DatasetRef], 

191 *, 

192 disassociate: bool = True, 

193 unstore: bool = False, 

194 tags: Iterable[str] = (), 

195 purge: bool = False, 

196 ) -> None: 

197 for ref in refs: 

198 self._datasets.pop(ref.id, None) 

199 

200 @property 

201 def _datastore(self) -> Any: 

202 raise NotImplementedError("This test butler does not have a datastore.") 

203 

204 @_datastore.setter # demanded by MyPy since we declare it to be an instance attribute in LimitedButler. 

205 def _datastore(self, value: Any) -> None: 

206 raise NotImplementedError("This test butler does not have a datastore.") 

207 

208 @property 

209 def dimensions(self) -> DimensionUniverse: 

210 return self._universe 

211 

212 def factory(self, quantum: Quantum) -> InMemoryLimitedButler: 

213 """Return ``self``. 

214 

215 This method can be used as the ``limited_butler_factory`` argument to 

216 `.single_quantum_executor.SingleQuantumExecutor`. 

217 

218 Parameters 

219 ---------- 

220 quantum : `lsst.daf.butler.Quantum` 

221 Ignored. 

222 """ 

223 return self