Coverage for tests / test_build_camera.py: 14%

128 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-05 18:49 +0000

1# This file is part of drp_tasks 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21import os.path 

22import re 

23import unittest 

24 

25import astropy.units as u 

26import numpy as np 

27import yaml 

28from scipy.optimize import minimize 

29 

30import lsst.drp.tasks 

31import lsst.drp.tasks.gbdesAstrometricFit 

32import lsst.utils 

33from lsst.afw.cameraGeom.testUtils import FIELD_ANGLE, PIXELS, CameraWrapper 

34from lsst.afw.table import ExposureCatalog 

35from lsst.drp.tasks.build_camera import ( 

36 BuildCameraFromAstrometryConfig, 

37 BuildCameraFromAstrometryTask, 

38 _z_function, 

39) 

40 

41TESTDIR = os.path.abspath(os.path.dirname(__file__)) 

42 

43 

44class TestBuildCameraFromAstrometry(lsst.utils.tests.TestCase): 

45 

46 def setUp(self): 

47 

48 # The tests will be done with data simulated to look like HSC data. 

49 self.camera = CameraWrapper(isLsstLike=False).camera 

50 self.detectorList = [10, 20, 30, 40, 50, 60, 70, 80, 90, 100] 

51 self.visitList = np.arange(10) 

52 self.rotationAngle = (270 * u.degree).to(u.radian).value 

53 self.task = BuildCameraFromAstrometryTask() 

54 

55 datadir = os.path.join(TESTDIR, "data") 

56 with open(os.path.join(datadir, "sample_wcs.yaml"), "r") as f: 

57 modelParams = yaml.load(f, Loader=yaml.Loader) 

58 self.mapParams = {} 

59 for key, value in modelParams.items(): 

60 if isinstance(key, str) and re.fullmatch(".+/poly", key): 

61 if value["Type"] == "Identity": 

62 continue 

63 polyCoefficients = np.concatenate( 

64 [value["XPoly"]["Coefficients"], value["YPoly"]["Coefficients"]] 

65 ) 

66 self.mapParams[key] = polyCoefficients 

67 

68 visitSummary = ExposureCatalog.readFits(os.path.join(datadir, "visitSummary_1176.fits")) 

69 gbdesTask = lsst.drp.tasks.gbdesAstrometricFit.GbdesAstrometricFitTask() 

70 _, self.mapTemplate = gbdesTask.make_yaml(visitSummary) 

71 

72 _, tangentPlaneX, tangentPlaneY = self.task._normalize_model( 

73 self.mapParams, self.mapTemplate, self.detectorList, self.visitList 

74 ) 

75 # There is a rotation and flip between the gbdes camera model and the 

76 # afw camera model, which is applied by-hand here. 

77 self.originalFieldAngleX = (-tangentPlaneY * u.degree).to(u.radian).value 

78 self.originalFieldAngleY = (-tangentPlaneX * u.degree).to(u.radian).value 

79 

80 bbox = self.camera[self.detectorList[0]].getBBox() 

81 self.xPix = (self.task.x + 1) * bbox.endX / 2 

82 self.yPix = (self.task.y + 1) * bbox.endY / 2 

83 

84 def test_normalize_model(self): 

85 

86 deviceParams = [] 

87 for element in self.mapTemplate["BAND/DEVICE"]["Elements"]: 

88 for detector in self.detectorList: 

89 detectorTemplate = element.replace("DEVICE", str(detector)) 

90 detectorTemplate = detectorTemplate.replace("BAND", ".+") 

91 for k, params in self.mapParams.items(): 

92 if re.fullmatch(detectorTemplate, k): 

93 deviceParams.append(params) 

94 deviceArray = np.vstack(deviceParams) 

95 

96 visitParams = [] 

97 for element in self.mapTemplate["EXPOSURE"]["Elements"]: 

98 for visit in self.visitList: 

99 visitTemplate = element.replace("EXPOSURE", str(visit)) 

100 for k, params in self.mapParams.items(): 

101 if re.fullmatch(visitTemplate, k): 

102 visitParams.append(params) 

103 identityVisitParams = np.array([0, 1, 0, 0, 0, 1]) 

104 visitParams.append(identityVisitParams) 

105 expoArray = np.vstack(visitParams) 

106 

107 # Get the tangent plane positions from the original device maps: 

108 origTpX = [] 

109 origTpY = [] 

110 for deviceMap in deviceArray: 

111 nCoeffsDev = len(deviceMap) // 2 

112 deviceDegree = lsst.drp.tasks.gbdesAstrometricFit._degreeFromNCoeffs(nCoeffsDev) 

113 intX = _z_function(deviceMap[:nCoeffsDev], self.task.x, self.task.y, order=deviceDegree) 

114 intY = _z_function(deviceMap[nCoeffsDev:], self.task.x, self.task.y, order=deviceDegree) 

115 origTpX.append(intX) 

116 origTpY.append(intY) 

117 

118 origTpX = np.array(origTpX).ravel() 

119 origTpY = np.array(origTpY).ravel() 

