Coverage for python / lsst / images / serialization / _output_archive.py: 80%

49 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-07 08:34 +0000

1# This file is part of lsst-images. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# Use of this source code is governed by a 3-clause BSD-style 

10# license that can be found in the LICENSE file. 

11 

12from __future__ import annotations 

13 

14__all__ = ( 

15 "NestedOutputArchive", 

16 "OutputArchive", 

17) 

18 

19from abc import ABC, abstractmethod 

20from collections.abc import Callable, Hashable, Iterator, Mapping 

21from typing import TYPE_CHECKING, TypeVar 

22 

23import astropy.io.fits 

24import astropy.table 

25import astropy.units 

26import numpy as np 

27import pydantic 

28 

29from ._asdf_utils import ArrayReferenceModel, InlineArrayModel 

30from ._common import ArchiveTree, no_header_updates 

31from ._tables import TableModel 

32 

33if TYPE_CHECKING: 

34 from .._transforms import FrameSet 

35 

36# This pre-python-3.12 declaration is needed by Sphinx (probably the 

37# autodoc-typehints plugin. 

38P = TypeVar("P", bound=pydantic.BaseModel) 

39 

40 

41class OutputArchive[P](ABC): 

42 """Abstract interface for writing to a file format. 

43 

44 Notes 

45 ----- 

46 An output archive instance is assumed to be paired with a Pydantic model 

47 that represents a JSON tree, with the archive used to serialize data that 

48 is not natively JSON into data that is (which may just be a reference to 

49 binary data stored elsewhere in the file). The archive doesn't actually 

50 hold that model instance because we don't want to assume it can be built 

51 via default-initialization and assignment, and because we'd prefer to avoid 

52 making the output archive generic over the model type. It is expected that 

53 most concrete archive implementations will accept the paired model in some 

54 sort of finalization method in order to write it into the file, but this is 

55 not part of the base class interface. 

56 """ 

57 

58 @abstractmethod 

59 def serialize_direct[T: pydantic.BaseModel]( 

60 self, name: str, serializer: Callable[[OutputArchive], T] 

61 ) -> T: 

62 """Use a serializer function to save a nested object. 

63 

64 Parameters 

65 ---------- 

66 name 

67 Attribute of the paired Pydantic model that will be assigned the 

68 result of this call. If it will not be assigned to a direct 

69 attribute, it may be a JSON Pointer path (relative to the paired 

70 Pydantic model) to the location where it will be added. 

71 serializer 

72 Callable that takes an `~lsst.serialization.OutputArchive` and 

73 returns a Pydantic model. This will be passed a new 

74 `~lsst.serialization.OutputArchive` that automatically prepends 

75 ``{name}/`` (and any root path added by this archive) to names 

76 passed to it, so the ``serializer`` does not need to know where it 

77 appears in the overall tree. 

78 

79 Returns 

80 ------- 

81 T 

82 Result of the call to the serializer. 

83 """ 

84 raise NotImplementedError() 

85 

86 @abstractmethod 

87 def serialize_pointer[T: ArchiveTree]( 

88 self, name: str, serializer: Callable[[OutputArchive], T], key: Hashable 

89 ) -> T | P: 

90 """Use a serializer function to save a nested object that may be 

91 referenced in multiple locations in the same archive. 

92 

93 Parameters 

94 ---------- 

95 name 

96 Attribute of the paired Pydantic model that will be assigned the 

97 result of this call. If it will not be assigned to a direct 

98 attribute, it may be a JSON Pointer path (relative to the paired 

99 Pydantic model) to the location where it will be added. 

100 serializer 

101 Callable that takes an `~lsst.serialization.OutputArchive` and 

102 returns a Pydantic model. This will be passed a new 

103 `~lsst.serialization.OutputArchive` that automatically prepends 

104 ``{name}/`` (and any root path added by this archive) to names 

105 passed to it, so the ``serializer`` does not need to know where it 

106 appears in the overall tree. 

107 key 

108 A unique identifier for the in-memory object the serializer saves, 

109 e.g. a call to the built-in `id` function. 

110 

111 Returns 

112 ------- 

113 T | P 

114 Either the result of the call to the serializer, or a Pydantic 

115 model that can be considered a reference to it and added to a 

116 larger model in its place. 

117 """ 

118 # Since Pydantic doesn't provide us a good way to "dereference" a JSON 

