Coverage for python / lsst / images / fits / _common.py: 39%

170 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-24 08:34 +0000

1# This file is part of lsst-images. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# Use of this source code is governed by a 3-clause BSD-style 

10# license that can be found in the LICENSE file. 

11 

12 

13from __future__ import annotations 

14 

15__all__ = ( 

16 "ExtensionHDU", 

17 "ExtensionKey", 

18 "FitsCompressionAlgorithm", 

19 "FitsCompressionOptions", 

20 "FitsDitherAlgorithm", 

21 "FitsOpaqueMetadata", 

22 "FitsQuantizationOptions", 

23 "InvalidFitsArchiveError", 

24 "PrecompressedImage", 

25 "add_offset_wcs", 

26 "strip_butler_cards", 

27 "strip_legacy_exposure_cards", 

28 "strip_wcs_cards", 

29) 

30 

31import dataclasses 

32import enum 

33import itertools 

34import string 

35from typing import ClassVar, Self, final 

36 

37import astropy.io.fits 

38import numpy as np 

39import pydantic 

40 

41from .._geom import Box 

42from ..serialization import OpaqueArchiveMetadata 

43 

44type ExtensionHDU = astropy.io.fits.ImageHDU | astropy.io.fits.CompImageHDU | astropy.io.fits.BinTableHDU 

45 

46 

47@dataclasses.dataclass(frozen=True) 

48class ExtensionKey: 

49 """The identifiers for a single FITS extension HDU within a file.""" 

50 

51 name: str = "" 

52 """Value of the EXTNAME keyword, or an empty string for the primary HDU.""" 

53 

54 ver: int = 1 

55 """Value of the EXTVER keyword (which may be absent if it is ``1``).""" 

56 

57 @classmethod 

58 def from_index_row(cls, row: np.ndarray) -> ExtensionKey: 

59 """Construct from a row of the index binary table appended to the 

60 FITS files written by the package. 

61 """ 

62 return cls(str(row["EXTNAME"]).upper(), int(row["EXTVER"])) 

63 

64 @classmethod 

65 def from_str(cls, source: str) -> ExtensionKey: 

66 """Construct from the `str` coercion of this type, which is used 

67 as the 'source' field in various Pydantic models that serve as 

68 references to other HDUs. 

69 """ 

70 name, comma, ver = source.removeprefix("fits:").partition(",") 

71 if not comma: 

72 return cls(name) 

73 else: 

74 return cls(name, int(ver)) 

75 

76 def __str__(self) -> str: 

77 if self.ver > 1: 

78 return f"fits:{self.name},{self.ver}" 

79 else: 

80 return f"fits:{self.name}" 

81 

82 

83class InvalidFitsArchiveError(RuntimeError): 

84 """The error type raised when the content of a FITS file presumed to have 

85 been written by FitsOutputArchive is not self-consistent. 

86 """ 

87 

88 

89class FitsCompressionAlgorithm(enum.StrEnum): 

90 """FITS compression algorithms supported by this package. 

91 

92 See the FITS standard for definitions. 

93 """ 

94 

95 GZIP_1 = "GZIP_1" 

96 GZIP_2 = "GZIP_2" 

97 RICE_1 = "RICE_1" 

98 

99 

100class FitsDitherAlgorithm(enum.StrEnum): 

101 """FITS quantization dither algorithms supported by this package. 

102 

103 See the FITS standard for definitions. 

104 """ 

105 

106 NO_DITHER = "NO_DITHER" 

107 SUBTRACTIVE_DITHER_1 = "SUBTRACTIVE_DITHER_1" 

108 SUBTRACTIVE_DITHER_2 = "SUBTRACTIVE_DITHER_2" 

109 

110 def to_astropy_quantize_method(self) -> int: 

111 """Convert to the integer code used by Astropy.""" 

112 match self: 

113 case self.NO_DITHER: 

114 return -1 

115 case self.SUBTRACTIVE_DITHER_1: 

116 return 1 

117 case self.SUBTRACTIVE_DITHER_2: 

118 return 2 

119 raise AssertionError("Invalid enum value.") 

120 

121 

122class FitsQuantizationOptions(pydantic.BaseModel, frozen=True): 

123 """Quantization options for FITS compression.""" 

124 

125 dither: FitsDitherAlgorithm 

126 """How to add random noise during quantization to reduce biases.""" 

127 

128 level: float 

129 """Quantization level. 

130 

131 When positive, this is the fraction of the measured standard deviation that 

132 corresponds to an integer step. When negative, it is ``-ZSCALE``, the 

133 scaling to apply directly to the original pixels before quantization. 

134 """ 

