Coverage for python / lsst / images / fits / _output_archive.py: 21%
210 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-01 08:36 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-01 08:36 +0000
1# This file is part of lsst-images.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# Use of this source code is governed by a 3-clause BSD-style
10# license that can be found in the LICENSE file.
12from __future__ import annotations
14__all__ = ("FitsOutputArchive", "write")
16import dataclasses
17from collections import Counter
18from collections.abc import Callable, Hashable, Iterator, Mapping
19from contextlib import contextmanager
20from typing import Any, Self
22import astropy.io.fits
23import astropy.table
24import numpy as np
25import pydantic
27from .._transforms import FrameSet
28from ..serialization import (
29 ArchiveTree,
30 ArrayReferenceModel,
31 ButlerInfo,
32 MetadataValue,
33 NestedOutputArchive,
34 NumberType,
35 OutputArchive,
36 TableColumnModel,
37 TableModel,
38 no_header_updates,
39)
40from ._common import (
41 JSON_COLUMN,
42 JSON_EXTNAME,
43 ExtensionHDU,
44 ExtensionKey,
45 FitsCompressionOptions,
46 FitsOpaqueMetadata,
47 PointerModel,
48)
51def write(
52 obj: Any,
53 path: str,
54 compression_options: Mapping[str, FitsCompressionOptions | None] | None = None,
55 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates,
56 compression_seed: int | None = None,
57 metadata: dict[str, MetadataValue] | None = None,
58 butler_info: ButlerInfo | None = None,
59) -> Any:
60 """Write an object with a ``serialize`` method to a FITS file.
62 Parameters
63 ----------
64 path
65 Name of the file to write to. Must not already exist.
66 compression_options
67 Options for how to compress the FITS file, keyed by the name of
68 the attribute (with JSON pointer ``/`` separators for nested
69 attributes).
70 update_header
71 A callback that will be given the primary HDU FITS header and an
72 opportunity to modify it.
73 compression_seed
74 A FITS tile compression seed to use whenever the configured
75 compression seed is `None` or (for backwards compatibility) ``0``.
76 This value is then incremented every time it is used.
77 metadata
78 Additional metadata to save with the object. This will override any
79 flexible metadata carried by the object itself with the same keys.
80 butler_info
81 Butler information to store in the file.
83 Returns
84 -------
85 `.serialization.ArchiveTree`
86 The serialized representation of the object.
87 """
88 opaque_metadata = getattr(obj, "_opaque_metadata", None)
89 name = getattr(obj, "_archive_default_name", None)
90 with FitsOutputArchive.open(
91 path,
92 compression_options=compression_options,
93 opaque_metadata=opaque_metadata,
94 update_header=update_header,
95 compression_seed=compression_seed,
96 ) as archive:
97 tree = archive.serialize_direct(name, obj.serialize) if name is not None else obj.serialize(archive)
98 if metadata is not None:
99 tree.metadata.update(metadata)
100 if butler_info is not None:
101 tree.butler_info = butler_info
102 archive.add_tree(tree)
103 return tree
106class FitsOutputArchive(OutputArchive[PointerModel]):
107 """An implementation of the `.serialization.OutputArchive` interface that
108 writes to FITS files.
110 Instances of this class should only be constructed via the `open`
111 context manager.
112 """
114 def __init__(
115 self,
116 hdu_list: astropy.io.fits.HDUList,
117 compression_options: Mapping[str, FitsCompressionOptions | None] | None = None,
118 opaque_metadata: Any = None,
119 compression_seed: int | None = None,
120 ):
121 # JSON blobs for objects we've saved as pointers:
122 self._pointer_targets: list[bytes] = []
123 # Mapping from user provided key (e.g. id(some object)) to a table
124 # pointer to where we actually saved it:
125 self._pointers_by_key: dict[Hashable, PointerModel] = {}
126 self._hdu_list = hdu_list
127 self._primary_hdu = astropy.io.fits.PrimaryHDU()
128 # TODO: add subformat description and version to primary HDU.
129 self._primary_hdu.header.set("INDXADDR", 0, "Offset in bytes to the HDU index.")
130 self._primary_hdu.header.set("INDXSIZE", 0, "Size of the HDU index.")
131 self._primary_hdu.header.set("JSONADDR", 0, "Offset in bytes to the JSON tree HDU.")
132 self._primary_hdu.header.set("JSONSIZE", 0, "Size of the JSON tree HDU.")
133 self._hdus_by_name = Counter[str]()
134 self._compression_options = dict(compression_options) if compression_options is not None else {}
135 self._compression_seed = compression_seed
136 self._opaque_metadata = (
137 opaque_metadata if isinstance(opaque_metadata, FitsOpaqueMetadata) else FitsOpaqueMetadata()
138 )
139 if (opaque_primary_header := self._opaque_metadata.headers.get(ExtensionKey())) is not None:
140 self._primary_hdu.header.extend(opaque_primary_header)
141 self._hdu_list.append(self._primary_hdu)
142 self._json_hdu_added: bool = False
143 self._frame_sets: list[tuple[FrameSet, PointerModel]] = []
145 @classmethod
146 @contextmanager
147 def open(
148 cls,
149 filename: str,
150 compression_options: Mapping[str, FitsCompressionOptions | None] | None = None,
151 opaque_metadata: Any = None,
152 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates,
153 compression_seed: int | None = None,
154 ) -> Iterator[Self]:
155 """Create an output archive that writes to the given file.
157 Parameters
158 ----------
159 filename
160 Name of the file to write to. Must not already exist.
161 compression_options
162 Options for how to compress the FITS file, keyed by the name of
163 the attribute (with JSON pointer ``/`` separators for nested
164 attributes).
165 opaque_metadata
166 Metadata read from an input archive along with the object being
167 written now. Ignored if the metadata is not from a FITS archive.
168 update_header
169 A callback that will be given the primary HDU FITS header and an
170 opportunity to modify it.
171 compression_seed
172 A FITS tile compression seed to use whenever the configured
173 compression seed is `None` or (for backwards compatibility) ``0``.
174 This value is then incremented every time it is used.
176 Returns
177 -------
178 `contextlib.AbstractContextManager` [`FitsOutputArchive`]
179 A context manager that returns a `FitsOutputArchive` when entered.
180 """
181 with astropy.io.fits.open(filename, mode="append") as hdu_list:
182 if hdu_list:
183 raise OSError(f"File {filename!r} already exists.")
184 archive = cls(hdu_list, compression_options, opaque_metadata, compression_seed=compression_seed)
185 update_header(hdu_list[0].header)
186 yield archive
187 if not archive._json_hdu_added:
188 raise RuntimeError("Write context exited without 'add_tree' being called.")
189 hdu_list.flush()
190 # This multi-open dance is necessary to get Astropy to tell us the
191 # byte addresses of the HDUs. Hopefully we can get an upstream change
192 # make this unnecessary at some point.
193 with astropy.io.fits.open(filename, mode="readonly", disable_image_compression=True) as hdu_list:
194 index_hdu = cls._make_index_table(hdu_list)
195 with astropy.io.fits.open(filename, mode="append") as hdu_list:
196 hdu_list.append(index_hdu)
197 json_bytes = _HDUBytes.from_index_row(index_hdu.data[-1])
198 index_bytes = _HDUBytes.from_write_hdu(index_hdu)
199 # Update the primary HDU with the address and size of the index and
200 # JSON HDUs, and rewrite just that. We do this write manually, since
201 # astropy's docs on its 'update' mode are scarce and it's not obvious
202 # whether we can guarantee it won't rewrite the whole file if we edit
203 # the primary header.
204 archive._primary_hdu.header["INDXADDR"] = index_bytes.header_address
205 archive._primary_hdu.header["INDXSIZE"] = index_bytes.size
206 archive._primary_hdu.header["JSONADDR"] = json_bytes.header_address
207 archive._primary_hdu.header["JSONSIZE"] = json_bytes.size
208 with open(filename, "r+b") as stream:
209 stream.write(archive._primary_hdu.header.tostring().encode())
211 def serialize_direct[T: pydantic.BaseModel](
212 self, name: str, serializer: Callable[[OutputArchive[PointerModel]], T]
213 ) -> T:
214 nested = NestedOutputArchive[PointerModel](name, self)
215 return serializer(nested)
217 def serialize_pointer[T: ArchiveTree](
218 self, name: str, serializer: Callable[[OutputArchive[PointerModel]], T], key: Hashable
219 ) -> PointerModel:
220 if (pointer := self._pointers_by_key.get(key)) is not None:
221 return pointer
222 pointer = PointerModel(
223 column=TableColumnModel(
224 name=JSON_COLUMN,
225 data=ArrayReferenceModel(source=f"fits:{JSON_EXTNAME}[1]", datatype=NumberType.uint8),
226 ),
227 row=len(self._pointer_targets) + 1,
228 )
229 model = self.serialize_direct("", serializer)
230 self._pointer_targets.append(model.model_dump_json().encode())
231 self._pointers_by_key[key] = pointer
232 return pointer
234 def serialize_frame_set[T: ArchiveTree](
235 self, name: str, frame_set: FrameSet, serializer: Callable[[OutputArchive], T], key: Hashable
236 ) -> PointerModel:
237 # Docstring inherited.
238 pointer = self.serialize_pointer(name, serializer, key)
239 self._frame_sets.append((frame_set, pointer))
240 return pointer
242 def iter_frame_sets(self) -> Iterator[tuple[FrameSet, PointerModel]]:
243 return iter(self._frame_sets)
245 def add_array(
246 self,
247 array: np.ndarray,
248 *,
249 name: str | None = None,
250 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates,
251 ) -> ArrayReferenceModel:
252 if name is None:
253 raise RuntimeError("Cannot save array with name=None unless it is nested.")
254 extname = name.upper()
255 hdu = self._opaque_metadata.maybe_use_precompressed(extname)
256 if hdu is None:
257 if (compression_options := self._get_compression_options(name)) is not None:
258 hdu = compression_options.make_hdu(array, name=extname)
259 else:
260 hdu = astropy.io.fits.ImageHDU(array, name=extname)
261 key = self._add_hdu(hdu, update_header)
262 return ArrayReferenceModel(
263 source=str(key),
264 shape=list(array.shape),
265 datatype=NumberType.from_numpy(array.dtype),
266 )
268 def add_table(
269 self,
270 table: astropy.table.Table,
271 *,
272 name: str | None = None,
273 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates,
274 ) -> TableModel:
275 if name is None:
276 raise RuntimeError("Cannot save table with name=None unless it is nested.")
277 extname = name.upper()
278 hdu: astropy.io.fits.BinTableHDU = astropy.io.fits.table_to_hdu(table, name=extname)
279 # Extract column information directly from the input array, not the
280 # data in the binary table HDU, because we want to assume as little as
281 # possible about where Astropy does uint -> TZERO stuff.
282 columns = TableColumnModel.from_table(table)
283 key = self._add_hdu(hdu, update_header)
284 for n, c in enumerate(columns, start=1):
285 assert isinstance(c.data, ArrayReferenceModel)
286 c.data.source = f"{key}[{n}]"
287 return TableModel(columns=columns, meta=table.meta)
289 def add_structured_array(
290 self,
291 array: np.ndarray,
292 *,
293 name: str | None = None,
294 units: Mapping[str, astropy.units.Unit] | None = None,
295 descriptions: Mapping[str, str] | None = None,
296 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates,
297 ) -> TableModel:
298 if name is None:
299 raise RuntimeError("Cannot save structured array with name=None unless it is nested.")
300 extname = name.upper()
301 # Extract column information directly from the input array, not the
302 # data in the binary table HDU, because we want to assume as little as
303 # possible about where Astropy does uint -> TZERO stuff.
304 columns = TableColumnModel.from_record_dtype(array.dtype)
305 hdu = astropy.io.fits.BinTableHDU(array, name=extname)
306 if units is not None:
307 for c in columns:
308 c.unit = units.get(c.name)
309 if descriptions is not None:
310 for c in columns:
311 c.description = descriptions.get(c.name, "")
312 key = self._add_hdu(hdu, update_header)
313 for n, c in enumerate(columns, start=1):
314 assert isinstance(c.data, ArrayReferenceModel)
315 c.data.source = f"{key}[{n}]"
316 return TableModel(columns=columns)
318 def _add_hdu(
319 self,
320 hdu: astropy.io.fits.ImageHDU | astropy.io.fits.CompImageHDU | astropy.io.fits.BinTableHDU,
321 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates,
322 ) -> ExtensionKey:
323 n_hdus = self._hdus_by_name.get(hdu.name, 0)
324 key = ExtensionKey(hdu.name, n_hdus + 1)
325 key.check()
326 if n_hdus:
327 hdu.header["EXTVER"] = key.ver
328 self._hdus_by_name[hdu.name] += 1
329 update_header(hdu.header)
330 if (opaque_headers := self._opaque_metadata.headers.get(key)) is not None:
331 hdu.header.extend(opaque_headers)
332 self._hdu_list.append(hdu)
333 return key
335 def add_tree(self, tree: ArchiveTree) -> None:
336 """Write the JSON tree to the archive.
338 This method must be called exactly once, just before the `open` context
339 is exited.
341 Parameters
342 ----------
343 tree
344 Pydantic model that represents the tree.
345 """
346 json_hdu = astropy.io.fits.BinTableHDU.from_columns(
347 [astropy.io.fits.Column(JSON_COLUMN, "PB")],
348 nrows=len(self._pointer_targets) + 1,
349 name=JSON_EXTNAME,
350 )
351 json_hdu.data[0][JSON_COLUMN] = np.frombuffer(tree.model_dump_json().encode(), dtype=np.byte)
352 for n, json_target_data in enumerate(self._pointer_targets):
353 json_hdu.data[n + 1][JSON_COLUMN] = np.frombuffer(json_target_data, dtype=np.byte)
354 self._hdu_list.append(json_hdu)
355 self._json_hdu_added = True
357 def _get_compression_options(self, name: str) -> FitsCompressionOptions | None:
358 result = self._compression_options.get(name, FitsCompressionOptions.DEFAULT)
359 if result is None or result.quantization is None:
360 return result
361 if self._compression_seed is not None and not result.quantization.seed:
362 result = result.model_copy(
363 update={
364 "quantization": result.quantization.model_copy(update={"seed": self._compression_seed})
365 }
366 )
367 self._compression_seed += 1
368 if self._compression_seed > 10000:
369 self._compression_seed = 1
370 # MyPy can tell that result.quantization is not None in the 'if', but
371 # forgets that by this 'else':
372 elif result.quantization.seed is None: # type: ignore[union-attr]
373 raise RuntimeError("No quantization seed provided.")
374 return result
376 @staticmethod
377 def _make_index_table(hdu_list: astropy.io.fits.HDUList) -> astropy.io.fits.BinTableHDU:
378 # We use a fixed-length string for the EXTNAME column; it might be
379 # better to use a variable-length array, but I have not been able to
380 # figure out how to get astropy to accept a string for the the
381 # character (TFORM='A') variant of that. And that's only better if the
382 # EXTNAMEs get super long, which is not likely (but maybe something to
383 # guard against).
384 max_name_size = max(len(hdu.header.get("EXTNAME", "")) for hdu in hdu_list)
385 index_hdu = astropy.io.fits.BinTableHDU.from_columns(
386 [
387 astropy.io.fits.Column("EXTNAME", f"A{max_name_size}"),
388 astropy.io.fits.Column("EXTVER", "J"),
389 astropy.io.fits.Column("XTENSION", "A8"),
390 astropy.io.fits.Column("ZIMAGE", "L"),
391 ]
392 + _HDUBytes.get_index_hdu_columns(),
393 nrows=len(hdu_list),
394 name="INDEX",
395 )
396 hdu: ExtensionHDU | astropy.io.fits.PrimaryHDU
397 for n, hdu in enumerate(hdu_list):
398 index_hdu.data[n]["EXTNAME"] = hdu.header.get("EXTNAME", "")
399 index_hdu.data[n]["EXTVER"] = hdu.header.get("EXTVER", 1)
400 index_hdu.data[n]["XTENSION"] = hdu.header.get("XTENSION", "IMAGE")
401 index_hdu.data[n]["ZIMAGE"] = hdu.header.get("ZIMAGE", False)
402 bytes = _HDUBytes.from_read_hdu(hdu)
403 bytes.update_index_row(index_hdu.data[n])
404 return index_hdu
407@dataclasses.dataclass
408class _HDUBytes:
409 """A struct that records the byte offsets into a FITS HDU."""
411 @classmethod
412 def from_write_hdu(cls, hdu: astropy.io.fits.PrimaryHDU | ExtensionHDU) -> Self:
413 """Construct from an Astropy HDU instance that has just been written.
415 Parameters
416 ----------
417 hdu
418 An Astropy HDU object.
420 Returns
421 -------
422 hdu_bytes
423 Struct with byte offsets.
425 Notes
426 -----
427 This method relies on internal Astropy attributes and does not work on
428 CompImageHDU objects.
429 """
430 # This is implemented by accessing private Astropy attributes because
431 # it turns out that's much more reliable than the public fileinfo()
432 # method, which seems to always return a dict with `None` entries or
433 # raise; it looks buggy, but docs are scarce enough that it's not clear
434 # what the right behavior is supposed to be.
435 if (header_address := getattr(hdu, "_header_offset", None)) is None:
436 raise RuntimeError("Failed to get Astropy's _header_offset.")
437 if (data_address := getattr(hdu, "_data_offset", None)) is None:
438 raise RuntimeError("Failed to get Astropy's _data_offset.")
439 if (data_size := getattr(hdu, "_data_size", None)) is None:
440 raise RuntimeError("Failed to get Astropy's _data_size.")
441 return cls(header_address, data_address, data_size)
443 @classmethod
444 def from_read_hdu(cls, hdu: astropy.io.fits.PrimaryHDU | ExtensionHDU) -> Self:
445 """Construct from an Astropy HDU instance that has just been read.
447 Parameters
448 ----------
449 hdu
450 An Astropy HDU object.
452 Returns
453 -------
454 hdu_bytes
455 Struct with byte offsets.
456 """
457 info = hdu.fileinfo()
458 header_address = info["hdrLoc"]
459 data_address = info["datLoc"]
460 data_size = info["datSpan"]
461 return cls(header_address, data_address, data_size)
463 @classmethod
464 def from_index_row(cls, row: np.void) -> Self:
465 """Construct from a row of the index HDU.
467 Parameters
468 ----------
469 row
470 A Numpy struct-like scalar.
472 Returns
473 -------
474 hdu_bytes
475 Struct with byte offsets.
476 """
477 return cls(
478 header_address=int(row["HDRADDR"]),
479 data_address=int(row["DATADDR"]),
480 data_size=int(row["DATSIZE"]),
481 )
483 @staticmethod
484 def get_index_hdu_columns() -> list[astropy.io.fits.Column]:
485 """Return the definitions of the columns this class gets and sets
486 from the index HDU.
488 Returns
489 -------
490 columns
491 A `list` of `astropy.io.fits.Column` objects that represent the
492 header address, data address, and data size.
493 """
494 return [
495 astropy.io.fits.Column("HDRADDR", "K"),
496 astropy.io.fits.Column("DATADDR", "K"),
497 astropy.io.fits.Column("DATSIZE", "K"),
498 ]
500 header_address: int
501 """Offset from the beginning of the start of the file to the header of this
502 HDU, in bytes.
503 """
505 data_address: int
506 """Offset from the beginning of the start of the file to the data section
507 of this HDU, in bytes.
508 """
510 data_size: int
511 """Size of the data section in bytes."""
513 @property
514 def header_size(self) -> int:
515 """Size of the header in bytes."""
516 return self.data_address - self.header_address
518 @property
519 def end_address(self) -> int:
520 """Offset in bytes from the start of the file to the end of the HDU."""
521 return self.data_address + self.data_size
523 @property
524 def size(self) -> int:
525 """Total size of this HDU in bytes."""
526 return self.data_size + self.data_address - self.header_address
528 def update_index_row(self, row: np.void) -> None:
529 """Set the values of a row of the index HDU from this strut.
531 Parameters
532 ----------
533 row
534 A Numpy struct-like scalar to modify in place.
535 """
536 row["HDRADDR"] = self.header_address
537 row["DATADDR"] = self.data_address
538 row["DATSIZE"] = self.data_size