Coverage for python / lsst / images / serialization / _output_archive.py: 80%

49 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-23 08:41 +0000

1# This file is part of lsst-images. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# Use of this source code is governed by a 3-clause BSD-style 

10# license that can be found in the LICENSE file. 

11 

12from __future__ import annotations 

13 

14__all__ = ( 

15 "NestedOutputArchive", 

16 "OutputArchive", 

17) 

18 

19from abc import ABC, abstractmethod 

20from collections.abc import Callable, Hashable, Iterator, Mapping 

21from typing import TYPE_CHECKING, TypeVar 

22 

23import astropy.io.fits 

24import astropy.table 

25import astropy.units 

26import numpy as np 

27import pydantic 

28 

29from ._asdf_utils import ArrayReferenceModel 

30from ._common import ArchiveTree, no_header_updates 

31from ._tables import TableReferenceModel 

32 

33if TYPE_CHECKING: 

34 from .._transforms import FrameSet 

35 

36# This pre-python-3.12 declaration is needed by Sphinx (probably the 

37# autodoc-typehints plugin. 

38P = TypeVar("P", bound=pydantic.BaseModel) 

39 

40 

41class OutputArchive[P](ABC): 

42 """Abstract interface for writing to a file format. 

43 

44 Notes 

45 ----- 

46 An output archive instance is assumed to be paired with a Pydantic model 

47 that represents a JSON tree, with the archive used to serialize data that 

48 is not natively JSON into data that is (which may just be a reference to 

49 binary data stored elsewhere in the file). The archive doesn't actually 

50 hold that model instance because we don't want to assume it can be built 

51 via default-initialization and assignment, and because we'd prefer to avoid 

52 making the output archive generic over the model type. It is expected that 

53 most concrete archive implementations will accept the paired model in some 

54 sort of finalization method in order to write it into the file, but this is 

55 not part of the base class interface. 

56 """ 

57 

58 @abstractmethod 

59 def serialize_direct[T: pydantic.BaseModel]( 

60 self, name: str, serializer: Callable[[OutputArchive], T] 

61 ) -> T: 

62 """Use a serializer function to save a nested object. 

63 

64 Parameters 

65 ---------- 

66 name 

67 Attribute of the paired Pydantic model that will be assigned the 

68 result of this call. If it will not be assigned to a direct 

69 attribute, it may be a JSON Pointer path (relative to the paired 

70 Pydantic model) to the location where it will be added. 

71 serializer 

72 Callable that takes an `~lsst.serialization.OutputArchive` and 

73 returns a Pydantic model. This will be passed a new 

74 `~lsst.serialization.OutputArchive` that automatically prepends 

75 ``{name}/`` (and any root path added by this archive) to names 

76 passed to it, so the ``serializer`` does not need to know where it 

77 appears in the overall tree. 

78 

79 Returns 

80 ------- 

81 T 

82 Result of the call to the serializer. 

83 """ 

84 raise NotImplementedError() 

85 

86 @abstractmethod 

87 def serialize_pointer[T: ArchiveTree]( 

88 self, name: str, serializer: Callable[[OutputArchive], T], key: Hashable 

89 ) -> T | P: 

90 """Use a serializer function to save a nested object that may be 

91 referenced in multiple locations in the same archive. 

92 

93 Parameters 

94 ---------- 

95 name 

96 Attribute of the paired Pydantic model that will be assigned the 

97 result of this call. If it will not be assigned to a direct 

98 attribute, it may be a JSON Pointer path (relative to the paired 

99 Pydantic model) to the location where it will be added. 

100 serializer 

101 Callable that takes an `~lsst.serialization.OutputArchive` and 

102 returns a Pydantic model. This will be passed a new 

103 `~lsst.serialization.OutputArchive` that automatically prepends 

104 ``{name}/`` (and any root path added by this archive) to names 

105 passed to it, so the ``serializer`` does not need to know where it 

106 appears in the overall tree. 

107 key 

108 A unique identifier for the in-memory object the serializer saves, 

109 e.g. a call to the built-in `id` function. 

110 

111 Returns 

112 ------- 

113 T | P 

114 Either the result of the call to the serializer, or a Pydantic 

115 model that can be considered a reference to it and added to a 

116 larger model in its place. 

117 """ 

118 # Since Pydantic doesn't provide us a good way to "dereference" a JSON 

119 # Pointer (i.e. traversing the tree to extract the original model), it 

120 # is probably easier to implement an `InputArchive` for the case where 

121 # the `~lsst.serialization.OutputArchive` opts to stuff all pointer 

122 # serializations into a standard location outside the user-controlled 

123 # Pydantic model tree, and always returned a JSON pointer to that 