135 

136 seed: int | None = None 

137 """Random number seed to use for dithering. 

138 

139 Values between 1 and 10000 (inclusive) are used directly. ``0`` will 

140 generate a value from the current time, and ``-1`` will generate a value 

141 from the checksum of the image. 

142 

143 If `None`, the ``compression_seed`` parameter must be passed to 

144 `FitsOutputArchive.open` if any quantized compression is configured. 

145 """ 

146 

147 

148class FitsCompressionOptions(pydantic.BaseModel, frozen=True): 

149 """Configuration options for FITS compression.""" 

150 

151 algorithm: FitsCompressionAlgorithm = FitsCompressionAlgorithm.GZIP_2 

152 """Compression algorithm to use.""" 

153 

154 tile_shape: tuple[int, ...] | None = None 

155 """Shape ``(..., y, x)`` of independently compressed tiles. 

156 

157 The default of `None` compresses each row separately. 

158 """ 

159 

160 quantization: FitsQuantizationOptions | None = None 

161 """Quantization to apply before compression, if any.""" 

162 

163 DEFAULT: ClassVar[FitsCompressionOptions | None] 

164 """Default compression options (lossless ``GZIP_2``).""" 

165 

166 LOSSY: ClassVar[FitsCompressionOptions] 

167 """Default lossy compression options.""" 

168 

169 def make_hdu(self, data: np.ndarray, name: str) -> astropy.io.fits.CompImageHDU: 

170 """Make an `astropy.io.fits.CompImageHDU` object from these options.""" 

171 if self.quantization is not None: 

172 return astropy.io.fits.CompImageHDU( 

173 data, 

174 name=name, 

175 compression_type=self.algorithm.value, 

176 tile_shape=self.tile_shape, 

177 quantize_method=self.quantization.dither.to_astropy_quantize_method(), 

178 quantize_level=self.quantization.level, 

179 dither_seed=self.quantization.seed, 

180 ) 

181 else: 

182 return astropy.io.fits.CompImageHDU( 

183 data, 

184 name=name, 

185 compression_type=self.algorithm.value, 

186 tile_shape=self.tile_shape, 

187 quantize_level=0.0, 

188 ) 

189 

190 

191FitsCompressionOptions.DEFAULT = FitsCompressionOptions() 

192FitsCompressionOptions.LOSSY = FitsCompressionOptions( 

193 algorithm=FitsCompressionAlgorithm.RICE_1, 

194 quantization=FitsQuantizationOptions(dither=FitsDitherAlgorithm.SUBTRACTIVE_DITHER_2, level=16.0), 

195) 

196 

197 

198_COMPRESSION_KEYS = frozenset( 

199 ( 

200 "ZIMAGE", 

201 "ZCMPTYPE", 

202 "ZBITPIX", 

203 "ZNAXIS", 

204 "ZMASKCMP", 

205 "ZQUANTIZ", 

206 "ZDITHER0", 

207 "ZCHECKSUM", 

208 "ZDATASUM", 

209 ) 

210) 

211_COMPRESSION_PREFIX_KEYS = ("ZNAXIS", "ZTILE", "ZNAME", "ZVAL") 

212 

213 

214@dataclasses.dataclass 

215class PrecompressedImage: 

216 """Already-compressed FITS HDUs that are attached to high-level objects 

217 via `FitsOpaqueMetadata`, allowing lossy-compressed pixel values to be 

218 round-tripped exactly. 

219 """ 

220 

221 header: astropy.io.fits.Header 

222 """Header for the HDU. 

223 

224 This contains only FITS tile-compression keywords. 

225 """ 

226 

227 data: astropy.io.fits.FITS_rec 

228 """FITS binary table data that serves as the low-level representation of a 

229 tile-compressed image HDU. 

230 """ 

231 

232 @classmethod 

233 def from_bintable(cls, hdu: astropy.io.fits.BinTableHDU) -> Self: 

234 """Construct from a binary table HDU. 

235 

236 Parameters 

237 ---------- 

238 hdu 

239 Binary table HDU, typically read from a FITS file opened with 

240 ``disable_image_compression=True``. 

241 

242 Returns 

243 ------- 

244 PrecompressedImage 

245 A `PrecompressedImage` instance. 

246 """ 

247 header = astropy.io.fits.Header( 

248 [ 

249 card 

250 for card in hdu.header.cards 

251 if card.keyword in _COMPRESSION_KEYS 

252 or any(card.keyword.startswith(k) for k in _COMPRESSION_PREFIX_KEYS) 

253 ] 

254 ) 

