Coverage for python / lsst / images / fits / _output_archive.py: 21%

210 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-30 09:07 +0000

1# This file is part of lsst-images. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# Use of this source code is governed by a 3-clause BSD-style 

10# license that can be found in the LICENSE file. 

11 

12from __future__ import annotations 

13 

14__all__ = ("FitsOutputArchive", "write") 

15 

16import dataclasses 

17from collections import Counter 

18from collections.abc import Callable, Hashable, Iterator, Mapping 

19from contextlib import contextmanager 

20from typing import Any, Self 

21 

22import astropy.io.fits 

23import astropy.table 

24import numpy as np 

25import pydantic 

26 

27from .._transforms import FrameSet 

28from ..serialization import ( 

29 ArchiveTree, 

30 ArrayReferenceModel, 

31 ButlerInfo, 

32 MetadataValue, 

33 NestedOutputArchive, 

34 NumberType, 

35 OutputArchive, 

36 TableColumnModel, 

37 TableModel, 

38 no_header_updates, 

39) 

40from ._common import ( 

41 JSON_COLUMN, 

42 JSON_EXTNAME, 

43 ExtensionHDU, 

44 ExtensionKey, 

45 FitsCompressionOptions, 

46 FitsOpaqueMetadata, 

47 PointerModel, 

48) 

49 

50 

51def write( 

52 obj: Any, 

53 path: str, 

54 compression_options: Mapping[str, FitsCompressionOptions | None] | None = None, 

55 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates, 

56 compression_seed: int | None = None, 

57 metadata: dict[str, MetadataValue] | None = None, 

58 butler_info: ButlerInfo | None = None, 

59) -> Any: 

60 """Write an object with a ``serialize`` method to a FITS file. 

61 

62 Parameters 

63 ---------- 

64 path 

65 Name of the file to write to. Must not already exist. 

66 compression_options 

67 Options for how to compress the FITS file, keyed by the name of 

68 the attribute (with JSON pointer ``/`` separators for nested 

69 attributes). 

70 update_header 

71 A callback that will be given the primary HDU FITS header and an 

72 opportunity to modify it. 

73 compression_seed 

74 A FITS tile compression seed to use whenever the configured 

75 compression seed is `None` or (for backwards compatibility) ``0``. 

76 This value is then incremented every time it is used. 

77 metadata 

78 Additional metadata to save with the object. This will override any 

79 flexible metadata carried by the object itself with the same keys. 

80 butler_info 

81 Butler information to store in the file. 

82 

83 Returns 

84 ------- 

85 `.serialization.ArchiveTree` 

86 The serialized representation of the object. 

87 """ 

88 opaque_metadata = getattr(obj, "_opaque_metadata", None) 

89 name = getattr(obj, "_archive_default_name", None) 

90 with FitsOutputArchive.open( 

91 path, 

92 compression_options=compression_options, 

93 opaque_metadata=opaque_metadata, 

94 update_header=update_header, 

95 compression_seed=compression_seed, 

96 ) as archive: 

97 tree = archive.serialize_direct(name, obj.serialize) if name is not None else obj.serialize(archive) 

98 if metadata is not None: 

99 tree.metadata.update(metadata) 

100 if butler_info is not None: 

101 tree.butler_info = butler_info 

102 archive.add_tree(tree) 

103 return tree 

104 

105 

106class FitsOutputArchive(OutputArchive[PointerModel]): 

107 """An implementation of the `.serialization.OutputArchive` interface that 

108 writes to FITS files. 

109 

110 Instances of this class should only be constructed via the `open` 

111 context manager. 

112 """ 

113 

114 def __init__( 

115 self, 

116 hdu_list: astropy.io.fits.HDUList, 

117 compression_options: Mapping[str, FitsCompressionOptions | None] | None = None, 

118 opaque_metadata: Any = None, 

119 compression_seed: int | None = None, 

120 ): 

121 # JSON blobs for objects we've saved as pointers: 

122 self._pointer_targets: list[bytes] = [] 

123 # Mapping from user provided key (e.g. id(some object)) to a table 

124 # pointer to where we actually saved it: 

125 self._pointers_by_key: dict[Hashable, PointerModel] = {} 

126 self._hdu_list = hdu_list 

127 self._primary_hdu = astropy.io.fits.PrimaryHDU() 

128 # TODO: add subformat description and version to primary HDU. 

129 self._primary_hdu.header.set("INDXADDR", 0, "Offset in bytes to the HDU index.") 

130 self._primary_hdu.header.set("INDXSIZE", 0, "Size of the HDU index.") 

