Coverage for tests / test_MeasureSources.py: 11%
204 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-22 09:01 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-22 09:01 +0000
1# This file is part of meas_base.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
22import math
23import unittest
25import numpy as np
27import lsst.pex.exceptions
28import lsst.daf.base as dafBase
29import lsst.geom
30import lsst.afw.detection as afwDetection
31import lsst.afw.math as afwMath
32import lsst.afw.geom as afwGeom
33import lsst.afw.table as afwTable
34import lsst.afw.image as afwImage
35import lsst.meas.base as measBase
36import lsst.utils.tests
38FwhmPerSigma = 2*math.sqrt(2*math.log(2)) # FWHM for an N(0, 1) Gaussian
41def makePluginAndCat(alg, name, control, metadata=False, centroid=None):
42 schema = afwTable.SourceTable.makeMinimalSchema()
43 if centroid:
44 schema.addField(centroid + "_x", type=np.float64)
45 schema.addField(centroid + "_y", type=np.float64)
46 schema.addField(centroid + "_flag", type='Flag')
47 schema.getAliasMap().set("slot_Centroid", centroid)
48 if metadata:
49 plugin = alg(control, name, schema, dafBase.PropertySet())
50 else:
51 plugin = alg(control, name, schema)
52 cat = afwTable.SourceCatalog(schema)
53 return plugin, cat
56class MeasureSourcesTestCase(lsst.utils.tests.TestCase):
58 def testCircularApertureMeasure(self):
59 mi = afwImage.MaskedImageF(lsst.geom.ExtentI(100, 200))
60 mi.set(10)
61 #
62 # Create our measuring engine
63 #
65 radii = (1.0, 5.0, 10.0) # radii to use
67 control = measBase.ApertureFluxControl()
68 control.radii = radii
70 exp = afwImage.makeExposure(mi)
71 x0, y0 = 1234, 5678
72 exp.setXY0(lsst.geom.Point2I(x0, y0))
74 plugin, cat = makePluginAndCat(measBase.CircularApertureFluxAlgorithm,
75 "test", control, True, centroid="centroid")
76 source = cat.makeRecord()
77 source.set("centroid_x", 30+x0)
78 source.set("centroid_y", 50+y0)
79 plugin.measure(source, exp)
81 for r in radii:
82 currentFlux = source.get("%s_instFlux" %
83 measBase.CircularApertureFluxAlgorithm.makeFieldPrefix("test", r))
84 self.assertAlmostEqual(10.0*math.pi*r*r/currentFlux, 1.0, places=4)
86 def testPeakLikelihoodFlux(self):
87 """Test measurement with PeakLikelihoodFlux.
89 Notes
90 -----
91 This test makes and measures a series of exposures containing just one
92 star, approximately centered.
93 """
95 bbox = lsst.geom.Box2I(lsst.geom.Point2I(0, 0), lsst.geom.Extent2I(100, 101))
96 kernelWidth = 35
97 var = 100
98 fwhm = 3.0
99 sigma = fwhm/FwhmPerSigma
100 convolutionControl = afwMath.ConvolutionControl()
101 psf = afwDetection.GaussianPsf(kernelWidth, kernelWidth, sigma)
102 psfKernel = psf.getLocalKernel(psf.getAveragePosition())
103 psfImage = psf.computeKernelImage(psf.getAveragePosition())
104 sumPsfSq = np.sum(psfImage.array**2)
105 psfSqArr = psfImage.array**2
107 for instFlux in (1000, 10000):
108 ctrInd = lsst.geom.Point2I(50, 51)
109 ctrPos = lsst.geom.Point2D(ctrInd)
111 kernelBBox = psfImage.getBBox()
112 kernelBBox.shift(lsst.geom.Extent2I(ctrInd))
114 # compute predicted instFlux error
115 unshMImage = makeFakeImage(bbox, [ctrPos], [instFlux], fwhm, var)
117 # filter image by PSF
118 unshFiltMImage = afwImage.MaskedImageF(unshMImage.getBBox())
119 afwMath.convolve(unshFiltMImage, unshMImage, psfKernel, convolutionControl)
121 # compute predicted instFlux = value of image at peak / sum(PSF^2)
122 # this is a sanity check of the algorithm, as much as anything
123 predFlux = unshFiltMImage.image[ctrInd, afwImage.LOCAL] / sumPsfSq
124 self.assertLess(abs(instFlux - predFlux), instFlux * 0.01)
126 # compute predicted instFlux error based on filtered pixels
127 # = sqrt(value of filtered variance at peak / sum(PSF^2)^2)
128 predFluxErr = math.sqrt(unshFiltMImage.variance[ctrInd, afwImage.LOCAL]) / sumPsfSq
130 # compute predicted instFlux error based on unfiltered pixels
131 # = sqrt(sum(unfiltered variance * PSF^2)) / sum(PSF^2)
132 # and compare to that derived from filtered pixels;
133 # again, this is a test of the algorithm
134 varView = afwImage.ImageF(unshMImage.variance, kernelBBox)
135 varArr = varView.array
136 unfiltPredFluxErr = math.sqrt(np.sum(varArr*psfSqArr)) / sumPsfSq
137 self.assertLess(abs(unfiltPredFluxErr - predFluxErr), predFluxErr * 0.01)
139 for fracOffset in (lsst.geom.Extent2D(0, 0), lsst.geom.Extent2D(0.2, -0.3)):
140 adjCenter = ctrPos + fracOffset
141 if fracOffset == lsst.geom.Extent2D(0, 0):
142 maskedImage = unshMImage
143 filteredImage = unshFiltMImage
144 else:
145 maskedImage = makeFakeImage(bbox, [adjCenter], [instFlux], fwhm, var)
146 # filter image by PSF
147 filteredImage = afwImage.MaskedImageF(maskedImage.getBBox())
148 afwMath.convolve(filteredImage, maskedImage, psfKernel, convolutionControl)
150 exp = afwImage.makeExposure(filteredImage)
151 exp.setPsf(psf)
152 control = measBase.PeakLikelihoodFluxControl()
153 plugin, cat = makePluginAndCat(measBase.PeakLikelihoodFluxAlgorithm, "test",
154 control, centroid="centroid")
155 source = cat.makeRecord()
156 source.set("centroid_x", adjCenter.getX())
157 source.set("centroid_y", adjCenter.getY())
158 plugin.measure(source, exp)
159 measFlux = source.get("test_instFlux")
160 measFluxErr = source.get("test_instFluxErr")
161 self.assertLess(abs(measFlux - instFlux), instFlux * 0.003)
163 self.assertLess(abs(measFluxErr - predFluxErr), predFluxErr * 0.2)
165 # try nearby points and verify that the instFlux is smaller;
166 # this checks that the sub-pixel shift is performed in the
167 # correct direction
168 for dx in (-0.2, 0, 0.2):
169 for dy in (-0.2, 0, 0.2):
170 if dx == dy == 0:
171 continue
172 offsetCtr = lsst.geom.Point2D(adjCenter[0] + dx, adjCenter[1] + dy)
173 source = cat.makeRecord()
174 source.set("centroid_x", offsetCtr.getX())
175 source.set("centroid_y", offsetCtr.getY())
176 plugin.measure(source, exp)
177 self.assertLess(source.get("test_instFlux"), measFlux)
179 # source so near edge of image that PSF does not overlap exposure
180 # should result in failure
181 for edgePos in (
182 (1, 50),
183 (50, 1),
184 (50, bbox.getHeight() - 1),
185 (bbox.getWidth() - 1, 50),
186 ):
187 source = cat.makeRecord()
188 source.set("centroid_x", edgePos[0])
189 source.set("centroid_y", edgePos[1])
190 with self.assertRaises(lsst.pex.exceptions.RangeError):
191 plugin.measure(source, exp)
193 # no PSF should result in failure: flags set
194 noPsfExposure = afwImage.ExposureF(filteredImage)
195 source = cat.makeRecord()
196 source.set("centroid_x", edgePos[0])
197 source.set("centroid_y", edgePos[1])
198 with self.assertRaises(lsst.pex.exceptions.InvalidParameterError):
199 plugin.measure(source, noPsfExposure)
201 def testPixelFlags(self):
202 width, height = 100, 100
203 mi = afwImage.MaskedImageF(width, height)
204 exp = afwImage.makeExposure(mi)
205 mi.image.set(0)
206 mask = mi.mask
207 sat = mask.getPlaneBitMask('SAT')
208 interp = mask.getPlaneBitMask('INTRP')
209 edge = mask.getPlaneBitMask('EDGE')
210 bad = mask.getPlaneBitMask('BAD')
211 nodata = mask.getPlaneBitMask('NO_DATA')
212 mask.addMaskPlane('CLIPPED')
213 clipped = mask.getPlaneBitMask('CLIPPED')
214 mask.set(0)
215 mask[20, 20, afwImage.LOCAL] = sat
216 mask[60, 60, afwImage.LOCAL] = interp
217 mask[40, 20, afwImage.LOCAL] = bad
218 mask[20, 80, afwImage.LOCAL] = nodata
219 mask[30, 30, afwImage.LOCAL] = clipped
220 mask.Factory(mask, lsst.geom.Box2I(lsst.geom.Point2I(0, 0), lsst.geom.Extent2I(3, height))).set(edge)
221 x0, y0 = 1234, 5678
222 exp.setXY0(lsst.geom.Point2I(x0, y0))
223 control = measBase.PixelFlagsControl()
224 # Change the configuration of control to test for clipped mask
225 control.masksFpAnywhere = ['CLIPPED']
226 plugin, cat = makePluginAndCat(measBase.PixelFlagsAlgorithm, "test", control, centroid="centroid")
227 allFlags = [
228 "",
229 "edge",
230 "interpolated",
231 "interpolatedCenter",
232 "saturated",
233 "saturatedCenter",
234 "cr",
235 "crCenter",
236 "bad",
237 "clipped",
238 ]
239 for x, y, setFlags in [(1, 50, ['edge']),
240 (40, 20, ['bad']),
241 (20, 20, ['saturatedCenter',
242 'saturated']),
243 (20, 22, ['saturated']),
244 (60, 60, ['interpolatedCenter',
245 'interpolated']),
246 (60, 62, ['interpolated']),
247 (20, 80, ['nodata']),
248 (30, 30, ['clipped']),
249 ]:
250 spans = afwGeom.SpanSet.fromShape(5).shiftedBy(x + x0,
251 y + y0)
252 foot = afwDetection.Footprint(spans)
253 source = cat.makeRecord()
254 source.setFootprint(foot)
255 source.set("centroid_x", x+x0)
256 source.set("centroid_y", y+y0)
257 plugin.measure(source, exp)
258 for flag in allFlags[1:]:
259 value = source.get("test_flag_" + flag)
260 if flag in setFlags:
261 self.assertTrue(value, "Flag %s should be set for %f,%f" % (flag, x, y))
262 else:
263 self.assertFalse(value, "Flag %s should not be set for %f,%f" % (flag, x, y))
265 # the new code which grabs the center of a record throws when a NaN is
266 # set in the centroid slot and the algorithm attempts to get the
267 # default center position
268 source = cat.makeRecord()
269 source.set("centroid_x", float("NAN"))
270 source.set("centroid_y", 40)
271 source.set("centroid_flag", True)
272 tmpSpanSet = afwGeom.SpanSet.fromShape(5).shiftedBy(x + x0,
273 y + y0)
274 source.setFootprint(afwDetection.Footprint(tmpSpanSet))
275 with self.assertRaises(lsst.pex.exceptions.RuntimeError):
276 plugin.measure(source, exp)
278 # Test that if there is no center and centroider that the object
279 # should look at the footprint
280 plugin, cat = makePluginAndCat(measBase.PixelFlagsAlgorithm, "test", control)
281 # The first test should raise exception because there is no footprint
282 source = cat.makeRecord()
283 with self.assertRaises(lsst.pex.exceptions.RuntimeError):
284 plugin.measure(source, exp)
285 # The second test will raise an error because no peaks are present
286 tmpSpanSet2 = afwGeom.SpanSet.fromShape(5).shiftedBy(x + x0,
287 y + y0)
288 source.setFootprint(afwDetection.Footprint(tmpSpanSet2))
289 with self.assertRaises(lsst.pex.exceptions.RuntimeError):
290 plugin.measure(source, exp)
291 # The final test should pass because it detects a peak, we are reusing
292 # the location of the clipped bit in the mask plane, so we will check
293 # first that it is False, then True
294 source.getFootprint().addPeak(x+x0, y+y0, 100)
295 self.assertFalse(source.get("test_flag_clipped"), "The clipped flag should be set False")
296 plugin.measure(source, exp)
297 self.assertTrue(source.get("test_flag_clipped"), "The clipped flag should be set True")
300def addStar(image, center, instFlux, fwhm):
301 """Add a perfect single Gaussian star to an image.
303 Parameters
304 ----------
305 image : `lsst.afw.image.ImageF`
306 Image to which the star will be added.
307 center : `list` or `tuple` of `float`, length 2
308 Position of the center of the star on the image.
309 instFlux : `float`
310 instFlux of the Gaussian star, in counts.
311 fwhm : `float`
312 FWHM of the Gaussian star, in pixels.
314 Notes
315 -----
316 Uses Python to iterate over all pixels (because there is no C++
317 function that computes a Gaussian offset by a non-integral amount).
318 """
319 sigma = fwhm/FwhmPerSigma
320 func = afwMath.GaussianFunction2D(sigma, sigma, 0)
321 starImage = afwImage.ImageF(image.getBBox())
322 # The instFlux in the region of the image will not be exactly the desired
323 # instFlux because the Gaussian does not extend to infinity, so keep track
324 # of the actual instFlux and correct for it
325 actFlux = 0
326 # No function exists that has a fractional x and y offset, so set the
327 # image the slow way
328 for i in range(image.getWidth()):
329 x = center[0] - i
330 for j in range(image.getHeight()):
331 y = center[1] - j
332 pixVal = instFlux * func(x, y)
333 actFlux += pixVal
334 starImage[i, j, afwImage.LOCAL] += pixVal
335 starImage *= instFlux / actFlux
337 image += starImage
340def makeFakeImage(bbox, centerList, instFluxList, fwhm, var):
341 """Make a fake image containing a set of stars with variance = image + var.
343 Paramters
344 ---------
345 bbox : `lsst.afw.image.Box2I`
346 Bounding box for image.
347 centerList : iterable of pairs of `float`
348 list of positions of center of star on image.
349 instFluxList : `list` of `float`
350 instFlux of each star, in counts.
351 fwhm : `float`
352 FWHM of Gaussian star, in pixels.
353 var : `float`
354 Value of variance plane, in counts.
356 Returns
357 -------
358 maskedImage : `lsst.afw.image.MaskedImageF`
359 Resulting fake image.
361 Notes
362 -----
363 It is trivial to add Poisson noise, which would be more accurate, but
364 hard to make a unit test that can reliably determine whether such an
365 image passes a test.
366 """
367 if len(centerList) != len(instFluxList):
368 raise RuntimeError("len(centerList) != len(instFluxList)")
369 maskedImage = afwImage.MaskedImageF(bbox)
370 image = maskedImage.image
371 for center, instFlux in zip(centerList, instFluxList):
372 addStar(image, center=center, instFlux=instFlux, fwhm=fwhm)
373 variance = maskedImage.variance
374 variance[:] = image
375 variance += var
376 return maskedImage
379class TestMemory(lsst.utils.tests.MemoryTestCase):
380 pass
383def setup_module(module):
384 lsst.utils.tests.init()
387if __name__ == "__main__": 387 ↛ 388line 387 didn't jump to line 388 because the condition on line 387 was never true
388 lsst.utils.tests.init()
389 unittest.main()