124 # standard location from this function. 

125 raise NotImplementedError() 

126 

127 @abstractmethod 

128 def serialize_frame_set[T: ArchiveTree]( 

129 self, name: str, frame_set: FrameSet, serializer: Callable[[OutputArchive], T], key: Hashable 

130 ) -> T | P: 

131 """Serialize a frame set and make it available to objects saved later. 

132 

133 Parameters 

134 ---------- 

135 name 

136 Attribute of the paired Pydantic model that will be assigned the 

137 result of this call. If it will not be assigned to a direct 

138 attribute, it may be a JSON Pointer path (relative to the paired 

139 Pydantic model) to the location where it will be added. 

140 frame_set 

141 The frame set being saved. This will be returned in later calls 

142 to `iter_frame_sets`, along with the returned reference object. 

143 serializer 

144 Callable that takes an `~lsst.serialization.OutputArchive` and 

145 returns a Pydantic model. This will be passed a new 

146 `~lsst.serialization.OutputArchive` that automatically prepends 

147 ``{name}/`` (and any root path added by this archive) to names 

148 passed to it, so the ``serializer`` does not need to know where it 

149 appears in the overall tree. 

150 key 

151 A unique identifier for the in-memory object the serializer saves, 

152 e.g. a call to the built-in `id` function. 

153 

154 Returns 

155 ------- 

156 T | P 

157 Either the result of the call to the serializer, or a Pydantic 

158 model that can be considered a reference to it and added to a 

159 larger model in its place. 

160 """ 

161 raise NotImplementedError() 

162 

163 @abstractmethod 

164 def iter_frame_sets(self) -> Iterator[tuple[FrameSet, P]]: 

165 """Iterate over the frame sets already serialized to this archive. 

166 

167 Yields 

168 ------ 

169 frame_set 

170 A frame set that has already been written to this archive. 

171 reference 

172 An implementation-specific reference model that points to the 

173 frame set. 

174 """ 

175 raise NotImplementedError() 

176 

177 @abstractmethod 

178 def add_array( 

179 self, 

180 array: np.ndarray, 

181 *, 

182 name: str | None = None, 

183 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates, 

184 ) -> ArrayReferenceModel: 

185 """Add an array to the archive. 

186 

187 Parameters 

188 ---------- 

189 array 

190 Array to save. 

191 name 

192 Name of the array. This should generally be the name of the 

193 Pydantic model attribute to which the result will be assigned. It 

194 may be left `None` if there is only one [structured] array or 

195 table in a nested object that is being saved. 

196 update_header 

197 A callback that will be given the FITS header for the HDU 

198 containing this array in order to add keys to it. This callback 

199 may be provided but will not be called if the output format is not 

200 FITS. 

201 

202 Returns 

203 ------- 

204 ArrayReferenceModel 

205 A Pydantic model that references the stored array. 

206 """ 

207 raise NotImplementedError() 

208 

209 @abstractmethod 

210 def add_table( 

211 self, 

212 table: astropy.table.Table, 

213 *, 

214 name: str | None = None, 

215 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates, 

216 ) -> TableReferenceModel: 

217 """Add a table to the archive. 

218 

219 Parameters 

220 ---------- 

221 table 

222 Table to save. 

223 name 

224 Name of the table. This should generally be the name of the 

225 Pydantic model attribute to which the result will be assigned. It 

226 may be left `None` if there is only one [structured] array or 

227 table in a nested object that is being saved. 

228 update_header 

229 A callback that will be given the FITS header for the HDU 

230 containing this table in order to add keys to it. This callback 

231 may be provided but will not be called if the output format is not 

232 FITS. 

233 

234 Returns 

235 ------- 

236 TableReferenceModel 

237 A Pydantic model that represents the table. Column definitions 

238 are included directly in the model while the actual data is 

239 stored elsewhere and referenced by the model. 

240 """ 

241 # TODO: ASDF has schemas for tables and columns that we should probably 

242 # adopt [a subset of]. While that can reference external per-column 

243 # data (which would Just Work for a true ASDF archive), I'm not sure 

244 # there's a way to reference external data in a FITS binary table 

245 # column. We could of course invent one, and since ASDF-in-FITS isn't 

246 # even referenced on the ASDF standard page our existing approach for 

247 # referencing FITS data in an image extension may be something only 

248 # we'll be using, too. 

249 raise NotImplementedError() 

250 

251 @abstractmethod 

252 def add_structured_array( 

253 self, 

254 array: np.ndarray, 

255 *, 

256 name: str | None = None, 

257 units: Mapping[str, astropy.units.Unit] | None = None, 

258 descriptions: Mapping[str, str] | None = None, 

259 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates, 

260 ) -> TableReferenceModel: 