131 self._primary_hdu.header.set("JSONADDR", 0, "Offset in bytes to the JSON tree HDU.") 

132 self._primary_hdu.header.set("JSONSIZE", 0, "Size of the JSON tree HDU.") 

133 self._hdus_by_name = Counter[str]() 

134 self._compression_options = dict(compression_options) if compression_options is not None else {} 

135 self._compression_seed = compression_seed 

136 self._opaque_metadata = ( 

137 opaque_metadata if isinstance(opaque_metadata, FitsOpaqueMetadata) else FitsOpaqueMetadata() 

138 ) 

139 if (opaque_primary_header := self._opaque_metadata.headers.get(ExtensionKey())) is not None: 

140 self._primary_hdu.header.extend(opaque_primary_header) 

141 self._hdu_list.append(self._primary_hdu) 

142 self._json_hdu_added: bool = False 

143 self._frame_sets: list[tuple[FrameSet, PointerModel]] = [] 

144 

145 @classmethod 

146 @contextmanager 

147 def open( 

148 cls, 

149 filename: str, 

150 compression_options: Mapping[str, FitsCompressionOptions | None] | None = None, 

151 opaque_metadata: Any = None, 

152 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates, 

153 compression_seed: int | None = None, 

154 ) -> Iterator[Self]: 

155 """Create an output archive that writes to the given file. 

156 

157 Parameters 

158 ---------- 

159 filename 

160 Name of the file to write to. Must not already exist. 

161 compression_options 

162 Options for how to compress the FITS file, keyed by the name of 

163 the attribute (with JSON pointer ``/`` separators for nested 

164 attributes). 

165 opaque_metadata 

166 Metadata read from an input archive along with the object being 

167 written now. Ignored if the metadata is not from a FITS archive. 

168 update_header 

169 A callback that will be given the primary HDU FITS header and an 

170 opportunity to modify it. 

171 compression_seed 

172 A FITS tile compression seed to use whenever the configured 

173 compression seed is `None` or (for backwards compatibility) ``0``. 

174 This value is then incremented every time it is used. 

175 

176 Returns 

177 ------- 

178 `contextlib.AbstractContextManager` [`FitsOutputArchive`] 

179 A context manager that returns a `FitsOutputArchive` when entered. 

180 """ 

181 with astropy.io.fits.open(filename, mode="append") as hdu_list: 

182 if hdu_list: 

183 raise OSError(f"File {filename!r} already exists.") 

184 archive = cls(hdu_list, compression_options, opaque_metadata, compression_seed=compression_seed) 

185 update_header(hdu_list[0].header) 

186 yield archive 

187 if not archive._json_hdu_added: 

188 raise RuntimeError("Write context exited without 'add_tree' being called.") 

189 hdu_list.flush() 

190 # This multi-open dance is necessary to get Astropy to tell us the 

191 # byte addresses of the HDUs. Hopefully we can get an upstream change 

192 # make this unnecessary at some point. 

193 with astropy.io.fits.open(filename, mode="readonly", disable_image_compression=True) as hdu_list: 

194 index_hdu = cls._make_index_table(hdu_list) 

195 with astropy.io.fits.open(filename, mode="append") as hdu_list: 

196 hdu_list.append(index_hdu) 

197 json_bytes = _HDUBytes.from_index_row(index_hdu.data[-1]) 

198 index_bytes = _HDUBytes.from_write_hdu(index_hdu) 

199 # Update the primary HDU with the address and size of the index and 

200 # JSON HDUs, and rewrite just that. We do this write manually, since 

201 # astropy's docs on its 'update' mode are scarce and it's not obvious 

202 # whether we can guarantee it won't rewrite the whole file if we edit 

203 # the primary header. 

204 archive._primary_hdu.header["INDXADDR"] = index_bytes.header_address 

205 archive._primary_hdu.header["INDXSIZE"] = index_bytes.size 

206 archive._primary_hdu.header["JSONADDR"] = json_bytes.header_address 

207 archive._primary_hdu.header["JSONSIZE"] = json_bytes.size 

208 with open(filename, "r+b") as stream: 

209 stream.write(archive._primary_hdu.header.tostring().encode()) 

210 

211 def serialize_direct[T: pydantic.BaseModel]( 

212 self, name: str, serializer: Callable[[OutputArchive[PointerModel]], T] 

213 ) -> T: 

214 nested = NestedOutputArchive[PointerModel](name, self) 

215 return serializer(nested) 

216 

