Coverage for tests / test_transforms.py: 24%

131 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-01 08:36 +0000

1# This file is part of lsst-images. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# Use of this source code is governed by a 3-clause BSD-style 

10# license that can be found in the LICENSE file. 

11 

12from __future__ import annotations 

13 

14import dataclasses 

15import functools 

16import os 

17import unittest 

18from typing import Any 

19 

20import numpy as np 

21import pydantic 

22 

23from lsst.images import ( 

24 Box, 

25 CameraFrameSet, 

26 CameraFrameSetSerializationModel, 

27 DetectorFrame, 

28 FocalPlaneFrame, 

29 Projection, 

30 Transform, 

31 TransformSerializationModel, 

32) 

33from lsst.images.fits import PointerModel 

34from lsst.images.serialization import ArchiveTree, InputArchive, JsonRef, OutputArchive 

35from lsst.images.tests import ( 

36 DP2_VISIT_DETECTOR_DATA_ID, 

37 RoundtripFits, 

38 RoundtripJson, 

39 check_transform, 

40 compare_projection_to_legacy_wcs, 

41 legacy_points_to_xy_array, 

42) 

43 

44DATA_DIR = os.environ.get("TESTDATA_IMAGES_DIR", None) 

45 

46 

47class TransformTestCase(unittest.TestCase): 

48 """Tests for the Transform, Projection, and FrameSet classes.""" 

49 

50 def test_identity(self) -> None: 

51 """Test an identity transform.""" 

52 frame = DetectorFrame(**DP2_VISIT_DETECTOR_DATA_ID, bbox=Box.factory[:5, :4]) 

53 xy = frame.bbox.meshgrid().map(np.ravel) 

54 identity = Transform.identity(frame) 

55 check_transform(self, identity, xy, xy, frame, frame) 

56 self.assertEqual(identity.decompose(), []) 

57 with RoundtripJson(self, identity) as roundtrip: 

58 pass 

59 check_transform(self, roundtrip.result, xy, xy, frame, frame) 

60 

61 @unittest.skipUnless(DATA_DIR is not None, "TESTDATA_IMAGES_DIR is not in the environment.") 

62 def test_camera(self) -> None: 

63 """Test that we can: 

64 

65 - make a CameraFrameSet from the AST representation returned by afw; 

66 - transform points and get the same result as afw; 

67 - round-trip the CameraFrameSet through FITS serialization and still 

68 do all of that; 

69 - also roundtrip a Transform that can be obtained from the 

70 CameraFrameSet, by referencing the mappings in the frame set. 

71 

72 This test is skipped if legacy modules cannot be imported. 

73 

74 This test provides coverage for the archive system's pointer and 

75 frame-set reference machinery. 

76 """ 

77 try: 

78 from lsst.afw.cameraGeom import Camera 

79 except ImportError: 

80 raise unittest.SkipTest("'lsst.afw.cameraGeom' could not be imported.") from None 

81 assert DATA_DIR is not None, "Guaranteed by decorator." 

82 filename = os.path.join(DATA_DIR, "dp2", "legacy", "camera.fits") 

83 legacy_camera = Camera.readFits(filename) 

84 frame_set = CameraFrameSet.from_legacy(legacy_camera) 

85 detector_id: int = DP2_VISIT_DETECTOR_DATA_ID["detector"] 

86 self.compare_to_legacy_camera(legacy_camera, frame_set) 

87 test_holder = FrameSetTestHolder( 

88 frames=frame_set, 

89 pixels_to_fp=frame_set[frame_set.detector(detector_id), frame_set.focal_plane()], 

90 ) 

91 with RoundtripFits(self, test_holder) as roundtrip1: 

92 self.assertEqual(len(roundtrip1.serialized.pixels_to_fp.frames), 2) 

93 self.assertEqual(len(roundtrip1.serialized.pixels_to_fp.bounds), 2) 

94 self.assertEqual(len(roundtrip1.serialized.pixels_to_fp.mappings), 1) 

95 # Instead of storing the AST mapping directly, we should have 

96 # stored a reference to the frame set: 

97 self.assertIsInstance(roundtrip1.serialized.pixels_to_fp.mappings[0], PointerModel) 

98 self.compare_to_legacy_camera(legacy_camera, roundtrip1.result.frames) 

99 self.assertEqual(roundtrip1.result.pixels_to_fp.in_frame, frame_set.detector(detector_id)) 

100 self.assertEqual(roundtrip1.result.pixels_to_fp.out_frame, frame_set.focal_plane()) 

101 self.assertEqual( 

102 roundtrip1.result.pixels_to_fp._ast_mapping.simplified().show(), 

103 test_holder.pixels_to_fp._ast_mapping.simplified().show(), 

104 ) 

105 with RoundtripJson(self, test_holder) as roundtrip2: 

106 self.assertEqual(len(roundtrip2.serialized.pixels_to_fp.frames), 2) 

