Coverage for tests / test_dipoleFitter.py: 19%

118 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-30 09:08 +0000

1# 

2# LSST Data Management System 

3# Copyright 2008-2017 AURA/LSST. 

4# 

5# This product includes software developed by the 

6# LSST Project (http://www.lsst.org/). 

7# 

8# This program is free software: you can redistribute it and/or modify 

9# it under the terms of the GNU General Public License as published by 

10# the Free Software Foundation, either version 3 of the License, or 

11# (at your option) any later version. 

12# 

13# This program is distributed in the hope that it will be useful, 

14# but WITHOUT ANY WARRANTY; without even the implied warranty of 

15# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

16# GNU General Public License for more details. 

17# 

18# You should have received a copy of the LSST License Statement and 

19# the GNU General Public License along with this program. If not, 

20# see <https://www.lsstcorp.org/LegalNotices/>. 

21 

22"""Tests of the DipoleFitAlgorithm and its related tasks and plugins. 

23 

24Each test generates a fake image with two synthetic dipoles as input data. 

25""" 

26import unittest 

27 

28import numpy as np 

29 

30import lsst.utils.tests 

31import lsst.afw.table as afwTable 

32from lsst.ip.diffim.dipoleFitTask import (DipoleFitAlgorithm, DipoleFitTask) 

33import utils 

34 

35 

36class DipoleTestImage: 

37 """Create a test dipole image and store the parameters used to make it, 

38 for comparison with the fitted results. 

39 

40 Parameters 

41 ---------- 

42 xc, yc : `list` [`float`] 

43 x, y coordinate (pixels) of center(s) of input dipole(s). 

44 flux: `list` [`float`] 

45 Flux(es) of input dipole(s). 

46 gradientParams : `tuple` 

47 Tuple with three parameters for linear background gradient. 

48 offsets : `list` [`float`] 

49 Pixel coordinates between lobes of dipoles. 

50 """ 

51 

52 def __init__(self, xc=None, yc=None, flux=None, offsets=None, gradientParams=None, edgeWidth=None): 

53 self.xc = xc if xc is not None else [65.3, 24.2] 

54 self.yc = yc if yc is not None else [38.6, 78.5] 

55 self.offsets = offsets if offsets is not None else np.array([-2., 2.]) 

56 self.flux = flux if flux is not None else [2500., 2345.] 

57 self.gradientParams = gradientParams if gradientParams is not None else [10., 3., 5.] 

58 self.edgeWidth = edgeWidth if edgeWidth is not None else 8 

59 

60 # The default tolerance for comparisons of fitted parameters with input values. 

61 # Given the noise in the input images (default noise value of 2.), this is a 

62 # useful test of algorithm robustness, and will guard against future regressions. 

63 self.rtol = 0.01 

64 

65 self.generateTestImage() 

66 

67 def generateTestImage(self): 

68 self.testImage = utils.DipoleTestImage( 

69 w=100, h=100, 

70 xcenPos=self.xc + self.offsets, 

71 ycenPos=self.yc + self.offsets, 

72 xcenNeg=self.xc - self.offsets, 

73 ycenNeg=self.yc - self.offsets, 

74 flux=self.flux, fluxNeg=self.flux, 

75 noise=2., # Note the input noise - this affects the relative tolerances used. 

76 gradientParams=self.gradientParams, 

77 edgeWidth=self.edgeWidth) 

78 

79 

80class DipoleFitTest(lsst.utils.tests.TestCase): 

81 """A test case for separately testing the dipole fit algorithm 

82 directly, and the single frame measurement. 

83 

84 In each test, create a simulated diffim with two dipoles, noise, 

85 and a linear background gradient in the pre-sub images then 

86 compare the input fluxes/centroids with the fitted results. 

87 """ 

88 

89 def testDipoleAlgorithm(self): 

90 """Test the dipole fitting algorithm directly (fitDipole()). 

91 

92 Test that the resulting fluxes/centroids are very close to the 

93 input values for both dipoles in the image. 

94 """ 

95 # Display (plot) the output dipole thumbnails with matplotlib. 

96 display = False 

97 # Be verbose during fitting, including the lmfit internal details. 

98 verbose = False 

99 

100 dipoleTestImage = DipoleTestImage() 

101 catalog = dipoleTestImage.testImage.detectDipoleSources(minBinSize=32) 

102 

103 for s in catalog: 

104 fp = s.getFootprint() 

105 self.assertTrue(len(fp.getPeaks()) == 2) 

106 

107 rtol = dipoleTestImage.rtol 

108 offsets = dipoleTestImage.offsets 