217 def serialize_pointer[T: ArchiveTree]( 

218 self, name: str, serializer: Callable[[OutputArchive[PointerModel]], T], key: Hashable 

219 ) -> PointerModel: 

220 if (pointer := self._pointers_by_key.get(key)) is not None: 

221 return pointer 

222 pointer = PointerModel( 

223 column=TableColumnModel( 

224 name=JSON_COLUMN, 

225 data=ArrayReferenceModel(source=f"fits:{JSON_EXTNAME}[1]", datatype=NumberType.uint8), 

226 ), 

227 row=len(self._pointer_targets) + 1, 

228 ) 

229 model = self.serialize_direct("", serializer) 

230 self._pointer_targets.append(model.model_dump_json().encode()) 

231 self._pointers_by_key[key] = pointer 

232 return pointer 

233 

234 def serialize_frame_set[T: ArchiveTree]( 

235 self, name: str, frame_set: FrameSet, serializer: Callable[[OutputArchive], T], key: Hashable 

236 ) -> PointerModel: 

237 # Docstring inherited. 

238 pointer = self.serialize_pointer(name, serializer, key) 

239 self._frame_sets.append((frame_set, pointer)) 

240 return pointer 

241 

242 def iter_frame_sets(self) -> Iterator[tuple[FrameSet, PointerModel]]: 

243 return iter(self._frame_sets) 

244 

245 def add_array( 

246 self, 

247 array: np.ndarray, 

248 *, 

249 name: str | None = None, 

250 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates, 

251 ) -> ArrayReferenceModel: 

252 if name is None: 

253 raise RuntimeError("Cannot save array with name=None unless it is nested.") 

254 extname = name.upper() 

255 hdu = self._opaque_metadata.maybe_use_precompressed(extname) 

256 if hdu is None: 

257 if (compression_options := self._get_compression_options(name)) is not None: 

258 hdu = compression_options.make_hdu(array, name=extname) 

259 else: 

260 hdu = astropy.io.fits.ImageHDU(array, name=extname) 

261 key = self._add_hdu(hdu, update_header) 

262 return ArrayReferenceModel( 

263 source=str(key), 

264 shape=list(array.shape), 

265 datatype=NumberType.from_numpy(array.dtype), 

266 ) 

267 

268 def add_table( 

269 self, 

270 table: astropy.table.Table, 

271 *, 

272 name: str | None = None, 

273 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates, 

274 ) -> TableModel: 

275 if name is None: 

276 raise RuntimeError("Cannot save table with name=None unless it is nested.") 

277 extname = name.upper() 

278 hdu: astropy.io.fits.BinTableHDU = astropy.io.fits.table_to_hdu(table, name=extname) 

279 # Extract column information directly from the input array, not the 

280 # data in the binary table HDU, because we want to assume as little as 

281 # possible about where Astropy does uint -> TZERO stuff. 

282 columns = TableColumnModel.from_table(table) 

283 key = self._add_hdu(hdu, update_header) 

284 for n, c in enumerate(columns, start=1): 

285 assert isinstance(c.data, ArrayReferenceModel) 

286 c.data.source = f"{key}[{n}]" 

287 return TableModel(columns=columns, meta=table.meta) 

288 

289 def add_structured_array( 

290 self, 

291 array: np.ndarray, 

292 *, 

293 name: str | None = None, 

294 units: Mapping[str, astropy.units.Unit] | None = None, 

295 descriptions: Mapping[str, str] | None = None, 

296 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates, 

297 ) -> TableModel: 

298 if name is None: 

299 raise RuntimeError("Cannot save structured array with name=None unless it is nested.") 

300 extname = name.upper() 

301 # Extract column information directly from the input array, not the 

302 # data in the binary table HDU, because we want to assume as little as 

303 # possible about where Astropy does uint -> TZERO stuff. 

304 columns = TableColumnModel.from_record_dtype(array.dtype) 

305 hdu = astropy.io.fits.BinTableHDU(array, name=extname) 

306 if units is not None: 

307 for c in columns: 

308 c.unit = units.get(c.name) 

309 if descriptions is not None: 

310 for c in columns: 

311 c.description = descriptions.get(c.name, "") 

312 key = self._add_hdu(hdu, update_header) 

313 for n, c in enumerate(columns, start=1): 

314 assert isinstance(c.data, ArrayReferenceModel) 

315 c.data.source = f"{key}[{n}]" 

316 return TableModel(columns=columns) 

317 