107 self.assertEqual(len(roundtrip2.serialized.pixels_to_fp.bounds), 2) 

108 self.assertEqual(len(roundtrip2.serialized.pixels_to_fp.mappings), 1) 

109 # Instead of storing the AST mapping directly, we should have 

110 # stored a reference to the frame set: 

111 self.assertIsInstance(roundtrip2.serialized.pixels_to_fp.mappings[0], JsonRef) 

112 raw_data = roundtrip2.inspect() 

113 self.assertEqual(len(raw_data["indirect"]), 1) 

114 self.assertEqual(raw_data["frames"], {"$ref": "#/indirect/0"}) 

115 self.compare_to_legacy_camera(legacy_camera, roundtrip2.result.frames) 

116 self.assertEqual(roundtrip2.result.pixels_to_fp.in_frame, frame_set.detector(detector_id)) 

117 self.assertEqual(roundtrip2.result.pixels_to_fp.out_frame, frame_set.focal_plane()) 

118 self.assertEqual( 

119 roundtrip2.result.pixels_to_fp._ast_mapping.simplified().show(), 

120 test_holder.pixels_to_fp._ast_mapping.simplified().show(), 

121 ) 

122 

123 def compare_to_legacy_camera(self, legacy_camera: Any, frame_set: CameraFrameSet) -> None: 

124 """Test the transforms extracted from a CameraFrameSet against the 

125 legacy lsst.afw.cameraGeom implementations. 

126 """ 

127 from lsst.afw.cameraGeom import FIELD_ANGLE, FOCAL_PLANE, PIXELS 

128 from lsst.geom import Point2D 

129 

130 legacy_detector = legacy_camera[16] 

131 pixel_legacy_points = [Point2D(50.0, 60.0), Point2D(801.2, 322.8), Point2D(33.5, 22.1)] 

132 fp_legacy_points = [legacy_detector.transform(p, PIXELS, FOCAL_PLANE) for p in pixel_legacy_points] 

133 fa_legacy_points = [legacy_detector.transform(p, PIXELS, FIELD_ANGLE) for p in pixel_legacy_points] 

134 pixel_xy_array = legacy_points_to_xy_array(pixel_legacy_points) 

135 fp_xy_array = legacy_points_to_xy_array(fp_legacy_points) 

136 fa_xy_array = legacy_points_to_xy_array(fa_legacy_points) 

137 # Test transforms extracted directly from the frame set. 

138 pixel_to_fp = frame_set[frame_set.detector(16), frame_set.focal_plane()] 

139 check_transform( 

140 self, pixel_to_fp, pixel_xy_array, fp_xy_array, frame_set.detector(16), frame_set.focal_plane() 

141 ) 

142 pixel_to_fa = frame_set[frame_set.detector(16), frame_set.field_angle()] 

143 check_transform( 

144 self, pixel_to_fa, pixel_xy_array, fa_xy_array, frame_set.detector(16), frame_set.field_angle() 

145 ) 

146 fp_to_fa = frame_set[frame_set.focal_plane(), frame_set.field_angle()] 

147 check_transform( 

148 self, fp_to_fa, fp_xy_array, fa_xy_array, frame_set.focal_plane(), frame_set.field_angle() 

149 ) 

150 # Test a composition. 

151 pixel_to_fa_indirect = pixel_to_fp.then(fp_to_fa) 

152 check_transform( 

153 self, 

154 pixel_to_fa_indirect, 

155 pixel_xy_array, 

156 fa_xy_array, 

157 frame_set.detector(16), 

158 frame_set.field_angle(), 

159 ) 

160 pixel_to_fp_d, fp_to_fa_d = pixel_to_fa_indirect.decompose() 

161 check_transform( 

162 self, pixel_to_fp_d, pixel_xy_array, fp_xy_array, frame_set.detector(16), frame_set.focal_plane() 

163 ) 

164 check_transform( 

165 self, fp_to_fa_d, fp_xy_array, fa_xy_array, frame_set.focal_plane(), frame_set.field_angle() 

166 ) 

167 fa_to_fp_d, fp_to_pixel_d = pixel_to_fa_indirect.inverted().decompose() 

168 check_transform( 

169 self, fa_to_fp_d, fa_xy_array, fp_xy_array, frame_set.field_angle(), frame_set.focal_plane() 

170 ) 

171 check_transform( 

172 self, fp_to_pixel_d, fp_xy_array, pixel_xy_array, frame_set.focal_plane(), frame_set.detector(16) 

173 ) 

174 

175 @unittest.skipUnless(DATA_DIR is not None, "TESTDATA_IMAGES_DIR is not in the environment.") 

176 def test_detector_wcs(self) -> None: 

177 """Test the Transform/Projection representation of a detector WCS.""" 

178 try: 

179 from lsst.afw.image import ExposureFitsReader 

180 except ImportError: 

181 raise unittest.SkipTest("'lsst.afw.image' could not be imported.") from None 

