Coverage for python / lsst / images / json / _output_archive.py: 29%
59 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-06 08:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-06 08:48 +0000
1# This file is part of lsst-images.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# Use of this source code is governed by a 3-clause BSD-style
10# license that can be found in the LICENSE file.
12from __future__ import annotations
14__all__ = ("JsonOutputArchive", "write")
16from collections.abc import Callable, Hashable, Iterator, Mapping
17from typing import TYPE_CHECKING, Any
19import astropy.table
20import numpy as np
21import pydantic
23from lsst.resources import ResourcePath, ResourcePathExpression
25from .._transforms import FrameSet
26from ..serialization import (
27 ArchiveTree,
28 ButlerInfo,
29 InlineArrayModel,
30 JsonRef,
31 MetadataValue,
32 NestedOutputArchive,
33 NumberType,
34 OutputArchive,
35 TableColumnModel,
36 TableModel,
37 no_header_updates,
38)
40if TYPE_CHECKING:
41 import astropy.io.fits
44def write(
45 obj: Any,
46 path: ResourcePathExpression | None = None,
47 metadata: dict[str, MetadataValue] | None = None,
48 butler_info: ButlerInfo | None = None,
49) -> ArchiveTree:
50 """Write an object with a ``serialize`` method to a JSON file.
52 Parameters
53 ----------
54 path
55 Name of the file to write to (may be a URI). If not provided, a
56 serializable model is returned but not written to disk.
57 metadata
58 Additional metadata to save with the object. This will override any
59 flexible metadata carried by the object itself with the same keys.
60 butler_info
61 Butler information to store in the file.
63 Returns
64 -------
65 `.serialization.ArchiveTree`
66 The serialized representation of the object.
67 """
68 archive = JsonOutputArchive()
69 name = getattr(obj, "_archive_default_name", None)
70 tree = archive.serialize_direct(name, obj.serialize) if name is not None else obj.serialize(archive)
71 if metadata is not None:
72 tree.metadata.update(metadata)
73 if butler_info is not None:
74 tree.butler_info = butler_info
75 archive.finish(tree)
76 if path is not None:
77 ResourcePath(path).write(tree.model_dump_json().encode())
78 return tree
81class JsonOutputArchive(OutputArchive[JsonRef]):
82 """An implementation of the `.serialization.OutputArchive` interface that
83 writes to JSON files.
85 This archive type is designed for pure-JSON objects and cases where any
86 images or tables are tiny. It will be *extremely* inefficient for large
87 images or tables, if it works at all.
88 """
90 def __init__(self) -> None:
91 self._pointers: dict[Hashable, JsonRef] = {}
92 self._indirect: list[Any] = []
93 self._frame_sets: list[tuple[FrameSet, JsonRef]] = []
95 def serialize_direct[T: pydantic.BaseModel](
96 self, name: str, serializer: Callable[[OutputArchive[JsonRef]], T]
97 ) -> T:
98 nested = NestedOutputArchive[JsonRef](name, self)
99 return serializer(nested)
101 def serialize_pointer[T: ArchiveTree](
102 self, name: str, serializer: Callable[[OutputArchive[JsonRef]], T], key: Hashable
103 ) -> JsonRef:
104 if (pointer := self._pointers.get(key)) is not None:
105 return pointer
106 pointer = JsonRef.model_construct(ref=f"#/indirect/{len(self._indirect)}")
107 self._indirect.append(self.serialize_direct(name, serializer).model_dump())
108 self._pointers[key] = pointer
109 return pointer
111 def serialize_frame_set[T: ArchiveTree](
112 self, name: str, frame_set: FrameSet, serializer: Callable[[OutputArchive], T], key: Hashable
113 ) -> JsonRef:
114 pointer = self.serialize_pointer(name, serializer, key)
115 self._frame_sets.append((frame_set, pointer))
116 return pointer
118 def iter_frame_sets(self) -> Iterator[tuple[FrameSet, JsonRef]]:
119 return iter(self._frame_sets)
121 def add_array(
122 self,
123 array: np.ndarray,
124 *,
125 name: str | None = None,
126 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates,
127 ) -> InlineArrayModel:
128 return InlineArrayModel(
129 data=array.tolist(),
130 datatype=NumberType.from_numpy(array.dtype),
131 )
133 def add_table(
134 self,
135 table: astropy.table.Table,
136 *,
137 name: str | None = None,
138 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates,
139 ) -> TableModel:
140 columns = TableColumnModel.from_table(table, inline=True)
141 return TableModel(columns=columns, meta=table.meta)
143 def add_structured_array(
144 self,
145 array: np.ndarray,
146 *,
147 name: str | None = None,
148 units: Mapping[str, astropy.units.Unit] | None = None,
149 descriptions: Mapping[str, str] | None = None,
150 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates,
151 ) -> TableModel:
152 columns = TableColumnModel.from_record_array(array, inline=True)
153 for c in columns:
154 if units and (unit := units.get(c.name)):
155 c.unit = unit
156 if descriptions and (description := descriptions.get(c.name)):
157 c.description = description
158 return TableModel(columns=columns)
160 def finish[T: ArchiveTree](self, tree: T) -> T:
161 """Finish serialization.
163 Parameters
164 ----------
165 tree
166 Serialized archive tree to write, which is modified in place
167 (the ``indirect`` attribute is overwritten) and then returned.
168 """
169 tree.indirect = self._indirect
170 return tree