Coverage for tests / test_MeasureSources.py: 11%

204 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:53 +0000

1# This file is part of meas_base. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21 

22import math 

23import unittest 

24 

25import numpy as np 

26 

27import lsst.pex.exceptions 

28import lsst.daf.base as dafBase 

29import lsst.geom 

30import lsst.afw.detection as afwDetection 

31import lsst.afw.math as afwMath 

32import lsst.afw.geom as afwGeom 

33import lsst.afw.table as afwTable 

34import lsst.afw.image as afwImage 

35import lsst.meas.base as measBase 

36import lsst.utils.tests 

37 

38FwhmPerSigma = 2*math.sqrt(2*math.log(2)) # FWHM for an N(0, 1) Gaussian 

39 

40 

41def makePluginAndCat(alg, name, control, metadata=False, centroid=None): 

42 schema = afwTable.SourceTable.makeMinimalSchema() 

43 if centroid: 

44 schema.addField(centroid + "_x", type=np.float64) 

45 schema.addField(centroid + "_y", type=np.float64) 

46 schema.addField(centroid + "_flag", type='Flag') 

47 schema.getAliasMap().set("slot_Centroid", centroid) 

48 if metadata: 

49 plugin = alg(control, name, schema, dafBase.PropertySet()) 

50 else: 

51 plugin = alg(control, name, schema) 

52 cat = afwTable.SourceCatalog(schema) 

53 return plugin, cat 

54 

55 

56class MeasureSourcesTestCase(lsst.utils.tests.TestCase): 

57 

58 def testCircularApertureMeasure(self): 

59 mi = afwImage.MaskedImageF(lsst.geom.ExtentI(100, 200)) 

60 mi.set(10) 

61 # 

62 # Create our measuring engine 

63 # 

64 

65 radii = (1.0, 5.0, 10.0) # radii to use 

66 

67 control = measBase.ApertureFluxControl() 

68 control.radii = radii 

69 

70 exp = afwImage.makeExposure(mi) 

71 x0, y0 = 1234, 5678 

72 exp.setXY0(lsst.geom.Point2I(x0, y0)) 

73 

74 plugin, cat = makePluginAndCat(measBase.CircularApertureFluxAlgorithm, 

75 "test", control, True, centroid="centroid") 

76 source = cat.makeRecord() 

77 source.set("centroid_x", 30+x0) 

78 source.set("centroid_y", 50+y0) 

79 plugin.measure(source, exp) 

80 

81 for r in radii: 

82 currentFlux = source.get("%s_instFlux" % 

83 measBase.CircularApertureFluxAlgorithm.makeFieldPrefix("test", r)) 

84 self.assertAlmostEqual(10.0*math.pi*r*r/currentFlux, 1.0, places=4) 

85 

86 def testPeakLikelihoodFlux(self): 

87 """Test measurement with PeakLikelihoodFlux. 

88 

89 Notes 

90 ----- 

91 This test makes and measures a series of exposures containing just one 

92 star, approximately centered. 

93 """ 

94 

95 bbox = lsst.geom.Box2I(lsst.geom.Point2I(0, 0), lsst.geom.Extent2I(100, 101)) 

96 kernelWidth = 35 

97 var = 100 

98 fwhm = 3.0 

99 sigma = fwhm/FwhmPerSigma 

100 convolutionControl = afwMath.ConvolutionControl() 

101 psf = afwDetection.GaussianPsf(kernelWidth, kernelWidth, sigma) 

102 psfKernel = psf.getLocalKernel(psf.getAveragePosition()) 

103 psfImage = psf.computeKernelImage(psf.getAveragePosition()) 

104 sumPsfSq = np.sum(psfImage.array**2) 

105 psfSqArr = psfImage.array**2 

106 

107 for instFlux in (1000, 10000): 

108 ctrInd = lsst.geom.Point2I(50, 51) 

109 ctrPos = lsst.geom.Point2D(ctrInd) 

110 

111 kernelBBox = psfImage.getBBox() 

112 kernelBBox.shift(lsst.geom.Extent2I(ctrInd)) 

113 

114 # compute predicted instFlux error 

115 unshMImage = makeFakeImage(bbox, [ctrPos], [instFlux], fwhm, var) 

116 

117 # filter image by PSF 

118 unshFiltMImage = afwImage.MaskedImageF(unshMImage.getBBox()) 