120 

121 # Get the interim positions with the new device maps: 

122 _, newIntX, newIntY = self.task._normalize_model( 

123 self.mapParams, self.mapTemplate, self.detectorList, self.visitList 

124 ) 

125 

126 # Now fit the per-visit parameters with the constraint that the new 

127 # tangent plane positions match the old tangent plane positions, and 

128 # then check they are sufficiently close to the old values. 

129 newExpoArrayEmp = np.zeros(expoArray.shape) 

130 for e, expo in enumerate(expoArray): 

131 origMapX = expo[0] + origTpX * expo[1] + origTpY * expo[2] 

132 origMapY = expo[3] + origTpX * expo[4] + origTpY * expo[5] 

133 

134 def min_function(params): 

135 tpX = params[0] + params[1] * newIntX + params[2] * newIntY 

136 tpY = params[3] + params[4] * newIntX + params[5] * newIntY 

137 

138 diff = ((origMapX - tpX)) ** 2 + ((origMapY - tpY)) ** 2 

139 return diff.sum() 

140 

141 def jac(params): 

142 tpX = params[0] + params[1] * newIntX + params[2] * newIntY 

143 tpY = params[3] + params[4] * newIntX + params[5] * newIntY 

144 dX = 2 * (origMapX - tpX) 

145 dY = 2 * (origMapY - tpY) 

146 jacobian = [ 

147 -dX.sum(), 

148 -(dX * newIntX).sum(), 

149 -(dX * newIntY).sum(), 

150 -dY.sum(), 

151 -(dY * newIntX).sum(), 

152 -(dY * newIntY).sum(), 

153 ] 

154 return jacobian 

155 

156 res = minimize(min_function, expo, method="Newton-CG", jac=jac, options={"xtol": 1e-7}) 

157 newExpoArrayEmp[e] = res.x 

158 

159 newMapX = ( 

160 newExpoArrayEmp[e][0] + newIntX * newExpoArrayEmp[e][1] + newIntY * newExpoArrayEmp[e][2] 

161 ) 

162 newMapY = ( 

163 newExpoArrayEmp[e][3] + newIntX * newExpoArrayEmp[e][4] + newIntY * newExpoArrayEmp[e][5] 

164 ) 

165 

166 self.assertFloatsAlmostEqual(origMapX, newMapX, atol=1e-12) 

167 self.assertFloatsAlmostEqual(origMapY, newMapY, atol=1e-12) 

168 

169 def test_run_with_basic_model(self): 

170 

171 config = BuildCameraFromAstrometryConfig() 

172 config.modelSplitting = "basic" 

173 task = BuildCameraFromAstrometryTask(config=config) 

174 

175 camera = task.run( 

176 self.mapParams, 

177 self.mapTemplate, 

178 self.detectorList, 

179 self.visitList, 

180 self.camera, 

181 self.rotationAngle, 

182 ) 

183 testX, testY = [], [] 

184 for dev in camera: 

185 faX, faY = ( 

186 dev.getTransform(PIXELS, FIELD_ANGLE) 

187 .getMapping() 

188 .applyForward(np.array([self.xPix, self.yPix])) 

189 ) 

190 testX.append(faX) 

191 testY.append(faY) 

192 testX = np.concatenate(testX) 

193 testY = np.concatenate(testY) 

194 

195 self.assertFloatsAlmostEqual(self.originalFieldAngleX, testX, atol=1e-12) 

196 self.assertFloatsAlmostEqual(self.originalFieldAngleY, testY, atol=1e-12) 

197 

198 def test_run_with_splitModel(self): 

199 

200 config = BuildCameraFromAstrometryConfig() 

201 config.modelSplitting = "physical" 

202 config.modelSplittingTolerance = 1e-6 

203 task = BuildCameraFromAstrometryTask(config=config) 

204 camera = task.run( 

205 self.mapParams, 

206 self.mapTemplate, 

207 self.detectorList, 

208 self.visitList, 

209 self.camera, 

210 self.rotationAngle, 

211 ) 

212 

213 testX, testY = [], [] 

214 for dev in camera: 

215 faX, faY = ( 

216 dev.getTransform(PIXELS, FIELD_ANGLE) 

217 .getMapping() 

218 .applyForward(np.array([self.xPix, self.yPix])) 

219 ) 

220 testX.append(faX) 

221 testY.append(faY) 

222 testX = np.concatenate(testX) 

223 testY = np.concatenate(testY) 

224 

225 # The "physical" model splitting is not expected to 

226 # reconstruct the input field angles perfectly. 

227 self.assertFloatsAlmostEqual(self.originalFieldAngleX, testX, atol=1e-4) 

228 self.assertFloatsAlmostEqual(self.originalFieldAngleY, testY, atol=1e-4) 

229 

230 

231def setup_module(module): 

232 lsst.utils.tests.init() 

233 

234 

235if __name__ == "__main__": 235 ↛ 236line 235 didn't jump to line 236 because the condition on line 235 was never true

236 lsst.utils.tests.init() 

237 unittest.main()