Coverage for tests / test_dipoleFitter.py: 19%
118 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-25 08:36 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-25 08:36 +0000
1#
2# LSST Data Management System
3# Copyright 2008-2017 AURA/LSST.
4#
5# This product includes software developed by the
6# LSST Project (http://www.lsst.org/).
7#
8# This program is free software: you can redistribute it and/or modify
9# it under the terms of the GNU General Public License as published by
10# the Free Software Foundation, either version 3 of the License, or
11# (at your option) any later version.
12#
13# This program is distributed in the hope that it will be useful,
14# but WITHOUT ANY WARRANTY; without even the implied warranty of
15# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16# GNU General Public License for more details.
17#
18# You should have received a copy of the LSST License Statement and
19# the GNU General Public License along with this program. If not,
20# see <https://www.lsstcorp.org/LegalNotices/>.
22"""Tests of the DipoleFitAlgorithm and its related tasks and plugins.
24Each test generates a fake image with two synthetic dipoles as input data.
25"""
26import unittest
28import numpy as np
30import lsst.utils.tests
31import lsst.afw.table as afwTable
32from lsst.ip.diffim.dipoleFitTask import (DipoleFitAlgorithm, DipoleFitTask)
33import utils
36class DipoleTestImage:
37 """Create a test dipole image and store the parameters used to make it,
38 for comparison with the fitted results.
40 Parameters
41 ----------
42 xc, yc : `list` [`float`]
43 x, y coordinate (pixels) of center(s) of input dipole(s).
44 flux: `list` [`float`]
45 Flux(es) of input dipole(s).
46 gradientParams : `tuple`
47 Tuple with three parameters for linear background gradient.
48 offsets : `list` [`float`]
49 Pixel coordinates between lobes of dipoles.
50 """
52 def __init__(self, xc=None, yc=None, flux=None, offsets=None, gradientParams=None, edgeWidth=None):
53 self.xc = xc if xc is not None else [65.3, 24.2]
54 self.yc = yc if yc is not None else [38.6, 78.5]
55 self.offsets = offsets if offsets is not None else np.array([-2., 2.])
56 self.flux = flux if flux is not None else [2500., 2345.]
57 self.gradientParams = gradientParams if gradientParams is not None else [10., 3., 5.]
58 self.edgeWidth = edgeWidth if edgeWidth is not None else 8
60 # The default tolerance for comparisons of fitted parameters with input values.
61 # Given the noise in the input images (default noise value of 2.), this is a
62 # useful test of algorithm robustness, and will guard against future regressions.
63 self.rtol = 0.01
65 self.generateTestImage()
67 def generateTestImage(self):
68 self.testImage = utils.DipoleTestImage(
69 w=100, h=100,
70 xcenPos=self.xc + self.offsets,
71 ycenPos=self.yc + self.offsets,
72 xcenNeg=self.xc - self.offsets,
73 ycenNeg=self.yc - self.offsets,
74 flux=self.flux, fluxNeg=self.flux,
75 noise=2., # Note the input noise - this affects the relative tolerances used.
76 gradientParams=self.gradientParams,
77 edgeWidth=self.edgeWidth)
80class DipoleFitTest(lsst.utils.tests.TestCase):
81 """A test case for separately testing the dipole fit algorithm
82 directly, and the single frame measurement.
84 In each test, create a simulated diffim with two dipoles, noise,
85 and a linear background gradient in the pre-sub images then
86 compare the input fluxes/centroids with the fitted results.
87 """
89 def testDipoleAlgorithm(self):
90 """Test the dipole fitting algorithm directly (fitDipole()).
92 Test that the resulting fluxes/centroids are very close to the
93 input values for both dipoles in the image.
94 """
95 # Display (plot) the output dipole thumbnails with matplotlib.
96 display = False
97 # Be verbose during fitting, including the lmfit internal details.
98 verbose = False
100 dipoleTestImage = DipoleTestImage()
101 catalog = dipoleTestImage.testImage.detectDipoleSources(minBinSize=32)
103 for s in catalog:
104 fp = s.getFootprint()
105 self.assertTrue(len(fp.getPeaks()) == 2)
107 rtol = dipoleTestImage.rtol
108 offsets = dipoleTestImage.offsets
109 testImage = dipoleTestImage.testImage
110 for i, s in enumerate(catalog):
111 alg = DipoleFitAlgorithm(testImage.diffim, testImage.posImage, testImage.negImage)
112 result, _ = alg.fitDipole(
113 s, rel_weight=0.5, separateNegParams=False,
114 verbose=verbose, display=display)
116 self.assertFloatsAlmostEqual((result.posFlux.instFlux + abs(result.negFlux.instFlux))/2.,
117 dipoleTestImage.flux[i], rtol=rtol)
118 self.assertFloatsAlmostEqual(result.posCentroid.x, dipoleTestImage.xc[i] + offsets[i], rtol=rtol)
119 self.assertFloatsAlmostEqual(result.posCentroid.y, dipoleTestImage.yc[i] + offsets[i], rtol=rtol)
120 self.assertFloatsAlmostEqual(result.negCentroid.x, dipoleTestImage.xc[i] - offsets[i], rtol=rtol)
121 self.assertFloatsAlmostEqual(result.negCentroid.y, dipoleTestImage.yc[i] - offsets[i], rtol=rtol)
123 def _runDetection(self, dipoleTestImage, maxFootprintArea=None):
124 """Run 'diaSource' detection on the diffim, including merging of
125 positive and negative sources.
127 Then run DipoleFitTask on the image and return the resulting catalog.
128 """
129 # Create the various tasks and schema -- avoid code reuse.
130 testImage = dipoleTestImage.testImage
131 detectTask, schema = testImage.detectDipoleSources(doMerge=False, minBinSize=32)
133 config = DipoleFitTask.ConfigClass()
134 # Also run the older C++ DipoleFlux algorithm for comparison purposes.
135 config.plugins.names |= ["ip_diffim_PsfDipoleFlux"]
136 if maxFootprintArea:
137 config.plugins["ip_diffim_DipoleFit"].maxFootprintArea = maxFootprintArea
138 measureTask = DipoleFitTask(schema=schema, config=config)
140 table = afwTable.SourceTable.make(schema)
141 detectResult = detectTask.run(table, testImage.diffim)
142 fpSet = detectResult.positive
143 fpSet.merge(detectResult.negative, 2, 2, False)
144 sources = afwTable.SourceCatalog(table)
145 fpSet.makeSources(sources)
147 measureTask.run(sources, testImage.diffim, testImage.posImage, testImage.negImage)
148 return sources
150 def _checkTaskOutput(self, dipoleTestImage, sources, noPreImages=False):
151 """Compare the fluxes/centroids in `sources` are entered
152 into the correct slots of the catalog, and have values that
153 are very close to the input values for both dipoles in the
154 image.
156 Also test that the resulting fluxes are close to those
157 generated by the existing ip_diffim_DipoleMeasurement task
158 (PsfDipoleFit).
160 Parameters
161 ----------
162 dipoleTestImage : `DipoleTestImage`
163 Image that was generated for the test.
164 sources : `lsst.afw.table.SourceCatalog`
165 Source catalog measured on the test image.
166 noPreImages : bool, optional
167 Set to True if there were no positive/negative images generated
168 for the test, to use much looser uncertainty tolerances.
169 """
170 rtol = dipoleTestImage.rtol
171 offsets = dipoleTestImage.offsets
172 for i, record in enumerate(sources):
173 self.assertFloatsAlmostEqual((record['ip_diffim_DipoleFit_pos_instFlux']
174 + abs(record['ip_diffim_DipoleFit_neg_instFlux']))/2.,
175 dipoleTestImage.flux[i], rtol=rtol)
176 self.assertFloatsAlmostEqual(record['ip_diffim_DipoleFit_pos_x'],
177 dipoleTestImage.xc[i] + offsets[i], rtol=rtol)
178 self.assertFloatsAlmostEqual(record['ip_diffim_DipoleFit_pos_y'],
179 dipoleTestImage.yc[i] + offsets[i], rtol=rtol)
180 self.assertFloatsAlmostEqual(record['ip_diffim_DipoleFit_neg_x'],
181 dipoleTestImage.xc[i] - offsets[i], rtol=rtol)
182 self.assertFloatsAlmostEqual(record['ip_diffim_DipoleFit_neg_y'],
183 dipoleTestImage.yc[i] - offsets[i], rtol=rtol)
185 # check uncertainties
186 # TODO: how to compute the correct uncertainty to test here?
187 # self.assertFloatsAlmostEqual(record['ip_diffim_DipoleFit_instFluxErr'],
188 # ???, rtol=rtol)
189 # emperical uncertainty values: not clear how to compute proper expected values.
190 with self.subTest(i=repr(i), type="pos_xErr"):
191 self.assertFloatsAlmostEqual(record['ip_diffim_DipoleFit_pos_xErr'],
192 .53*record["base_SdssCentroid_xErr"],
193 rtol=0.1 if not noPreImages else 0.5)
194 with self.subTest(i=repr(i), type="pos_yErr"):
195 self.assertFloatsAlmostEqual(record['ip_diffim_DipoleFit_pos_yErr'],
196 .53*record["base_SdssCentroid_yErr"],
197 rtol=0.1 if not noPreImages else 0.5)
198 with self.subTest(i=repr(i), type="neg_xErr"):
199 self.assertFloatsAlmostEqual(record['ip_diffim_DipoleFit_neg_xErr'],
200 .53*record["base_SdssCentroid_xErr"],
201 rtol=0.1 if not noPreImages else 0.5)
202 with self.subTest(i=repr(i), type="neg_yErr"):
203 self.assertFloatsAlmostEqual(record['ip_diffim_DipoleFit_neg_yErr'],
204 .53*record["base_SdssCentroid_yErr"],
205 rtol=0.1 if not noPreImages else 0.5)
206 # NOTE: these are smaller than the positive/negative uncertainties,
207 # because the of those covariance is negative!
208 with self.subTest(i=repr(i), type="xErr"):
209 self.assertFloatsAlmostEqual(record['ip_diffim_DipoleFit_xErr'],
210 .34*record["base_SdssCentroid_xErr"],
211 rtol=0.06 if not noPreImages else 0.5)
212 with self.subTest(i=repr(i), type="yErr"):
213 self.assertFloatsAlmostEqual(record['ip_diffim_DipoleFit_yErr'],
214 .35*record["base_SdssCentroid_yErr"],
215 rtol=0.06 if not noPreImages else 0.5)
217 # Note this is dependent on the noise (variance) being realistic in the image.
218 # otherwise it throws off the chi2 estimate, which is used for classification:
219 self.assertTrue(record['ip_diffim_DipoleFit_classification'])
221 # compare to the original ip_diffim_PsfDipoleFlux measurements
222 # result2 = record.extract("ip_diffim_PsfDipoleFlux*")
223 self.assertFloatsAlmostEqual((record['ip_diffim_DipoleFit_pos_instFlux']
224 + abs(record['ip_diffim_DipoleFit_neg_instFlux']))/2.,
225 (record['ip_diffim_PsfDipoleFlux_pos_instFlux']
226 + abs(record['ip_diffim_PsfDipoleFlux_neg_instFlux']))/2.,
227 rtol=rtol)
228 self.assertFloatsAlmostEqual(record['ip_diffim_DipoleFit_pos_x'],
229 record['ip_diffim_PsfDipoleFlux_pos_centroid_x'],
230 rtol=rtol)
231 self.assertFloatsAlmostEqual(record['ip_diffim_DipoleFit_pos_y'],
232 record['ip_diffim_PsfDipoleFlux_pos_centroid_y'],
233 rtol=rtol)
234 self.assertFloatsAlmostEqual(record['ip_diffim_DipoleFit_neg_x'],
235 record['ip_diffim_PsfDipoleFlux_neg_centroid_x'],
236 rtol=rtol)
237 self.assertFloatsAlmostEqual(record['ip_diffim_DipoleFit_neg_y'],
238 record['ip_diffim_PsfDipoleFlux_neg_centroid_y'],
239 rtol=rtol)
241 def testDipoleTask(self):
242 """Test the dipole fitting singleFramePlugin.
244 Test that the resulting fluxes/centroids are entered into the
245 correct slots of the catalog, and have values that are very
246 close to the input values for both dipoles in the image.
248 Also test that the resulting fluxes are close to those
249 generated by the existing ip_diffim_DipoleMeasurement task
250 (PsfDipoleFit).
251 """
252 dipoleTestImage = DipoleTestImage()
253 sources = self._runDetection(dipoleTestImage)
254 self._checkTaskOutput(dipoleTestImage, sources)
256 def testDipoleTaskNoPosImage(self):
257 """Test the dipole fitting singleFramePlugin in the case where no
258 `posImage` is provided. It should be the same as above because
259 `posImage` can be constructed from `diffim+negImage`.
261 Test that the resulting fluxes/centroids are entered into the
262 correct slots of the catalog, and have values that are very
263 close to the input values for both dipoles in the image.
265 Also test that the resulting fluxes are close to those
266 generated by the existing ip_diffim_DipoleMeasurement task
267 (PsfDipoleFit).
268 """
269 dipoleTestImage = DipoleTestImage()
270 dipoleTestImage.testImage.posImage = None
271 sources = self._runDetection(dipoleTestImage)
272 self._checkTaskOutput(dipoleTestImage, sources)
274 def testDipoleTaskNoNegImage(self):
275 """Test the dipole fitting singleFramePlugin in the case where no
276 `negImage` is provided. It should be the same as above because
277 `negImage` can be constructed from `posImage-diffim`.
279 Test that the resulting fluxes/centroids are entered into the
280 correct slots of the catalog, and have values that are very
281 close to the input values for both dipoles in the image.
283 Also test that the resulting fluxes are close to those
284 generated by the existing ip_diffim_DipoleMeasurement task
285 (PsfDipoleFit).
286 """
287 dipoleTestImage = DipoleTestImage()
288 dipoleTestImage.testImage.negImage = None
289 sources = self._runDetection(dipoleTestImage)
290 self._checkTaskOutput(dipoleTestImage, sources)
292 def testDipoleTaskNoPreSubImages(self):
293 """Test the dipole fitting singleFramePlugin in the case where no
294 pre-subtraction data (`posImage` or `negImage`) are provided.
295 In this case it just fits a dipole model to the diffim
296 (dipole) image alone. Note that this test will only pass for
297 widely-separated dipoles.
299 Test that the resulting fluxes/centroids are entered into the
300 correct slots of the catalog, and have values that are very
301 close to the input values for both dipoles in the image.
303 Also test that the resulting fluxes are close to those
304 generated by the existing ip_diffim_DipoleMeasurement task
305 (PsfDipoleFit).
306 """
307 dipoleTestImage = DipoleTestImage()
308 dipoleTestImage.testImage.posImage = dipoleTestImage.testImage.negImage = None
309 sources = self._runDetection(dipoleTestImage)
310 self._checkTaskOutput(dipoleTestImage, sources, noPreImages=True)
312 def testDipoleEdge(self):
313 """Test the too-close-to-image-edge scenario for dipole fitting
314 singleFramePlugin.
316 Test that the dipoles which are too close to the edge are
317 not detected.
318 """
320 # with no edge, and masks growing due to convolution,
321 # we should not detect any dipole sources
322 # (If we were to keep the original mask instead, as in DM-45318 that
323 # was subsequently canceled in DM-47385, we will detect 2 sources)
324 dipoleTestImage = DipoleTestImage(xc=[5.3, 4.8], yc=[4.6, 86.5], flux=[200, 210], edgeWidth=0)
325 sources = self._runDetection(dipoleTestImage)
326 self.assertEqual(len(sources), 0)
328 # with a wide edge we should not detect any sources
329 dipoleTestImage = DipoleTestImage(xc=[5.3, 4.8], yc=[4.6, 86.5], flux=[200, 210], edgeWidth=20)
330 sources = self._runDetection(dipoleTestImage)
331 self.assertEqual(len(sources), 0)
333 def testDipoleFootprintTooLarge(self):
334 """Test that the footprint area cut flags sources."""
336 dipoleTestImage = DipoleTestImage()
337 # This area is smaller than the area of the test sources (~750).
338 sources = self._runDetection(dipoleTestImage, maxFootprintArea=500)
340 self.assertTrue(np.all(sources["ip_diffim_DipoleFit_flag"]))
343class TestMemory(lsst.utils.tests.MemoryTestCase):
344 pass
347def setup_module(module):
348 lsst.utils.tests.init()
351if __name__ == "__main__": 351 ↛ 352line 351 didn't jump to line 352 because the condition on line 351 was never true
352 lsst.utils.tests.init()
353 unittest.main()