Coverage for python / lsst / images / fits / _output_archive.py: 22%

203 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-28 09:01 +0000

1# This file is part of lsst-images. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# Use of this source code is governed by a 3-clause BSD-style 

10# license that can be found in the LICENSE file. 

11 

12from __future__ import annotations 

13 

14__all__ = ("FitsOutputArchive", "write") 

15 

16import dataclasses 

17from collections import Counter 

18from collections.abc import Callable, Hashable, Iterator, Mapping 

19from contextlib import contextmanager 

20from typing import Any, Self 

21 

22import astropy.io.fits 

23import astropy.table 

24import numpy as np 

25import pydantic 

26 

27from .._transforms import FrameSet 

28from ..serialization import ( 

29 ArchiveTree, 

30 ArrayReferenceModel, 

31 ButlerInfo, 

32 ColumnDefinitionModel, 

33 MetadataValue, 

34 NestedOutputArchive, 

35 NumberType, 

36 OutputArchive, 

37 TableCellReferenceModel, 

38 TableReferenceModel, 

39 no_header_updates, 

40) 

41from ._common import ExtensionHDU, ExtensionKey, FitsCompressionOptions, FitsOpaqueMetadata 

42 

43 

44def write( 

45 obj: Any, 

46 filename: str, 

47 compression_options: Mapping[str, FitsCompressionOptions | None] | None = None, 

48 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates, 

49 compression_seed: int | None = None, 

50 metadata: dict[str, MetadataValue] | None = None, 

51 butler_info: ButlerInfo | None = None, 

52) -> Any: 

53 """Write an object with a ``serialize`` method to a FITS file. 

54 

55 Parameters 

56 ---------- 

57 filename 

58 Name of the file to write to. Must not already exist. 

59 compression_options 

60 Options for how to compress the FITS file, keyed by the name of 

61 the attribute (with JSON pointer ``/`` separators for nested 

62 attributes). 

63 update_header 

64 A callback that will be given the primary HDU FITS header and an 

65 opportunity to modify it. 

66 compression_seed 

67 A FITS tile compression seed to use whenever the configured 

68 compression seed is `None` or (for backwards compatibility) ``0``. 

69 This value is then incremented every time it is used. 

70 metadata 

71 Additional metadata to save with the object. This will override any 

72 flexible metadata carried by the object itself with the same keys. 

73 butler_info 

74 Butler information to store in the file. 

75 

76 Returns 

77 ------- 

78 `.serialization.ArchiveTree` 

79 The serialized representation of the object. 

80 """ 

81 opaque_metadata = getattr(obj, "_opaque_metadata", None) 

82 name = getattr(obj, "_archive_default_name", None) 

83 with FitsOutputArchive.open( 

84 filename, 

85 compression_options=compression_options, 

86 opaque_metadata=opaque_metadata, 

87 update_header=update_header, 

88 compression_seed=compression_seed, 

89 ) as archive: 

90 tree = archive.serialize_direct(name, obj.serialize) if name is not None else obj.serialize(archive) 

91 if metadata is not None: 

92 tree.metadata.update(metadata) 

93 if butler_info is not None: 

94 tree.butler_info = butler_info 

95 archive.add_tree(tree) 

96 return tree 

97 

98 

99class FitsOutputArchive(OutputArchive[TableCellReferenceModel]): 

100 """An implementation of the `.serialization.OutputArchive` interface that 

101 writes to FITS files. 

102 

103 Instances of this class should only be constructed via the `open` 

104 context manager. 

105 """ 

106 

107 def __init__( 

108 self, 

109 hdu_list: astropy.io.fits.HDUList, 

110 compression_options: Mapping[str, FitsCompressionOptions | None] | None = None, 

111 opaque_metadata: Any = None, 

112 compression_seed: int | None = None, 

113 ): 

114 # JSON blobs for objects we've saved as pointers: 

115 self._pointer_targets: list[bytes] = [] 

116 # Mapping from user provided key (e.g. id(some object)) to a table 

117 # pointer to where we actually saved it: 

118 self._pointers_by_key: dict[Hashable, TableCellReferenceModel] = {} 

119 self._hdu_list = hdu_list 

120 self._primary_hdu = astropy.io.fits.PrimaryHDU() 

121 # TODO: add subformat description and version to primary HDU. 

122 self._primary_hdu.header.set("INDXADDR", 0, "Offset in bytes to the HDU index.") 

