Coverage for python / lsst / images / fits / _common.py: 40%
182 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-01 08:36 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-01 08:36 +0000
1# This file is part of lsst-images.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# Use of this source code is governed by a 3-clause BSD-style
10# license that can be found in the LICENSE file.
13from __future__ import annotations
15__all__ = (
16 "FITS_SOURCE_REGEX",
17 "JSON_COLUMN",
18 "JSON_EXTNAME",
19 "ExtensionHDU",
20 "ExtensionKey",
21 "FitsCompressionAlgorithm",
22 "FitsCompressionOptions",
23 "FitsDitherAlgorithm",
24 "FitsOpaqueMetadata",
25 "FitsQuantizationOptions",
26 "InvalidFitsArchiveError",
27 "PointerModel",
28 "PrecompressedImage",
29 "add_offset_wcs",
30 "strip_butler_cards",
31 "strip_legacy_exposure_cards",
32 "strip_wcs_cards",
33)
35import dataclasses
36import enum
37import itertools
38import re
39import string
40from typing import ClassVar, Self, final
42import astropy.io.fits
43import numpy as np
44import pydantic
46from .._geom import Box
47from ..serialization import ArchiveReadError, OpaqueArchiveMetadata, TableColumnModel
49type ExtensionHDU = astropy.io.fits.ImageHDU | astropy.io.fits.CompImageHDU | astropy.io.fits.BinTableHDU
51FITS_SOURCE_REGEX = re.compile(r"fits:(?P<extname>[\w/\-]+)(,(?P<extver>\d+))?(\[\d+\])?")
53JSON_EXTNAME: str = "JSON"
54JSON_COLUMN: str = "JSON"
57@dataclasses.dataclass(frozen=True)
58class ExtensionKey:
59 """The identifiers for a single FITS extension HDU within a file."""
61 name: str = ""
62 """Value of the EXTNAME keyword, or an empty string for the primary HDU."""
64 ver: int = 1
65 """Value of the EXTVER keyword (which may be absent if it is ``1``)."""
67 @classmethod
68 def from_index_row(cls, row: np.ndarray) -> ExtensionKey:
69 """Construct from a row of the index binary table appended to the
70 FITS files written by the package.
71 """
72 return cls(str(row["EXTNAME"]).upper(), int(row["EXTVER"]))
74 @classmethod
75 def from_str(cls, source: str) -> ExtensionKey:
76 """Construct from the `str` coercion of this type, which is used
77 as the 'source' field in various Pydantic models that serve as
78 references to other HDUs.
79 """
80 if (m := FITS_SOURCE_REGEX.fullmatch(source)) is None:
81 raise ArchiveReadError(f"Bad 'source' string for FITS: {source!r}.")
82 ver = 1
83 if m.group("extver") is not None:
84 ver = int(m.group("extver"))
85 return cls(m.group("extname"), ver)
87 def check(self) -> None:
88 if not FITS_SOURCE_REGEX.match(str(self)):
89 raise ValueError(
90 f"Invalid source key: '{str(self)}'; name characters must be alphanumeric, '-', '_', or '/'."
91 )
93 def __str__(self) -> str:
94 if self.ver > 1:
95 return f"fits:{self.name},{self.ver}"
96 else:
97 return f"fits:{self.name}"
100class PointerModel(pydantic.BaseModel):
101 column: TableColumnModel = pydantic.Field(description="Table column for this cell this pointer targets.")
102 row: int = pydantic.Field(description="Zero-indexed row for cell this pointer targets.")
105class InvalidFitsArchiveError(RuntimeError):
106 """The error type raised when the content of a FITS file presumed to have
107 been written by FitsOutputArchive is not self-consistent.
108 """
111class FitsCompressionAlgorithm(enum.StrEnum):
112 """FITS compression algorithms supported by this package.
114 See the FITS standard for definitions.
115 """
117 GZIP_1 = "GZIP_1"
118 GZIP_2 = "GZIP_2"
119 RICE_1 = "RICE_1"
122class FitsDitherAlgorithm(enum.StrEnum):
123 """FITS quantization dither algorithms supported by this package.
125 See the FITS standard for definitions.
126 """
128 NO_DITHER = "NO_DITHER"
129 SUBTRACTIVE_DITHER_1 = "SUBTRACTIVE_DITHER_1"
130 SUBTRACTIVE_DITHER_2 = "SUBTRACTIVE_DITHER_2"
132 def to_astropy_quantize_method(self) -> int:
133 """Convert to the integer code used by Astropy."""
134 match self:
135 case self.NO_DITHER:
136 return -1
137 case self.SUBTRACTIVE_DITHER_1:
138 return 1
139 case self.SUBTRACTIVE_DITHER_2:
140 return 2
141 raise AssertionError("Invalid enum value.")
144class FitsQuantizationOptions(pydantic.BaseModel, frozen=True):
145 """Quantization options for FITS compression."""
147 dither: FitsDitherAlgorithm
148 """How to add random noise during quantization to reduce biases."""
150 level: float
151 """Quantization level.
153 When positive, this is the fraction of the measured standard deviation that
154 corresponds to an integer step. When negative, it is ``-ZSCALE``, the
155 scaling to apply directly to the original pixels before quantization.
156 """
158 seed: int | None = None
159 """Random number seed to use for dithering.
161 Values between 1 and 10000 (inclusive) are used directly. ``0`` will
162 generate a value from the current time, and ``-1`` will generate a value
163 from the checksum of the image.
165 If `None`, the ``compression_seed`` parameter must be passed to
166 `FitsOutputArchive.open` if any quantized compression is configured.
167 """
170class FitsCompressionOptions(pydantic.BaseModel, frozen=True):
171 """Configuration options for FITS compression."""
173 algorithm: FitsCompressionAlgorithm = FitsCompressionAlgorithm.GZIP_2
174 """Compression algorithm to use."""
176 tile_shape: tuple[int, ...] | None = None
177 """Shape ``(..., y, x)`` of independently compressed tiles.
179 The default of `None` compresses each row separately.
180 """
182 quantization: FitsQuantizationOptions | None = None
183 """Quantization to apply before compression, if any."""
185 DEFAULT: ClassVar[FitsCompressionOptions | None]
186 """Default compression options (lossless ``GZIP_2``)."""
188 LOSSY: ClassVar[FitsCompressionOptions]
189 """Default lossy compression options."""
191 def make_hdu(self, data: np.ndarray, name: str) -> astropy.io.fits.CompImageHDU:
192 """Make an `astropy.io.fits.CompImageHDU` object from these options."""
193 if self.quantization is not None:
194 return astropy.io.fits.CompImageHDU(
195 data,
196 name=name,
197 compression_type=self.algorithm.value,
198 tile_shape=self.tile_shape,
199 quantize_method=self.quantization.dither.to_astropy_quantize_method(),
200 quantize_level=self.quantization.level,
201 dither_seed=self.quantization.seed,
202 )
203 else:
204 return astropy.io.fits.CompImageHDU(
205 data,
206 name=name,
207 compression_type=self.algorithm.value,
208 tile_shape=self.tile_shape,
209 quantize_level=0.0,
210 )
213FitsCompressionOptions.DEFAULT = FitsCompressionOptions()
214FitsCompressionOptions.LOSSY = FitsCompressionOptions(
215 algorithm=FitsCompressionAlgorithm.RICE_1,
216 quantization=FitsQuantizationOptions(dither=FitsDitherAlgorithm.SUBTRACTIVE_DITHER_2, level=16.0),
217)
220_COMPRESSION_KEYS = frozenset(
221 (
222 "ZIMAGE",
223 "ZCMPTYPE",
224 "ZBITPIX",
225 "ZNAXIS",
226 "ZMASKCMP",
227 "ZQUANTIZ",
228 "ZDITHER0",
229 "ZCHECKSUM",
230 "ZDATASUM",
231 )
232)
233_COMPRESSION_PREFIX_KEYS = ("ZNAXIS", "ZTILE", "ZNAME", "ZVAL")
236@dataclasses.dataclass
237class PrecompressedImage:
238 """Already-compressed FITS HDUs that are attached to high-level objects
239 via `FitsOpaqueMetadata`, allowing lossy-compressed pixel values to be
240 round-tripped exactly.
241 """
243 header: astropy.io.fits.Header
244 """Header for the HDU.
246 This contains only FITS tile-compression keywords.
247 """
249 data: astropy.io.fits.FITS_rec
250 """FITS binary table data that serves as the low-level representation of a
251 tile-compressed image HDU.
252 """
254 @classmethod
255 def from_bintable(cls, hdu: astropy.io.fits.BinTableHDU) -> Self:
256 """Construct from a binary table HDU.
258 Parameters
259 ----------
260 hdu
261 Binary table HDU, typically read from a FITS file opened with
262 ``disable_image_compression=True``.
264 Returns
265 -------
266 PrecompressedImage
267 A `PrecompressedImage` instance.
268 """
269 header = astropy.io.fits.Header(
270 [
271 card
272 for card in hdu.header.cards
273 if card.keyword in _COMPRESSION_KEYS
274 or any(card.keyword.startswith(k) for k in _COMPRESSION_PREFIX_KEYS)
275 ]
276 )
277 # This is an opportunity to fix CFITSIO's non-standard writing of the
278 # old RICE_ONE value instead of RICE_1.
279 if header["ZCMPTYPE"] == "RICE_ONE":
280 header["ZCMPTYPE"] = "RICE_1"
281 return cls(header=header, data=hdu.data)
284@final
285@dataclasses.dataclass
286class FitsOpaqueMetadata(OpaqueArchiveMetadata):
287 """Opaque metadata that may be carried around by a serializable type to
288 propagate serialization options and opaque information without that type
289 knowing how it was serialized.
290 """
292 headers: dict[ExtensionKey, astropy.io.fits.Header] = dataclasses.field(default_factory=dict)
293 """FITS headers found (but not interpreted and stripped) when reading, to
294 be propagated on write.
296 Keys are EXTNAME/EXTVER combinations, or ("", 1) for the primary header.
297 Header information in opaque metadata is considered immutable, allowing it
298 to be transferred by reference to copies and subsets of the object it is
299 attached to.
300 """
302 precompressed: dict[str, PrecompressedImage] = dataclasses.field(default_factory=dict)
303 """FITS tile-compressed HDUs that should be written out directly instead
304 of the in-memory data provided.
306 Keys are EXTNAME values, which must be unique (no EXTVER disambiguation).
307 Precompressed pixel values are never copied or transferred to subsets.
308 """
310 def add_header(
311 self,
312 header: astropy.io.fits.Header,
313 name: str | None = None,
314 ver: int | None = None,
315 key: ExtensionKey | None = None,
316 ) -> None:
317 """Add a header to the opaque metadata if it is not already present,
318 and strip EXTNAME and EXTVER if present.
320 Parameters
321 ----------
322 header
323 Header to add. May be modified in place.
324 name
325 EXTNAME (all caps). If not provided, the EXTNAME card must be
326 present in the header. Use ``""`` for the primary header.
327 ver
328 EXTVER. If not provided and the EXTVER card is not present,
329 defaults to 1.
330 key
331 Combination of EXTNAME and EXTVER; used instead of both ``name``
332 and ``ver`` if provided.
333 """
334 if key is None:
335 if name is None:
336 name = header["EXTNAME"]
337 if ver is None:
338 ver = header.get("EXTVER", 1)
339 key = ExtensionKey(name, ver)
340 strip_butler_cards(header)
341 if key not in self.headers:
342 header.remove("EXTNAME", ignore_missing=True)
343 header.remove("EXTVER", ignore_missing=True)
344 self.headers[key] = header
346 def maybe_use_precompressed(self, name: str) -> astropy.io.fits.BinTableHDU | None:
347 """Look up the given EXTNAME to see if there is a tile compressed image
348 HDU that should be used directly, instead of requantizing.
350 Parameters
351 ----------
352 name
353 EXTNAME (all caps).
355 Returns
356 -------
357 `astropy.io.fits.BinTableHDU` | `None`
358 An already-compressed HDU, in binary table form, or `None` if there
359 is no precompressed HDU for this EXTNAME.
360 """
361 if (precompressed := self.precompressed.get(name)) is None:
362 return None
363 return astropy.io.fits.BinTableHDU(precompressed.data, header=precompressed.header.copy(), name=name)
365 def extract_legacy_primary_header(self, header: astropy.io.fits.Header) -> None:
366 """Update the opaque metadata with the header of the primary HDU
367 of a legacy (`lsst.afw.image`) FITS file.
368 """
369 primary_header = header.copy(strip=True)
370 # No idea what these spare TAN-SIP headers are doing in the afw
371 # FITS files, but we'll strip them here:
372 primary_header.remove("A_ORDER", ignore_missing=True)
373 primary_header.remove("B_ORDER", ignore_missing=True)
374 strip_legacy_exposure_cards(primary_header)
375 strip_butler_cards(primary_header)
376 self.headers[ExtensionKey()] = primary_header
378 def copy(self) -> FitsOpaqueMetadata:
379 # Docstring inherited.
380 return FitsOpaqueMetadata(headers=self.headers)
382 def subset(self, bbox: Box) -> FitsOpaqueMetadata:
383 # Docstring inherited.
384 return FitsOpaqueMetadata(headers=self.headers)
387def add_offset_wcs(header: astropy.io.fits.Header, *, x: int | float, y: int | float, key: str = "A") -> None:
388 """Add a trivial FITS WCS to a header that applies the appropriate offset
389 to map FITS array coordinates to a logical pixel grid.
391 Parameters
392 ----------
393 header
394 Header to update in-place.
395 x
396 Logical coordinate of the first column.
397 y
398 Logical coordinate of the first row.
399 key
400 Single-character suffix for this WCS.
401 """
402 header.set(f"CTYPE1{key}", "LINEAR")
403 header.set(f"CTYPE2{key}", "LINEAR")
404 header.set(f"CRPIX1{key}", 1.0)
405 header.set(f"CRPIX2{key}", 1.0)
406 header.set(f"CRVAL1{key}", float(x))
407 header.set(f"CRVAL2{key}", float(y))
408 header.set(f"CUNIT1{key}", "PIXEL")
409 header.set(f"CUNIT2{key}", "PIXEL")
412_WCS_VECTOR_KEYS = ("CUNIT", "CRPIX", "CRPIX", "CRVAL", "CRDELT", "CROTA", "CRDER", "CSYER", "CDELT")
413_WCS_MATRIX_KEYS = ("CD{0}_{1}", "PC{0}_{1}")
416def strip_wcs_cards(header: astropy.io.fits.Header) -> None:
417 """Strip WCS cards from a FITS header.
419 This does *not* attempt to cover all possible FITS WCS forms; it focuses on
420 the ones we actually plan to write (simple undistorted ones + TAN-SIP).
421 """
422 wcsaxes = header.pop("WCSAXES", 2)
423 for wcsname in [""] + list(string.ascii_uppercase):
424 header.remove("RADESYS" + wcsname, ignore_missing=True)
425 if "CTYPE1" + wcsname in header:
426 ctype: str = "" # just for linters that can't figure out that the loop always executes
427 for n in range(wcsaxes):
428 suffix = f"{n + 1}{wcsname}"
429 ctype = header.pop("CTYPE" + suffix)
430 for key in _WCS_VECTOR_KEYS:
431 header.remove(key + suffix, ignore_missing=True)
432 for m in range(wcsaxes):
433 for tmpl in _WCS_MATRIX_KEYS:
434 header.remove(tmpl.format(m + 1, suffix), ignore_missing=True)
435 if ctype.endswith("-SIP"):
436 _strip_sip_poly(header, wcsname, "A")
437 _strip_sip_poly(header, wcsname, "B")
438 _strip_sip_poly(header, wcsname, "AP")
439 _strip_sip_poly(header, wcsname, "BP")
440 header.remove("LONPOLE", ignore_missing=True)
441 header.remove("LATPOLE", ignore_missing=True)
442 header.remove("MJDREF", ignore_missing=True)
443 header.remove("PV1_3", ignore_missing=True)
444 header.remove("PV1_4", ignore_missing=True)
447def _strip_sip_poly(header: astropy.io.fits.Header, wcsname: str, which: str) -> None:
448 order: int | None = header.pop(f"{which}_ORDER{wcsname}", None)
449 if order is not None:
450 for i, j in itertools.product(range(order + 1), range(order + 1)):
451 header.remove(f"{which}_{i}_{j}{wcsname}", ignore_missing=True)
454def strip_legacy_exposure_cards(header: astropy.io.fits.Header) -> None:
455 """Strip header keywords added by lsst.afw.image.Exposure."""
456 header.remove("AR_HDU", ignore_missing=True)
457 for name in (
458 "FILTER",
459 "DETECTOR",
460 "VALID_POLYGON",
461 "SKYWCS",
462 "PSF",
463 "SUMMARYSTATS",
464 "AP_CORR_MAP",
465 "PHOTOCALIB",
466 ):
467 header.remove(f"{name}_ID", ignore_missing=True)
468 header.remove(f"ARCHIVE_ID_{name}", ignore_missing=True)
471def strip_butler_cards(header: astropy.io.fits.Header) -> None:
472 """Strip header keywords added by butler provenance that would be
473 incorrect if propagated to a downstream file.
474 """
475 for key in list(header):
476 if key.startswith("LSST BUTLER"):
477 del header[key]