Coverage for python / lsst / images / fits / _input_archive.py: 29%
207 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-01 08:36 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-01 08:36 +0000
1# This file is part of lsst-images.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# Use of this source code is governed by a 3-clause BSD-style
10# license that can be found in the LICENSE file.
12from __future__ import annotations
14__all__ = (
15 "FitsInputArchive",
16 "FitsOpaqueMetadata",
17 "read",
18)
20import io
21import os
22from collections.abc import Callable, Iterator
23from contextlib import contextmanager
24from functools import cached_property
25from types import EllipsisType
26from typing import IO, Any, Self
28import astropy.io.fits
29import astropy.table
30import fsspec
31import numpy as np
33from lsst.resources import ResourcePath, ResourcePathExpression
35from .._transforms import FrameSet
36from ..serialization import (
37 ArchiveReadError,
38 ArchiveTree,
39 ArrayReferenceModel,
40 InlineArrayModel,
41 InputArchive,
42 ReadResult,
43 TableModel,
44 no_header_updates,
45)
46from ._common import (
47 JSON_COLUMN,
48 JSON_EXTNAME,
49 ExtensionHDU,
50 ExtensionKey,
51 FitsOpaqueMetadata,
52 InvalidFitsArchiveError,
53 PointerModel,
54)
57def read[T: Any](
58 cls: type[T],
59 path: ResourcePathExpression,
60 *,
61 page_size: int = 2880 * 50,
62 partial: bool | None = None,
63 **kwargs: Any,
64) -> ReadResult[T]:
65 """Read an object from a FITS file.
67 Parameters
68 ----------
69 path
70 File to read; convertible to `lsst.resources.ResourcePath`.
71 page_size
72 Minimum number of bytes to read at at once. Making this a multiple
73 of the FITS block size (2880) is recommended.
74 partial
75 Whether we will be reading only some of the archive, or if memory
76 pressure forces us to read it only a little at a time. If `False`,
77 the entire raw file may be read into memory up front. Defaults to
78 `True` if any extra ``**kwargs`` are passed with values other than
79 `None`, since those usually indicate that only some of the original
80 object will be loaded.
81 **kwargs
82 Extra keyword arguments passed to ``cls.deserialize``.
84 Returns
85 -------
86 ReadResult
87 A named tuple containing the deserialized object and any additional
88 metadata or butler information saved alongside it.
90 Notes
91 -----
92 Supported types must implement ``deserialize`` and
93 ``_get_archive_tree_type`` (see `.Image` for an example).
94 """
95 if partial is None:
96 partial = any(v is not None for v in kwargs.values())
97 with FitsInputArchive.open(path, page_size=page_size, partial=partial) as archive:
98 tree = archive.get_tree(cls._get_archive_tree_type(PointerModel))
99 obj = cls.deserialize(tree, archive, **kwargs)
100 obj._opaque_metadata = archive.get_opaque_metadata()
101 return ReadResult(obj, tree.metadata, tree.butler_info)
104class FitsInputArchive(InputArchive[PointerModel]):
105 """An implementation of the `.serialization.InputArchive` interface that
106 reads from FITS files.
108 Instances of this class should only be constructed via the `open`
109 context manager.
110 """
112 def __init__(self, stream: IO[bytes]):
113 self._primary_hdu = astropy.io.fits.PrimaryHDU.readfrom(stream)
114 # TODO: read and strip subformat declaration and version, once we start
115 # writing those.
116 #
117 # TODO: do some basic checks that the file format conforms to our
118 # expectations (e.g. primary HDU should have no data).
119 #
120 # Read and strip the addresses and sizes from the headers. We don't
121 # actually need the index address because we always want to read the
122 # JSON HDU, too, and the index HDU is always the next one (but this
123 # could change in the future).
124 json_address: int = self._primary_hdu.header.pop("JSONADDR")
125 json_size: int = self._primary_hdu.header.pop("JSONSIZE")
126 del self._primary_hdu.header["INDXADDR"]
127 index_size: int = self._primary_hdu.header.pop("INDXSIZE")
128 # Save the remaining primary header keys so we can propagate them on
129 # rewrite.
130 self._opaque_metadata = FitsOpaqueMetadata()
131 self._opaque_metadata.add_header(self._primary_hdu.header.copy(strip=True), name="", ver=1)
132 # Read the JSON and index HDUs from the end.
133 stream.seek(json_address)
134 tail_data = stream.read(json_size + index_size)
135 index_hdu = astropy.io.fits.BinTableHDU.fromstring(tail_data[json_size:])
136 # Initialize lazy readers for all of the regular HDUs and the JSON HDU.
137 self._readers = {
138 ExtensionKey.from_index_row(row): _ExtensionReader.from_index_row(row, stream)
139 for row in index_hdu.data
140 }
141 self._readers[ExtensionKey(JSON_COLUMN)] = _ExtensionReader.from_bytes(
142 astropy.io.fits.BinTableHDU, tail_data[:json_size]
143 )
144 # Make any empty dictionary to cache deserialized objects. Keys are
145 # the zero-indexed row in the JSON table.
146 self._deserialized_pointer_cache: dict[int, Any] = {}
148 @classmethod
149 @contextmanager
150 def open(
151 cls,
152 path: ResourcePathExpression,
153 *,
154 page_size: int = 2880 * 50,
155 partial: bool = False,
156 ) -> Iterator[Self]:
157 """Create an output archive that writes to the given file.
159 Parameters
160 ----------
161 path
162 File to read; convertible to `lsst.resources.ResourcePath`.
163 page_size
164 Minimum number of bytes to read at at once. Making this a multiple
165 of the FITS block size (2880) is recommended.
166 partial
167 Whether we will be reading only some of the archive, or if memory
168 pressure forces us to read it only a little at a time. If `False`
169 (default), the entire raw file may be read into memory up front.
171 Returns
172 -------
173 `contextlib.AbstractContextManager` [`FitsInputArchive`]
174 A context manager that returns a `FitsInputArchive` when entered.
175 """
176 path = ResourcePath(path)
177 stream: IO[bytes]
178 if not partial:
179 stream = io.BytesIO(path.read())
180 yield cls(stream)
181 else:
182 fs: fsspec.AbstractFileSystem
183 fs, fp = path.to_fsspec()
184 with fs.open(fp, block_size=page_size) as stream:
185 yield cls(stream)
187 def get_tree[T: ArchiveTree](self, model_type: type[T]) -> T:
188 """Read the JSON tree from the archive.
190 Parameters
191 ----------
192 model_type
193 A Pydantic model type to use to validate the JSON.
195 Returns
196 -------
197 T
198 The validated Pydantic model.
199 """
200 json_bytes = self._readers[ExtensionKey(JSON_EXTNAME)].data[0][JSON_COLUMN].tobytes()
201 return model_type.model_validate_json(json_bytes)
203 def deserialize_pointer[U: ArchiveTree, V](
204 self,
205 pointer: PointerModel,
206 model_type: type[U],
207 deserializer: Callable[[U, InputArchive[PointerModel]], V],
208 ) -> V:
209 # Docstring inherited.
210 if (cached := self._deserialized_pointer_cache.get(pointer.row)) is not None:
211 return cached
212 if not isinstance(pointer.column.data, ArrayReferenceModel):
213 raise ArchiveReadError(f"Invalid pointer with inline array:\n{pointer.model_dump_json(indent=2)}")
214 _, reader = self._get_source_reader(pointer.column.data.source, is_table=True)
215 try:
216 json_bytes = reader.data[pointer.row][JSON_COLUMN].tobytes()
217 except Exception as err:
218 raise InvalidFitsArchiveError(
219 f"Failed to access the table cell referenced by {pointer.model_dump_json()}."
220 ) from err
221 result = deserializer(model_type.model_validate_json(json_bytes), self)
222 self._deserialized_pointer_cache[pointer.row] = result
223 return result
225 def get_frame_set(self, ref: PointerModel) -> FrameSet:
226 try:
227 result = self._deserialized_pointer_cache[ref.row]
228 except KeyError:
229 raise AssertionError(
230 f"Frame set at {ref.model_dump_json(indent=2)} must be deserialized "
231 "before any dependent transform can be."
232 ) from None
233 if not isinstance(result, FrameSet):
234 raise InvalidFitsArchiveError(f"Expected a FrameSet instance at {ref.model_dump_json(indent=2)}.")
235 return result
237 def get_array(
238 self,
239 model: ArrayReferenceModel | InlineArrayModel,
240 *,
241 slices: tuple[slice, ...] | EllipsisType = ...,
242 strip_header: Callable[[astropy.io.fits.Header], None] = no_header_updates,
243 ) -> np.ndarray:
244 if not isinstance(model, ArrayReferenceModel):
245 raise ArchiveReadError("Inline array found where a reference array was expected.")
246 key, reader = self._get_source_reader(model.source, is_table=False)
247 if slices is not ...:
248 array = reader.section[slices]
249 else:
250 array = reader.data
251 if key not in self._opaque_metadata.headers:
252 opaque_header = reader.header.copy(strip=True)
253 strip_header(opaque_header)
254 self._opaque_metadata.add_header(opaque_header, key=key)
255 return array
257 def get_table(
258 self,
259 model: TableModel,
260 strip_header: Callable[[astropy.io.fits.Header], None] = no_header_updates,
261 ) -> astropy.table.Table:
262 # Docstring inherited.
263 array = self.get_structured_array(model, strip_header)
264 table = astropy.table.Table(array)
265 for c in model.columns:
266 c.update_table(table)
267 return table
269 def get_structured_array(
270 self,
271 model: TableModel,
272 strip_header: Callable[[astropy.io.fits.Header], None] = no_header_updates,
273 ) -> np.ndarray:
274 # Docstring inherited.
275 if not isinstance(model.columns[0].data, ArrayReferenceModel):
276 raise ArchiveReadError("Inline array found where a reference array was expected.")
277 # All columns should have the same data.source; just use the first.
278 key, reader = self._get_source_reader(model.columns[0].data.source, is_table=True)
279 if key not in self._opaque_metadata.headers:
280 opaque_header = reader.header.copy(strip=True)
281 strip_header(opaque_header)
282 self._opaque_metadata.add_header(opaque_header, key=key)
283 return reader.hdu.data
285 def get_opaque_metadata(self) -> FitsOpaqueMetadata:
286 # Docstring inherited.
287 return self._opaque_metadata
289 def _get_source_reader(self, source: str | int, is_table: bool) -> tuple[ExtensionKey, _ExtensionReader]:
290 """Get a reader for the extension referenced by a serialiation model's
291 ``source`` field.
293 Parameters
294 ----------
295 source
296 A ``source`` field of the form ``fits:${hdu}`` or
297 ``fits:${hdu}[${col}]``.
298 is_table
299 Whether the source should be for a table HDU.
301 Returns
302 -------
303 key
304 Identifier pair for the HDU (EXTNAME, EXTVER).
305 reader
306 A reader object for the extension.
307 """
308 if not isinstance(source, str):
309 raise InvalidFitsArchiveError(f"Reference with source={source!r} is not a string.")
310 if not source.startswith("fits:"):
311 raise InvalidFitsArchiveError(f"Reference with source={source!r} does not start with 'fits:'.")
312 key = ExtensionKey.from_str(source)
313 try:
314 reader = self._readers[key]
315 except KeyError:
316 raise InvalidFitsArchiveError(f"Unrecognized source value {key}.") from None
317 if is_table and not reader.is_table:
318 raise InvalidFitsArchiveError(
319 f"Extension with source={key} was expected to be be a binary table, not an image."
320 )
321 elif not is_table and reader.is_table:
322 raise InvalidFitsArchiveError(
323 f"Extension with source={key} was expected to be be an image, not a binary table."
324 )
325 return key, reader
328class _ExtensionReader:
329 """A lazy-load reader for a single extension HDU.
331 Parameters
332 ----------
333 hdu_cls
334 The type of the astropy HDU instance to construct.
335 stream
336 The file-like object to read from.
337 """
339 def __init__(self, hdu_cls: type[ExtensionHDU], stream: IO[bytes]):
340 self._hdu_cls = hdu_cls
341 self._stream = stream
343 @classmethod
344 def from_index_row(cls, index_row: np.void, stream: IO[bytes]) -> _ExtensionReader:
345 """Construct from a row of the binary table index HDU.
347 Parameters
348 ----------
349 index_row
350 A record array row from the index HDU.
351 stream
352 The file-like object being used to read the full FITS file.
354 Returns
355 -------
356 reader
357 A reader object for the extension.
358 """
359 match index_row["XTENSION"].strip():
360 case "IMAGE":
361 hdu_cls = astropy.io.fits.ImageHDU
362 case "BINTABLE":
363 if index_row["ZIMAGE"]:
364 hdu_cls = astropy.io.fits.CompImageHDU
365 else:
366 hdu_cls = astropy.io.fits.BinTableHDU
367 case other:
368 raise AssertionError(f"Unsupported HDU type {other!r}.")
369 return _ExtensionReader(
370 hdu_cls,
371 _RangeStreamProxy(
372 stream,
373 start=int(index_row["HDRADDR"]),
374 ),
375 )
377 @classmethod
378 def from_bytes(cls, hdu_cls: type[ExtensionHDU], data: bytes) -> _ExtensionReader:
379 """Construct from already-read `bytes`
381 Parameters
382 ----------
383 hdu_cls
384 The HDU type to instantiate.
385 data
386 Raw data for the HDU.
388 Returns
389 -------
390 reader
391 A reader object for extension.
392 """
393 return _ExtensionReader(hdu_cls, io.BytesIO(data))
395 @property
396 def is_table(self) -> bool:
397 """Whether this is logically a table HDU.
399 This is `False` for compressed image HDUs, even though they are
400 represented in FITS as a binary table.
401 """
402 return issubclass(self._hdu_cls, astropy.io.fits.BinTableHDU) and not issubclass(
403 self._hdu_cls, astropy.io.fits.CompImageHDU
404 )
406 @cached_property
407 def hdu(self) -> ExtensionHDU:
408 """The Astropy HDU object."""
409 self._stream.seek(0)
410 if self._hdu_cls is astropy.io.fits.CompImageHDU:
411 # CompImageHDU.readfrom doesn't work; we need to make a minimal
412 # example and report it upstream. Happily this workaround does
413 # work.
414 bintable_hdu = astropy.io.fits.BinTableHDU.readfrom(self._stream, memmap=False, cache=False)
415 return self._hdu_cls(bintable=bintable_hdu)
416 else:
417 return self._hdu_cls.readfrom(self._stream, memmap=False, cache=False, uint=True)
419 @property
420 def header(self) -> astropy.io.fits.Header:
421 """The header of the HDU."""
422 return self.hdu.header
424 @property
425 def data(self) -> np.ndarray:
426 """The data for the HDU."""
427 return self.hdu.data
429 @property
430 def section(self) -> astropy.io.fits.Section | astropy.io.fits.CompImageSection:
431 """An Astropy expression object that reads a subset of the data when
432 sliced.
433 """
434 return self.hdu.section
437class _RangeStreamProxy(IO[bytes]):
438 """A readable IO proxy object that makes the beginning of the file appear
439 at a custom position.
441 Parameters
442 ----------
443 base
444 Underlying readable, seekable buffer to proxy.
445 start
446 Offset into the base stream that will be considered the start of the
447 proxy stream.
449 Notes
450 -----
451 This class exists because Astropy doesn't seem to provide a way to read a
452 single HDU that starts at the current seek position of a file-like object.
453 It does provide ``readfrom`` methods on its HDU objects that take a
454 file-like object, but these assume (possibly unintentionally; it only
455 happens when Astropy is trying to see whether the file was opened for
456 appending) that ``seek(0)`` will set the file-like object to the start of
457 the HDU.
458 """
460 def __init__(self, base: IO[bytes], start: int):
461 self._base = base
462 self._start = start
464 @property
465 def mode(self) -> str:
466 return "rb"
468 def __enter__(self) -> Self:
469 raise AssertionError("This proxy should not be used as a context manager.")
471 def __exit__(self, type: Any, value: Any, traceback: Any) -> None:
472 raise AssertionError("This proxy should not be used as a context manager.")
474 def __iter__(self) -> Iterator[bytes]:
475 return self._base.__iter__()
477 def __next__(self) -> bytes:
478 return self._base.__next__()
480 def close(self) -> None:
481 raise AssertionError("This proxy should not ever be closed.")
483 @property
484 def closed(self) -> bool:
485 return False
487 def fileno(self) -> int:
488 raise OSError()
490 def flush(self) -> None:
491 pass
493 def isatty(self) -> bool:
494 return False
496 def read(self, n: int = -1, /) -> bytes:
497 result = self._base.read(n)
498 return result
500 def readable(self) -> bool:
501 return True
503 def readline(self, limit: int = -1, /) -> bytes:
504 return self._base.readline(limit)
506 def readlines(self, hint: int = -1, /) -> list[bytes]:
507 return self._base.readlines(hint)
509 def seek(self, offset: int, whence: int = 0) -> int:
510 match whence:
511 case os.SEEK_SET:
512 return self._base.seek(offset + self._start, os.SEEK_SET) - self._start
513 case os.SEEK_CUR:
514 return self._base.seek(offset, os.SEEK_CUR) - self._start
515 case os.SEEK_END:
516 return self._base.seek(offset, os.SEEK_END) - self._start
517 raise TypeError(f"Invalid value for 'whence': {whence}.")
519 def seekable(self) -> bool:
520 return True
522 def tell(self) -> int:
523 return self._base.tell() - self._start
525 def truncate(self, size: int | None = None, /) -> int:
526 raise OSError()
528 def writable(self) -> bool:
529 return False
531 def write(self, arg: Any, /) -> int:
532 raise OSError()
534 def writelines(self, arg: Any, /) -> None:
535 raise OSError()