123 self._primary_hdu.header.set("INDXSIZE", 0, "Size of the HDU index.") 

124 self._primary_hdu.header.set("JSONADDR", 0, "Offset in bytes to the JSON tree HDU.") 

125 self._primary_hdu.header.set("JSONSIZE", 0, "Size of the JSON tree HDU.") 

126 self._hdus_by_name = Counter[str]() 

127 self._compression_options = dict(compression_options) if compression_options is not None else {} 

128 self._compression_seed = compression_seed 

129 self._opaque_metadata = ( 

130 opaque_metadata if isinstance(opaque_metadata, FitsOpaqueMetadata) else FitsOpaqueMetadata() 

131 ) 

132 if (opaque_primary_header := self._opaque_metadata.headers.get(ExtensionKey())) is not None: 

133 self._primary_hdu.header.extend(opaque_primary_header) 

134 self._hdu_list.append(self._primary_hdu) 

135 self._json_hdu_added: bool = False 

136 self._frame_sets: list[tuple[FrameSet, TableCellReferenceModel]] = [] 

137 

138 @classmethod 

139 @contextmanager 

140 def open( 

141 cls, 

142 filename: str, 

143 compression_options: Mapping[str, FitsCompressionOptions | None] | None = None, 

144 opaque_metadata: Any = None, 

145 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates, 

146 compression_seed: int | None = None, 

147 ) -> Iterator[Self]: 

148 """Create an output archive that writes to the given file. 

149 

150 Parameters 

151 ---------- 

152 filename 

153 Name of the file to write to. Must not already exist. 

154 compression_options 

155 Options for how to compress the FITS file, keyed by the name of 

156 the attribute (with JSON pointer ``/`` separators for nested 

157 attributes). 

158 opaque_metadata 

159 Metadata read from an input archive along with the object being 

160 written now. Ignored if the metadata is not from a FITS archive. 

161 update_header 

162 A callback that will be given the primary HDU FITS header and an 

163 opportunity to modify it. 

164 compression_seed 

165 A FITS tile compression seed to use whenever the configured 

166 compression seed is `None` or (for backwards compatibility) ``0``. 

167 This value is then incremented every time it is used. 

168 

169 Returns 

170 ------- 

171 `contextlib.AbstractContextManager` [`FitsOutputArchive`] 

172 A context manager that returns a `FitsOutputArchive` when entered. 

173 """ 

174 with astropy.io.fits.open(filename, mode="append") as hdu_list: 

175 if hdu_list: 

176 raise OSError(f"File {filename!r} already exists.") 

177 archive = cls(hdu_list, compression_options, opaque_metadata, compression_seed=compression_seed) 

178 update_header(hdu_list[0].header) 

179 yield archive 

180 if not archive._json_hdu_added: 

181 raise RuntimeError("Write context exited without 'add_tree' being called.") 

182 hdu_list.flush() 

183 # This multi-open dance is necessary to get Astropy to tell us the 

184 # byte addresses of the HDUs. Hopefully we can get an upstream change 

185 # make this unnecessary at some point. 

186 with astropy.io.fits.open(filename, mode="readonly", disable_image_compression=True) as hdu_list: 

187 index_hdu = cls._make_index_table(hdu_list) 

188 with astropy.io.fits.open(filename, mode="append") as hdu_list: 

189 hdu_list.append(index_hdu) 

190 json_bytes = _HDUBytes.from_index_row(index_hdu.data[-1]) 

191 index_bytes = _HDUBytes.from_write_hdu(index_hdu) 

192 # Update the primary HDU with the address and size of the index and 

193 # JSON HDUs, and rewrite just that. We do this write manually, since 

194 # astropy's docs on its 'update' mode are scarce and it's not obvious 

195 # whether we can guarantee it won't rewrite the whole file if we edit 

196 # the primary header. 

197 archive._primary_hdu.header["INDXADDR"] = index_bytes.header_address 

198 archive._primary_hdu.header["INDXSIZE"] = index_bytes.size 

199 archive._primary_hdu.header["JSONADDR"] = json_bytes.header_address 

200 archive._primary_hdu.header["JSONSIZE"] = json_bytes.size 

201 with open(filename, "r+b") as stream: 

202 stream.write(archive._primary_hdu.header.tostring().encode()) 

203 

204 def serialize_direct[T: pydantic.BaseModel]( 

205 self, name: str, serializer: Callable[[OutputArchive[TableCellReferenceModel]], T] 

206 ) -> T: 

