Coverage for python / lsst / images / fits / _common.py: 40%

182 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-30 09:07 +0000

1# This file is part of lsst-images. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# Use of this source code is governed by a 3-clause BSD-style 

10# license that can be found in the LICENSE file. 

11 

12 

13from __future__ import annotations 

14 

15__all__ = ( 

16 "FITS_SOURCE_REGEX", 

17 "JSON_COLUMN", 

18 "JSON_EXTNAME", 

19 "ExtensionHDU", 

20 "ExtensionKey", 

21 "FitsCompressionAlgorithm", 

22 "FitsCompressionOptions", 

23 "FitsDitherAlgorithm", 

24 "FitsOpaqueMetadata", 

25 "FitsQuantizationOptions", 

26 "InvalidFitsArchiveError", 

27 "PointerModel", 

28 "PrecompressedImage", 

29 "add_offset_wcs", 

30 "strip_butler_cards", 

31 "strip_legacy_exposure_cards", 

32 "strip_wcs_cards", 

33) 

34 

35import dataclasses 

36import enum 

37import itertools 

38import re 

39import string 

40from typing import ClassVar, Self, final 

41 

42import astropy.io.fits 

43import numpy as np 

44import pydantic 

45 

46from .._geom import Box 

47from ..serialization import ArchiveReadError, OpaqueArchiveMetadata, TableColumnModel 

48 

49type ExtensionHDU = astropy.io.fits.ImageHDU | astropy.io.fits.CompImageHDU | astropy.io.fits.BinTableHDU 

50 

51FITS_SOURCE_REGEX = re.compile(r"fits:(?P<extname>[\w/\-]+)(,(?P<extver>\d+))?(\[\d+\])?") 

52 

53JSON_EXTNAME: str = "JSON" 

54JSON_COLUMN: str = "JSON" 

55 

56 

57@dataclasses.dataclass(frozen=True) 

58class ExtensionKey: 

59 """The identifiers for a single FITS extension HDU within a file.""" 

60 

61 name: str = "" 

62 """Value of the EXTNAME keyword, or an empty string for the primary HDU.""" 

63 

64 ver: int = 1 

65 """Value of the EXTVER keyword (which may be absent if it is ``1``).""" 

66 

67 @classmethod 

68 def from_index_row(cls, row: np.ndarray) -> ExtensionKey: 

69 """Construct from a row of the index binary table appended to the 

70 FITS files written by the package. 

71 """ 

72 return cls(str(row["EXTNAME"]).upper(), int(row["EXTVER"])) 

73 

74 @classmethod 

75 def from_str(cls, source: str) -> ExtensionKey: 

76 """Construct from the `str` coercion of this type, which is used 

77 as the 'source' field in various Pydantic models that serve as 

78 references to other HDUs. 

79 """ 

80 if (m := FITS_SOURCE_REGEX.fullmatch(source)) is None: 

81 raise ArchiveReadError(f"Bad 'source' string for FITS: {source!r}.") 

82 ver = 1 

83 if m.group("extver") is not None: 

84 ver = int(m.group("extver")) 

85 return cls(m.group("extname"), ver) 

86 

87 def check(self) -> None: 

88 if not FITS_SOURCE_REGEX.match(str(self)): 

89 raise ValueError( 

90 f"Invalid source key: '{str(self)}'; name characters must be alphanumeric, '-', '_', or '/'." 

91 ) 

92 

93 def __str__(self) -> str: 

94 if self.ver > 1: 

95 return f"fits:{self.name},{self.ver}" 

96 else: 

97 return f"fits:{self.name}" 

98 

99 

100class PointerModel(pydantic.BaseModel): 

101 column: TableColumnModel = pydantic.Field(description="Table column for this cell this pointer targets.") 

102 row: int = pydantic.Field(description="Zero-indexed row for cell this pointer targets.") 

103 

104 

105class InvalidFitsArchiveError(RuntimeError): 