318 def _add_hdu( 

319 self, 

320 hdu: astropy.io.fits.ImageHDU | astropy.io.fits.CompImageHDU | astropy.io.fits.BinTableHDU, 

321 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates, 

322 ) -> ExtensionKey: 

323 n_hdus = self._hdus_by_name.get(hdu.name, 0) 

324 key = ExtensionKey(hdu.name, n_hdus + 1) 

325 key.check() 

326 if n_hdus: 

327 hdu.header["EXTVER"] = key.ver 

328 self._hdus_by_name[hdu.name] += 1 

329 update_header(hdu.header) 

330 if (opaque_headers := self._opaque_metadata.headers.get(key)) is not None: 

331 hdu.header.extend(opaque_headers) 

332 self._hdu_list.append(hdu) 

333 return key 

334 

335 def add_tree(self, tree: ArchiveTree) -> None: 

336 """Write the JSON tree to the archive. 

337 

338 This method must be called exactly once, just before the `open` context 

339 is exited. 

340 

341 Parameters 

342 ---------- 

343 tree 

344 Pydantic model that represents the tree. 

345 """ 

346 json_hdu = astropy.io.fits.BinTableHDU.from_columns( 

347 [astropy.io.fits.Column(JSON_COLUMN, "PB")], 

348 nrows=len(self._pointer_targets) + 1, 

349 name=JSON_EXTNAME, 

350 ) 

351 json_hdu.data[0][JSON_COLUMN] = np.frombuffer(tree.model_dump_json().encode(), dtype=np.byte) 

352 for n, json_target_data in enumerate(self._pointer_targets): 

353 json_hdu.data[n + 1][JSON_COLUMN] = np.frombuffer(json_target_data, dtype=np.byte) 

354 self._hdu_list.append(json_hdu) 

355 self._json_hdu_added = True 

356 

357 def _get_compression_options(self, name: str) -> FitsCompressionOptions | None: 

358 result = self._compression_options.get(name, FitsCompressionOptions.DEFAULT) 

359 if result is None or result.quantization is None: 

360 return result 

361 if self._compression_seed is not None and not result.quantization.seed: 

362 result = result.model_copy( 

363 update={ 

364 "quantization": result.quantization.model_copy(update={"seed": self._compression_seed}) 

365 } 

366 ) 

367 self._compression_seed += 1 

368 if self._compression_seed > 10000: 

369 self._compression_seed = 1 

370 # MyPy can tell that result.quantization is not None in the 'if', but 

371 # forgets that by this 'else': 

372 elif result.quantization.seed is None: # type: ignore[union-attr] 

373 raise RuntimeError("No quantization seed provided.") 

374 return result 

375 

376 @staticmethod 

377 def _make_index_table(hdu_list: astropy.io.fits.HDUList) -> astropy.io.fits.BinTableHDU: 

378 # We use a fixed-length string for the EXTNAME column; it might be 

379 # better to use a variable-length array, but I have not been able to 

380 # figure out how to get astropy to accept a string for the the 

381 # character (TFORM='A') variant of that. And that's only better if the 

382 # EXTNAMEs get super long, which is not likely (but maybe something to 

383 # guard against). 

384 max_name_size = max(len(hdu.header.get("EXTNAME", "")) for hdu in hdu_list) 

385 index_hdu = astropy.io.fits.BinTableHDU.from_columns( 

386 [ 

387 astropy.io.fits.Column("EXTNAME", f"A{max_name_size}"), 

388 astropy.io.fits.Column("EXTVER", "J"), 

389 astropy.io.fits.Column("XTENSION", "A8"), 

390 astropy.io.fits.Column("ZIMAGE", "L"), 

391 ] 

392 + _HDUBytes.get_index_hdu_columns(), 

393 nrows=len(hdu_list), 

394 name="INDEX", 

395 ) 

396 hdu: ExtensionHDU | astropy.io.fits.PrimaryHDU 

397 for n, hdu in enumerate(hdu_list): 

398 index_hdu.data[n]["EXTNAME"] = hdu.header.get("EXTNAME", "") 

399 index_hdu.data[n]["EXTVER"] = hdu.header.get("EXTVER", 1) 

400 index_hdu.data[n]["XTENSION"] = hdu.header.get("XTENSION", "IMAGE") 

401 index_hdu.data[n]["ZIMAGE"] = hdu.header.get("ZIMAGE", False) 

402 bytes = _HDUBytes.from_read_hdu(hdu) 

403 bytes.update_index_row(index_hdu.data[n]) 

404 return index_hdu 

405 

406 

407@dataclasses.dataclass 

408class _HDUBytes: 