119 afwMath.convolve(unshFiltMImage, unshMImage, psfKernel, convolutionControl) 

120 

121 # compute predicted instFlux = value of image at peak / sum(PSF^2) 

122 # this is a sanity check of the algorithm, as much as anything 

123 predFlux = unshFiltMImage.image[ctrInd, afwImage.LOCAL] / sumPsfSq 

124 self.assertLess(abs(instFlux - predFlux), instFlux * 0.01) 

125 

126 # compute predicted instFlux error based on filtered pixels 

127 # = sqrt(value of filtered variance at peak / sum(PSF^2)^2) 

128 predFluxErr = math.sqrt(unshFiltMImage.variance[ctrInd, afwImage.LOCAL]) / sumPsfSq 

129 

130 # compute predicted instFlux error based on unfiltered pixels 

131 # = sqrt(sum(unfiltered variance * PSF^2)) / sum(PSF^2) 

132 # and compare to that derived from filtered pixels; 

133 # again, this is a test of the algorithm 

134 varView = afwImage.ImageF(unshMImage.variance, kernelBBox) 

135 varArr = varView.array 

136 unfiltPredFluxErr = math.sqrt(np.sum(varArr*psfSqArr)) / sumPsfSq 

137 self.assertLess(abs(unfiltPredFluxErr - predFluxErr), predFluxErr * 0.01) 

138 

139 for fracOffset in (lsst.geom.Extent2D(0, 0), lsst.geom.Extent2D(0.2, -0.3)): 

140 adjCenter = ctrPos + fracOffset 

141 if fracOffset == lsst.geom.Extent2D(0, 0): 

142 maskedImage = unshMImage 

143 filteredImage = unshFiltMImage 

144 else: 

145 maskedImage = makeFakeImage(bbox, [adjCenter], [instFlux], fwhm, var) 

146 # filter image by PSF 

147 filteredImage = afwImage.MaskedImageF(maskedImage.getBBox()) 

148 afwMath.convolve(filteredImage, maskedImage, psfKernel, convolutionControl) 

149 

150 exp = afwImage.makeExposure(filteredImage) 

151 exp.setPsf(psf) 

152 control = measBase.PeakLikelihoodFluxControl() 

153 plugin, cat = makePluginAndCat(measBase.PeakLikelihoodFluxAlgorithm, "test", 

154 control, centroid="centroid") 

155 source = cat.makeRecord() 

156 source.set("centroid_x", adjCenter.getX()) 

157 source.set("centroid_y", adjCenter.getY()) 

158 plugin.measure(source, exp) 

159 measFlux = source.get("test_instFlux") 

160 measFluxErr = source.get("test_instFluxErr") 

161 self.assertLess(abs(measFlux - instFlux), instFlux * 0.003) 

162 

163 self.assertLess(abs(measFluxErr - predFluxErr), predFluxErr * 0.2) 

164 

165 # try nearby points and verify that the instFlux is smaller; 

166 # this checks that the sub-pixel shift is performed in the 

167 # correct direction 

168 for dx in (-0.2, 0, 0.2): 

169 for dy in (-0.2, 0, 0.2): 

170 if dx == dy == 0: 

171 continue 

172 offsetCtr = lsst.geom.Point2D(adjCenter[0] + dx, adjCenter[1] + dy) 

173 source = cat.makeRecord() 

174 source.set("centroid_x", offsetCtr.getX()) 

175 source.set("centroid_y", offsetCtr.getY()) 

176 plugin.measure(source, exp) 

177 self.assertLess(source.get("test_instFlux"), measFlux) 

178 

179 # source so near edge of image that PSF does not overlap exposure 

180 # should result in failure 

181 for edgePos in ( 

182 (1, 50), 

183 (50, 1), 

184 (50, bbox.getHeight() - 1), 

185 (bbox.getWidth() - 1, 50), 

186 ): 

187 source = cat.makeRecord() 

188 source.set("centroid_x", edgePos[0]) 

189 source.set("centroid_y", edgePos[1]) 

190 with self.assertRaises(lsst.pex.exceptions.RangeError): 

191 plugin.measure(source, exp) 

192 

193 # no PSF should result in failure: flags set 

194 noPsfExposure = afwImage.ExposureF(filteredImage) 

195 source = cat.makeRecord() 

196 source.set("centroid_x", edgePos[0]) 

