Coverage for python / lsst / daf / butler / datastore / record_data.py: 27%

79 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-28 08:36 +0000

1# This file is part of daf_butler. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28"""Support for generic data stores.""" 

29 

30from __future__ import annotations 

31 

32__all__ = ("DatastoreRecordData", "SerializedDatastoreRecordData") 

33 

34import dataclasses 

35import uuid 

36from collections.abc import Mapping 

37from typing import TYPE_CHECKING, TypeAlias 

38 

39import pydantic 

40 

41from .._dataset_ref import DatasetId 

42from ..dimensions import DimensionUniverse 

43from ..persistence_context import PersistenceContextVars 

44from .stored_file_info import StoredDatastoreItemInfo 

45 

46if TYPE_CHECKING: 

47 from ..registry import Registry 

48 

49# Pydantic requires the possible value types to be explicitly enumerated in 

50# order for `uuid.UUID` in particular to work. `typing.Any` does not work 

51# here. 

52_Record: TypeAlias = dict[str, int | str | None] 

53 

54 

55class SerializedDatastoreRecordData(pydantic.BaseModel): 

56 """Representation of a `DatastoreRecordData` suitable for serialization.""" 

57 

58 dataset_ids: list[uuid.UUID] 

59 """List of dataset IDs""" 

60 

61 records: Mapping[str, Mapping[str, Mapping[str, list[_Record]]]] 

62 """List of records indexed by record class name, dataset ID (encoded as 

63 str, because JSON), and opaque table name. 

64 """ 

65 

66 @classmethod 

67 def direct( 

68 cls, 

69 *, 

70 dataset_ids: list[str | uuid.UUID], 

71 records: dict[str, dict[str, dict[str, list[_Record]]]], 

72 ) -> SerializedDatastoreRecordData: 

73 """Construct a `SerializedDatastoreRecordData` directly without 

74 validators. 

75 

76 Parameters 

77 ---------- 

78 dataset_ids : `list` [`str` or `uuid.UUID`] 

79 The dataset UUIDs. 

80 records : `dict` 

81 The datastore records. 

82 

83 Notes 

84 ----- 

85 This differs from the pydantic "construct" method in that the 

86 arguments are explicitly what the model requires, and it will recurse 

87 through members, constructing them from their corresponding `direct` 

88 methods. 

89 

90 This method should only be called when the inputs are trusted. 

91 """ 

92 data = cls.model_construct( 

93 _fields_set={"dataset_ids", "records"}, 

94 # JSON makes strings out of UUIDs, need to convert them back 

95 dataset_ids=[uuid.UUID(id) if isinstance(id, str) else id for id in dataset_ids], 

96 records=records, 

97 ) 

98 

99 return data 

100 

101 

102@dataclasses.dataclass 

103class DatastoreRecordData: 

104 """A struct that represents a tabular data export from a single 

105 datastore. 

106 """ 

107 

108 records: dict[DatasetId, dict[str, list[StoredDatastoreItemInfo]]] = dataclasses.field( 

109 default_factory=dict 

110 ) 

111 """Opaque table data, indexed by dataset ID and grouped by opaque table 

112 name.""" 

113 

114 @staticmethod 

115 def merge_mappings(*args: Mapping[str, DatastoreRecordData]) -> dict[str, DatastoreRecordData]: 

116 """Merge mappings of datastore record data. 

117 

118 Parameters 

119 ---------- 

120 *args : `~collections.abc.Mapping` [ `str`, `DatastoreRecordData` ] 

121 Mappings of record data, keyed by datastore name. 

122 

123 Returns 

124 ------- 

125 merged : `~collections.abc.Mapping` [ `str`, `DatastoreRecordData` ] 

126 Merged mapping of record data, keyed by datastore name. 

127 """ 

128 result: dict[str, DatastoreRecordData] = {} 

129 for arg in args: 

130 for datastore_name, record_data in arg.items(): 

131 if datastore_name not in result: 

132 result[datastore_name] = DatastoreRecordData() 

133 result[datastore_name].update(record_data) 

134 return result 

135 

136 def update(self, other: DatastoreRecordData) -> None: 

137 """Update contents of this instance with data from another instance. 

138 

139 Parameters 

140 ---------- 

141 other : `DatastoreRecordData` 

142 Records to merge into this instance. 

143 

144 Notes 

145 ----- 

146 If a ``(dataset_id, table_name)`` combination has any records in 

147 ``self``, it is assumed that all records for that combination are 

148 already present. This allows duplicates of the same dataset to be 

149 handled gracefully. 

150 """ 

151 for dataset_id, table_records in other.records.items(): 

152 this_table_records = self.records.setdefault(dataset_id, {}) 