182 assert DATA_DIR is not None, "Guaranteed by decorator." 

183 filename = os.path.join(DATA_DIR, "dp2", "legacy", "visit_image.fits") 

184 reader = ExposureFitsReader(filename) 

185 legacy_wcs = reader.readWcs() 

186 wcs_bbox = Box.from_legacy(reader.readDetector().getBBox()) 

187 subimage_bbox = Box.from_legacy(reader.readBBox()) 

188 detector_frame = DetectorFrame(**DP2_VISIT_DETECTOR_DATA_ID, bbox=wcs_bbox) 

189 projection = Projection.from_legacy(legacy_wcs, detector_frame) 

190 assert projection.fits_approximation is not None 

191 compare_projection_to_legacy_wcs(self, projection, legacy_wcs, detector_frame, subimage_bbox) 

192 # When we convert from a legacy SkyWcs, the internal AST Mapping needs 

193 # to really be an AST FrameSet in order to be able to convert back. 

194 self.assertIn("Begin FrameSet", projection.show()) 

195 compare_projection_to_legacy_wcs( 

196 self, projection, projection.to_legacy(), detector_frame, subimage_bbox 

197 ) 

198 self.assertIn("Begin FrameSet", projection.fits_approximation.show()) 

199 compare_projection_to_legacy_wcs( 

200 self, 

201 projection.fits_approximation, 

202 projection.fits_approximation.to_legacy(), 

203 detector_frame, 

204 subimage_bbox, 

205 is_fits=True, 

206 ) 

207 with RoundtripJson(self, projection, "Projection") as roundtrip: 

208 pass 

209 compare_projection_to_legacy_wcs(self, roundtrip.result, legacy_wcs, detector_frame, subimage_bbox) 

210 # The AST FrameSet-ness needs to propagate through serialization. 

211 self.assertIn("Begin FrameSet", roundtrip.result.show()) 

212 compare_projection_to_legacy_wcs( 

213 self, projection, roundtrip.result.to_legacy(), detector_frame, subimage_bbox 

214 ) 

215 with RoundtripJson(self, projection.fits_approximation, "Projection") as roundtrip: 

216 pass 

217 compare_projection_to_legacy_wcs( 

218 self, 

219 roundtrip.result, 

220 legacy_wcs.getFitsApproximation(), 

221 detector_frame, 

222 subimage_bbox, 

223 is_fits=True, 

224 ) 

225 self.assertIn("Begin FrameSet", roundtrip.result.show()) 

226 compare_projection_to_legacy_wcs( 

227 self, 

228 projection.fits_approximation, 

229 roundtrip.result.to_legacy(), 

230 detector_frame, 

231 subimage_bbox, 

232 is_fits=True, 

233 ) 

234 

235 

236@dataclasses.dataclass 

237class FrameSetTestHolder: 

238 """A top-level object that holds a CameraFrameSet and a transform 

239 extracted from it, for testing archive pointers and frame set references. 

240 """ 

241 

242 frames: CameraFrameSet 

243 pixels_to_fp: Transform[DetectorFrame, FocalPlaneFrame] 

244 

245 def serialize[P: pydantic.BaseModel](self, archive: OutputArchive[P]) -> FrameSetTestHolderModel[P]: 

246 frames_model = archive.serialize_frame_set( 

247 "frames", self.frames, self.frames.serialize, key=id(self.frames) 

248 ) 

249 pixels_to_fp_model = archive.serialize_direct( 

250 "pixels_to_fp", functools.partial(self.pixels_to_fp.serialize, use_frame_sets=True) 

251 ) 

252 return FrameSetTestHolderModel[P](frames=frames_model, pixels_to_fp=pixels_to_fp_model) 

253 

254 @staticmethod 

255 def deserialize(model: FrameSetTestHolderModel[Any], archive: InputArchive[Any]) -> FrameSetTestHolder: 

256 assert not isinstance(model.frames, CameraFrameSetSerializationModel), "Archive pointer expected." 

257 frames = archive.deserialize_pointer( 

258 model.frames, CameraFrameSetSerializationModel, CameraFrameSet.deserialize 

259 ) 

260 pixels_to_fp = Transform.deserialize(model.pixels_to_fp, archive) 

261 return FrameSetTestHolder(frames, pixels_to_fp) 

262 

263 @staticmethod 

264 def _get_archive_tree_type[P: pydantic.BaseModel]( 

265 pointer_type: type[P], 

266 ) -> type[FrameSetTestHolderModel[P]]: 

267 return FrameSetTestHolderModel[pointer_type] # type: ignore 

268 

269 

270class FrameSetTestHolderModel[P: pydantic.BaseModel](ArchiveTree): 

271 """The serialization model for FrameSetTestHolder.""" 

272 

273 frames: CameraFrameSetSerializationModel | P 

274 pixels_to_fp: TransformSerializationModel[P] 

275 

276 

277if __name__ == "__main__": 

278 unittest.main()