109 testImage = dipoleTestImage.testImage 

110 for i, s in enumerate(catalog): 

111 alg = DipoleFitAlgorithm(testImage.diffim, testImage.posImage, testImage.negImage) 

112 result, _ = alg.fitDipole( 

113 s, rel_weight=0.5, separateNegParams=False, 

114 verbose=verbose, display=display) 

115 

116 self.assertFloatsAlmostEqual((result.posFlux.instFlux + abs(result.negFlux.instFlux))/2., 

117 dipoleTestImage.flux[i], rtol=rtol) 

118 self.assertFloatsAlmostEqual(result.posCentroid.x, dipoleTestImage.xc[i] + offsets[i], rtol=rtol) 

119 self.assertFloatsAlmostEqual(result.posCentroid.y, dipoleTestImage.yc[i] + offsets[i], rtol=rtol) 

120 self.assertFloatsAlmostEqual(result.negCentroid.x, dipoleTestImage.xc[i] - offsets[i], rtol=rtol) 

121 self.assertFloatsAlmostEqual(result.negCentroid.y, dipoleTestImage.yc[i] - offsets[i], rtol=rtol) 

122 

123 def _runDetection(self, dipoleTestImage, maxFootprintArea=None): 

124 """Run 'diaSource' detection on the diffim, including merging of 

125 positive and negative sources. 

126 

127 Then run DipoleFitTask on the image and return the resulting catalog. 

128 """ 

129 # Create the various tasks and schema -- avoid code reuse. 

130 testImage = dipoleTestImage.testImage 

131 detectTask, schema = testImage.detectDipoleSources(doMerge=False, minBinSize=32) 

132 

133 config = DipoleFitTask.ConfigClass() 

134 # Also run the older C++ DipoleFlux algorithm for comparison purposes. 

135 config.plugins.names |= ["ip_diffim_PsfDipoleFlux"] 

136 if maxFootprintArea: 

137 config.plugins["ip_diffim_DipoleFit"].maxFootprintArea = maxFootprintArea 

138 measureTask = DipoleFitTask(schema=schema, config=config) 

139 

140 table = afwTable.SourceTable.make(schema) 

141 detectResult = detectTask.run(table, testImage.diffim) 

142 fpSet = detectResult.positive 

143 fpSet.merge(detectResult.negative, 2, 2, False) 

144 sources = afwTable.SourceCatalog(table) 

145 fpSet.makeSources(sources) 

146 

147 measureTask.run(sources, testImage.diffim, testImage.posImage, testImage.negImage) 

148 return sources 

149 

150 def _checkTaskOutput(self, dipoleTestImage, sources, noPreImages=False): 

151 """Compare the fluxes/centroids in `sources` are entered 

152 into the correct slots of the catalog, and have values that 

153 are very close to the input values for both dipoles in the 

154 image. 

155 

156 Also test that the resulting fluxes are close to those 

157 generated by the existing ip_diffim_DipoleMeasurement task 

158 (PsfDipoleFit). 

159 

160 Parameters 

161 ---------- 

162 dipoleTestImage : `DipoleTestImage` 

163 Image that was generated for the test. 

164 sources : `lsst.afw.table.SourceCatalog` 

165 Source catalog measured on the test image. 

166 noPreImages : bool, optional 

167 Set to True if there were no positive/negative images generated 

168 for the test, to use much looser uncertainty tolerances. 

169 """ 

170 rtol = dipoleTestImage.rtol 

171 offsets = dipoleTestImage.offsets 

172 for i, record in enumerate(sources): 

173 self.assertFloatsAlmostEqual((record['ip_diffim_DipoleFit_pos_instFlux'] 

174 + abs(record['ip_diffim_DipoleFit_neg_instFlux']))/2., 

175 dipoleTestImage.flux[i], rtol=rtol) 

176 self.assertFloatsAlmostEqual(record['ip_diffim_DipoleFit_pos_x'], 

177 dipoleTestImage.xc[i] + offsets[i], rtol=rtol) 

178 self.assertFloatsAlmostEqual(record['ip_diffim_DipoleFit_pos_y'], 

179 dipoleTestImage.yc[i] + offsets[i], rtol=rtol) 

180 self.assertFloatsAlmostEqual(record['ip_diffim_DipoleFit_neg_x'], 

181 dipoleTestImage.xc[i] - offsets[i], rtol=rtol) 

182 self.assertFloatsAlmostEqual(record['ip_diffim_DipoleFit_neg_y'], 

183 dipoleTestImage.yc[i] - offsets[i], rtol=rtol) 

