Coverage for python / lsst / images / serialization / _output_archive.py: 80%
49 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-30 09:07 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-30 09:07 +0000
1# This file is part of lsst-images.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# Use of this source code is governed by a 3-clause BSD-style
10# license that can be found in the LICENSE file.
12from __future__ import annotations
14__all__ = (
15 "NestedOutputArchive",
16 "OutputArchive",
17)
19from abc import ABC, abstractmethod
20from collections.abc import Callable, Hashable, Iterator, Mapping
21from typing import TYPE_CHECKING, TypeVar
23import astropy.io.fits
24import astropy.table
25import astropy.units
26import numpy as np
27import pydantic
29from ._asdf_utils import ArrayReferenceModel, InlineArrayModel
30from ._common import ArchiveTree, no_header_updates
31from ._tables import TableModel
33if TYPE_CHECKING:
34 from .._transforms import FrameSet
36# This pre-python-3.12 declaration is needed by Sphinx (probably the
37# autodoc-typehints plugin.
38P = TypeVar("P", bound=pydantic.BaseModel)
41class OutputArchive[P](ABC):
42 """Abstract interface for writing to a file format.
44 Notes
45 -----
46 An output archive instance is assumed to be paired with a Pydantic model
47 that represents a JSON tree, with the archive used to serialize data that
48 is not natively JSON into data that is (which may just be a reference to
49 binary data stored elsewhere in the file). The archive doesn't actually
50 hold that model instance because we don't want to assume it can be built
51 via default-initialization and assignment, and because we'd prefer to avoid
52 making the output archive generic over the model type. It is expected that
53 most concrete archive implementations will accept the paired model in some
54 sort of finalization method in order to write it into the file, but this is
55 not part of the base class interface.
56 """
58 @abstractmethod
59 def serialize_direct[T: pydantic.BaseModel](
60 self, name: str, serializer: Callable[[OutputArchive], T]
61 ) -> T:
62 """Use a serializer function to save a nested object.
64 Parameters
65 ----------
66 name
67 Attribute of the paired Pydantic model that will be assigned the
68 result of this call. If it will not be assigned to a direct
69 attribute, it may be a JSON Pointer path (relative to the paired
70 Pydantic model) to the location where it will be added.
71 serializer
72 Callable that takes an `~lsst.serialization.OutputArchive` and
73 returns a Pydantic model. This will be passed a new
74 `~lsst.serialization.OutputArchive` that automatically prepends
75 ``{name}/`` (and any root path added by this archive) to names
76 passed to it, so the ``serializer`` does not need to know where it
77 appears in the overall tree.
79 Returns
80 -------
81 T
82 Result of the call to the serializer.
83 """
84 raise NotImplementedError()
86 @abstractmethod
87 def serialize_pointer[T: ArchiveTree](
88 self, name: str, serializer: Callable[[OutputArchive], T], key: Hashable
89 ) -> T | P:
90 """Use a serializer function to save a nested object that may be
91 referenced in multiple locations in the same archive.
93 Parameters
94 ----------
95 name
96 Attribute of the paired Pydantic model that will be assigned the
97 result of this call. If it will not be assigned to a direct
98 attribute, it may be a JSON Pointer path (relative to the paired
99 Pydantic model) to the location where it will be added.
100 serializer
101 Callable that takes an `~lsst.serialization.OutputArchive` and
102 returns a Pydantic model. This will be passed a new
103 `~lsst.serialization.OutputArchive` that automatically prepends
104 ``{name}/`` (and any root path added by this archive) to names
105 passed to it, so the ``serializer`` does not need to know where it
106 appears in the overall tree.
107 key
108 A unique identifier for the in-memory object the serializer saves,
109 e.g. a call to the built-in `id` function.
111 Returns
112 -------
113 T | P
114 Either the result of the call to the serializer, or a Pydantic
115 model that can be considered a reference to it and added to a
116 larger model in its place.
117 """
118 # Since Pydantic doesn't provide us a good way to "dereference" a JSON
119 # Pointer (i.e. traversing the tree to extract the original model), it
120 # is probably easier to implement an `InputArchive` for the case where
121 # the `~lsst.serialization.OutputArchive` opts to stuff all pointer
122 # serializations into a standard location outside the user-controlled
123 # Pydantic model tree, and always returned a JSON pointer to that
124 # standard location from this function.
125 raise NotImplementedError()
127 @abstractmethod
128 def serialize_frame_set[T: ArchiveTree](
129 self, name: str, frame_set: FrameSet, serializer: Callable[[OutputArchive], T], key: Hashable
130 ) -> T | P:
131 """Serialize a frame set and make it available to objects saved later.
133 Parameters
134 ----------
135 name
136 Attribute of the paired Pydantic model that will be assigned the
137 result of this call. If it will not be assigned to a direct
138 attribute, it may be a JSON Pointer path (relative to the paired
139 Pydantic model) to the location where it will be added.
140 frame_set
141 The frame set being saved. This will be returned in later calls
142 to `iter_frame_sets`, along with the returned reference object.
143 serializer
144 Callable that takes an `~lsst.serialization.OutputArchive` and
145 returns a Pydantic model. This will be passed a new
146 `~lsst.serialization.OutputArchive` that automatically prepends
147 ``{name}/`` (and any root path added by this archive) to names
148 passed to it, so the ``serializer`` does not need to know where it
149 appears in the overall tree.
150 key
151 A unique identifier for the in-memory object the serializer saves,
152 e.g. a call to the built-in `id` function.
154 Returns
155 -------
156 T | P
157 Either the result of the call to the serializer, or a Pydantic
158 model that can be considered a reference to it and added to a
159 larger model in its place.
160 """
161 raise NotImplementedError()
163 @abstractmethod
164 def iter_frame_sets(self) -> Iterator[tuple[FrameSet, P]]:
165 """Iterate over the frame sets already serialized to this archive.
167 Yields
168 ------
169 frame_set
170 A frame set that has already been written to this archive.
171 reference
172 An implementation-specific reference model that points to the
173 frame set.
174 """
175 raise NotImplementedError()
177 @abstractmethod
178 def add_array(
179 self,
180 array: np.ndarray,
181 *,
182 name: str | None = None,
183 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates,
184 ) -> ArrayReferenceModel | InlineArrayModel:
185 """Add an array to the archive.
187 Parameters
188 ----------
189 array
190 Array to save.
191 name
192 Name of the array. This should generally be the name of the
193 Pydantic model attribute to which the result will be assigned. It
194 may be left `None` if there is only one [structured] array or
195 table in a nested object that is being saved.
196 update_header
197 A callback that will be given the FITS header for the HDU
198 containing this array in order to add keys to it. This callback
199 may be provided but will not be called if the output format is not
200 FITS.
202 Returns
203 -------
204 `~lsst.images.serialization.ArrayReferenceModel` |\
205 `~lsst.images.serialization.InlineArrayModel`
206 A Pydantic model that references or holds the stored array.
207 """
208 raise NotImplementedError()
210 @abstractmethod
211 def add_table(
212 self,
213 table: astropy.table.Table,
214 *,
215 name: str | None = None,
216 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates,
217 ) -> TableModel:
218 """Add a table to the archive.
220 Parameters
221 ----------
222 table
223 Table to save.
224 name
225 Name of the table. This should generally be the name of the
226 Pydantic model attribute to which the result will be assigned. It
227 may be left `None` if there is only one [structured] array or
228 table in a nested object that is being saved.
229 update_header
230 A callback that will be given the FITS header for the HDU
231 containing this table in order to add keys to it. This callback
232 may be provided but will not be called if the output format is not
233 FITS.
235 Returns
236 -------
237 TableModel
238 A Pydantic model that represents the table.
239 """
240 raise NotImplementedError()
242 @abstractmethod
243 def add_structured_array(
244 self,
245 array: np.ndarray,
246 *,
247 name: str | None = None,
248 units: Mapping[str, astropy.units.Unit] | None = None,
249 descriptions: Mapping[str, str] | None = None,
250 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates,
251 ) -> TableModel:
252 """Add a table to the archive.
254 Parameters
255 ----------
256 name
257 Attribute of the paired Pydantic model that will be assigned the
258 result of this call. If it will not be assigned to a direct
259 attribute, it may be a JSON Pointer path (relative to the paired
260 Pydantic model) to the location where it will be added.
261 array
262 A structured numpy array.
263 name
264 Name of the array. This should generally be the name of the
265 Pydantic model attribute to which the result will be assigned. It
266 may be left `None` if there is only one [structured] array or
267 table in a nested object that is being saved.
268 units
269 A mapping of units for columns. Need not be complete.
270 descriptions
271 A mapping of descriptions for columns. Need not be complete.
272 update_header
273 A callback that will be given the FITS header for the HDU
274 containing this table in order to add keys to it. This callback
275 may be provided but will not be called if the output format is not
276 FITS.
278 Returns
279 -------
280 TableModel
281 A Pydantic model that represents the table.
282 """
283 raise NotImplementedError()
286class NestedOutputArchive[P: pydantic.BaseModel](OutputArchive[P]):
287 """A proxy output archive that joins a root path into all names before
288 delegating back to its parent archive.
290 This is intended to be used in the implementation of most
291 `~lsst.serialization.OutputArchive.serialize_direct` and
292 `~lsst.serialization.OutputArchive.serialize_pointer` implementations.
294 Parameters
295 ----------
296 root
297 Root of all JSON Pointer paths. Should include a leading slash (as we
298 always use absolute JSON Pointers) but no trailing slash.
299 parent
300 Parent output archive to delegate to.
301 """
303 def __init__(self, root: str, parent: OutputArchive):
304 self._root = root
305 self._parent = parent
307 def serialize_direct[T: pydantic.BaseModel](
308 self, name: str, serializer: Callable[[OutputArchive[P]], T]
309 ) -> T:
310 return self._parent.serialize_direct(self._join_path(name), serializer)
312 def serialize_pointer[T: ArchiveTree](
313 self, name: str, serializer: Callable[[OutputArchive[P]], T], key: Hashable
314 ) -> T | P:
315 return self._parent.serialize_pointer(self._join_path(name), serializer, key)
317 def serialize_frame_set[T: ArchiveTree](
318 self, name: str, frame_set: FrameSet, serializer: Callable[[OutputArchive], T], key: Hashable
319 ) -> T | P:
320 return self._parent.serialize_frame_set(self._join_path(name), frame_set, serializer, key)
322 def iter_frame_sets(self) -> Iterator[tuple[FrameSet, P]]:
323 return self._parent.iter_frame_sets()
325 def add_array(
326 self,
327 array: np.ndarray,
328 *,
329 name: str | None = None,
330 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates,
331 ) -> ArrayReferenceModel | InlineArrayModel:
332 return self._parent.add_array(array, name=self._join_path(name), update_header=update_header)
334 def add_table(
335 self,
336 table: astropy.table.Table,
337 *,
338 name: str | None = None,
339 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates,
340 ) -> TableModel:
341 return self._parent.add_table(table, name=self._join_path(name), update_header=update_header)
343 def add_structured_array(
344 self,
345 array: np.ndarray,
346 *,
347 name: str | None = None,
348 units: Mapping[str, astropy.units.Unit] | None = None,
349 descriptions: Mapping[str, str] | None = None,
350 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates,
351 ) -> TableModel:
352 return self._parent.add_structured_array(
353 array,
354 name=self._join_path(name),
355 units=units,
356 descriptions=descriptions,
357 update_header=update_header,
358 )
360 def _join_path(self, name: str | None) -> str:
361 return f"{self._root}/{name}" if name is not None else self._root