Coverage for python / lsst / images / fits / _input_archive.py: 32%
208 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 09:16 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 09:16 +0000
1# This file is part of lsst-images.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# Use of this source code is governed by a 3-clause BSD-style
10# license that can be found in the LICENSE file.
12from __future__ import annotations
14__all__ = (
15 "FitsInputArchive",
16 "FitsOpaqueMetadata",
17 "ReadResult",
18 "read",
19)
21import io
22import os
23from collections.abc import Callable, Iterator
24from contextlib import contextmanager
25from functools import cached_property
26from types import EllipsisType
27from typing import IO, Any, NamedTuple, Self
29import astropy.io.fits
30import astropy.table
31import fsspec
32import numpy as np
34from lsst.resources import ResourcePath, ResourcePathExpression
36from .._transforms import FrameSet
37from ..serialization import (
38 ArchiveTree,
39 ArrayReferenceModel,
40 ButlerInfo,
41 InputArchive,
42 MetadataValue,
43 TableCellReferenceModel,
44 TableReferenceModel,
45 no_header_updates,
46)
47from ._common import ExtensionHDU, ExtensionKey, FitsOpaqueMetadata, InvalidFitsArchiveError
50class ReadResult[T: Any](NamedTuple):
51 """Struct used as the return value for `read`."""
53 deserialized: T
54 """The deserialized object itself."""
56 metadata: dict[str, MetadataValue]
57 """Additional flexible metadata stored with the object."""
59 butler_info: ButlerInfo | None
60 """Butler provenance information for the dataset this file backs."""
63def read[T: Any](
64 cls: type[T],
65 path: ResourcePathExpression,
66 *,
67 page_size: int = 2880 * 50,
68 partial: bool | None = None,
69 **kwargs: Any,
70) -> ReadResult[T]:
71 """Read an object from a FITS file.
73 Parameters
74 ----------
75 path
76 File to read; convertible to `lsst.resources.ResourcePath`.
77 page_size
78 Minimum number of bytes to read at at once. Making this a multiple
79 of the FITS block size (2880) is recommended.
80 partial
81 Whether we will be reading only some of the archive, or if memory
82 pressure forces us to read it only a little at a time. If `False`,
83 the entire raw file may be read into memory up front. Defaults to
84 `True` if any extra ``**kwargs`` are passed with values other than
85 `None`, since those usually indicate that only some of the original
86 object will be loaded.
87 **kwargs
88 Extra keyword arguments passed to ``cls.deserialize``.
90 Returns
91 -------
92 ReadResult
93 A named tuple containing the deserialized object and any additional
94 metadata or butler information saved alongside it.
96 Notes
97 -----
98 Supported types must implement ``deserialize`` and
99 ``_get_archive_tree_type`` (see `.Image` for an example).
100 """
101 if partial is None:
102 partial = any(v is not None for v in kwargs.values())
103 with FitsInputArchive.open(path, page_size=page_size, partial=partial) as archive:
104 tree = archive.get_tree(cls._get_archive_tree_type(TableCellReferenceModel))
105 obj = cls.deserialize(tree, archive, **kwargs)
106 obj._opaque_metadata = archive.get_opaque_metadata()
107 return ReadResult(obj, tree.metadata, tree.butler_info)
110class FitsInputArchive(InputArchive[TableCellReferenceModel]):
111 """An implementation of the `.serialization.InputArchive` interface that
112 reads from FITS files.
114 Instances of this class should only be constructed via the `open`
115 context manager.
116 """
118 def __init__(self, stream: IO[bytes]):
119 self._primary_hdu = astropy.io.fits.PrimaryHDU.readfrom(stream)
120 # TODO: read and strip subformat declaration and version, once we start
121 # writing those.
122 #
123 # TODO: do some basic checks that the file format conforms to our
124 # expectations (e.g. primary HDU should have no data).
125 #
126 # Read and strip the addresses and sizes from the headers. We don't
127 # actually need the index address because we always want to read the
128 # JSON HDU, too, and the index HDU is always the next one (but this
129 # could change in the future).
130 json_address: int = self._primary_hdu.header.pop("JSONADDR")
131 json_size: int = self._primary_hdu.header.pop("JSONSIZE")
132 del self._primary_hdu.header["INDXADDR"]
133 index_size: int = self._primary_hdu.header.pop("INDXSIZE")
134 # Save the remaining primary header keys so we can propagate them on
135 # rewrite.
136 self._opaque_metadata = FitsOpaqueMetadata()
137 self._opaque_metadata.add_header(self._primary_hdu.header.copy(strip=True), name="", ver=1)
138 # Read the JSON and index HDUs from the end.
139 stream.seek(json_address)
140 tail_data = stream.read(json_size + index_size)
141 index_hdu = astropy.io.fits.BinTableHDU.fromstring(tail_data[json_size:])
142 # Initialize lazy readers for all of the regular HDUs and the JSON HDU.
143 self._readers = {
144 ExtensionKey.from_index_row(row): _ExtensionReader.from_index_row(row, stream)
145 for row in index_hdu.data
146 }
147 self._readers[ExtensionKey("JSON")] = _ExtensionReader.from_bytes(
148 astropy.io.fits.BinTableHDU, tail_data[:json_size]
149 )
150 # Make any empty dictionary to cache deserialized objects.
151 self._deserialized_pointer_cache: dict[TableCellReferenceModel, Any] = {}
153 @classmethod
154 @contextmanager
155 def open(
156 cls,
157 path: ResourcePathExpression,
158 *,
159 page_size: int = 2880 * 50,
160 partial: bool = False,
161 ) -> Iterator[Self]:
162 """Create an output archive that writes to the given file.
164 Parameters
165 ----------
166 path
167 File to read; convertible to `lsst.resources.ResourcePath`.
168 page_size
169 Minimum number of bytes to read at at once. Making this a multiple
170 of the FITS block size (2880) is recommended.
171 partial
172 Whether we will be reading only some of the archive, or if memory
173 pressure forces us to read it only a little at a time. If `False`
174 (default), the entire raw file may be read into memory up front.
176 Returns
177 -------
178 `contextlib.AbstractContextManager` [`FitsInputArchive`]
179 A context manager that returns a `FitsInputArchive` when entered.
180 """
181 path = ResourcePath(path)
182 stream: IO[bytes]
183 if not partial:
184 stream = io.BytesIO(path.read())
185 yield cls(stream)
186 else:
187 fs: fsspec.AbstractFileSystem
188 fs, fp = path.to_fsspec()
189 with fs.open(fp, block_size=page_size) as stream:
190 yield cls(stream)
192 def get_tree[T: ArchiveTree](self, model_type: type[T]) -> T:
193 """Read the JSON tree from the archive.
195 Parameters
196 ----------
197 model_type
198 A Pydantic model type to use to validate the JSON.
200 Returns
201 -------
202 T
203 The validated Pydantic model.
204 """
205 json_bytes = self._readers[ExtensionKey("JSON")].data[0]["JSON"].tobytes()
206 return model_type.model_validate_json(json_bytes)
208 def deserialize_pointer[U: ArchiveTree, V](
209 self,
210 pointer: TableCellReferenceModel,
211 model_type: type[U],
212 deserializer: Callable[[U, InputArchive[TableCellReferenceModel]], V],
213 ) -> V:
214 # Docstring inherited.
215 if (cached := self._deserialized_pointer_cache.get(pointer)) is not None:
216 return cached
217 _, reader = self._get_source_reader(pointer)
218 try:
219 json_bytes = reader.data[pointer.row][pointer.column].tobytes()
220 except Exception as err:
221 raise InvalidFitsArchiveError(
222 f"Failed to access the table cell referenced by {pointer.model_dump_json()}."
223 ) from err
224 result = deserializer(model_type.model_validate_json(json_bytes), self)
225 self._deserialized_pointer_cache[pointer] = result
226 return result
228 def get_frame_set(self, ref: TableCellReferenceModel) -> FrameSet:
229 try:
230 result = self._deserialized_pointer_cache[ref]
231 except KeyError:
232 raise AssertionError(
233 f"Frame set at {ref.model_dump_json(indent=2)} must be deserialized "
234 "before any dependent transform can be."
235 ) from None
236 if not isinstance(result, FrameSet):
237 raise InvalidFitsArchiveError(f"Expected a FrameSet instance at {ref.model_dump_json(indent=2)}.")
238 return result
240 def get_array(
241 self,
242 ref: ArrayReferenceModel,
243 *,
244 slices: tuple[slice, ...] | EllipsisType = ...,
245 strip_header: Callable[[astropy.io.fits.Header], None] = no_header_updates,
246 ) -> np.ndarray:
247 key, reader = self._get_source_reader(ref)
248 if slices is not ...:
249 array = reader.section[slices]
250 else:
251 array = reader.data
252 if key not in self._opaque_metadata.headers:
253 opaque_header = reader.header.copy(strip=True)
254 strip_header(opaque_header)
255 self._opaque_metadata.add_header(opaque_header, key=key)
256 return array
258 def get_table(
259 self,
260 ref: TableReferenceModel,
261 strip_header: Callable[[astropy.io.fits.Header], None] = no_header_updates,
262 ) -> astropy.table.Table:
263 # Docstring inherited.
264 array = self.get_structured_array(ref, strip_header)
265 table = astropy.table.Table(array)
266 for c in ref.columns:
267 c.update_table(table)
268 return table
270 def get_structured_array(
271 self,
272 ref: TableReferenceModel,
273 strip_header: Callable[[astropy.io.fits.Header], None] = no_header_updates,
274 ) -> np.ndarray:
275 # Docstring inherited.
276 key, reader = self._get_source_reader(ref)
277 if key not in self._opaque_metadata.headers:
278 opaque_header = reader.header.copy(strip=True)
279 strip_header(opaque_header)
280 self._opaque_metadata.add_header(opaque_header, key=key)
281 return reader.hdu.data
283 def get_opaque_metadata(self) -> FitsOpaqueMetadata:
284 # Docstring inherited.
285 return self._opaque_metadata
287 def _get_source_reader(
288 self, ref: ArrayReferenceModel | TableCellReferenceModel | TableReferenceModel
289 ) -> tuple[ExtensionKey, _ExtensionReader]:
290 """Get a reader for the extension referenced by a model.
292 Parameters
293 ----------
294 ref
295 A Pydantic model with a 'source' field that references a FITS HDU.
297 Returns
298 -------
299 key
300 Identifier pair for the HDU (EXTNAME, EXTVER).
301 reader
302 A reader object for the extension.
303 """
304 if not isinstance(ref.source, str):
305 raise InvalidFitsArchiveError(f"Reference with source={ref.source!r} is not a string.")
306 if not ref.source.startswith("fits:"):
307 raise InvalidFitsArchiveError(
308 f"Reference with source={ref.source!r} does not start with 'fits:'."
309 )
310 key = ExtensionKey.from_str(ref.source)
311 try:
312 reader = self._readers[key]
313 except KeyError:
314 raise InvalidFitsArchiveError(f"Unrecognized source value {key}.") from None
315 if ref.source_is_table and not reader.is_table:
316 raise InvalidFitsArchiveError(
317 f"Extension with source={key} was expected to be be a binary table, not an image."
318 )
319 elif not ref.source_is_table and reader.is_table:
320 raise InvalidFitsArchiveError(
321 f"Extension with source={key} was expected to be be an image, not a binary table."
322 )
323 return key, reader
326class _ExtensionReader:
327 """A lazy-load reader for a single extension HDU.
329 Parameters
330 ----------
331 hdu_cls
332 The type of the astropy HDU instance to construct.
333 stream
334 The file-like object to read from.
335 """
337 def __init__(self, hdu_cls: type[ExtensionHDU], stream: IO[bytes]):
338 self._hdu_cls = hdu_cls
339 self._stream = stream
341 @classmethod
342 def from_index_row(cls, index_row: np.void, stream: IO[bytes]) -> _ExtensionReader:
343 """Construct from a row of the binary table index HDU.
345 Parameters
346 ----------
347 index_row
348 A record array row from the index HDU.
349 stream
350 The file-like object being used to read the full FITS file.
352 Returns
353 -------
354 reader
355 A reader object for the extension.
356 """
357 match index_row["XTENSION"].strip():
358 case "IMAGE":
359 hdu_cls = astropy.io.fits.ImageHDU
360 case "BINTABLE":
361 if index_row["ZIMAGE"]:
362 hdu_cls = astropy.io.fits.CompImageHDU
363 else:
364 hdu_cls = astropy.io.fits.BinTableHDU
365 case other:
366 raise AssertionError(f"Unsupported HDU type {other!r}.")
367 return _ExtensionReader(
368 hdu_cls,
369 _RangeStreamProxy(
370 stream,
371 start=int(index_row["HDRADDR"]),
372 ),
373 )
375 @classmethod
376 def from_bytes(cls, hdu_cls: type[ExtensionHDU], data: bytes) -> _ExtensionReader:
377 """Construct from already-read `bytes`
379 Parameters
380 ----------
381 hdu_cls
382 The HDU type to instantiate.
383 data
384 Raw data for the HDU.
386 Returns
387 -------
388 reader
389 A reader object for extension.
390 """
391 return _ExtensionReader(hdu_cls, io.BytesIO(data))
393 @property
394 def is_table(self) -> bool:
395 """Whether this is logically a table HDU.
397 This is `False` for compressed image HDUs, even though they are
398 represented in FITS as a binary table.
399 """
400 return issubclass(self._hdu_cls, astropy.io.fits.BinTableHDU) and not issubclass(
401 self._hdu_cls, astropy.io.fits.CompImageHDU
402 )
404 @cached_property
405 def hdu(self) -> ExtensionHDU:
406 """The Astropy HDU object."""
407 self._stream.seek(0)
408 if self._hdu_cls is astropy.io.fits.CompImageHDU:
409 # CompImageHDU.readfrom doesn't work; we need to make a minimal
410 # example and report it upstream. Happily this workaround does
411 # work.
412 bintable_hdu = astropy.io.fits.BinTableHDU.readfrom(self._stream, memmap=False, cache=False)
413 return self._hdu_cls(bintable=bintable_hdu)
414 else:
415 return self._hdu_cls.readfrom(self._stream, memmap=False, cache=False, uint=True)
417 @property
418 def header(self) -> astropy.io.fits.Header:
419 """The header of the HDU."""
420 return self.hdu.header
422 @property
423 def data(self) -> np.ndarray:
424 """The data for the HDU."""
425 return self.hdu.data
427 @property
428 def section(self) -> astropy.io.fits.Section | astropy.io.fits.CompImageSection:
429 """An Astropy expression object that reads a subset of the data when
430 sliced.
431 """
432 return self.hdu.section
435class _RangeStreamProxy(IO[bytes]):
436 """A readable IO proxy object that makes the beginning of the file appear
437 at a custom position.
439 Parameters
440 ----------
441 base
442 Underlying readable, seekable buffer to proxy.
443 start
444 Offset into the base stream that will be considered the start of the
445 proxy stream.
447 Notes
448 -----
449 This class exists because Astropy doesn't seem to provide a way to read a
450 single HDU that starts at the current seek position of a file-like object.
451 It does provide ``readfrom`` methods on its HDU objects that take a
452 file-like object, but these assume (possibly unintentionally; it only
453 happens when Astropy is trying to see whether the file was opened for
454 appending) that ``seek(0)`` will set the file-like object to the start of
455 the HDU.
456 """
458 def __init__(self, base: IO[bytes], start: int):
459 self._base = base
460 self._start = start
462 @property
463 def mode(self) -> str:
464 return "rb"
466 def __enter__(self) -> Self:
467 raise AssertionError("This proxy should not be used as a context manager.")
469 def __exit__(self, type: Any, value: Any, traceback: Any) -> None:
470 raise AssertionError("This proxy should not be used as a context manager.")
472 def __iter__(self) -> Iterator[bytes]:
473 return self._base.__iter__()
475 def __next__(self) -> bytes:
476 return self._base.__next__()
478 def close(self) -> None:
479 raise AssertionError("This proxy should not ever be closed.")
481 @property
482 def closed(self) -> bool:
483 return False
485 def fileno(self) -> int:
486 raise OSError()
488 def flush(self) -> None:
489 pass
491 def isatty(self) -> bool:
492 return False
494 def read(self, n: int = -1, /) -> bytes:
495 result = self._base.read(n)
496 return result
498 def readable(self) -> bool:
499 return True
501 def readline(self, limit: int = -1, /) -> bytes:
502 return self._base.readline(limit)
504 def readlines(self, hint: int = -1, /) -> list[bytes]:
505 return self._base.readlines(hint)
507 def seek(self, offset: int, whence: int = 0) -> int:
508 match whence:
509 case os.SEEK_SET:
510 return self._base.seek(offset + self._start, os.SEEK_SET) - self._start
511 case os.SEEK_CUR:
512 return self._base.seek(offset, os.SEEK_CUR) - self._start
513 case os.SEEK_END:
514 return self._base.seek(offset, os.SEEK_END) - self._start
515 raise TypeError(f"Invalid value for 'whence': {whence}.")
517 def seekable(self) -> bool:
518 return True
520 def tell(self) -> int:
521 return self._base.tell() - self._start
523 def truncate(self, size: int | None = None, /) -> int:
524 raise OSError()
526 def writable(self) -> bool:
527 return False
529 def write(self, arg: Any, /) -> int:
530 raise OSError()
532 def writelines(self, arg: Any, /) -> None:
533 raise OSError()