Coverage for tests / test_build_camera.py: 14%
128 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-22 09:28 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-22 09:28 +0000
1# This file is part of drp_tasks
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
21import os.path
22import re
23import unittest
25import astropy.units as u
26import numpy as np
27import yaml
28from scipy.optimize import minimize
30import lsst.drp.tasks
31import lsst.drp.tasks.gbdesAstrometricFit
32import lsst.utils
33from lsst.afw.cameraGeom.testUtils import FIELD_ANGLE, PIXELS, CameraWrapper
34from lsst.afw.table import ExposureCatalog
35from lsst.drp.tasks.build_camera import (
36 BuildCameraFromAstrometryConfig,
37 BuildCameraFromAstrometryTask,
38 _z_function,
39)
41TESTDIR = os.path.abspath(os.path.dirname(__file__))
44class TestBuildCameraFromAstrometry(lsst.utils.tests.TestCase):
46 def setUp(self):
48 # The tests will be done with data simulated to look like HSC data.
49 self.camera = CameraWrapper(isLsstLike=False).camera
50 self.detectorList = [10, 20, 30, 40, 50, 60, 70, 80, 90, 100]
51 self.visitList = np.arange(10)
52 self.rotationAngle = (270 * u.degree).to(u.radian).value
53 self.task = BuildCameraFromAstrometryTask()
55 datadir = os.path.join(TESTDIR, "data")
56 with open(os.path.join(datadir, "sample_wcs.yaml"), "r") as f:
57 modelParams = yaml.load(f, Loader=yaml.Loader)
58 self.mapParams = {}
59 for key, value in modelParams.items():
60 if isinstance(key, str) and re.fullmatch(".+/poly", key):
61 if value["Type"] == "Identity":
62 continue
63 polyCoefficients = np.concatenate(
64 [value["XPoly"]["Coefficients"], value["YPoly"]["Coefficients"]]
65 )
66 self.mapParams[key] = polyCoefficients
68 visitSummary = ExposureCatalog.readFits(os.path.join(datadir, "visitSummary_1176.fits"))
69 gbdesTask = lsst.drp.tasks.gbdesAstrometricFit.GbdesAstrometricFitTask()
70 _, self.mapTemplate = gbdesTask.make_yaml(visitSummary)
72 _, tangentPlaneX, tangentPlaneY = self.task._normalize_model(
73 self.mapParams, self.mapTemplate, self.detectorList, self.visitList
74 )
75 # There is a rotation and flip between the gbdes camera model and the
76 # afw camera model, which is applied by-hand here.
77 self.originalFieldAngleX = (-tangentPlaneY * u.degree).to(u.radian).value
78 self.originalFieldAngleY = (-tangentPlaneX * u.degree).to(u.radian).value
80 bbox = self.camera[self.detectorList[0]].getBBox()
81 self.xPix = (self.task.x + 1) * bbox.endX / 2
82 self.yPix = (self.task.y + 1) * bbox.endY / 2
84 def test_normalize_model(self):
86 deviceParams = []
87 for element in self.mapTemplate["BAND/DEVICE"]["Elements"]:
88 for detector in self.detectorList:
89 detectorTemplate = element.replace("DEVICE", str(detector))
90 detectorTemplate = detectorTemplate.replace("BAND", ".+")
91 for k, params in self.mapParams.items():
92 if re.fullmatch(detectorTemplate, k):
93 deviceParams.append(params)
94 deviceArray = np.vstack(deviceParams)
96 visitParams = []
97 for element in self.mapTemplate["EXPOSURE"]["Elements"]:
98 for visit in self.visitList:
99 visitTemplate = element.replace("EXPOSURE", str(visit))
100 for k, params in self.mapParams.items():
101 if re.fullmatch(visitTemplate, k):
102 visitParams.append(params)
103 identityVisitParams = np.array([0, 1, 0, 0, 0, 1])
104 visitParams.append(identityVisitParams)
105 expoArray = np.vstack(visitParams)
107 # Get the tangent plane positions from the original device maps:
108 origTpX = []
109 origTpY = []
110 for deviceMap in deviceArray:
111 nCoeffsDev = len(deviceMap) // 2
112 deviceDegree = lsst.drp.tasks.gbdesAstrometricFit._degreeFromNCoeffs(nCoeffsDev)
113 intX = _z_function(deviceMap[:nCoeffsDev], self.task.x, self.task.y, order=deviceDegree)
114 intY = _z_function(deviceMap[nCoeffsDev:], self.task.x, self.task.y, order=deviceDegree)
115 origTpX.append(intX)
116 origTpY.append(intY)
118 origTpX = np.array(origTpX).ravel()
119 origTpY = np.array(origTpY).ravel()
121 # Get the interim positions with the new device maps:
122 _, newIntX, newIntY = self.task._normalize_model(
123 self.mapParams, self.mapTemplate, self.detectorList, self.visitList
124 )
126 # Now fit the per-visit parameters with the constraint that the new
127 # tangent plane positions match the old tangent plane positions, and
128 # then check they are sufficiently close to the old values.
129 newExpoArrayEmp = np.zeros(expoArray.shape)
130 for e, expo in enumerate(expoArray):
131 origMapX = expo[0] + origTpX * expo[1] + origTpY * expo[2]
132 origMapY = expo[3] + origTpX * expo[4] + origTpY * expo[5]
134 def min_function(params):
135 tpX = params[0] + params[1] * newIntX + params[2] * newIntY
136 tpY = params[3] + params[4] * newIntX + params[5] * newIntY
138 diff = ((origMapX - tpX)) ** 2 + ((origMapY - tpY)) ** 2
139 return diff.sum()
141 def jac(params):
142 tpX = params[0] + params[1] * newIntX + params[2] * newIntY
143 tpY = params[3] + params[4] * newIntX + params[5] * newIntY
144 dX = 2 * (origMapX - tpX)
145 dY = 2 * (origMapY - tpY)
146 jacobian = [
147 -dX.sum(),
148 -(dX * newIntX).sum(),
149 -(dX * newIntY).sum(),
150 -dY.sum(),
151 -(dY * newIntX).sum(),
152 -(dY * newIntY).sum(),
153 ]
154 return jacobian
156 res = minimize(min_function, expo, method="Newton-CG", jac=jac, options={"xtol": 1e-7})
157 newExpoArrayEmp[e] = res.x
159 newMapX = (
160 newExpoArrayEmp[e][0] + newIntX * newExpoArrayEmp[e][1] + newIntY * newExpoArrayEmp[e][2]
161 )
162 newMapY = (
163 newExpoArrayEmp[e][3] + newIntX * newExpoArrayEmp[e][4] + newIntY * newExpoArrayEmp[e][5]
164 )
166 self.assertFloatsAlmostEqual(origMapX, newMapX, atol=1e-12)
167 self.assertFloatsAlmostEqual(origMapY, newMapY, atol=1e-12)
169 def test_run_with_basic_model(self):
171 config = BuildCameraFromAstrometryConfig()
172 config.modelSplitting = "basic"
173 task = BuildCameraFromAstrometryTask(config=config)
175 camera = task.run(
176 self.mapParams,
177 self.mapTemplate,
178 self.detectorList,
179 self.visitList,
180 self.camera,
181 self.rotationAngle,
182 )
183 testX, testY = [], []
184 for dev in camera:
185 faX, faY = (
186 dev.getTransform(PIXELS, FIELD_ANGLE)
187 .getMapping()
188 .applyForward(np.array([self.xPix, self.yPix]))
189 )
190 testX.append(faX)
191 testY.append(faY)
192 testX = np.concatenate(testX)
193 testY = np.concatenate(testY)
195 self.assertFloatsAlmostEqual(self.originalFieldAngleX, testX, atol=1e-12)
196 self.assertFloatsAlmostEqual(self.originalFieldAngleY, testY, atol=1e-12)
198 def test_run_with_splitModel(self):
200 config = BuildCameraFromAstrometryConfig()
201 config.modelSplitting = "physical"
202 config.modelSplittingTolerance = 1e-6
203 task = BuildCameraFromAstrometryTask(config=config)
204 camera = task.run(
205 self.mapParams,
206 self.mapTemplate,
207 self.detectorList,
208 self.visitList,
209 self.camera,
210 self.rotationAngle,
211 )
213 testX, testY = [], []
214 for dev in camera:
215 faX, faY = (
216 dev.getTransform(PIXELS, FIELD_ANGLE)
217 .getMapping()
218 .applyForward(np.array([self.xPix, self.yPix]))
219 )
220 testX.append(faX)
221 testY.append(faY)
222 testX = np.concatenate(testX)
223 testY = np.concatenate(testY)
225 # The "physical" model splitting is not expected to
226 # reconstruct the input field angles perfectly.
227 self.assertFloatsAlmostEqual(self.originalFieldAngleX, testX, atol=1e-4)
228 self.assertFloatsAlmostEqual(self.originalFieldAngleY, testY, atol=1e-4)
231def setup_module(module):
232 lsst.utils.tests.init()
235if __name__ == "__main__": 235 ↛ 236line 235 didn't jump to line 236 because the condition on line 235 was never true
236 lsst.utils.tests.init()
237 unittest.main()