261 """Add a table to the archive. 

262 

263 Parameters 

264 ---------- 

265 name 

266 Attribute of the paired Pydantic model that will be assigned the 

267 result of this call. If it will not be assigned to a direct 

268 attribute, it may be a JSON Pointer path (relative to the paired 

269 Pydantic model) to the location where it will be added. 

270 array 

271 A structured numpy array. 

272 name 

273 Name of the array. This should generally be the name of the 

274 Pydantic model attribute to which the result will be assigned. It 

275 may be left `None` if there is only one [structured] array or 

276 table in a nested object that is being saved. 

277 units 

278 A mapping of units for columns. Need not be complete. 

279 descriptions 

280 A mapping of descriptions for columns. Need not be complete. 

281 update_header 

282 A callback that will be given the FITS header for the HDU 

283 containing this table in order to add keys to it. This callback 

284 may be provided but will not be called if the output format is not 

285 FITS. 

286 

287 Returns 

288 ------- 

289 TableReferenceModel 

290 A Pydantic model that represents the table. Column definitions 

291 are included directly in the model while the actual data is 

292 stored elsewhere and referenced by the model. 

293 """ 

294 # TODO: ASDF has schemas for tables and columns that we should probably 

295 # adopt [a subset of]. While that can reference external per-column 

296 # data (which would Just Work for a true ASDF archive), I'm not sure 

297 # there's a way to reference external data in a FITS binary table 

298 # column. We could of course invent one, and since ASDF-in-FITS isn't 

299 # even referenced on the ASDF standard page our existing approach for 

300 # referencing FITS data in an image extension may be something only 

301 # we'll be using, too. 

302 raise NotImplementedError() 

303 

304 

305class NestedOutputArchive[P: pydantic.BaseModel](OutputArchive[P]): 

306 """A proxy output archive that joins a root path into all names before 

307 delegating back to its parent archive. 

308 

309 This is intended to be used in the implementation of most 

310 `~lsst.serialization.OutputArchive.serialize_direct` and 

311 `~lsst.serialization.OutputArchive.serialize_pointer` implementations. 

312 

313 Parameters 

314 ---------- 

315 root 

316 Root of all JSON Pointer paths. Should include a leading slash (as we 

317 always use absolute JSON Pointers) but no trailing slash. 

318 parent 

319 Parent output archive to delegate to. 

320 """ 

321 

322 def __init__(self, root: str, parent: OutputArchive): 

323 self._root = root 

324 self._parent = parent 

325 

326 def serialize_direct[T: pydantic.BaseModel]( 

327 self, name: str, serializer: Callable[[OutputArchive[P]], T] 

328 ) -> T: 

329 return self._parent.serialize_direct(self._join_path(name), serializer) 

330 

331 def serialize_pointer[T: ArchiveTree]( 

332 self, name: str, serializer: Callable[[OutputArchive[P]], T], key: Hashable 

333 ) -> T | P: 

334 return self._parent.serialize_pointer(self._join_path(name), serializer, key) 

335 

336 def serialize_frame_set[T: ArchiveTree]( 

337 self, name: str, frame_set: FrameSet, serializer: Callable[[OutputArchive], T], key: Hashable 

338 ) -> T | P: 

339 return self._parent.serialize_frame_set(self._join_path(name), frame_set, serializer, key) 

340 

341 def iter_frame_sets(self) -> Iterator[tuple[FrameSet, P]]: 

342 return self._parent.iter_frame_sets() 

343 

344 def add_array( 

345 self, 

346 array: np.ndarray, 

347 *, 

348 name: str | None = None, 

349 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates, 

350 ) -> ArrayReferenceModel: 

351 return self._parent.add_array(array, name=self._join_path(name), update_header=update_header) 

352 

353 def add_table( 

354 self, 

355 table: astropy.table.Table, 

356 *, 

357 name: str | None = None, 

358 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates, 

359 ) -> TableReferenceModel: 

360 return self._parent.add_table(table, name=self._join_path(name), update_header=update_header) 

361 

362 def add_structured_array( 

363 self, 

364 array: np.ndarray, 

365 *, 

366 name: str | None = None, 

367 units: Mapping[str, astropy.units.Unit] | None = None, 

368 descriptions: Mapping[str, str] | None = None, 

369 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates, 

370 ) -> TableReferenceModel: 

371 return self._parent.add_structured_array( 

372 array, 

373 name=self._join_path(name), 

374 units=units, 

375 descriptions=descriptions, 

376 update_header=update_header, 

377 ) 

378 

379 def _join_path(self, name: str | None) -> str: 

380 return f"{self._root}/{name}" if name is not None else self._root