Coverage for tests / test_flatFitGradient.py: 10%
207 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-21 10:54 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-21 10:54 +0000
1#
2# LSST Data Management System
3#
4# Copyright 2008-2017 AURA/LSST.
5#
6# This product includes software developed by the
7# LSST Project (http://www.lsst.org/).
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the LSST License Statement and
20# the GNU General Public License along with this program. If not,
21# see <https://www.lsstcorp.org/LegalNotices/>.
22#
23"""Test cases for cp_pipe flat gradient code."""
25import unittest
26import numpy as np
27from scipy.interpolate import Akima1DInterpolator
29import lsst.utils.tests
31import lsst.afw.cameraGeom
32from lsst.afw.image import ExposureF
33from lsst.cp.pipe import CpFlatFitGradientsTask, CpFlatApplyGradientsTask
34from lsst.ip.isr import IsrMockLSST, Defects, FlatGradient
35from lsst.pipe.base import InMemoryDatasetHandle
38class FlatFitGradientTestCase(lsst.utils.tests.TestCase):
39 def setUp(self):
40 mock = IsrMockLSST()
41 initial_camera = mock.getCamera()
42 camera_builder = initial_camera.rebuild()
43 for counter, detector in enumerate(camera_builder):
44 detector.setType(lsst.afw.cameraGeom.DetectorType.SCIENCE)
45 if counter < 2:
46 detector.setPhysicalType("pseudoITL")
47 else:
48 detector.setPhysicalType("pseudoE2V")
49 self.camera = camera_builder.finish()
51 self.filter_label = lsst.afw.image.FilterLabel(band="i", physical="i_0")
53 def _get_flat_handle_dict(
54 self,
55 radial_nodes,
56 radial_values,
57 normalization,
58 itl_ratio=1.0,
59 delta_x=0.0,
60 delta_y=0.0,
61 gradient_x=0.0,
62 gradient_y=0.0,
63 ):
64 spl = Akima1DInterpolator(radial_nodes, radial_values, method="akima")
66 flat_handle_dict = {}
67 for detector in self.camera:
68 flat = ExposureF(detector.getBBox())
70 flat.setDetector(detector)
71 flat.setFilter(self.filter_label)
73 xx = np.arange(flat.image.array.shape[1], dtype=np.float64)
74 yy = np.arange(flat.image.array.shape[0], dtype=np.float64)
75 x, y = np.meshgrid(xx, yy)
76 x = x.ravel()
77 y = y.ravel()
79 transform = detector.getTransform(lsst.afw.cameraGeom.PIXELS, lsst.afw.cameraGeom.FOCAL_PLANE)
80 xy = np.vstack((x, y))
81 xf, yf = np.vsplit(transform.getMapping().applyForward(xy), 2)
82 xf = xf.ravel()
83 yf = yf.ravel()
85 centroid_x = 0.0 + delta_x
86 centroid_y = 0.0 + delta_y
87 radius = np.sqrt((xf - centroid_x)**2. + (yf - centroid_y)**2.)
88 value = spl(np.clip(radius, radial_nodes[0], radial_nodes[-1]))
90 gradient = 1 + gradient_x*(xf - 0.0) + gradient_y*(yf - 0.0)
91 value /= gradient
93 flat.image.array[:, :] = value.reshape(flat.image.array.shape) * normalization
95 if "ITL" in detector.getPhysicalType():
96 flat.image.array[:, :] *= itl_ratio
98 data_id = {
99 "detector": detector.getId(),
100 "physical_filter": None,
101 }
102 flat_handle_dict[detector.getId()] = InMemoryDatasetHandle(flat, dataId=data_id)
104 return flat_handle_dict
106 def _get_defect_handle_dict(self):
107 defect_handle_dict = {}
108 for detector in self.camera:
109 defects = Defects()
110 defects.append(lsst.geom.Box2I(lsst.geom.Point2I(0, 0), lsst.geom.Point2I(7, 7)))
112 data_id = {
113 "detector": detector.getId(),
114 }
115 defect_handle_dict[detector.getId()] = InMemoryDatasetHandle(defects, dataId=data_id)
117 return defect_handle_dict
119 def test_radial_only(self):
120 radial_nodes = np.array([0, 1, 2, 3, 4, 4.5, 5.2], dtype=np.float64)
121 radial_values = np.array([1.0, 1.0, 1.0, 0.9, 0.5, 0.3, 0.0], dtype=np.float64)
122 normalization = 1.1
124 flat_handle_dict = self._get_flat_handle_dict(radial_nodes, radial_values, normalization)
125 defect_handle_dict = self._get_defect_handle_dict()
127 config = CpFlatFitGradientsTask.ConfigClass()
128 config.bin_factor = 4 # Small detectors for the test.
129 config.normalize_center_radius = 1.0
130 config.radial_spline_nodes_initial = radial_nodes.tolist()
131 config.radial_spline_nodes = radial_nodes.tolist()
132 config.detector_boundary = 5
133 config.do_constrain_zero = True
134 config.do_normalize_center = True
135 config.do_fit_centroid = False
136 config.do_fit_gradient = False
137 config.do_normalize_center = True
139 task = CpFlatFitGradientsTask(config=config)
140 gradient = task.run(
141 camera=self.camera,
142 input_flat_handle_dict=flat_handle_dict,
143 input_defect_handle_dict=defect_handle_dict,
144 ).output_gradient
146 self.assertFloatsAlmostEqual(gradient.normalizationFactor, normalization, rtol=1e-7)
147 self.assertFloatsAlmostEqual(gradient.radialSplineNodes, radial_nodes)
148 self.assertFloatsAlmostEqual(gradient.radialSplineValues, radial_values, atol=1e-3)
150 def test_radial_only_nozero(self):
151 radial_nodes = np.array([0, 1, 2, 3, 4, 4.5, 5.2], dtype=np.float64)
152 radial_values = np.array([1.0, 1.0, 1.0, 0.9, 0.5, 0.3, 0.0], dtype=np.float64)
153 normalization = 1.1
155 flat_handle_dict = self._get_flat_handle_dict(radial_nodes, radial_values, normalization)
156 defect_handle_dict = self._get_defect_handle_dict()
158 config = CpFlatFitGradientsTask.ConfigClass()
159 config.bin_factor = 4 # Small detectors for the test.
160 config.normalize_center_radius = 1.0
161 config.radial_spline_nodes_initial = radial_nodes.tolist()
162 config.radial_spline_nodes = radial_nodes.tolist()
163 config.detector_boundary = 5
164 config.do_constrain_zero = False
165 config.do_normalize_center = True
166 config.do_fit_centroid = False
167 config.do_fit_gradient = False
168 config.do_normalize_center = True
170 task = CpFlatFitGradientsTask(config=config)
171 gradient = task.run(
172 camera=self.camera,
173 input_flat_handle_dict=flat_handle_dict,
174 input_defect_handle_dict=defect_handle_dict,
175 ).output_gradient
177 self.assertFloatsAlmostEqual(gradient.normalizationFactor, normalization, rtol=1e-7)
178 self.assertFloatsAlmostEqual(gradient.radialSplineNodes, radial_nodes)
179 self.assertFloatsAlmostEqual(gradient.radialSplineValues[: -1], radial_values[: -1], atol=1e-3)
180 self.assertFloatsAlmostEqual(gradient.radialSplineValues[-1], radial_values[-1], atol=0.20)
182 def test_radial_centroid(self):
183 radial_nodes = np.array([0, 1, 2, 3, 4, 4.5, 5.2], dtype=np.float64)
184 radial_values = np.array([1.0, 1.0, 1.0, 0.9, 0.5, 0.3, 0.0], dtype=np.float64)
185 normalization = 1.1
187 delta_x = 0.01
188 delta_y = -0.01
190 flat_handle_dict = self._get_flat_handle_dict(
191 radial_nodes,
192 radial_values,
193 normalization,
194 delta_x=delta_x,
195 delta_y=delta_y,
196 )
197 defect_handle_dict = self._get_defect_handle_dict()
199 config = CpFlatFitGradientsTask.ConfigClass()
200 config.bin_factor = 4 # Small detectors for the test.
201 config.normalize_center_radius = 1.0
202 config.radial_spline_nodes_initial = radial_nodes.tolist()
203 config.radial_spline_nodes = radial_nodes.tolist()
204 config.detector_boundary = 5
205 config.do_constrain_zero = True
206 config.do_normalize_center = True
207 config.do_fit_centroid = True
208 config.do_fit_gradient = False
209 config.do_normalize_center = True
211 task = CpFlatFitGradientsTask(config=config)
212 gradient = task.run(
213 camera=self.camera,
214 input_flat_handle_dict=flat_handle_dict,
215 input_defect_handle_dict=defect_handle_dict,
216 ).output_gradient
218 self.assertFloatsAlmostEqual(gradient.normalizationFactor, normalization, rtol=1e-7)
219 self.assertFloatsAlmostEqual(gradient.radialSplineNodes, radial_nodes)
220 self.assertFloatsAlmostEqual(gradient.radialSplineValues, radial_values, atol=1e-3)
221 self.assertFloatsAlmostEqual(gradient.centroidDeltaX, delta_x, atol=6e-3)
222 self.assertFloatsAlmostEqual(gradient.centroidDeltaY, delta_y, atol=6e-3)
224 def test_radial_plane(self):
225 radial_nodes = np.array([0, 1, 2, 3, 4, 4.5, 5.2], dtype=np.float64)
226 radial_values = np.array([1.0, 1.0, 1.0, 0.9, 0.5, 0.3, 0.0], dtype=np.float64)
227 normalization = 1.1
229 gradient_x = 0.01
230 gradient_y = -0.01
232 flat_handle_dict = self._get_flat_handle_dict(
233 radial_nodes,
234 radial_values,
235 normalization,
236 gradient_x=gradient_x,
237 gradient_y=gradient_y,
238 )
239 defect_handle_dict = self._get_defect_handle_dict()
241 config = CpFlatFitGradientsTask.ConfigClass()
242 config.bin_factor = 4 # Small detectors for the test.
243 config.normalize_center_radius = 1.0
244 config.radial_spline_nodes_initial = radial_nodes.tolist()
245 config.radial_spline_nodes = radial_nodes.tolist()
246 config.detector_boundary = 5
247 config.do_constrain_zero = True
248 config.do_normalize_center = True
249 config.do_fit_centroid = False
250 config.do_fit_gradient = True
251 config.do_normalize_center = True
253 task = CpFlatFitGradientsTask(config=config)
254 gradient = task.run(
255 camera=self.camera,
256 input_flat_handle_dict=flat_handle_dict,
257 input_defect_handle_dict=defect_handle_dict,
258 ).output_gradient
260 self.assertFloatsAlmostEqual(gradient.normalizationFactor, normalization, rtol=1e-2)
261 self.assertFloatsAlmostEqual(gradient.radialSplineNodes, radial_nodes)
262 self.assertFloatsAlmostEqual(gradient.radialSplineValues, radial_values, atol=2e-3)
263 self.assertFloatsAlmostEqual(gradient.gradientX, gradient_x, atol=1e-4)
264 self.assertFloatsAlmostEqual(gradient.gradientY, gradient_y, atol=1e-4)
266 def test_apply(self):
267 # This will create source and target; no fitting.
269 # Create the "sky flat" target vignetting.
270 radial_nodes_sky = np.array([0, 1, 2, 3, 4, 4.5, 5.2], dtype=np.float64)
271 radial_values_sky = np.array([1.0, 1.0, 1.0, 1.0, 0.8, 0.5, 0.0], dtype=np.float64)
272 normalization_sky = 1.1
274 sky_gradient = FlatGradient()
275 sky_gradient.setParameters(
276 radialSplineNodes=radial_nodes_sky,
277 radialSplineValues=radial_values_sky,
278 normalizationFactor=normalization_sky,
279 )
281 # Create the "dome flat" source vignetting.
282 radial_nodes_dome = np.array([0, 1, 2, 3, 4, 4.5, 5.2], dtype=np.float64)
283 radial_values_dome = np.array([1.0, 1.0, 1.0, 0.9, 0.5, 0.3, 0.0], dtype=np.float64)
284 normalization_dome = 1.15
285 itl_ratio = 0.9
286 gradient_x = 0.01
287 gradient_y = -0.01
288 delta_x = 0.01
289 delta_y = -0.01
291 dome_gradient = FlatGradient()
292 dome_gradient.setParameters(
293 radialSplineNodes=radial_nodes_dome,
294 radialSplineValues=radial_values_dome,
295 normalizationFactor=normalization_dome,
296 itlRatio=itl_ratio,
297 gradientX=gradient_x,
298 gradientY=gradient_y,
299 centroidDeltaX=delta_x,
300 centroidDeltaY=delta_y,
301 )
303 dome_flat_handles = self._get_flat_handle_dict(
304 radial_nodes_dome,
305 radial_values_dome,
306 normalization_dome,
307 itl_ratio=itl_ratio,
308 delta_x=delta_x,
309 delta_y=delta_y,
310 gradient_x=gradient_x,
311 gradient_y=gradient_y,
312 )
314 config = CpFlatApplyGradientsTask.ConfigClass()
315 task = CpFlatApplyGradientsTask(config=config)
317 corrected_dome_flat_handles = {}
318 for key, handle in dome_flat_handles.items():
319 struct = task.run(
320 camera=self.camera,
321 input_flat=handle.get(),
322 reference_gradient=sky_gradient,
323 gradient=dome_gradient,
324 )
325 corrected_dome_flat_handles[key] = InMemoryDatasetHandle(struct.output_flat, dataId=handle.dataId)
327 # Now if we fit the corrected handles they should have the
328 # same radial structure as the target, with no centroid or
329 # gradient, and normalization 1.0.
330 defect_handle_dict = self._get_defect_handle_dict()
332 config = CpFlatFitGradientsTask.ConfigClass()
333 config.bin_factor = 4 # Small detectors for the test.
334 config.normalize_center_radius = 1.0
335 config.radial_spline_nodes_initial = radial_nodes_sky[radial_nodes_sky <= 4.0].tolist()
336 config.radial_spline_nodes = radial_nodes_sky.tolist()
337 config.detector_boundary = 5
338 config.do_constrain_zero = True
339 config.do_normalize_center = True
340 config.do_fit_centroid = True
341 config.do_fit_gradient = True
342 config.do_normalize_center = True
344 task = CpFlatFitGradientsTask(config=config)
345 gradient = task.run(
346 camera=self.camera,
347 input_flat_handle_dict=corrected_dome_flat_handles,
348 input_defect_handle_dict=defect_handle_dict,
349 ).output_gradient
351 self.assertFloatsAlmostEqual(gradient.normalizationFactor, 1.0, rtol=1e-3)
352 self.assertFloatsAlmostEqual(gradient.radialSplineNodes, radial_nodes_sky)
353 self.assertFloatsAlmostEqual(gradient.radialSplineValues, radial_values_sky, atol=1e-2)
354 self.assertFloatsAlmostEqual(gradient.itlRatio, itl_ratio, atol=2e-4)
355 self.assertFloatsAlmostEqual(gradient.centroidDeltaX, 0.0, atol=6e-3)
356 self.assertFloatsAlmostEqual(gradient.centroidDeltaY, 0.0, atol=6e-3)
357 self.assertFloatsAlmostEqual(gradient.gradientX, 0.0, atol=1e-7)
358 self.assertFloatsAlmostEqual(gradient.gradientY, 0.0, atol=1e-7)
361class TestMemory(lsst.utils.tests.MemoryTestCase):
362 pass
365def setup_module(module):
366 lsst.utils.tests.init()
369if __name__ == "__main__": 369 ↛ 370line 369 didn't jump to line 370 because the condition on line 369 was never true
370 lsst.utils.tests.init()
371 unittest.main()