153 for table_name, records in table_records.items(): 

154 # If this (dataset_id, table_name) combination already has 

155 # records in `self`, we assume that means all of the records 

156 # for that combination; we require other code to ensure entire 

157 # (parent) datasets are exported to these data structures 

158 # (never components). 

159 if not (this_records := this_table_records.setdefault(table_name, [])): 

160 this_records.extend(records) 

161 

162 def subset(self, dataset_ids: set[DatasetId]) -> DatastoreRecordData | None: 

163 """Extract a subset of the records that match given dataset IDs. 

164 

165 Parameters 

166 ---------- 

167 dataset_ids : `set` [ `DatasetId` ] 

168 Dataset IDs to match. 

169 

170 Returns 

171 ------- 

172 record_data : `DatastoreRecordData` or `None` 

173 `None` is returned if there are no matching refs. 

174 

175 Notes 

176 ----- 

177 Records in the returned instance are shared with this instance, clients 

178 should not update or extend records in the returned instance. 

179 """ 

180 matching_records: dict[DatasetId, dict[str, list[StoredDatastoreItemInfo]]] = {} 

181 for dataset_id in dataset_ids: 

182 if (id_records := self.records.get(dataset_id)) is not None: 

183 matching_records[dataset_id] = id_records 

184 if matching_records: 

185 return DatastoreRecordData(records=matching_records) 

186 else: 

187 return None 

188 

189 def to_simple(self, minimal: bool = False) -> SerializedDatastoreRecordData: 

190 """Make representation of the object for serialization. 

191 

192 Implements `~lsst.daf.butler.json.SupportsSimple` protocol. 

193 

194 Parameters 

195 ---------- 

196 minimal : `bool`, optional 

197 If True produce minimal representation, not used by this method. 

198 

199 Returns 

200 ------- 

201 simple : `dict` 

202 Representation of this instance as a simple dictionary. 

203 """ 

204 records: dict[str, dict[str, dict[str, list[_Record]]]] = {} 

205 for dataset_id, table_data in self.records.items(): 

206 for table_name, table_records in table_data.items(): 

207 class_name, infos = StoredDatastoreItemInfo.to_records(table_records) 

208 class_records = records.setdefault(class_name, {}) 

209 dataset_records = class_records.setdefault(dataset_id.hex, {}) 

210 dataset_records.setdefault(table_name, []).extend(dict(info) for info in infos) 

211 return SerializedDatastoreRecordData(dataset_ids=list(self.records.keys()), records=records) 

212 

213 @classmethod 

214 def from_simple( 

215 cls, 

216 simple: SerializedDatastoreRecordData, 

217 universe: DimensionUniverse | None = None, 

218 registry: Registry | None = None, 

219 ) -> DatastoreRecordData: 

220 """Make an instance of this class from serialized data. 

221 

222 Implements `~lsst.daf.butler.json.SupportsSimple` protocol. 

223 

224 Parameters 

225 ---------- 

226 simple : `dict` 

227 Serialized representation returned from `to_simple` method. 

228 universe : `DimensionUniverse`, optional 

229 Dimension universe, not used by this method. 

230 registry : `Registry`, optional 

231 Registry instance, not used by this method. 

232 

233 Returns 

234 ------- 

235 item_info : `StoredDatastoreItemInfo` 

236 De-serialized instance of `StoredDatastoreItemInfo`. 

237 """ 

238 cache = PersistenceContextVars.dataStoreRecords.get() 

239 key = frozenset(simple.dataset_ids) 

240 if cache is not None and (cachedRecord := cache.get(key)) is not None: 

241 return cachedRecord 

242 records: dict[DatasetId, dict[str, list[StoredDatastoreItemInfo]]] = {} 

243 # make sure that all dataset IDs appear in the dict even if they don't 

244 # have records. 

245 for dataset_id in simple.dataset_ids: 

246 records[dataset_id] = {} 

247 for class_name, class_data in simple.records.items(): 

248 for dataset_id_str, dataset_data in class_data.items(): 

249 for table_name, table_records in dataset_data.items(): 

250 try: 

251 infos = StoredDatastoreItemInfo.from_records(class_name, table_records) 

252 except TypeError as exc: 

253 raise RuntimeError( 

254 "The class specified in the SerializedDatastoreRecordData " 

255 f"({class_name}) is not a StoredDatastoreItemInfo." 

256 ) from exc 

257 dataset_records = records.setdefault(uuid.UUID(dataset_id_str), {}) 

258 dataset_records.setdefault(table_name, []).extend(infos) 

259 newRecord = cls(records=records) 

260 if cache is not None: 

261 cache[key] = newRecord 

262 return newRecord