207 nested = NestedOutputArchive[TableCellReferenceModel](name, self) 

208 return serializer(nested) 

209 

210 def serialize_pointer[T: ArchiveTree]( 

211 self, name: str, serializer: Callable[[OutputArchive[TableCellReferenceModel]], T], key: Hashable 

212 ) -> TableCellReferenceModel: 

213 if (pointer := self._pointers_by_key.get(key)) is not None: 

214 return pointer 

215 pointer = TableCellReferenceModel( 

216 source="fits:JSON", column="JSON", row=len(self._pointer_targets) + 1 

217 ) 

218 model = self.serialize_direct("", serializer) 

219 self._pointer_targets.append(model.model_dump_json().encode()) 

220 self._pointers_by_key[key] = pointer 

221 return pointer 

222 

223 def serialize_frame_set[T: ArchiveTree]( 

224 self, name: str, frame_set: FrameSet, serializer: Callable[[OutputArchive], T], key: Hashable 

225 ) -> TableCellReferenceModel: 

226 # Docstring inherited. 

227 pointer = self.serialize_pointer(name, serializer, key) 

228 self._frame_sets.append((frame_set, pointer)) 

229 return pointer 

230 

231 def iter_frame_sets(self) -> Iterator[tuple[FrameSet, TableCellReferenceModel]]: 

232 return iter(self._frame_sets) 

233 

234 def add_array( 

235 self, 

236 array: np.ndarray, 

237 *, 

238 name: str | None = None, 

239 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates, 

240 ) -> ArrayReferenceModel: 

241 if name is None: 

242 raise RuntimeError("Cannot save array with name=None unless it is nested.") 

243 extname = name.upper() 

244 hdu = self._opaque_metadata.maybe_use_precompressed(extname) 

245 if hdu is None: 

246 if (compression_options := self._get_compression_options(name)) is not None: 

247 hdu = compression_options.make_hdu(array, name=extname) 

248 else: 

249 hdu = astropy.io.fits.ImageHDU(array, name=extname) 

250 key = self._add_hdu(hdu, update_header) 

251 return ArrayReferenceModel( 

252 source=str(key), 

253 shape=list(array.shape), 

254 datatype=NumberType.from_numpy(array.dtype), 

255 ) 

256 

257 def add_table( 

258 self, 

259 table: astropy.table.Table, 

260 *, 

261 name: str | None = None, 

262 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates, 

263 ) -> TableReferenceModel: 

264 if name is None: 

265 raise RuntimeError("Cannot save table with name=None unless it is nested.") 

266 extname = name.upper() 

267 hdu: astropy.io.fits.BinTableHDU = astropy.io.fits.table_to_hdu(table, name=extname) 

268 # Extract column information directly from the input array, not the 

269 # data in the binary table HDU, because we want to assume as little as 

270 # possible about where Astropy does uint -> TZERO stuff. 

271 columns = ColumnDefinitionModel.from_table(table) 

272 key = self._add_hdu(hdu, update_header) 

273 return TableReferenceModel(source=str(key), columns=columns) 

274 

275 def add_structured_array( 

276 self, 

277 array: np.ndarray, 

278 *, 

279 name: str | None = None, 

280 units: Mapping[str, astropy.units.Unit] | None = None, 

281 descriptions: Mapping[str, str] | None = None, 

282 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates, 

283 ) -> TableReferenceModel: 

284 if name is None: 

285 raise RuntimeError("Cannot save structured array with name=None unless it is nested.") 

286 extname = name.upper() 

287 # Extract column information directly from the input array, not the 

288 # data in the binary table HDU, because we want to assume as little as 

289 # possible about where Astropy does uint -> TZERO stuff. 

290 columns = ColumnDefinitionModel.from_record_dtype(array.dtype) 

291 hdu = astropy.io.fits.BinTableHDU(array, name=extname) 

292 if units is not None: 

293 for c in columns: 

294 c.unit = units.get(c.name) 

295 if descriptions is not None: 

296 for c in columns: 

297 c.description = descriptions.get(c.name, "") 

298 key = self._add_hdu(hdu, update_header) 

299 return TableReferenceModel(source=str(key), columns=columns) 

300 

301 def _add_hdu( 

302 self, 

303 hdu: astropy.io.fits.ImageHDU | astropy.io.fits.CompImageHDU | astropy.io.fits.BinTableHDU, 

304 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates, 

305 ) -> ExtensionKey: 

