Coverage for python / lsst / images / fits / _output_archive.py: 22%
203 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-26 09:16 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-26 09:16 +0000
1# This file is part of lsst-images.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# Use of this source code is governed by a 3-clause BSD-style
10# license that can be found in the LICENSE file.
12from __future__ import annotations
14__all__ = ("FitsOutputArchive", "write")
16import dataclasses
17from collections import Counter
18from collections.abc import Callable, Hashable, Iterator, Mapping
19from contextlib import contextmanager
20from typing import Any, Self
22import astropy.io.fits
23import astropy.table
24import numpy as np
25import pydantic
27from .._transforms import FrameSet
28from ..serialization import (
29 ArchiveTree,
30 ArrayReferenceModel,
31 ButlerInfo,
32 ColumnDefinitionModel,
33 MetadataValue,
34 NestedOutputArchive,
35 NumberType,
36 OutputArchive,
37 TableCellReferenceModel,
38 TableReferenceModel,
39 no_header_updates,
40)
41from ._common import ExtensionHDU, ExtensionKey, FitsCompressionOptions, FitsOpaqueMetadata
44def write(
45 obj: Any,
46 filename: str,
47 compression_options: Mapping[str, FitsCompressionOptions | None] | None = None,
48 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates,
49 compression_seed: int | None = None,
50 metadata: dict[str, MetadataValue] | None = None,
51 butler_info: ButlerInfo | None = None,
52) -> Any:
53 """Write an object with a ``serialize`` method to a FITS file.
55 Parameters
56 ----------
57 filename
58 Name of the file to write to. Must not already exist.
59 compression_options
60 Options for how to compress the FITS file, keyed by the name of
61 the attribute (with JSON pointer ``/`` separators for nested
62 attributes).
63 update_header
64 A callback that will be given the primary HDU FITS header and an
65 opportunity to modify it.
66 compression_seed
67 A FITS tile compression seed to use whenever the configured
68 compression seed is `None` or (for backwards compatibility) ``0``.
69 This value is then incremented every time it is used.
70 metadata
71 Additional metadata to save with the object. This will override any
72 flexible metadata carried by the object itself with the same keys.
73 butler_info
74 Butler information to store in the file.
76 Returns
77 -------
78 `.serialization.ArchiveTree`
79 The serialized representation of the object.
80 """
81 opaque_metadata = getattr(obj, "_opaque_metadata", None)
82 name = getattr(obj, "_archive_default_name", None)
83 with FitsOutputArchive.open(
84 filename,
85 compression_options=compression_options,
86 opaque_metadata=opaque_metadata,
87 update_header=update_header,
88 compression_seed=compression_seed,
89 ) as archive:
90 tree = archive.serialize_direct(name, obj.serialize) if name is not None else obj.serialize(archive)
91 if metadata is not None:
92 tree.metadata.update(metadata)
93 if butler_info is not None:
94 tree.butler_info = butler_info
95 archive.add_tree(tree)
96 return tree
99class FitsOutputArchive(OutputArchive[TableCellReferenceModel]):
100 """An implementation of the `.serialization.OutputArchive` interface that
101 writes to FITS files.
103 Instances of this class should only be constructed via the `open`
104 context manager.
105 """
107 def __init__(
108 self,
109 hdu_list: astropy.io.fits.HDUList,
110 compression_options: Mapping[str, FitsCompressionOptions | None] | None = None,
111 opaque_metadata: Any = None,
112 compression_seed: int | None = None,
113 ):
114 # JSON blobs for objects we've saved as pointers:
115 self._pointer_targets: list[bytes] = []
116 # Mapping from user provided key (e.g. id(some object)) to a table
117 # pointer to where we actually saved it:
118 self._pointers_by_key: dict[Hashable, TableCellReferenceModel] = {}
119 self._hdu_list = hdu_list
120 self._primary_hdu = astropy.io.fits.PrimaryHDU()
121 # TODO: add subformat description and version to primary HDU.
122 self._primary_hdu.header.set("INDXADDR", 0, "Offset in bytes to the HDU index.")
123 self._primary_hdu.header.set("INDXSIZE", 0, "Size of the HDU index.")
124 self._primary_hdu.header.set("JSONADDR", 0, "Offset in bytes to the JSON tree HDU.")
125 self._primary_hdu.header.set("JSONSIZE", 0, "Size of the JSON tree HDU.")
126 self._hdus_by_name = Counter[str]()
127 self._compression_options = dict(compression_options) if compression_options is not None else {}
128 self._compression_seed = compression_seed
129 self._opaque_metadata = (
130 opaque_metadata if isinstance(opaque_metadata, FitsOpaqueMetadata) else FitsOpaqueMetadata()
131 )
132 if (opaque_primary_header := self._opaque_metadata.headers.get(ExtensionKey())) is not None:
133 self._primary_hdu.header.extend(opaque_primary_header)
134 self._hdu_list.append(self._primary_hdu)
135 self._json_hdu_added: bool = False
136 self._frame_sets: list[tuple[FrameSet, TableCellReferenceModel]] = []
138 @classmethod
139 @contextmanager
140 def open(
141 cls,
142 filename: str,
143 compression_options: Mapping[str, FitsCompressionOptions | None] | None = None,
144 opaque_metadata: Any = None,
145 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates,
146 compression_seed: int | None = None,
147 ) -> Iterator[Self]:
148 """Create an output archive that writes to the given file.
150 Parameters
151 ----------
152 filename
153 Name of the file to write to. Must not already exist.
154 compression_options
155 Options for how to compress the FITS file, keyed by the name of
156 the attribute (with JSON pointer ``/`` separators for nested
157 attributes).
158 opaque_metadata
159 Metadata read from an input archive along with the object being
160 written now. Ignored if the metadata is not from a FITS archive.
161 update_header
162 A callback that will be given the primary HDU FITS header and an
163 opportunity to modify it.
164 compression_seed
165 A FITS tile compression seed to use whenever the configured
166 compression seed is `None` or (for backwards compatibility) ``0``.
167 This value is then incremented every time it is used.
169 Returns
170 -------
171 `contextlib.AbstractContextManager` [`FitsOutputArchive`]
172 A context manager that returns a `FitsOutputArchive` when entered.
173 """
174 with astropy.io.fits.open(filename, mode="append") as hdu_list:
175 if hdu_list:
176 raise OSError(f"File {filename!r} already exists.")
177 archive = cls(hdu_list, compression_options, opaque_metadata, compression_seed=compression_seed)
178 update_header(hdu_list[0].header)
179 yield archive
180 if not archive._json_hdu_added:
181 raise RuntimeError("Write context exited without 'add_tree' being called.")
182 hdu_list.flush()
183 # This multi-open dance is necessary to get Astropy to tell us the
184 # byte addresses of the HDUs. Hopefully we can get an upstream change
185 # make this unnecessary at some point.
186 with astropy.io.fits.open(filename, mode="readonly", disable_image_compression=True) as hdu_list:
187 index_hdu = cls._make_index_table(hdu_list)
188 with astropy.io.fits.open(filename, mode="append") as hdu_list:
189 hdu_list.append(index_hdu)
190 json_bytes = _HDUBytes.from_index_row(index_hdu.data[-1])
191 index_bytes = _HDUBytes.from_write_hdu(index_hdu)
192 # Update the primary HDU with the address and size of the index and
193 # JSON HDUs, and rewrite just that. We do this write manually, since
194 # astropy's docs on its 'update' mode are scarce and it's not obvious
195 # whether we can guarantee it won't rewrite the whole file if we edit
196 # the primary header.
197 archive._primary_hdu.header["INDXADDR"] = index_bytes.header_address
198 archive._primary_hdu.header["INDXSIZE"] = index_bytes.size
199 archive._primary_hdu.header["JSONADDR"] = json_bytes.header_address
200 archive._primary_hdu.header["JSONSIZE"] = json_bytes.size
201 with open(filename, "r+b") as stream:
202 stream.write(archive._primary_hdu.header.tostring().encode())
204 def serialize_direct[T: pydantic.BaseModel](
205 self, name: str, serializer: Callable[[OutputArchive[TableCellReferenceModel]], T]
206 ) -> T:
207 nested = NestedOutputArchive[TableCellReferenceModel](name, self)
208 return serializer(nested)
210 def serialize_pointer[T: ArchiveTree](
211 self, name: str, serializer: Callable[[OutputArchive[TableCellReferenceModel]], T], key: Hashable
212 ) -> TableCellReferenceModel:
213 if (pointer := self._pointers_by_key.get(key)) is not None:
214 return pointer
215 pointer = TableCellReferenceModel(
216 source="fits:JSON", column="JSON", row=len(self._pointer_targets) + 1
217 )
218 model = self.serialize_direct("", serializer)
219 self._pointer_targets.append(model.model_dump_json().encode())
220 self._pointers_by_key[key] = pointer
221 return pointer
223 def serialize_frame_set[T: ArchiveTree](
224 self, name: str, frame_set: FrameSet, serializer: Callable[[OutputArchive], T], key: Hashable
225 ) -> TableCellReferenceModel:
226 # Docstring inherited.
227 pointer = self.serialize_pointer(name, serializer, key)
228 self._frame_sets.append((frame_set, pointer))
229 return pointer
231 def iter_frame_sets(self) -> Iterator[tuple[FrameSet, TableCellReferenceModel]]:
232 return iter(self._frame_sets)
234 def add_array(
235 self,
236 array: np.ndarray,
237 *,
238 name: str | None = None,
239 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates,
240 ) -> ArrayReferenceModel:
241 if name is None:
242 raise RuntimeError("Cannot save array with name=None unless it is nested.")
243 extname = name.upper()
244 hdu = self._opaque_metadata.maybe_use_precompressed(extname)
245 if hdu is None:
246 if (compression_options := self._get_compression_options(name)) is not None:
247 hdu = compression_options.make_hdu(array, name=extname)
248 else:
249 hdu = astropy.io.fits.ImageHDU(array, name=extname)
250 key = self._add_hdu(hdu, update_header)
251 return ArrayReferenceModel(
252 source=str(key),
253 shape=list(array.shape),
254 datatype=NumberType.from_numpy(array.dtype),
255 )
257 def add_table(
258 self,
259 table: astropy.table.Table,
260 *,
261 name: str | None = None,
262 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates,
263 ) -> TableReferenceModel:
264 if name is None:
265 raise RuntimeError("Cannot save table with name=None unless it is nested.")
266 extname = name.upper()
267 hdu: astropy.io.fits.BinTableHDU = astropy.io.fits.table_to_hdu(table, name=extname)
268 # Extract column information directly from the input array, not the
269 # data in the binary table HDU, because we want to assume as little as
270 # possible about where Astropy does uint -> TZERO stuff.
271 columns = ColumnDefinitionModel.from_table(table)
272 key = self._add_hdu(hdu, update_header)
273 return TableReferenceModel(source=str(key), columns=columns)
275 def add_structured_array(
276 self,
277 array: np.ndarray,
278 *,
279 name: str | None = None,
280 units: Mapping[str, astropy.units.Unit] | None = None,
281 descriptions: Mapping[str, str] | None = None,
282 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates,
283 ) -> TableReferenceModel:
284 if name is None:
285 raise RuntimeError("Cannot save structured array with name=None unless it is nested.")
286 extname = name.upper()
287 # Extract column information directly from the input array, not the
288 # data in the binary table HDU, because we want to assume as little as
289 # possible about where Astropy does uint -> TZERO stuff.
290 columns = ColumnDefinitionModel.from_record_dtype(array.dtype)
291 hdu = astropy.io.fits.BinTableHDU(array, name=extname)
292 if units is not None:
293 for c in columns:
294 c.unit = units.get(c.name)
295 if descriptions is not None:
296 for c in columns:
297 c.description = descriptions.get(c.name, "")
298 key = self._add_hdu(hdu, update_header)
299 return TableReferenceModel(source=str(key), columns=columns)
301 def _add_hdu(
302 self,
303 hdu: astropy.io.fits.ImageHDU | astropy.io.fits.CompImageHDU | astropy.io.fits.BinTableHDU,
304 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates,
305 ) -> ExtensionKey:
306 n_hdus = self._hdus_by_name.get(hdu.name, 0)
307 key = ExtensionKey(hdu.name, n_hdus + 1)
308 if n_hdus:
309 hdu.header["EXTVER"] = key.ver
310 self._hdus_by_name[hdu.name] += 1
311 update_header(hdu.header)
312 if (opaque_headers := self._opaque_metadata.headers.get(key)) is not None:
313 hdu.header.extend(opaque_headers)
314 self._hdu_list.append(hdu)
315 return key
317 def add_tree(self, tree: ArchiveTree) -> None:
318 """Write the JSON tree to the archive.
320 This method must be called exactly once, just before the `open` context
321 is exited.
323 Parameters
324 ----------
325 tree
326 Pydantic model that represents the tree.
327 """
328 json_hdu = astropy.io.fits.BinTableHDU.from_columns(
329 [astropy.io.fits.Column("JSON", "PB")],
330 nrows=len(self._pointer_targets) + 1,
331 name="JSON",
332 )
333 json_hdu.data[0]["JSON"] = np.frombuffer(tree.model_dump_json().encode(), dtype=np.byte)
334 for n, json_target_data in enumerate(self._pointer_targets):
335 json_hdu.data[n + 1]["JSON"] = np.frombuffer(json_target_data, dtype=np.byte)
336 self._hdu_list.append(json_hdu)
337 self._json_hdu_added = True
339 def _get_compression_options(self, name: str) -> FitsCompressionOptions | None:
340 result = self._compression_options.get(name, FitsCompressionOptions.DEFAULT)
341 if result is None or result.quantization is None:
342 return result
343 if self._compression_seed is not None and not result.quantization.seed:
344 result = result.model_copy(
345 update={
346 "quantization": result.quantization.model_copy(update={"seed": self._compression_seed})
347 }
348 )
349 self._compression_seed += 1
350 if self._compression_seed > 10000:
351 self._compression_seed = 1
352 # MyPy can tell that result.quantization is not None in the 'if', but
353 # forgets that by this 'else':
354 elif result.quantization.seed is None: # type: ignore[union-attr]
355 raise RuntimeError("No quantization seed provided.")
356 return result
358 @staticmethod
359 def _make_index_table(hdu_list: astropy.io.fits.HDUList) -> astropy.io.fits.BinTableHDU:
360 # We use a fixed-length string for the EXTNAME column; it might be
361 # better to use a variable-length array, but I have not been able to
362 # figure out how to get astropy to accept a string for the the
363 # character (TFORM='A') variant of that. And that's only better if the
364 # EXTNAMEs get super long, which is not likely (but maybe something to
365 # guard against).
366 max_name_size = max(len(hdu.header.get("EXTNAME", "")) for hdu in hdu_list)
367 index_hdu = astropy.io.fits.BinTableHDU.from_columns(
368 [
369 astropy.io.fits.Column("EXTNAME", f"A{max_name_size}"),
370 astropy.io.fits.Column("EXTVER", "J"),
371 astropy.io.fits.Column("XTENSION", "A8"),
372 astropy.io.fits.Column("ZIMAGE", "L"),
373 ]
374 + _HDUBytes.get_index_hdu_columns(),
375 nrows=len(hdu_list),
376 name="INDEX",
377 )
378 hdu: ExtensionHDU | astropy.io.fits.PrimaryHDU
379 for n, hdu in enumerate(hdu_list):
380 index_hdu.data[n]["EXTNAME"] = hdu.header.get("EXTNAME", "")
381 index_hdu.data[n]["EXTVER"] = hdu.header.get("EXTVER", 1)
382 index_hdu.data[n]["XTENSION"] = hdu.header.get("XTENSION", "IMAGE")
383 index_hdu.data[n]["ZIMAGE"] = hdu.header.get("ZIMAGE", False)
384 bytes = _HDUBytes.from_read_hdu(hdu)
385 bytes.update_index_row(index_hdu.data[n])
386 return index_hdu
389@dataclasses.dataclass
390class _HDUBytes:
391 """A struct that records the byte offsets into a FITS HDU."""
393 @classmethod
394 def from_write_hdu(cls, hdu: astropy.io.fits.PrimaryHDU | ExtensionHDU) -> Self:
395 """Construct from an Astropy HDU instance that has just been written.
397 Parameters
398 ----------
399 hdu
400 An Astropy HDU object.
402 Returns
403 -------
404 hdu_bytes
405 Struct with byte offsets.
407 Notes
408 -----
409 This method relies on internal Astropy attributes and does not work on
410 CompImageHDU objects.
411 """
412 # This is implemented by accessing private Astropy attributes because
413 # it turns out that's much more reliable than the public fileinfo()
414 # method, which seems to always return a dict with `None` entries or
415 # raise; it looks buggy, but docs are scarce enough that it's not clear
416 # what the right behavior is supposed to be.
417 if (header_address := getattr(hdu, "_header_offset", None)) is None:
418 raise RuntimeError("Failed to get Astropy's _header_offset.")
419 if (data_address := getattr(hdu, "_data_offset", None)) is None:
420 raise RuntimeError("Failed to get Astropy's _data_offset.")
421 if (data_size := getattr(hdu, "_data_size", None)) is None:
422 raise RuntimeError("Failed to get Astropy's _data_size.")
423 return cls(header_address, data_address, data_size)
425 @classmethod
426 def from_read_hdu(cls, hdu: astropy.io.fits.PrimaryHDU | ExtensionHDU) -> Self:
427 """Construct from an Astropy HDU instance that has just been read.
429 Parameters
430 ----------
431 hdu
432 An Astropy HDU object.
434 Returns
435 -------
436 hdu_bytes
437 Struct with byte offsets.
438 """
439 info = hdu.fileinfo()
440 header_address = info["hdrLoc"]
441 data_address = info["datLoc"]
442 data_size = info["datSpan"]
443 return cls(header_address, data_address, data_size)
445 @classmethod
446 def from_index_row(cls, row: np.void) -> Self:
447 """Construct from a row of the index HDU.
449 Parameters
450 ----------
451 row
452 A Numpy struct-like scalar.
454 Returns
455 -------
456 hdu_bytes
457 Struct with byte offsets.
458 """
459 return cls(
460 header_address=int(row["HDRADDR"]),
461 data_address=int(row["DATADDR"]),
462 data_size=int(row["DATSIZE"]),
463 )
465 @staticmethod
466 def get_index_hdu_columns() -> list[astropy.io.fits.Column]:
467 """Return the definitions of the columns this class gets and sets
468 from the index HDU.
470 Returns
471 -------
472 columns
473 A `list` of `astropy.io.fits.Column` objects that represent the
474 header address, data address, and data size.
475 """
476 return [
477 astropy.io.fits.Column("HDRADDR", "K"),
478 astropy.io.fits.Column("DATADDR", "K"),
479 astropy.io.fits.Column("DATSIZE", "K"),
480 ]
482 header_address: int
483 """Offset from the beginning of the start of the file to the header of this
484 HDU, in bytes.
485 """
487 data_address: int
488 """Offset from the beginning of the start of the file to the data section
489 of this HDU, in bytes.
490 """
492 data_size: int
493 """Size of the data section in bytes."""
495 @property
496 def header_size(self) -> int:
497 """Size of the header in bytes."""
498 return self.data_address - self.header_address
500 @property
501 def end_address(self) -> int:
502 """Offset in bytes from the start of the file to the end of the HDU."""
503 return self.data_address + self.data_size
505 @property
506 def size(self) -> int:
507 """Total size of this HDU in bytes."""
508 return self.data_size + self.data_address - self.header_address
510 def update_index_row(self, row: np.void) -> None:
511 """Set the values of a row of the index HDU from this strut.
513 Parameters
514 ----------
515 row
516 A Numpy struct-like scalar to modify in place.
517 """
518 row["HDRADDR"] = self.header_address
519 row["DATADDR"] = self.data_address
520 row["DATSIZE"] = self.data_size