119 # Pointer (i.e. traversing the tree to extract the original model), it 

120 # is probably easier to implement an `InputArchive` for the case where 

121 # the `~lsst.serialization.OutputArchive` opts to stuff all pointer 

122 # serializations into a standard location outside the user-controlled 

123 # Pydantic model tree, and always returned a JSON pointer to that 

124 # standard location from this function. 

125 raise NotImplementedError() 

126 

127 @abstractmethod 

128 def serialize_frame_set[T: ArchiveTree]( 

129 self, name: str, frame_set: FrameSet, serializer: Callable[[OutputArchive], T], key: Hashable 

130 ) -> T | P: 

131 """Serialize a frame set and make it available to objects saved later. 

132 

133 Parameters 

134 ---------- 

135 name 

136 Attribute of the paired Pydantic model that will be assigned the 

137 result of this call. If it will not be assigned to a direct 

138 attribute, it may be a JSON Pointer path (relative to the paired 

139 Pydantic model) to the location where it will be added. 

140 frame_set 

141 The frame set being saved. This will be returned in later calls 

142 to `iter_frame_sets`, along with the returned reference object. 

143 serializer 

144 Callable that takes an `~lsst.serialization.OutputArchive` and 

145 returns a Pydantic model. This will be passed a new 

146 `~lsst.serialization.OutputArchive` that automatically prepends 

147 ``{name}/`` (and any root path added by this archive) to names 

148 passed to it, so the ``serializer`` does not need to know where it 

149 appears in the overall tree. 

150 key 

151 A unique identifier for the in-memory object the serializer saves, 

152 e.g. a call to the built-in `id` function. 

153 

154 Returns 

155 ------- 

156 T | P 

157 Either the result of the call to the serializer, or a Pydantic 

158 model that can be considered a reference to it and added to a 

159 larger model in its place. 

160 """ 

161 raise NotImplementedError() 

162 

163 @abstractmethod 

164 def iter_frame_sets(self) -> Iterator[tuple[FrameSet, P]]: 

165 """Iterate over the frame sets already serialized to this archive. 

166 

167 Yields 

168 ------ 

169 frame_set 

170 A frame set that has already been written to this archive. 

171 reference 

172 An implementation-specific reference model that points to the 

173 frame set. 

174 """ 

175 raise NotImplementedError() 

176 

177 @abstractmethod 

178 def add_array( 

179 self, 

180 array: np.ndarray, 

181 *, 

182 name: str | None = None, 

183 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates, 

184 ) -> ArrayReferenceModel | InlineArrayModel: 

185 """Add an array to the archive. 

186 

187 Parameters 

188 ---------- 

189 array 

190 Array to save. 

191 name 

192 Name of the array. This should generally be the name of the 

193 Pydantic model attribute to which the result will be assigned. It 

194 may be left `None` if there is only one [structured] array or 

195 table in a nested object that is being saved. 

196 update_header 

197 A callback that will be given the FITS header for the HDU 

198 containing this array in order to add keys to it. This callback 

199 may be provided but will not be called if the output format is not 

200 FITS. 

201 

202 Returns 

203 ------- 

204 `~lsst.images.serialization.ArrayReferenceModel` |\ 

205 `~lsst.images.serialization.InlineArrayModel` 

206 A Pydantic model that references or holds the stored array. 

207 """ 

208 raise NotImplementedError() 

209 

210 @abstractmethod 

211 def add_table( 

212 self, 

213 table: astropy.table.Table, 

214 *, 

215 name: str | None = None, 

216 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates, 

217 ) -> TableModel: 

218 """Add a table to the archive. 

219 

220 Parameters 

221 ---------- 

222 table 

223 Table to save. 

224 name 

225 Name of the table. This should generally be the name of the 

226 Pydantic model attribute to which the result will be assigned. It 

227 may be left `None` if there is only one [structured] array or 

228 table in a nested object that is being saved. 

229 update_header 

230 A callback that will be given the FITS header for the HDU 

231 containing this table in order to add keys to it. This callback 

232 may be provided but will not be called if the output format is not 

233 FITS. 

234 

235 Returns 

236 ------- 

237 TableModel 

238 A Pydantic model that represents the table. 

239 """ 

240 raise NotImplementedError() 

241 

242 @abstractmethod 