306 n_hdus = self._hdus_by_name.get(hdu.name, 0) 

307 key = ExtensionKey(hdu.name, n_hdus + 1) 

308 if n_hdus: 

309 hdu.header["EXTVER"] = key.ver 

310 self._hdus_by_name[hdu.name] += 1 

311 update_header(hdu.header) 

312 if (opaque_headers := self._opaque_metadata.headers.get(key)) is not None: 

313 hdu.header.extend(opaque_headers) 

314 self._hdu_list.append(hdu) 

315 return key 

316 

317 def add_tree(self, tree: ArchiveTree) -> None: 

318 """Write the JSON tree to the archive. 

319 

320 This method must be called exactly once, just before the `open` context 

321 is exited. 

322 

323 Parameters 

324 ---------- 

325 tree 

326 Pydantic model that represents the tree. 

327 """ 

328 json_hdu = astropy.io.fits.BinTableHDU.from_columns( 

329 [astropy.io.fits.Column("JSON", "PB")], 

330 nrows=len(self._pointer_targets) + 1, 

331 name="JSON", 

332 ) 

333 json_hdu.data[0]["JSON"] = np.frombuffer(tree.model_dump_json().encode(), dtype=np.byte) 

334 for n, json_target_data in enumerate(self._pointer_targets): 

335 json_hdu.data[n + 1]["JSON"] = np.frombuffer(json_target_data, dtype=np.byte) 

336 self._hdu_list.append(json_hdu) 

337 self._json_hdu_added = True 

338 

339 def _get_compression_options(self, name: str) -> FitsCompressionOptions | None: 

340 result = self._compression_options.get(name, FitsCompressionOptions.DEFAULT) 

341 if result is None or result.quantization is None: 

342 return result 

343 if self._compression_seed is not None and not result.quantization.seed: 

344 result = result.model_copy( 

345 update={ 

346 "quantization": result.quantization.model_copy(update={"seed": self._compression_seed}) 

347 } 

348 ) 

349 self._compression_seed += 1 

350 if self._compression_seed > 10000: 

351 self._compression_seed = 1 

352 # MyPy can tell that result.quantization is not None in the 'if', but 

353 # forgets that by this 'else': 

354 elif result.quantization.seed is None: # type: ignore[union-attr] 

355 raise RuntimeError("No quantization seed provided.") 

356 return result 

357 

358 @staticmethod 

359 def _make_index_table(hdu_list: astropy.io.fits.HDUList) -> astropy.io.fits.BinTableHDU: 

360 # We use a fixed-length string for the EXTNAME column; it might be 

361 # better to use a variable-length array, but I have not been able to 

362 # figure out how to get astropy to accept a string for the the 

363 # character (TFORM='A') variant of that. And that's only better if the 

364 # EXTNAMEs get super long, which is not likely (but maybe something to 

365 # guard against). 

366 max_name_size = max(len(hdu.header.get("EXTNAME", "")) for hdu in hdu_list) 

367 index_hdu = astropy.io.fits.BinTableHDU.from_columns( 

368 [ 

369 astropy.io.fits.Column("EXTNAME", f"A{max_name_size}"), 

370 astropy.io.fits.Column("EXTVER", "J"), 

371 astropy.io.fits.Column("XTENSION", "A8"), 

372 astropy.io.fits.Column("ZIMAGE", "L"), 

373 ] 

374 + _HDUBytes.get_index_hdu_columns(), 

375 nrows=len(hdu_list), 

376 name="INDEX", 

377 ) 

378 hdu: ExtensionHDU | astropy.io.fits.PrimaryHDU 

379 for n, hdu in enumerate(hdu_list): 

380 index_hdu.data[n]["EXTNAME"] = hdu.header.get("EXTNAME", "") 

381 index_hdu.data[n]["EXTVER"] = hdu.header.get("EXTVER", 1) 

382 index_hdu.data[n]["XTENSION"] = hdu.header.get("XTENSION", "IMAGE") 

383 index_hdu.data[n]["ZIMAGE"] = hdu.header.get("ZIMAGE", False) 

384 bytes = _HDUBytes.from_read_hdu(hdu) 

385 bytes.update_index_row(index_hdu.data[n]) 

386 return index_hdu 

387 

388 

389@dataclasses.dataclass 

390class _HDUBytes: 

391 """A struct that records the byte offsets into a FITS HDU.""" 

392 

393 @classmethod 