184 

185 # check uncertainties 

186 # TODO: how to compute the correct uncertainty to test here? 

187 # self.assertFloatsAlmostEqual(record['ip_diffim_DipoleFit_instFluxErr'], 

188 # ???, rtol=rtol) 

189 # emperical uncertainty values: not clear how to compute proper expected values. 

190 with self.subTest(i=repr(i), type="pos_xErr"): 

191 self.assertFloatsAlmostEqual(record['ip_diffim_DipoleFit_pos_xErr'], 

192 .53*record["base_SdssCentroid_xErr"], 

193 rtol=0.1 if not noPreImages else 0.5) 

194 with self.subTest(i=repr(i), type="pos_yErr"): 

195 self.assertFloatsAlmostEqual(record['ip_diffim_DipoleFit_pos_yErr'], 

196 .53*record["base_SdssCentroid_yErr"], 

197 rtol=0.1 if not noPreImages else 0.5) 

198 with self.subTest(i=repr(i), type="neg_xErr"): 

199 self.assertFloatsAlmostEqual(record['ip_diffim_DipoleFit_neg_xErr'], 

200 .53*record["base_SdssCentroid_xErr"], 

201 rtol=0.1 if not noPreImages else 0.5) 

202 with self.subTest(i=repr(i), type="neg_yErr"): 

203 self.assertFloatsAlmostEqual(record['ip_diffim_DipoleFit_neg_yErr'], 

204 .53*record["base_SdssCentroid_yErr"], 

205 rtol=0.1 if not noPreImages else 0.5) 

206 # NOTE: these are smaller than the positive/negative uncertainties, 

207 # because the of those covariance is negative! 

208 with self.subTest(i=repr(i), type="xErr"): 

209 self.assertFloatsAlmostEqual(record['ip_diffim_DipoleFit_xErr'], 

210 .34*record["base_SdssCentroid_xErr"], 

211 rtol=0.06 if not noPreImages else 0.5) 

212 with self.subTest(i=repr(i), type="yErr"): 

213 self.assertFloatsAlmostEqual(record['ip_diffim_DipoleFit_yErr'], 

214 .35*record["base_SdssCentroid_yErr"], 

215 rtol=0.06 if not noPreImages else 0.5) 

216 

217 # Note this is dependent on the noise (variance) being realistic in the image. 

218 # otherwise it throws off the chi2 estimate, which is used for classification: 

219 self.assertTrue(record['ip_diffim_DipoleFit_classification']) 

220 

221 # compare to the original ip_diffim_PsfDipoleFlux measurements 

222 # result2 = record.extract("ip_diffim_PsfDipoleFlux*") 

223 self.assertFloatsAlmostEqual((record['ip_diffim_DipoleFit_pos_instFlux'] 

224 + abs(record['ip_diffim_DipoleFit_neg_instFlux']))/2., 

225 (record['ip_diffim_PsfDipoleFlux_pos_instFlux'] 

226 + abs(record['ip_diffim_PsfDipoleFlux_neg_instFlux']))/2., 

227 rtol=rtol) 

228 self.assertFloatsAlmostEqual(record['ip_diffim_DipoleFit_pos_x'], 

229 record['ip_diffim_PsfDipoleFlux_pos_centroid_x'], 

230 rtol=rtol) 

231 self.assertFloatsAlmostEqual(record['ip_diffim_DipoleFit_pos_y'], 

232 record['ip_diffim_PsfDipoleFlux_pos_centroid_y'], 

233 rtol=rtol) 

234 self.assertFloatsAlmostEqual(record['ip_diffim_DipoleFit_neg_x'], 

235 record['ip_diffim_PsfDipoleFlux_neg_centroid_x'], 

236 rtol=rtol) 

237 self.assertFloatsAlmostEqual(record['ip_diffim_DipoleFit_neg_y'], 

238 record['ip_diffim_PsfDipoleFlux_neg_centroid_y'], 

239 rtol=rtol) 

240 

241 def testDipoleTask(self): 

242 """Test the dipole fitting singleFramePlugin. 

243 

244 Test that the resulting fluxes/centroids are entered into the 

245 correct slots of the catalog, and have values that are very 

246 close to the input values for both dipoles in the image. 

247 

248 Also test that the resulting fluxes are close to those 

249 generated by the existing ip_diffim_DipoleMeasurement task 

250 (PsfDipoleFit). 

251 """ 

252 dipoleTestImage = DipoleTestImage() 

253 sources = self._runDetection(dipoleTestImage) 

254 self._checkTaskOutput(dipoleTestImage, sources) 