197 source.set("centroid_y", edgePos[1]) 

198 with self.assertRaises(lsst.pex.exceptions.InvalidParameterError): 

199 plugin.measure(source, noPsfExposure) 

200 

201 def testPixelFlags(self): 

202 width, height = 100, 100 

203 mi = afwImage.MaskedImageF(width, height) 

204 exp = afwImage.makeExposure(mi) 

205 mi.image.set(0) 

206 mask = mi.mask 

207 sat = mask.getPlaneBitMask('SAT') 

208 interp = mask.getPlaneBitMask('INTRP') 

209 edge = mask.getPlaneBitMask('EDGE') 

210 bad = mask.getPlaneBitMask('BAD') 

211 nodata = mask.getPlaneBitMask('NO_DATA') 

212 mask.addMaskPlane('CLIPPED') 

213 clipped = mask.getPlaneBitMask('CLIPPED') 

214 mask.set(0) 

215 mask[20, 20, afwImage.LOCAL] = sat 

216 mask[60, 60, afwImage.LOCAL] = interp 

217 mask[40, 20, afwImage.LOCAL] = bad 

218 mask[20, 80, afwImage.LOCAL] = nodata 

219 mask[30, 30, afwImage.LOCAL] = clipped 

220 mask.Factory(mask, lsst.geom.Box2I(lsst.geom.Point2I(0, 0), lsst.geom.Extent2I(3, height))).set(edge) 

221 x0, y0 = 1234, 5678 

222 exp.setXY0(lsst.geom.Point2I(x0, y0)) 

223 control = measBase.PixelFlagsControl() 

224 # Change the configuration of control to test for clipped mask 

225 control.masksFpAnywhere = ['CLIPPED'] 

226 plugin, cat = makePluginAndCat(measBase.PixelFlagsAlgorithm, "test", control, centroid="centroid") 

227 allFlags = [ 

228 "", 

229 "edge", 

230 "interpolated", 

231 "interpolatedCenter", 

232 "saturated", 

233 "saturatedCenter", 

234 "cr", 

235 "crCenter", 

236 "bad", 

237 "clipped", 

238 ] 

239 for x, y, setFlags in [(1, 50, ['edge']), 

240 (40, 20, ['bad']), 

241 (20, 20, ['saturatedCenter', 

242 'saturated']), 

243 (20, 22, ['saturated']), 

244 (60, 60, ['interpolatedCenter', 

245 'interpolated']), 

246 (60, 62, ['interpolated']), 

247 (20, 80, ['nodata']), 

248 (30, 30, ['clipped']), 

249 ]: 

250 spans = afwGeom.SpanSet.fromShape(5).shiftedBy(x + x0, 

251 y + y0) 

252 foot = afwDetection.Footprint(spans) 

253 source = cat.makeRecord() 

254 source.setFootprint(foot) 

255 source.set("centroid_x", x+x0) 

256 source.set("centroid_y", y+y0) 

257 plugin.measure(source, exp) 

258 for flag in allFlags[1:]: 

259 value = source.get("test_flag_" + flag) 

260 if flag in setFlags: 

261 self.assertTrue(value, "Flag %s should be set for %f,%f" % (flag, x, y)) 

262 else: 

263 self.assertFalse(value, "Flag %s should not be set for %f,%f" % (flag, x, y)) 

264 

265 # the new code which grabs the center of a record throws when a NaN is 

266 # set in the centroid slot and the algorithm attempts to get the 

267 # default center position 

268 source = cat.makeRecord() 

269 source.set("centroid_x", float("NAN")) 

270 source.set("centroid_y", 40) 

271 source.set("centroid_flag", True) 

272 tmpSpanSet = afwGeom.SpanSet.fromShape(5).shiftedBy(x + x0, 

273 y + y0) 

274 source.setFootprint(afwDetection.Footprint(tmpSpanSet)) 

275 with self.assertRaises(lsst.pex.exceptions.RuntimeError): 

276 plugin.measure(source, exp) 

277 

278 # Test that if there is no center and centroider that the object 

279 # should look at the footprint 

280 plugin, cat = makePluginAndCat(measBase.PixelFlagsAlgorithm, "test", control) 

281 # The first test should raise exception because there is no footprint 

282 source = cat.makeRecord() 

283 with self.assertRaises(lsst.pex.exceptions.RuntimeError): 