394 def from_write_hdu(cls, hdu: astropy.io.fits.PrimaryHDU | ExtensionHDU) -> Self: 

395 """Construct from an Astropy HDU instance that has just been written. 

396 

397 Parameters 

398 ---------- 

399 hdu 

400 An Astropy HDU object. 

401 

402 Returns 

403 ------- 

404 hdu_bytes 

405 Struct with byte offsets. 

406 

407 Notes 

408 ----- 

409 This method relies on internal Astropy attributes and does not work on 

410 CompImageHDU objects. 

411 """ 

412 # This is implemented by accessing private Astropy attributes because 

413 # it turns out that's much more reliable than the public fileinfo() 

414 # method, which seems to always return a dict with `None` entries or 

415 # raise; it looks buggy, but docs are scarce enough that it's not clear 

416 # what the right behavior is supposed to be. 

417 if (header_address := getattr(hdu, "_header_offset", None)) is None: 

418 raise RuntimeError("Failed to get Astropy's _header_offset.") 

419 if (data_address := getattr(hdu, "_data_offset", None)) is None: 

420 raise RuntimeError("Failed to get Astropy's _data_offset.") 

421 if (data_size := getattr(hdu, "_data_size", None)) is None: 

422 raise RuntimeError("Failed to get Astropy's _data_size.") 

423 return cls(header_address, data_address, data_size) 

424 

425 @classmethod 

426 def from_read_hdu(cls, hdu: astropy.io.fits.PrimaryHDU | ExtensionHDU) -> Self: 

427 """Construct from an Astropy HDU instance that has just been read. 

428 

429 Parameters 

430 ---------- 

431 hdu 

432 An Astropy HDU object. 

433 

434 Returns 

435 ------- 

436 hdu_bytes 

437 Struct with byte offsets. 

438 """ 

439 info = hdu.fileinfo() 

440 header_address = info["hdrLoc"] 

441 data_address = info["datLoc"] 

442 data_size = info["datSpan"] 

443 return cls(header_address, data_address, data_size) 

444 

445 @classmethod 

446 def from_index_row(cls, row: np.void) -> Self: 

447 """Construct from a row of the index HDU. 

448 

449 Parameters 

450 ---------- 

451 row 

452 A Numpy struct-like scalar. 

453 

454 Returns 

455 ------- 

456 hdu_bytes 

457 Struct with byte offsets. 

458 """ 

459 return cls( 

460 header_address=int(row["HDRADDR"]), 

461 data_address=int(row["DATADDR"]), 

462 data_size=int(row["DATSIZE"]), 

463 ) 

464 

465 @staticmethod 

466 def get_index_hdu_columns() -> list[astropy.io.fits.Column]: 

467 """Return the definitions of the columns this class gets and sets 

468 from the index HDU. 

469 

470 Returns 

471 ------- 

472 columns 

473 A `list` of `astropy.io.fits.Column` objects that represent the 

474 header address, data address, and data size. 

475 """ 

476 return [ 

477 astropy.io.fits.Column("HDRADDR", "K"), 

478 astropy.io.fits.Column("DATADDR", "K"), 

479 astropy.io.fits.Column("DATSIZE", "K"), 

480 ] 

481 

482 header_address: int 

483 """Offset from the beginning of the start of the file to the header of this 

484 HDU, in bytes. 

485 """ 

486 

487 data_address: int 

488 """Offset from the beginning of the start of the file to the data section 

489 of this HDU, in bytes. 

490 """ 

491 

492 data_size: int 

493 """Size of the data section in bytes.""" 

494 

495 @property 

496 def header_size(self) -> int: 

497 """Size of the header in bytes.""" 

498 return self.data_address - self.header_address 

499 

500 @property 

501 def end_address(self) -> int: 

502 """Offset in bytes from the start of the file to the end of the HDU.""" 

503 return self.data_address + self.data_size 

504 

505 @property 

506 def size(self) -> int: 

507 """Total size of this HDU in bytes.""" 

508 return self.data_size + self.data_address - self.header_address 

509 

510 def update_index_row(self, row: np.void) -> None: 

511 """Set the values of a row of the index HDU from this strut. 

512 

513 Parameters 

514 ---------- 

515 row 

516 A Numpy struct-like scalar to modify in place. 

517 """ 

518 row["HDRADDR"] = self.header_address 

519 row["DATADDR"] = self.data_address 

520 row["DATSIZE"] = self.data_size