255 # This is an opportunity to fix CFITSIO's non-standard writing of the 

256 # old RICE_ONE value instead of RICE_1. 

257 if header["ZCMPTYPE"] == "RICE_ONE": 

258 header["ZCMPTYPE"] = "RICE_1" 

259 return cls(header=header, data=hdu.data) 

260 

261 

262@final 

263@dataclasses.dataclass 

264class FitsOpaqueMetadata(OpaqueArchiveMetadata): 

265 """Opaque metadata that may be carried around by a serializable type to 

266 propagate serialization options and opaque information without that type 

267 knowing how it was serialized. 

268 """ 

269 

270 headers: dict[ExtensionKey, astropy.io.fits.Header] = dataclasses.field(default_factory=dict) 

271 """FITS headers found (but not interpreted and stripped) when reading, to 

272 be propagated on write. 

273 

274 Keys are EXTNAME/EXTVER combinations, or ("", 1) for the primary header. 

275 Header information in opaque metadata is considered immutable, allowing it 

276 to be transferred by reference to copies and subsets of the object it is 

277 attached to. 

278 """ 

279 

280 precompressed: dict[str, PrecompressedImage] = dataclasses.field(default_factory=dict) 

281 """FITS tile-compressed HDUs that should be written out directly instead 

282 of the in-memory data provided. 

283 

284 Keys are EXTNAME values, which must be unique (no EXTVER disambiguation). 

285 Precompressed pixel values are never copied or transferred to subsets. 

286 """ 

287 

288 def add_header( 

289 self, 

290 header: astropy.io.fits.Header, 

291 name: str | None = None, 

292 ver: int | None = None, 

293 key: ExtensionKey | None = None, 

294 ) -> None: 

295 """Add a header to the opaque metadata if it is not already present, 

296 and strip EXTNAME and EXTVER if present. 

297 

298 Parameters 

299 ---------- 

300 header 

301 Header to add. May be modified in place. 

302 name 

303 EXTNAME (all caps). If not provided, the EXTNAME card must be 

304 present in the header. Use ``""`` for the primary header. 

305 ver 

306 EXTVER. If not provided and the EXTVER card is not present, 

307 defaults to 1. 

308 key 

309 Combination of EXTNAME and EXTVER; used instead of both ``name`` 

310 and ``ver`` if provided. 

311 """ 

312 if key is None: 

313 if name is None: 

314 name = header["EXTNAME"] 

315 if ver is None: 

316 ver = header.get("EXTVER", 1) 

317 key = ExtensionKey(name, ver) 

318 strip_butler_cards(header) 

319 if key not in self.headers: 

320 header.remove("EXTNAME", ignore_missing=True) 

321 header.remove("EXTVER", ignore_missing=True) 

322 self.headers[key] = header 

323 

324 def maybe_use_precompressed(self, name: str) -> astropy.io.fits.BinTableHDU | None: 

325 """Look up the given EXTNAME to see if there is a tile compressed image 

326 HDU that should be used directly, instead of requantizing. 

327 

328 Parameters 

329 ---------- 

330 name 

331 EXTNAME (all caps). 

332 

333 Returns 

334 ------- 

335 `astropy.io.fits.BinTableHDU` | `None` 

336 An already-compressed HDU, in binary table form, or `None` if there 

337 is no precompressed HDU for this EXTNAME. 

338 """ 

339 if (precompressed := self.precompressed.get(name)) is None: 

340 return None 

341 return astropy.io.fits.BinTableHDU(precompressed.data, header=precompressed.header.copy(), name=name) 

342 

343 def extract_legacy_primary_header(self, header: astropy.io.fits.Header) -> None: 

344 """Update the opaque metadata with the header of the primary HDU 

345 of a legacy (`lsst.afw.image`) FITS file. 

346 """ 

347 primary_header = header.copy(strip=True) 

348 # No idea what these spare TAN-SIP headers are doing in the afw 

349 # FITS files, but we'll strip them here: 

350 primary_header.remove("A_ORDER", ignore_missing=True) 

351 primary_header.remove("B_ORDER", ignore_missing=True) 

352 strip_legacy_exposure_cards(primary_header) 

353 strip_butler_cards(primary_header) 

354 self.headers[ExtensionKey()] = primary_header 

355 

356 def copy(self) -> FitsOpaqueMetadata: 

357 # Docstring inherited. 

358 return FitsOpaqueMetadata(headers=self.headers) 

359 

360 def subset(self, bbox: Box) -> FitsOpaqueMetadata: 

361 # Docstring inherited. 

362 return FitsOpaqueMetadata(headers=self.headers) 