106 """The error type raised when the content of a FITS file presumed to have 

107 been written by FitsOutputArchive is not self-consistent. 

108 """ 

109 

110 

111class FitsCompressionAlgorithm(enum.StrEnum): 

112 """FITS compression algorithms supported by this package. 

113 

114 See the FITS standard for definitions. 

115 """ 

116 

117 GZIP_1 = "GZIP_1" 

118 GZIP_2 = "GZIP_2" 

119 RICE_1 = "RICE_1" 

120 

121 

122class FitsDitherAlgorithm(enum.StrEnum): 

123 """FITS quantization dither algorithms supported by this package. 

124 

125 See the FITS standard for definitions. 

126 """ 

127 

128 NO_DITHER = "NO_DITHER" 

129 SUBTRACTIVE_DITHER_1 = "SUBTRACTIVE_DITHER_1" 

130 SUBTRACTIVE_DITHER_2 = "SUBTRACTIVE_DITHER_2" 

131 

132 def to_astropy_quantize_method(self) -> int: 

133 """Convert to the integer code used by Astropy.""" 

134 match self: 

135 case self.NO_DITHER: 

136 return -1 

137 case self.SUBTRACTIVE_DITHER_1: 

138 return 1 

139 case self.SUBTRACTIVE_DITHER_2: 

140 return 2 

141 raise AssertionError("Invalid enum value.") 

142 

143 

144class FitsQuantizationOptions(pydantic.BaseModel, frozen=True): 

145 """Quantization options for FITS compression.""" 

146 

147 dither: FitsDitherAlgorithm 

148 """How to add random noise during quantization to reduce biases.""" 

149 

150 level: float 

151 """Quantization level. 

152 

153 When positive, this is the fraction of the measured standard deviation that 

154 corresponds to an integer step. When negative, it is ``-ZSCALE``, the 

155 scaling to apply directly to the original pixels before quantization. 

156 """ 

157 

158 seed: int | None = None 

159 """Random number seed to use for dithering. 

160 

161 Values between 1 and 10000 (inclusive) are used directly. ``0`` will 

162 generate a value from the current time, and ``-1`` will generate a value 

163 from the checksum of the image. 

164 

165 If `None`, the ``compression_seed`` parameter must be passed to 

166 `FitsOutputArchive.open` if any quantized compression is configured. 

167 """ 

168 

169 

170class FitsCompressionOptions(pydantic.BaseModel, frozen=True): 

171 """Configuration options for FITS compression.""" 

172 

173 algorithm: FitsCompressionAlgorithm = FitsCompressionAlgorithm.GZIP_2 

174 """Compression algorithm to use.""" 

175 

176 tile_shape: tuple[int, ...] | None = None 

177 """Shape ``(..., y, x)`` of independently compressed tiles. 

178 

179 The default of `None` compresses each row separately. 

180 """ 

181 

182 quantization: FitsQuantizationOptions | None = None 

183 """Quantization to apply before compression, if any.""" 

184 

185 DEFAULT: ClassVar[FitsCompressionOptions | None] 

186 """Default compression options (lossless ``GZIP_2``).""" 

187 

188 LOSSY: ClassVar[FitsCompressionOptions] 

189 """Default lossy compression options.""" 

190 

191 def make_hdu(self, data: np.ndarray, name: str) -> astropy.io.fits.CompImageHDU: 

192 """Make an `astropy.io.fits.CompImageHDU` object from these options.""" 

193 if self.quantization is not None: 

194 return astropy.io.fits.CompImageHDU( 

195 data, 

196 name=name, 

197 compression_type=self.algorithm.value, 

198 tile_shape=self.tile_shape, 

199 quantize_method=self.quantization.dither.to_astropy_quantize_method(), 

200 quantize_level=self.quantization.level, 

201 dither_seed=self.quantization.seed, 

202 ) 

203 else: 