409 """A struct that records the byte offsets into a FITS HDU.""" 

410 

411 @classmethod 

412 def from_write_hdu(cls, hdu: astropy.io.fits.PrimaryHDU | ExtensionHDU) -> Self: 

413 """Construct from an Astropy HDU instance that has just been written. 

414 

415 Parameters 

416 ---------- 

417 hdu 

418 An Astropy HDU object. 

419 

420 Returns 

421 ------- 

422 hdu_bytes 

423 Struct with byte offsets. 

424 

425 Notes 

426 ----- 

427 This method relies on internal Astropy attributes and does not work on 

428 CompImageHDU objects. 

429 """ 

430 # This is implemented by accessing private Astropy attributes because 

431 # it turns out that's much more reliable than the public fileinfo() 

432 # method, which seems to always return a dict with `None` entries or 

433 # raise; it looks buggy, but docs are scarce enough that it's not clear 

434 # what the right behavior is supposed to be. 

435 if (header_address := getattr(hdu, "_header_offset", None)) is None: 

436 raise RuntimeError("Failed to get Astropy's _header_offset.") 

437 if (data_address := getattr(hdu, "_data_offset", None)) is None: 

438 raise RuntimeError("Failed to get Astropy's _data_offset.") 

439 if (data_size := getattr(hdu, "_data_size", None)) is None: 

440 raise RuntimeError("Failed to get Astropy's _data_size.") 

441 return cls(header_address, data_address, data_size) 

442 

443 @classmethod 

444 def from_read_hdu(cls, hdu: astropy.io.fits.PrimaryHDU | ExtensionHDU) -> Self: 

445 """Construct from an Astropy HDU instance that has just been read. 

446 

447 Parameters 

448 ---------- 

449 hdu 

450 An Astropy HDU object. 

451 

452 Returns 

453 ------- 

454 hdu_bytes 

455 Struct with byte offsets. 

456 """ 

457 info = hdu.fileinfo() 

458 header_address = info["hdrLoc"] 

459 data_address = info["datLoc"] 

460 data_size = info["datSpan"] 

461 return cls(header_address, data_address, data_size) 

462 

463 @classmethod 

464 def from_index_row(cls, row: np.void) -> Self: 

465 """Construct from a row of the index HDU. 

466 

467 Parameters 

468 ---------- 

469 row 

470 A Numpy struct-like scalar. 

471 

472 Returns 

473 ------- 

474 hdu_bytes 

475 Struct with byte offsets. 

476 """ 

477 return cls( 

478 header_address=int(row["HDRADDR"]), 

479 data_address=int(row["DATADDR"]), 

480 data_size=int(row["DATSIZE"]), 

481 ) 

482 

483 @staticmethod 

484 def get_index_hdu_columns() -> list[astropy.io.fits.Column]: 

485 """Return the definitions of the columns this class gets and sets 

486 from the index HDU. 

487 

488 Returns 

489 ------- 

490 columns 

491 A `list` of `astropy.io.fits.Column` objects that represent the 

492 header address, data address, and data size. 

493 """ 

494 return [ 

495 astropy.io.fits.Column("HDRADDR", "K"), 

496 astropy.io.fits.Column("DATADDR", "K"), 

497 astropy.io.fits.Column("DATSIZE", "K"), 

498 ] 

499 

500 header_address: int 

501 """Offset from the beginning of the start of the file to the header of this 

502 HDU, in bytes. 

503 """ 

504 

505 data_address: int 

506 """Offset from the beginning of the start of the file to the data section 

507 of this HDU, in bytes. 

508 """ 

509 

510 data_size: int 

511 """Size of the data section in bytes.""" 

512 

513 @property 

514 def header_size(self) -> int: 

515 """Size of the header in bytes.""" 

516 return self.data_address - self.header_address 

517 

518 @property 

519 def end_address(self) -> int: 

520 """Offset in bytes from the start of the file to the end of the HDU.""" 

521 return self.data_address + self.data_size 

522 

523 @property 

524 def size(self) -> int: 

525 """Total size of this HDU in bytes.""" 

526 return self.data_size + self.data_address - self.header_address 

527 

528 def update_index_row(self, row: np.void) -> None: 

529 """Set the values of a row of the index HDU from this strut. 

530 

531 Parameters 

532 ---------- 

533 row 

534 A Numpy struct-like scalar to modify in place. 

535 """ 

536 row["HDRADDR"] = self.header_address 

537 row["DATADDR"] = self.data_address 

538 row["DATSIZE"] = self.data_size