284 plugin.measure(source, exp) 

285 # The second test will raise an error because no peaks are present 

286 tmpSpanSet2 = afwGeom.SpanSet.fromShape(5).shiftedBy(x + x0, 

287 y + y0) 

288 source.setFootprint(afwDetection.Footprint(tmpSpanSet2)) 

289 with self.assertRaises(lsst.pex.exceptions.RuntimeError): 

290 plugin.measure(source, exp) 

291 # The final test should pass because it detects a peak, we are reusing 

292 # the location of the clipped bit in the mask plane, so we will check 

293 # first that it is False, then True 

294 source.getFootprint().addPeak(x+x0, y+y0, 100) 

295 self.assertFalse(source.get("test_flag_clipped"), "The clipped flag should be set False") 

296 plugin.measure(source, exp) 

297 self.assertTrue(source.get("test_flag_clipped"), "The clipped flag should be set True") 

298 

299 

300def addStar(image, center, instFlux, fwhm): 

301 """Add a perfect single Gaussian star to an image. 

302 

303 Parameters 

304 ---------- 

305 image : `lsst.afw.image.ImageF` 

306 Image to which the star will be added. 

307 center : `list` or `tuple` of `float`, length 2 

308 Position of the center of the star on the image. 

309 instFlux : `float` 

310 instFlux of the Gaussian star, in counts. 

311 fwhm : `float` 

312 FWHM of the Gaussian star, in pixels. 

313 

314 Notes 

315 ----- 

316 Uses Python to iterate over all pixels (because there is no C++ 

317 function that computes a Gaussian offset by a non-integral amount). 

318 """ 

319 sigma = fwhm/FwhmPerSigma 

320 func = afwMath.GaussianFunction2D(sigma, sigma, 0) 

321 starImage = afwImage.ImageF(image.getBBox()) 

322 # The instFlux in the region of the image will not be exactly the desired 

323 # instFlux because the Gaussian does not extend to infinity, so keep track 

324 # of the actual instFlux and correct for it 

325 actFlux = 0 

326 # No function exists that has a fractional x and y offset, so set the 

327 # image the slow way 

328 for i in range(image.getWidth()): 

329 x = center[0] - i 

330 for j in range(image.getHeight()): 

331 y = center[1] - j 

332 pixVal = instFlux * func(x, y) 

333 actFlux += pixVal 

334 starImage[i, j, afwImage.LOCAL] += pixVal 

335 starImage *= instFlux / actFlux 

336 

337 image += starImage 

338 

339 

340def makeFakeImage(bbox, centerList, instFluxList, fwhm, var): 

341 """Make a fake image containing a set of stars with variance = image + var. 

342 

343 Paramters 

344 --------- 

345 bbox : `lsst.afw.image.Box2I` 

346 Bounding box for image. 

347 centerList : iterable of pairs of `float` 

348 list of positions of center of star on image. 

349 instFluxList : `list` of `float` 

350 instFlux of each star, in counts. 

351 fwhm : `float` 

352 FWHM of Gaussian star, in pixels. 

353 var : `float` 

354 Value of variance plane, in counts. 

355 

356 Returns 

357 ------- 

358 maskedImage : `lsst.afw.image.MaskedImageF` 

359 Resulting fake image. 

360 

361 Notes 

362 ----- 

363 It is trivial to add Poisson noise, which would be more accurate, but 

364 hard to make a unit test that can reliably determine whether such an 

365 image passes a test. 

366 """ 

367 if len(centerList) != len(instFluxList): 

368 raise RuntimeError("len(centerList) != len(instFluxList)") 

369 maskedImage = afwImage.MaskedImageF(bbox) 

370 image = maskedImage.image 

371 for center, instFlux in zip(centerList, instFluxList): 

372 addStar(image, center=center, instFlux=instFlux, fwhm=fwhm) 

373 variance = maskedImage.variance 

374 variance[:] = image 

375 variance += var 

376 return maskedImage 

377 

378 

379class TestMemory(lsst.utils.tests.MemoryTestCase): 

380 pass 

381 

382 

383def setup_module(module): 

384 lsst.utils.tests.init() 

385 

386 

387if __name__ == "__main__": 387 ↛ 388line 387 didn't jump to line 388 because the condition on line 387 was never true

388 lsst.utils.tests.init() 

389 unittest.main()