255 

256 def testDipoleTaskNoPosImage(self): 

257 """Test the dipole fitting singleFramePlugin in the case where no 

258 `posImage` is provided. It should be the same as above because 

259 `posImage` can be constructed from `diffim+negImage`. 

260 

261 Test that the resulting fluxes/centroids are entered into the 

262 correct slots of the catalog, and have values that are very 

263 close to the input values for both dipoles in the image. 

264 

265 Also test that the resulting fluxes are close to those 

266 generated by the existing ip_diffim_DipoleMeasurement task 

267 (PsfDipoleFit). 

268 """ 

269 dipoleTestImage = DipoleTestImage() 

270 dipoleTestImage.testImage.posImage = None 

271 sources = self._runDetection(dipoleTestImage) 

272 self._checkTaskOutput(dipoleTestImage, sources) 

273 

274 def testDipoleTaskNoNegImage(self): 

275 """Test the dipole fitting singleFramePlugin in the case where no 

276 `negImage` is provided. It should be the same as above because 

277 `negImage` can be constructed from `posImage-diffim`. 

278 

279 Test that the resulting fluxes/centroids are entered into the 

280 correct slots of the catalog, and have values that are very 

281 close to the input values for both dipoles in the image. 

282 

283 Also test that the resulting fluxes are close to those 

284 generated by the existing ip_diffim_DipoleMeasurement task 

285 (PsfDipoleFit). 

286 """ 

287 dipoleTestImage = DipoleTestImage() 

288 dipoleTestImage.testImage.negImage = None 

289 sources = self._runDetection(dipoleTestImage) 

290 self._checkTaskOutput(dipoleTestImage, sources) 

291 

292 def testDipoleTaskNoPreSubImages(self): 

293 """Test the dipole fitting singleFramePlugin in the case where no 

294 pre-subtraction data (`posImage` or `negImage`) are provided. 

295 In this case it just fits a dipole model to the diffim 

296 (dipole) image alone. Note that this test will only pass for 

297 widely-separated dipoles. 

298 

299 Test that the resulting fluxes/centroids are entered into the 

300 correct slots of the catalog, and have values that are very 

301 close to the input values for both dipoles in the image. 

302 

303 Also test that the resulting fluxes are close to those 

304 generated by the existing ip_diffim_DipoleMeasurement task 

305 (PsfDipoleFit). 

306 """ 

307 dipoleTestImage = DipoleTestImage() 

308 dipoleTestImage.testImage.posImage = dipoleTestImage.testImage.negImage = None 

309 sources = self._runDetection(dipoleTestImage) 

310 self._checkTaskOutput(dipoleTestImage, sources, noPreImages=True) 

311 

312 def testDipoleEdge(self): 

313 """Test the too-close-to-image-edge scenario for dipole fitting 

314 singleFramePlugin. 

315 

316 Test that the dipoles which are too close to the edge are 

317 not detected. 

318 """ 

319 

320 # with no edge, and masks growing due to convolution, 

321 # we should not detect any dipole sources 

322 # (If we were to keep the original mask instead, as in DM-45318 that 

323 # was subsequently canceled in DM-47385, we will detect 2 sources) 

324 dipoleTestImage = DipoleTestImage(xc=[5.3, 4.8], yc=[4.6, 86.5], flux=[200, 210], edgeWidth=0) 

325 sources = self._runDetection(dipoleTestImage) 

326 self.assertEqual(len(sources), 0) 

327 

328 # with a wide edge we should not detect any sources 

329 dipoleTestImage = DipoleTestImage(xc=[5.3, 4.8], yc=[4.6, 86.5], flux=[200, 210], edgeWidth=20) 

330 sources = self._runDetection(dipoleTestImage) 

331 self.assertEqual(len(sources), 0) 

332 

333 def testDipoleFootprintTooLarge(self): 

334 """Test that the footprint area cut flags sources.""" 

335 

336 dipoleTestImage = DipoleTestImage() 

337 # This area is smaller than the area of the test sources (~750). 

338 sources = self._runDetection(dipoleTestImage, maxFootprintArea=500) 

339 

340 self.assertTrue(np.all(sources["ip_diffim_DipoleFit_flag"])) 

341 

342 

343class TestMemory(lsst.utils.tests.MemoryTestCase): 

344 pass 

345 

346 

347def setup_module(module): 

348 lsst.utils.tests.init() 

349 

350 

351if __name__ == "__main__": 351 ↛ 352line 351 didn't jump to line 352 because the condition on line 351 was never true

352 lsst.utils.tests.init() 

353 unittest.main()