363 

364 

365def add_offset_wcs(header: astropy.io.fits.Header, *, x: int | float, y: int | float, key: str = "A") -> None: 

366 """Add a trivial FITS WCS to a header that applies the appropriate offset 

367 to map FITS array coordinates to a logical pixel grid. 

368 

369 Parameters 

370 ---------- 

371 header 

372 Header to update in-place. 

373 x 

374 Logical coordinate of the first column. 

375 y 

376 Logical coordinate of the first row. 

377 key 

378 Single-character suffix for this WCS. 

379 """ 

380 header.set(f"CTYPE1{key}", "LINEAR") 

381 header.set(f"CTYPE2{key}", "LINEAR") 

382 header.set(f"CRPIX1{key}", 1.0) 

383 header.set(f"CRPIX2{key}", 1.0) 

384 header.set(f"CRVAL1{key}", float(x)) 

385 header.set(f"CRVAL2{key}", float(y)) 

386 header.set(f"CUNIT1{key}", "PIXEL") 

387 header.set(f"CUNIT2{key}", "PIXEL") 

388 

389 

390_WCS_VECTOR_KEYS = ("CUNIT", "CRPIX", "CRPIX", "CRVAL", "CRDELT", "CROTA", "CRDER", "CSYER", "CDELT") 

391_WCS_MATRIX_KEYS = ("CD{0}_{1}", "PC{0}_{1}") 

392 

393 

394def strip_wcs_cards(header: astropy.io.fits.Header) -> None: 

395 """Strip WCS cards from a FITS header. 

396 

397 This does *not* attempt to cover all possible FITS WCS forms; it focuses on 

398 the ones we actually plan to write (simple undistorted ones + TAN-SIP). 

399 """ 

400 wcsaxes = header.pop("WCSAXES", 2) 

401 for wcsname in [""] + list(string.ascii_uppercase): 

402 header.remove("RADESYS" + wcsname, ignore_missing=True) 

403 if "CTYPE1" + wcsname in header: 

404 ctype: str = "" # just for linters that can't figure out that the loop always executes 

405 for n in range(wcsaxes): 

406 suffix = f"{n + 1}{wcsname}" 

407 ctype = header.pop("CTYPE" + suffix) 

408 for key in _WCS_VECTOR_KEYS: 

409 header.remove(key + suffix, ignore_missing=True) 

410 for m in range(wcsaxes): 

411 for tmpl in _WCS_MATRIX_KEYS: 

412 header.remove(tmpl.format(m + 1, suffix), ignore_missing=True) 

413 if ctype.endswith("-SIP"): 

414 _strip_sip_poly(header, wcsname, "A") 

415 _strip_sip_poly(header, wcsname, "B") 

416 _strip_sip_poly(header, wcsname, "AP") 

417 _strip_sip_poly(header, wcsname, "BP") 

418 header.remove("LONPOLE", ignore_missing=True) 

419 header.remove("LATPOLE", ignore_missing=True) 

420 header.remove("MJDREF", ignore_missing=True) 

421 header.remove("PV1_3", ignore_missing=True) 

422 header.remove("PV1_4", ignore_missing=True) 

423 

424 

425def _strip_sip_poly(header: astropy.io.fits.Header, wcsname: str, which: str) -> None: 

426 order: int | None = header.pop(f"{which}_ORDER{wcsname}", None) 

427 if order is not None: 

428 for i, j in itertools.product(range(order + 1), range(order + 1)): 

429 header.remove(f"{which}_{i}_{j}{wcsname}", ignore_missing=True) 

430 

431 

432def strip_legacy_exposure_cards(header: astropy.io.fits.Header) -> None: 

433 """Strip header keywords added by lsst.afw.image.Exposure.""" 

434 header.remove("AR_HDU", ignore_missing=True) 

435 for name in ( 

436 "FILTER", 

437 "DETECTOR", 

438 "VALID_POLYGON", 

439 "SKYWCS", 

440 "PSF", 

441 "SUMMARYSTATS", 

442 "AP_CORR_MAP", 

443 "PHOTOCALIB", 

444 ): 

445 header.remove(f"{name}_ID", ignore_missing=True) 

446 header.remove(f"ARCHIVE_ID_{name}", ignore_missing=True) 

447 

448 

449def strip_butler_cards(header: astropy.io.fits.Header) -> None: 

450 """Strip header keywords added by butler provenance that would be 

451 incorrect if propagated to a downstream file. 

452 """ 

453 for key in list(header): 

454 if key.startswith("LSST BUTLER"): 

455 del header[key]