Coverage for python / lsst / images / json / _output_archive.py: 29%

59 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-01 08:36 +0000

1# This file is part of lsst-images. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# Use of this source code is governed by a 3-clause BSD-style 

10# license that can be found in the LICENSE file. 

11 

12from __future__ import annotations 

13 

14__all__ = ("JsonOutputArchive", "write") 

15 

16from collections.abc import Callable, Hashable, Iterator, Mapping 

17from typing import TYPE_CHECKING, Any 

18 

19import astropy.table 

20import numpy as np 

21import pydantic 

22 

23from lsst.resources import ResourcePath, ResourcePathExpression 

24 

25from .._transforms import FrameSet 

26from ..serialization import ( 

27 ArchiveTree, 

28 ButlerInfo, 

29 InlineArrayModel, 

30 JsonRef, 

31 MetadataValue, 

32 NestedOutputArchive, 

33 NumberType, 

34 OutputArchive, 

35 TableColumnModel, 

36 TableModel, 

37 no_header_updates, 

38) 

39 

40if TYPE_CHECKING: 

41 import astropy.io.fits 

42 

43 

44def write( 

45 obj: Any, 

46 path: ResourcePathExpression | None = None, 

47 metadata: dict[str, MetadataValue] | None = None, 

48 butler_info: ButlerInfo | None = None, 

49) -> ArchiveTree: 

50 """Write an object with a ``serialize`` method to a JSON file. 

51 

52 Parameters 

53 ---------- 

54 path 

55 Name of the file to write to (may be a URI). If not provided, a 

56 serializable model is returned but not written to disk. 

57 metadata 

58 Additional metadata to save with the object. This will override any 

59 flexible metadata carried by the object itself with the same keys. 

60 butler_info 

61 Butler information to store in the file. 

62 

63 Returns 

64 ------- 

65 `.serialization.ArchiveTree` 

66 The serialized representation of the object. 

67 """ 

68 archive = JsonOutputArchive() 

69 name = getattr(obj, "_archive_default_name", None) 

70 tree = archive.serialize_direct(name, obj.serialize) if name is not None else obj.serialize(archive) 

71 if metadata is not None: 

72 tree.metadata.update(metadata) 

73 if butler_info is not None: 

74 tree.butler_info = butler_info 

75 archive.finish(tree) 

76 if path is not None: 

77 ResourcePath(path).write(tree.model_dump_json().encode()) 

78 return tree 

79 

80 

81class JsonOutputArchive(OutputArchive[JsonRef]): 

82 """An implementation of the `.serialization.OutputArchive` interface that 

83 writes to JSON files. 

84 

85 This archive type is designed for pure-JSON objects and cases where any 

86 images or tables are tiny. It will be *extremely* inefficient for large 

87 images or tables, if it works at all. 

88 """ 

89 

90 def __init__(self) -> None: 

91 self._pointers: dict[Hashable, JsonRef] = {} 

92 self._indirect: list[Any] = [] 

93 self._frame_sets: list[tuple[FrameSet, JsonRef]] = [] 

94 

95 def serialize_direct[T: pydantic.BaseModel]( 

96 self, name: str, serializer: Callable[[OutputArchive[JsonRef]], T] 

97 ) -> T: 

98 nested = NestedOutputArchive[JsonRef](name, self) 

99 return serializer(nested) 

100 

101 def serialize_pointer[T: ArchiveTree]( 

102 self, name: str, serializer: Callable[[OutputArchive[JsonRef]], T], key: Hashable 

103 ) -> JsonRef: 

104 if (pointer := self._pointers.get(key)) is not None: 

105 return pointer 

106 pointer = JsonRef.model_construct(ref=f"#/indirect/{len(self._indirect)}") 

107 self._indirect.append(self.serialize_direct(name, serializer).model_dump()) 

108 self._pointers[key] = pointer 

109 return pointer 

110 

111 def serialize_frame_set[T: ArchiveTree]( 

112 self, name: str, frame_set: FrameSet, serializer: Callable[[OutputArchive], T], key: Hashable 

113 ) -> JsonRef: 

114 pointer = self.serialize_pointer(name, serializer, key) 

115 self._frame_sets.append((frame_set, pointer)) 

116 return pointer 

117 

118 def iter_frame_sets(self) -> Iterator[tuple[FrameSet, JsonRef]]: 

119 return iter(self._frame_sets) 

120 

121 def add_array( 

122 self, 

123 array: np.ndarray, 

124 *, 

125 name: str | None = None, 

126 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates, 

127 ) -> InlineArrayModel: 

128 return InlineArrayModel( 

129 data=array.tolist(), 

130 datatype=NumberType.from_numpy(array.dtype), 

131 ) 

132 

133 def add_table( 

134 self, 

135 table: astropy.table.Table, 

136 *, 

137 name: str | None = None, 

138 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates, 

139 ) -> TableModel: 

140 columns = TableColumnModel.from_table(table, inline=True) 

141 return TableModel(columns=columns, meta=table.meta) 

142 

143 def add_structured_array( 

144 self, 

145 array: np.ndarray, 

146 *, 

147 name: str | None = None, 

148 units: Mapping[str, astropy.units.Unit] | None = None, 

149 descriptions: Mapping[str, str] | None = None, 

150 update_header: Callable[[astropy.io.fits.Header], None] = no_header_updates, 

151 ) -> TableModel: 

152 columns = TableColumnModel.from_record_array(array, inline=True) 

153 for c in columns: 

154 if units and (unit := units.get(c.name)): 

155 c.unit = unit 

156 if descriptions and (description := descriptions.get(c.name)): 

157 c.description = description 

158 return TableModel(columns=columns) 

159 

160 def finish[T: ArchiveTree](self, tree: T) -> T: 

161 """Finish serialization. 

162 

163 Parameters 

164 ---------- 

165 tree 

166 Serialized archive tree to write, which is modified in place 

167 (the ``indirect`` attribute is overwritten) and then returned. 

168 """ 

169 tree.indirect = self._indirect 

170 return tree