Coverage for python / lsst / images / serialization / _output_archive.py: 80%
49 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-26 09:16 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-26 09:16 +0000
1# This file is part of lsst-images.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# Use of this source code is governed by a 3-clause BSD-style
10# license that can be found in the LICENSE file.
12from __future__ import annotations
14__all__ = (
15 "NestedOutputArchive",
16 "OutputArchive",
17)
19from abc import ABC, abstractmethod
20from collections.abc import Callable, Hashable, Iterator, Mapping
21from typing import TYPE_CHECKING, TypeVar
23import astropy.io.fits
24import astropy.table
25import astropy.units
26import numpy as np
27import pydantic
29from ._asdf_utils import ArrayReferenceModel
30from ._common import ArchiveTree, no_header_updates
31from ._tables import TableReferenceModel
33if TYPE_CHECKING:
34 from .._transforms import FrameSet
36# This pre-python-3.12 declaration is needed by Sphinx (probably the
37# autodoc-typehints plugin.
38P = TypeVar("P", bound=pydantic.BaseModel)
41class OutputArchive[P](ABC):
42 """Abstract interface for writing to a file format.
44 Notes
45 -----
46 An output archive instance is assumed to be paired with a Pydantic model
47 that represents a JSON tree, with the archive used to serialize data that
48 is not natively JSON into data that is (which may just be a reference to
49 binary data stored elsewhere in the file). The archive doesn't actually
50 hold that model instance because we don't want to assume it can be built
51 via default-initialization and assignment, and because we'd prefer to avoid
52 making the output archive generic over the model type. It is expected that
53 most concrete archive implementations will accept the paired model in some
54 sort of finalization method in order to write it into the file, but this is
55 not part of the base class interface.
56 """
58 @abstractmethod
59 def serialize_direct[T: pydantic.BaseModel](
60 self, name: str, serializer: Callable[[OutputArchive], T]
61 ) -> T:
62 """Use a serializer function to save a nested object.
64 Parameters
65 ----------
66 name
67 Attribute of the paired Pydantic model that will be assigned the
68 result of this call. If it will not be assigned to a direct
69 attribute, it may be a JSON Pointer path (relative to the paired
70 Pydantic model) to the location where it will be added.
71 serializer
72 Callable that takes an `~lsst.serialization.OutputArchive` and
73 returns a Pydantic model. This will be passed a new
74 `~lsst.serialization.OutputArchive` that automatically prepends
75 ``{name}/`` (and any root path added by this archive) to names
76 passed to it, so the ``serializer`` does not need to know where it
77 appears in the overall tree.
79 Returns
80 -------
81 T
82 Result of the call to the serializer.
83 """
84 raise NotImplementedError()
86 @abstractmethod
87 def serialize_pointer[T: ArchiveTree](
88 self, name: str, serializer: Callable[[OutputArchive], T], key: Hashable
89 ) -> T | P:
90 """Use a serializer function to save a nested object that may be
91 referenced in multiple locations in the same archive.
93 Parameters
94 ----------
95 name
96 Attribute of the paired Pydantic model that will be assigned the
97 result of this call. If it will not be assigned to a direct
98 attribute, it may be a JSON Pointer path (relative to the paired
99 Pydantic model) to the location where it will be added.
100 serializer
101 Callable that takes an `~lsst.serialization.OutputArchive` and
102 returns a Pydantic model. This will be passed a new
103 `~lsst.serialization.OutputArchive` that automatically prepends
104 ``{name}/`` (and any root path added by this archive) to names
105 passed to it, so the ``serializer`` does not need to know where it
106 appears in the overall tree.
107 key
108 A unique identifier for the in-memory object the serializer saves,
109 e.g. a call to the built-in `id` function.
111 Returns
112 -------
113 T | P
114 Either the result of the call to the serializer, or a Pydantic
115 model that can be considered a reference to it and added to a
116 larger model in its place.
117 """
118 # Since Pydantic doesn't provide us a good way to "dereference" a JSON
119 # Pointer (i.e. traversing the tree to extract the original model), it
120 # is probably easier to implement an `InputArchive` for the case where
121 # the `~lsst.serialization.OutputArchive` opts to stuff all pointer
122 # serializations into a standard location outside the user-controlled
123 # Pydantic model tree, and always returned a JSON pointer to that
124 # standard location from this function.
125 raise NotImplementedError()
127 @abstractmethod
128 def serialize_frame_set[T: ArchiveTree](
129 self, name: str, frame_set: FrameSet, serializer: Callable[[OutputArchive], T], key: Hashable
130 ) -> T | P:
131 """Serialize a frame set and make it available to objects saved later.
133 Parameters
134 ----------
135 name
136 Attribute of the paired Pydantic model that will be assigned the
137 result of this call. If it will not be assigned to a direct
138 attribute, it may be a JSON Pointer path (relative to the paired
139 Pydantic model) to the location where it will be added.
140 frame_set
141 The frame set being saved. This will be returned in later calls
142 to `iter_frame_sets`, along with the returned reference object.
143 serializer
144 Callable that takes an `~lsst.serialization.OutputArchive` and
145 returns a Pydantic model. This will be passed a new
146 `~lsst.serialization.OutputArchive` that automatically prepends
147 ``{name}/`` (and any root path added by this archive) to names
148 passed to it, so the ``serializer`` does not need to know where it
149 appears in the overall tree.
150 key
151 A unique identifier for the in-memory object the serializer saves,
152 e.g. a call to the built-in `id` function.
154 Returns
155 -------
156 T | P
157 Either the result of the call to the serializer, or a Pydantic
158 model that can be considered a reference to it and added to a
159 larger model in its place.
160 """
161 raise NotImplementedError()
163 @abstractmethod
164 def iter_frame_sets(self) -> Iterator[tuple[FrameSet, P]]:
165 """Iterate over the frame sets already serialized to this archive.
167 Yields
168 ------
169 frame_set
170 A frame set that has already been written to this archive.
171 reference
172 An implementation-specific reference model that points to the
173 frame set.
174 """
175 raise NotImplementedError()
177 @abstractmethod
178 def add_array(
179 self,
180 array: np.ndarray,
181 *,
182 name: str | None = None,
183 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates,
184 ) -> ArrayReferenceModel:
185 """Add an array to the archive.
187 Parameters
188 ----------
189 array
190 Array to save.
191 name
192 Name of the array. This should generally be the name of the
193 Pydantic model attribute to which the result will be assigned. It
194 may be left `None` if there is only one [structured] array or
195 table in a nested object that is being saved.
196 update_header
197 A callback that will be given the FITS header for the HDU
198 containing this array in order to add keys to it. This callback
199 may be provided but will not be called if the output format is not
200 FITS.
202 Returns
203 -------
204 ArrayReferenceModel
205 A Pydantic model that references the stored array.
206 """
207 raise NotImplementedError()
209 @abstractmethod
210 def add_table(
211 self,
212 table: astropy.table.Table,
213 *,
214 name: str | None = None,
215 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates,
216 ) -> TableReferenceModel:
217 """Add a table to the archive.
219 Parameters
220 ----------
221 table
222 Table to save.
223 name
224 Name of the table. This should generally be the name of the
225 Pydantic model attribute to which the result will be assigned. It
226 may be left `None` if there is only one [structured] array or
227 table in a nested object that is being saved.
228 update_header
229 A callback that will be given the FITS header for the HDU
230 containing this table in order to add keys to it. This callback
231 may be provided but will not be called if the output format is not
232 FITS.
234 Returns
235 -------
236 TableReferenceModel
237 A Pydantic model that represents the table. Column definitions
238 are included directly in the model while the actual data is
239 stored elsewhere and referenced by the model.
240 """
241 # TODO: ASDF has schemas for tables and columns that we should probably
242 # adopt [a subset of]. While that can reference external per-column
243 # data (which would Just Work for a true ASDF archive), I'm not sure
244 # there's a way to reference external data in a FITS binary table
245 # column. We could of course invent one, and since ASDF-in-FITS isn't
246 # even referenced on the ASDF standard page our existing approach for
247 # referencing FITS data in an image extension may be something only
248 # we'll be using, too.
249 raise NotImplementedError()
251 @abstractmethod
252 def add_structured_array(
253 self,
254 array: np.ndarray,
255 *,
256 name: str | None = None,
257 units: Mapping[str, astropy.units.Unit] | None = None,
258 descriptions: Mapping[str, str] | None = None,
259 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates,
260 ) -> TableReferenceModel:
261 """Add a table to the archive.
263 Parameters
264 ----------
265 name
266 Attribute of the paired Pydantic model that will be assigned the
267 result of this call. If it will not be assigned to a direct
268 attribute, it may be a JSON Pointer path (relative to the paired
269 Pydantic model) to the location where it will be added.
270 array
271 A structured numpy array.
272 name
273 Name of the array. This should generally be the name of the
274 Pydantic model attribute to which the result will be assigned. It
275 may be left `None` if there is only one [structured] array or
276 table in a nested object that is being saved.
277 units
278 A mapping of units for columns. Need not be complete.
279 descriptions
280 A mapping of descriptions for columns. Need not be complete.
281 update_header
282 A callback that will be given the FITS header for the HDU
283 containing this table in order to add keys to it. This callback
284 may be provided but will not be called if the output format is not
285 FITS.
287 Returns
288 -------
289 TableReferenceModel
290 A Pydantic model that represents the table. Column definitions
291 are included directly in the model while the actual data is
292 stored elsewhere and referenced by the model.
293 """
294 # TODO: ASDF has schemas for tables and columns that we should probably
295 # adopt [a subset of]. While that can reference external per-column
296 # data (which would Just Work for a true ASDF archive), I'm not sure
297 # there's a way to reference external data in a FITS binary table
298 # column. We could of course invent one, and since ASDF-in-FITS isn't
299 # even referenced on the ASDF standard page our existing approach for
300 # referencing FITS data in an image extension may be something only
301 # we'll be using, too.
302 raise NotImplementedError()
305class NestedOutputArchive[P: pydantic.BaseModel](OutputArchive[P]):
306 """A proxy output archive that joins a root path into all names before
307 delegating back to its parent archive.
309 This is intended to be used in the implementation of most
310 `~lsst.serialization.OutputArchive.serialize_direct` and
311 `~lsst.serialization.OutputArchive.serialize_pointer` implementations.
313 Parameters
314 ----------
315 root
316 Root of all JSON Pointer paths. Should include a leading slash (as we
317 always use absolute JSON Pointers) but no trailing slash.
318 parent
319 Parent output archive to delegate to.
320 """
322 def __init__(self, root: str, parent: OutputArchive):
323 self._root = root
324 self._parent = parent
326 def serialize_direct[T: pydantic.BaseModel](
327 self, name: str, serializer: Callable[[OutputArchive[P]], T]
328 ) -> T:
329 return self._parent.serialize_direct(self._join_path(name), serializer)
331 def serialize_pointer[T: ArchiveTree](
332 self, name: str, serializer: Callable[[OutputArchive[P]], T], key: Hashable
333 ) -> T | P:
334 return self._parent.serialize_pointer(self._join_path(name), serializer, key)
336 def serialize_frame_set[T: ArchiveTree](
337 self, name: str, frame_set: FrameSet, serializer: Callable[[OutputArchive], T], key: Hashable
338 ) -> T | P:
339 return self._parent.serialize_frame_set(self._join_path(name), frame_set, serializer, key)
341 def iter_frame_sets(self) -> Iterator[tuple[FrameSet, P]]:
342 return self._parent.iter_frame_sets()
344 def add_array(
345 self,
346 array: np.ndarray,
347 *,
348 name: str | None = None,
349 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates,
350 ) -> ArrayReferenceModel:
351 return self._parent.add_array(array, name=self._join_path(name), update_header=update_header)
353 def add_table(
354 self,
355 table: astropy.table.Table,
356 *,
357 name: str | None = None,
358 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates,
359 ) -> TableReferenceModel:
360 return self._parent.add_table(table, name=self._join_path(name), update_header=update_header)
362 def add_structured_array(
363 self,
364 array: np.ndarray,
365 *,
366 name: str | None = None,
367 units: Mapping[str, astropy.units.Unit] | None = None,
368 descriptions: Mapping[str, str] | None = None,
369 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates,
370 ) -> TableReferenceModel:
371 return self._parent.add_structured_array(
372 array,
373 name=self._join_path(name),
374 units=units,
375 descriptions=descriptions,
376 update_header=update_header,
377 )
379 def _join_path(self, name: str | None) -> str:
380 return f"{self._root}/{name}" if name is not None else self._root