243 def add_structured_array( 

244 self, 

245 array: np.ndarray, 

246 *, 

247 name: str | None = None, 

248 units: Mapping[str, astropy.units.Unit] | None = None, 

249 descriptions: Mapping[str, str] | None = None, 

250 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates, 

251 ) -> TableModel: 

252 """Add a table to the archive. 

253 

254 Parameters 

255 ---------- 

256 name 

257 Attribute of the paired Pydantic model that will be assigned the 

258 result of this call. If it will not be assigned to a direct 

259 attribute, it may be a JSON Pointer path (relative to the paired 

260 Pydantic model) to the location where it will be added. 

261 array 

262 A structured numpy array. 

263 name 

264 Name of the array. This should generally be the name of the 

265 Pydantic model attribute to which the result will be assigned. It 

266 may be left `None` if there is only one [structured] array or 

267 table in a nested object that is being saved. 

268 units 

269 A mapping of units for columns. Need not be complete. 

270 descriptions 

271 A mapping of descriptions for columns. Need not be complete. 

272 update_header 

273 A callback that will be given the FITS header for the HDU 

274 containing this table in order to add keys to it. This callback 

275 may be provided but will not be called if the output format is not 

276 FITS. 

277 

278 Returns 

279 ------- 

280 TableModel 

281 A Pydantic model that represents the table. 

282 """ 

283 raise NotImplementedError() 

284 

285 

286class NestedOutputArchive[P: pydantic.BaseModel](OutputArchive[P]): 

287 """A proxy output archive that joins a root path into all names before 

288 delegating back to its parent archive. 

289 

290 This is intended to be used in the implementation of most 

291 `~lsst.serialization.OutputArchive.serialize_direct` and 

292 `~lsst.serialization.OutputArchive.serialize_pointer` implementations. 

293 

294 Parameters 

295 ---------- 

296 root 

297 Root of all JSON Pointer paths. Should include a leading slash (as we 

298 always use absolute JSON Pointers) but no trailing slash. 

299 parent 

300 Parent output archive to delegate to. 

301 """ 

302 

303 def __init__(self, root: str, parent: OutputArchive): 

304 self._root = root 

305 self._parent = parent 

306 

307 def serialize_direct[T: pydantic.BaseModel]( 

308 self, name: str, serializer: Callable[[OutputArchive[P]], T] 

309 ) -> T: 

310 return self._parent.serialize_direct(self._join_path(name), serializer) 

311 

312 def serialize_pointer[T: ArchiveTree]( 

313 self, name: str, serializer: Callable[[OutputArchive[P]], T], key: Hashable 

314 ) -> T | P: 

315 return self._parent.serialize_pointer(self._join_path(name), serializer, key) 

316 

317 def serialize_frame_set[T: ArchiveTree]( 

318 self, name: str, frame_set: FrameSet, serializer: Callable[[OutputArchive], T], key: Hashable 

319 ) -> T | P: 

320 return self._parent.serialize_frame_set(self._join_path(name), frame_set, serializer, key) 

321 

322 def iter_frame_sets(self) -> Iterator[tuple[FrameSet, P]]: 

323 return self._parent.iter_frame_sets() 

324 

325 def add_array( 

326 self, 

327 array: np.ndarray, 

328 *, 

329 name: str | None = None, 

330 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates, 

331 ) -> ArrayReferenceModel | InlineArrayModel: 

332 return self._parent.add_array(array, name=self._join_path(name), update_header=update_header) 

333 

334 def add_table( 

335 self, 

336 table: astropy.table.Table, 

337 *, 

338 name: str | None = None, 

339 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates, 

340 ) -> TableModel: 

341 return self._parent.add_table(table, name=self._join_path(name), update_header=update_header) 

342 

343 def add_structured_array( 

344 self, 

345 array: np.ndarray, 

346 *, 

347 name: str | None = None, 

348 units: Mapping[str, astropy.units.Unit] | None = None, 

349 descriptions: Mapping[str, str] | None = None, 

350 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates, 

351 ) -> TableModel: 

352 return self._parent.add_structured_array( 

353 array, 

354 name=self._join_path(name), 

355 units=units, 

356 descriptions=descriptions, 

357 update_header=update_header, 

358 ) 

359 

360 def _join_path(self, name: str | None) -> str: 

361 return f"{self._root}/{name}" if name is not None else self._root