204 return astropy.io.fits.CompImageHDU( 

205 data, 

206 name=name, 

207 compression_type=self.algorithm.value, 

208 tile_shape=self.tile_shape, 

209 quantize_level=0.0, 

210 ) 

211 

212 

213FitsCompressionOptions.DEFAULT = FitsCompressionOptions() 

214FitsCompressionOptions.LOSSY = FitsCompressionOptions( 

215 algorithm=FitsCompressionAlgorithm.RICE_1, 

216 quantization=FitsQuantizationOptions(dither=FitsDitherAlgorithm.SUBTRACTIVE_DITHER_2, level=16.0), 

217) 

218 

219 

220_COMPRESSION_KEYS = frozenset( 

221 ( 

222 "ZIMAGE", 

223 "ZCMPTYPE", 

224 "ZBITPIX", 

225 "ZNAXIS", 

226 "ZMASKCMP", 

227 "ZQUANTIZ", 

228 "ZDITHER0", 

229 "ZCHECKSUM", 

230 "ZDATASUM", 

231 ) 

232) 

233_COMPRESSION_PREFIX_KEYS = ("ZNAXIS", "ZTILE", "ZNAME", "ZVAL") 

234 

235 

236@dataclasses.dataclass 

237class PrecompressedImage: 

238 """Already-compressed FITS HDUs that are attached to high-level objects 

239 via `FitsOpaqueMetadata`, allowing lossy-compressed pixel values to be 

240 round-tripped exactly. 

241 """ 

242 

243 header: astropy.io.fits.Header 

244 """Header for the HDU. 

245 

246 This contains only FITS tile-compression keywords. 

247 """ 

248 

249 data: astropy.io.fits.FITS_rec 

250 """FITS binary table data that serves as the low-level representation of a 

251 tile-compressed image HDU. 

252 """ 

253 

254 @classmethod 

255 def from_bintable(cls, hdu: astropy.io.fits.BinTableHDU) -> Self: 

256 """Construct from a binary table HDU. 

257 

258 Parameters 

259 ---------- 

260 hdu 

261 Binary table HDU, typically read from a FITS file opened with 

262 ``disable_image_compression=True``. 

263 

264 Returns 

265 ------- 

266 PrecompressedImage 

267 A `PrecompressedImage` instance. 

268 """ 

269 header = astropy.io.fits.Header( 

270 [ 

271 card 

272 for card in hdu.header.cards 

273 if card.keyword in _COMPRESSION_KEYS 

274 or any(card.keyword.startswith(k) for k in _COMPRESSION_PREFIX_KEYS) 

275 ] 

276 ) 

277 # This is an opportunity to fix CFITSIO's non-standard writing of the 

278 # old RICE_ONE value instead of RICE_1. 

279 if header["ZCMPTYPE"] == "RICE_ONE": 

280 header["ZCMPTYPE"] = "RICE_1" 

281 return cls(header=header, data=hdu.data) 

282 

283 

284@final 

285@dataclasses.dataclass 

286class FitsOpaqueMetadata(OpaqueArchiveMetadata): 

287 """Opaque metadata that may be carried around by a serializable type to 

288 propagate serialization options and opaque information without that type 

289 knowing how it was serialized. 

290 """ 

291 

292 headers: dict[ExtensionKey, astropy.io.fits.Header] = dataclasses.field(default_factory=dict) 

293 """FITS headers found (but not interpreted and stripped) when reading, to 

294 be propagated on write. 

295 

296 Keys are EXTNAME/EXTVER combinations, or ("", 1) for the primary header. 

297 Header information in opaque metadata is considered immutable, allowing it 

298 to be transferred by reference to copies and subsets of the object it is 

299 attached to. 

300 """ 

301 

302 precompressed: dict[str, PrecompressedImage] = dataclasses.field(default_factory=dict) 

303 """FITS tile-compressed HDUs that should be written out directly instead 

304 of the in-memory data provided. 

305 

306 Keys are EXTNAME values, which must be unique (no EXTVER disambiguation). 

307 Precompressed pixel values are never copied or transferred to subsets. 

308 """ 

