Coverage for python / lsst / images / fits / _common.py: 39%
170 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-28 09:01 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-28 09:01 +0000
1# This file is part of lsst-images.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# Use of this source code is governed by a 3-clause BSD-style
10# license that can be found in the LICENSE file.
13from __future__ import annotations
15__all__ = (
16 "ExtensionHDU",
17 "ExtensionKey",
18 "FitsCompressionAlgorithm",
19 "FitsCompressionOptions",
20 "FitsDitherAlgorithm",
21 "FitsOpaqueMetadata",
22 "FitsQuantizationOptions",
23 "InvalidFitsArchiveError",
24 "PrecompressedImage",
25 "add_offset_wcs",
26 "strip_butler_cards",
27 "strip_legacy_exposure_cards",
28 "strip_wcs_cards",
29)
31import dataclasses
32import enum
33import itertools
34import string
35from typing import ClassVar, Self, final
37import astropy.io.fits
38import numpy as np
39import pydantic
41from .._geom import Box
42from ..serialization import OpaqueArchiveMetadata
44type ExtensionHDU = astropy.io.fits.ImageHDU | astropy.io.fits.CompImageHDU | astropy.io.fits.BinTableHDU
47@dataclasses.dataclass(frozen=True)
48class ExtensionKey:
49 """The identifiers for a single FITS extension HDU within a file."""
51 name: str = ""
52 """Value of the EXTNAME keyword, or an empty string for the primary HDU."""
54 ver: int = 1
55 """Value of the EXTVER keyword (which may be absent if it is ``1``)."""
57 @classmethod
58 def from_index_row(cls, row: np.ndarray) -> ExtensionKey:
59 """Construct from a row of the index binary table appended to the
60 FITS files written by the package.
61 """
62 return cls(str(row["EXTNAME"]).upper(), int(row["EXTVER"]))
64 @classmethod
65 def from_str(cls, source: str) -> ExtensionKey:
66 """Construct from the `str` coercion of this type, which is used
67 as the 'source' field in various Pydantic models that serve as
68 references to other HDUs.
69 """
70 name, comma, ver = source.removeprefix("fits:").partition(",")
71 if not comma:
72 return cls(name)
73 else:
74 return cls(name, int(ver))
76 def __str__(self) -> str:
77 if self.ver > 1:
78 return f"fits:{self.name},{self.ver}"
79 else:
80 return f"fits:{self.name}"
83class InvalidFitsArchiveError(RuntimeError):
84 """The error type raised when the content of a FITS file presumed to have
85 been written by FitsOutputArchive is not self-consistent.
86 """
89class FitsCompressionAlgorithm(enum.StrEnum):
90 """FITS compression algorithms supported by this package.
92 See the FITS standard for definitions.
93 """
95 GZIP_1 = "GZIP_1"
96 GZIP_2 = "GZIP_2"
97 RICE_1 = "RICE_1"
100class FitsDitherAlgorithm(enum.StrEnum):
101 """FITS quantization dither algorithms supported by this package.
103 See the FITS standard for definitions.
104 """
106 NO_DITHER = "NO_DITHER"
107 SUBTRACTIVE_DITHER_1 = "SUBTRACTIVE_DITHER_1"
108 SUBTRACTIVE_DITHER_2 = "SUBTRACTIVE_DITHER_2"
110 def to_astropy_quantize_method(self) -> int:
111 """Convert to the integer code used by Astropy."""
112 match self:
113 case self.NO_DITHER:
114 return -1
115 case self.SUBTRACTIVE_DITHER_1:
116 return 1
117 case self.SUBTRACTIVE_DITHER_2:
118 return 2
119 raise AssertionError("Invalid enum value.")
122class FitsQuantizationOptions(pydantic.BaseModel, frozen=True):
123 """Quantization options for FITS compression."""
125 dither: FitsDitherAlgorithm
126 """How to add random noise during quantization to reduce biases."""
128 level: float
129 """Quantization level.
131 When positive, this is the fraction of the measured standard deviation that
132 corresponds to an integer step. When negative, it is ``-ZSCALE``, the
133 scaling to apply directly to the original pixels before quantization.
134 """
136 seed: int | None = None
137 """Random number seed to use for dithering.
139 Values between 1 and 10000 (inclusive) are used directly. ``0`` will
140 generate a value from the current time, and ``-1`` will generate a value
141 from the checksum of the image.
143 If `None`, the ``compression_seed`` parameter must be passed to
144 `FitsOutputArchive.open` if any quantized compression is configured.
145 """
148class FitsCompressionOptions(pydantic.BaseModel, frozen=True):
149 """Configuration options for FITS compression."""
151 algorithm: FitsCompressionAlgorithm = FitsCompressionAlgorithm.GZIP_2
152 """Compression algorithm to use."""
154 tile_shape: tuple[int, ...] | None = None
155 """Shape ``(..., y, x)`` of independently compressed tiles.
157 The default of `None` compresses each row separately.
158 """
160 quantization: FitsQuantizationOptions | None = None
161 """Quantization to apply before compression, if any."""
163 DEFAULT: ClassVar[FitsCompressionOptions | None]
164 """Default compression options (lossless ``GZIP_2``)."""
166 LOSSY: ClassVar[FitsCompressionOptions]
167 """Default lossy compression options."""
169 def make_hdu(self, data: np.ndarray, name: str) -> astropy.io.fits.CompImageHDU:
170 """Make an `astropy.io.fits.CompImageHDU` object from these options."""
171 if self.quantization is not None:
172 return astropy.io.fits.CompImageHDU(
173 data,
174 name=name,
175 compression_type=self.algorithm.value,
176 tile_shape=self.tile_shape,
177 quantize_method=self.quantization.dither.to_astropy_quantize_method(),
178 quantize_level=self.quantization.level,
179 dither_seed=self.quantization.seed,
180 )
181 else:
182 return astropy.io.fits.CompImageHDU(
183 data,
184 name=name,
185 compression_type=self.algorithm.value,
186 tile_shape=self.tile_shape,
187 quantize_level=0.0,
188 )
191FitsCompressionOptions.DEFAULT = FitsCompressionOptions()
192FitsCompressionOptions.LOSSY = FitsCompressionOptions(
193 algorithm=FitsCompressionAlgorithm.RICE_1,
194 quantization=FitsQuantizationOptions(dither=FitsDitherAlgorithm.SUBTRACTIVE_DITHER_2, level=16.0),
195)
198_COMPRESSION_KEYS = frozenset(
199 (
200 "ZIMAGE",
201 "ZCMPTYPE",
202 "ZBITPIX",
203 "ZNAXIS",
204 "ZMASKCMP",
205 "ZQUANTIZ",
206 "ZDITHER0",
207 "ZCHECKSUM",
208 "ZDATASUM",
209 )
210)
211_COMPRESSION_PREFIX_KEYS = ("ZNAXIS", "ZTILE", "ZNAME", "ZVAL")
214@dataclasses.dataclass
215class PrecompressedImage:
216 """Already-compressed FITS HDUs that are attached to high-level objects
217 via `FitsOpaqueMetadata`, allowing lossy-compressed pixel values to be
218 round-tripped exactly.
219 """
221 header: astropy.io.fits.Header
222 """Header for the HDU.
224 This contains only FITS tile-compression keywords.
225 """
227 data: astropy.io.fits.FITS_rec
228 """FITS binary table data that serves as the low-level representation of a
229 tile-compressed image HDU.
230 """
232 @classmethod
233 def from_bintable(cls, hdu: astropy.io.fits.BinTableHDU) -> Self:
234 """Construct from a binary table HDU.
236 Parameters
237 ----------
238 hdu
239 Binary table HDU, typically read from a FITS file opened with
240 ``disable_image_compression=True``.
242 Returns
243 -------
244 PrecompressedImage
245 A `PrecompressedImage` instance.
246 """
247 header = astropy.io.fits.Header(
248 [
249 card
250 for card in hdu.header.cards
251 if card.keyword in _COMPRESSION_KEYS
252 or any(card.keyword.startswith(k) for k in _COMPRESSION_PREFIX_KEYS)
253 ]
254 )
255 # This is an opportunity to fix CFITSIO's non-standard writing of the
256 # old RICE_ONE value instead of RICE_1.
257 if header["ZCMPTYPE"] == "RICE_ONE":
258 header["ZCMPTYPE"] = "RICE_1"
259 return cls(header=header, data=hdu.data)
262@final
263@dataclasses.dataclass
264class FitsOpaqueMetadata(OpaqueArchiveMetadata):
265 """Opaque metadata that may be carried around by a serializable type to
266 propagate serialization options and opaque information without that type
267 knowing how it was serialized.
268 """
270 headers: dict[ExtensionKey, astropy.io.fits.Header] = dataclasses.field(default_factory=dict)
271 """FITS headers found (but not interpreted and stripped) when reading, to
272 be propagated on write.
274 Keys are EXTNAME/EXTVER combinations, or ("", 1) for the primary header.
275 Header information in opaque metadata is considered immutable, allowing it
276 to be transferred by reference to copies and subsets of the object it is
277 attached to.
278 """
280 precompressed: dict[str, PrecompressedImage] = dataclasses.field(default_factory=dict)
281 """FITS tile-compressed HDUs that should be written out directly instead
282 of the in-memory data provided.
284 Keys are EXTNAME values, which must be unique (no EXTVER disambiguation).
285 Precompressed pixel values are never copied or transferred to subsets.
286 """
288 def add_header(
289 self,
290 header: astropy.io.fits.Header,
291 name: str | None = None,
292 ver: int | None = None,
293 key: ExtensionKey | None = None,
294 ) -> None:
295 """Add a header to the opaque metadata if it is not already present,
296 and strip EXTNAME and EXTVER if present.
298 Parameters
299 ----------
300 header
301 Header to add. May be modified in place.
302 name
303 EXTNAME (all caps). If not provided, the EXTNAME card must be
304 present in the header. Use ``""`` for the primary header.
305 ver
306 EXTVER. If not provided and the EXTVER card is not present,
307 defaults to 1.
308 key
309 Combination of EXTNAME and EXTVER; used instead of both ``name``
310 and ``ver`` if provided.
311 """
312 if key is None:
313 if name is None:
314 name = header["EXTNAME"]
315 if ver is None:
316 ver = header.get("EXTVER", 1)
317 key = ExtensionKey(name, ver)
318 strip_butler_cards(header)
319 if key not in self.headers:
320 header.remove("EXTNAME", ignore_missing=True)
321 header.remove("EXTVER", ignore_missing=True)
322 self.headers[key] = header
324 def maybe_use_precompressed(self, name: str) -> astropy.io.fits.BinTableHDU | None:
325 """Look up the given EXTNAME to see if there is a tile compressed image
326 HDU that should be used directly, instead of requantizing.
328 Parameters
329 ----------
330 name
331 EXTNAME (all caps).
333 Returns
334 -------
335 `astropy.io.fits.BinTableHDU` | `None`
336 An already-compressed HDU, in binary table form, or `None` if there
337 is no precompressed HDU for this EXTNAME.
338 """
339 if (precompressed := self.precompressed.get(name)) is None:
340 return None
341 return astropy.io.fits.BinTableHDU(precompressed.data, header=precompressed.header.copy(), name=name)
343 def extract_legacy_primary_header(self, header: astropy.io.fits.Header) -> None:
344 """Update the opaque metadata with the header of the primary HDU
345 of a legacy (`lsst.afw.image`) FITS file.
346 """
347 primary_header = header.copy(strip=True)
348 # No idea what these spare TAN-SIP headers are doing in the afw
349 # FITS files, but we'll strip them here:
350 primary_header.remove("A_ORDER", ignore_missing=True)
351 primary_header.remove("B_ORDER", ignore_missing=True)
352 strip_legacy_exposure_cards(primary_header)
353 strip_butler_cards(primary_header)
354 self.headers[ExtensionKey()] = primary_header
356 def copy(self) -> FitsOpaqueMetadata:
357 # Docstring inherited.
358 return FitsOpaqueMetadata(headers=self.headers)
360 def subset(self, bbox: Box) -> FitsOpaqueMetadata:
361 # Docstring inherited.
362 return FitsOpaqueMetadata(headers=self.headers)
365def add_offset_wcs(header: astropy.io.fits.Header, *, x: int | float, y: int | float, key: str = "A") -> None:
366 """Add a trivial FITS WCS to a header that applies the appropriate offset
367 to map FITS array coordinates to a logical pixel grid.
369 Parameters
370 ----------
371 header
372 Header to update in-place.
373 x
374 Logical coordinate of the first column.
375 y
376 Logical coordinate of the first row.
377 key
378 Single-character suffix for this WCS.
379 """
380 header.set(f"CTYPE1{key}", "LINEAR")
381 header.set(f"CTYPE2{key}", "LINEAR")
382 header.set(f"CRPIX1{key}", 1.0)
383 header.set(f"CRPIX2{key}", 1.0)
384 header.set(f"CRVAL1{key}", float(x))
385 header.set(f"CRVAL2{key}", float(y))
386 header.set(f"CUNIT1{key}", "PIXEL")
387 header.set(f"CUNIT2{key}", "PIXEL")
390_WCS_VECTOR_KEYS = ("CUNIT", "CRPIX", "CRPIX", "CRVAL", "CRDELT", "CROTA", "CRDER", "CSYER", "CDELT")
391_WCS_MATRIX_KEYS = ("CD{0}_{1}", "PC{0}_{1}")
394def strip_wcs_cards(header: astropy.io.fits.Header) -> None:
395 """Strip WCS cards from a FITS header.
397 This does *not* attempt to cover all possible FITS WCS forms; it focuses on
398 the ones we actually plan to write (simple undistorted ones + TAN-SIP).
399 """
400 wcsaxes = header.pop("WCSAXES", 2)
401 for wcsname in [""] + list(string.ascii_uppercase):
402 header.remove("RADESYS" + wcsname, ignore_missing=True)
403 if "CTYPE1" + wcsname in header:
404 ctype: str = "" # just for linters that can't figure out that the loop always executes
405 for n in range(wcsaxes):
406 suffix = f"{n + 1}{wcsname}"
407 ctype = header.pop("CTYPE" + suffix)
408 for key in _WCS_VECTOR_KEYS:
409 header.remove(key + suffix, ignore_missing=True)
410 for m in range(wcsaxes):
411 for tmpl in _WCS_MATRIX_KEYS:
412 header.remove(tmpl.format(m + 1, suffix), ignore_missing=True)
413 if ctype.endswith("-SIP"):
414 _strip_sip_poly(header, wcsname, "A")
415 _strip_sip_poly(header, wcsname, "B")
416 _strip_sip_poly(header, wcsname, "AP")
417 _strip_sip_poly(header, wcsname, "BP")
418 header.remove("LONPOLE", ignore_missing=True)
419 header.remove("LATPOLE", ignore_missing=True)
420 header.remove("MJDREF", ignore_missing=True)
421 header.remove("PV1_3", ignore_missing=True)
422 header.remove("PV1_4", ignore_missing=True)
425def _strip_sip_poly(header: astropy.io.fits.Header, wcsname: str, which: str) -> None:
426 order: int | None = header.pop(f"{which}_ORDER{wcsname}", None)
427 if order is not None:
428 for i, j in itertools.product(range(order + 1), range(order + 1)):
429 header.remove(f"{which}_{i}_{j}{wcsname}", ignore_missing=True)
432def strip_legacy_exposure_cards(header: astropy.io.fits.Header) -> None:
433 """Strip header keywords added by lsst.afw.image.Exposure."""
434 header.remove("AR_HDU", ignore_missing=True)
435 for name in (
436 "FILTER",
437 "DETECTOR",
438 "VALID_POLYGON",
439 "SKYWCS",
440 "PSF",
441 "SUMMARYSTATS",
442 "AP_CORR_MAP",
443 "PHOTOCALIB",
444 ):
445 header.remove(f"{name}_ID", ignore_missing=True)
446 header.remove(f"ARCHIVE_ID_{name}", ignore_missing=True)
449def strip_butler_cards(header: astropy.io.fits.Header) -> None:
450 """Strip header keywords added by butler provenance that would be
451 incorrect if propagated to a downstream file.
452 """
453 for key in list(header):
454 if key.startswith("LSST BUTLER"):
455 del header[key]