309 

310 def add_header( 

311 self, 

312 header: astropy.io.fits.Header, 

313 name: str | None = None, 

314 ver: int | None = None, 

315 key: ExtensionKey | None = None, 

316 ) -> None: 

317 """Add a header to the opaque metadata if it is not already present, 

318 and strip EXTNAME and EXTVER if present. 

319 

320 Parameters 

321 ---------- 

322 header 

323 Header to add. May be modified in place. 

324 name 

325 EXTNAME (all caps). If not provided, the EXTNAME card must be 

326 present in the header. Use ``""`` for the primary header. 

327 ver 

328 EXTVER. If not provided and the EXTVER card is not present, 

329 defaults to 1. 

330 key 

331 Combination of EXTNAME and EXTVER; used instead of both ``name`` 

332 and ``ver`` if provided. 

333 """ 

334 if key is None: 

335 if name is None: 

336 name = header["EXTNAME"] 

337 if ver is None: 

338 ver = header.get("EXTVER", 1) 

339 key = ExtensionKey(name, ver) 

340 strip_butler_cards(header) 

341 if key not in self.headers: 

342 header.remove("EXTNAME", ignore_missing=True) 

343 header.remove("EXTVER", ignore_missing=True) 

344 self.headers[key] = header 

345 

346 def maybe_use_precompressed(self, name: str) -> astropy.io.fits.BinTableHDU | None: 

347 """Look up the given EXTNAME to see if there is a tile compressed image 

348 HDU that should be used directly, instead of requantizing. 

349 

350 Parameters 

351 ---------- 

352 name 

353 EXTNAME (all caps). 

354 

355 Returns 

356 ------- 

357 `astropy.io.fits.BinTableHDU` | `None` 

358 An already-compressed HDU, in binary table form, or `None` if there 

359 is no precompressed HDU for this EXTNAME. 

360 """ 

361 if (precompressed := self.precompressed.get(name)) is None: 

362 return None 

363 return astropy.io.fits.BinTableHDU(precompressed.data, header=precompressed.header.copy(), name=name) 

364 

365 def extract_legacy_primary_header(self, header: astropy.io.fits.Header) -> None: 

366 """Update the opaque metadata with the header of the primary HDU 

367 of a legacy (`lsst.afw.image`) FITS file. 

368 """ 

369 primary_header = header.copy(strip=True) 

370 # No idea what these spare TAN-SIP headers are doing in the afw 

371 # FITS files, but we'll strip them here: 

372 primary_header.remove("A_ORDER", ignore_missing=True) 

373 primary_header.remove("B_ORDER", ignore_missing=True) 

374 strip_legacy_exposure_cards(primary_header) 

375 strip_butler_cards(primary_header) 

376 self.headers[ExtensionKey()] = primary_header 

377 

378 def copy(self) -> FitsOpaqueMetadata: 

379 # Docstring inherited. 

380 return FitsOpaqueMetadata(headers=self.headers) 

381 

382 def subset(self, bbox: Box) -> FitsOpaqueMetadata: 

383 # Docstring inherited. 

384 return FitsOpaqueMetadata(headers=self.headers) 

385 

386 

387def add_offset_wcs(header: astropy.io.fits.Header, *, x: int | float, y: int | float, key: str = "A") -> None: 

388 """Add a trivial FITS WCS to a header that applies the appropriate offset 

389 to map FITS array coordinates to a logical pixel grid. 

390 

391 Parameters 

392 ---------- 

393 header 

394 Header to update in-place. 

395 x 

396 Logical coordinate of the first column. 

397 y 

398 Logical coordinate of the first row. 

399 key 

400 Single-character suffix for this WCS. 

401 """ 

402 header.set(f"CTYPE1{key}", "LINEAR") 

403 header.set(f"CTYPE2{key}", "LINEAR") 

404 header.set(f"CRPIX1{key}", 1.0) 

405 header.set(f"CRPIX2{key}", 1.0) 

406 header.set(f"CRVAL1{key}", float(x)) 

407 header.set(f"CRVAL2{key}", float(y)) 

408 header.set(f"CUNIT1{key}", "PIXEL") 

409 header.set(f"CUNIT2{key}", "PIXEL") 

410 

411 

412_WCS_VECTOR_KEYS = ("CUNIT", "CRPIX", "CRPIX", "CRVAL", "CRDELT", "CROTA", "CRDER", "CSYER", "CDELT") 

413_WCS_MATRIX_KEYS = ("CD{0}_{1}", "PC{0}_{1}") 

414 

415 

416def strip_wcs_cards(header: astropy.io.fits.Header) -> None: 

417 """Strip WCS cards from a FITS header. 

418 

419 This does *not* attempt to cover all possible FITS WCS forms; it focuses on 

420 the ones we actually plan to write (simple undistorted ones + TAN-SIP). 

421 """ 

422 wcsaxes = header.pop("WCSAXES", 2) 

423 for wcsname in [""] + list(string.ascii_uppercase): 

424 header.remove("RADESYS" + wcsname, ignore_missing=True) 

425 if "CTYPE1" + wcsname in header: 

426 ctype: str = "" # just for linters that can't figure out that the loop always executes 

427 for n in range(wcsaxes): 

428 suffix = f"{n + 1}{wcsname}" 

429 ctype = header.pop("CTYPE" + suffix) 

430 for key in _WCS_VECTOR_KEYS: 

431 header.remove(key + suffix, ignore_missing=True) 

432 for m in range(wcsaxes): 

433 for tmpl in _WCS_MATRIX_KEYS: 

434 header.remove(tmpl.format(m + 1, suffix), ignore_missing=True) 

435 if ctype.endswith("-SIP"): 

436 _strip_sip_poly(header, wcsname, "A") 

437 _strip_sip_poly(header, wcsname, "B") 

438 _strip_sip_poly(header, wcsname, "AP") 

439 _strip_sip_poly(header, wcsname, "BP") 

440 header.remove("LONPOLE", ignore_missing=True) 

441 header.remove("LATPOLE", ignore_missing=True) 

442 header.remove("MJDREF", ignore_missing=True) 

443 header.remove("PV1_3", ignore_missing=True) 

444 header.remove("PV1_4", ignore_missing=True) 

445 

446 

447def _strip_sip_poly(header: astropy.io.fits.Header, wcsname: str, which: str) -> None: 

448 order: int | None = header.pop(f"{which}_ORDER{wcsname}", None) 

449 if order is not None: 

450 for i, j in itertools.product(range(order + 1), range(order + 1)): 

451 header.remove(f"{which}_{i}_{j}{wcsname}", ignore_missing=True) 

452 

453 

454def strip_legacy_exposure_cards(header: astropy.io.fits.Header) -> None: 

455 """Strip header keywords added by lsst.afw.image.Exposure.""" 

456 header.remove("AR_HDU", ignore_missing=True) 

457 for name in ( 

458 "FILTER", 

459 "DETECTOR", 

460 "VALID_POLYGON", 

461 "SKYWCS", 

462 "PSF", 

463 "SUMMARYSTATS", 

464 "AP_CORR_MAP", 

465 "PHOTOCALIB", 

466 ): 

467 header.remove(f"{name}_ID", ignore_missing=True) 

468 header.remove(f"ARCHIVE_ID_{name}", ignore_missing=True) 

469 

470 

471def strip_butler_cards(header: astropy.io.fits.Header) -> None: 

472 """Strip header keywords added by butler provenance that would be 

473 incorrect if propagated to a downstream file. 

474 """ 

475 for key in list(header): 

476 if key.startswith("LSST BUTLER"): 

477 del header[key]