Coverage for tests / test_PsfexPsf.py: 15%
226 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-23 08:39 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-23 08:39 +0000
1# This file is part of meas_extensions_psfex.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
22import math
23import numpy as np
24import unittest
26import lsst.utils.tests
27import lsst.afw.image as afwImage
28import lsst.afw.detection as afwDetection
29import lsst.afw.geom as afwGeom
30import lsst.geom as geom
31import lsst.afw.math as afwMath
32import lsst.afw.table as afwTable
33import lsst.daf.base as dafBase
34import lsst.meas.algorithms as measAlg
35import lsst.afw.image.testUtils # Inject some test helpers.
36from lsst.meas.base import SingleFrameMeasurementTask
37from lsst.meas.extensions.psfex import PsfexPsf
38# register the PSF determiner
39import lsst.meas.extensions.psfex.psfexPsfDeterminer
40assert lsst.meas.extensions.psfex.psfexPsfDeterminer # make pyflakes happy
42try:
43 display
44except NameError:
45 display = False
46else:
47 import lsst.afw.display as afwDisplay
48 afwDisplay.setDefaultMaskTransparency(75)
51def psfVal(ix, iy, x, y, sigma1, sigma2, b):
52 """Return the value at (ix, iy) of a double Gaussian
53 (N(0, sigma1^2) + b*N(0, sigma2^2))/(1 + b)
54 centered at (x, y)
55 """
56 dx, dy = x - ix, y - iy
57 theta = np.radians(30)
58 ab = 1.0/0.75 # axis ratio
59 c, s = np.cos(theta), np.sin(theta)
60 u, v = c*dx - s*dy, s*dx + c*dy
62 return (math.exp(-0.5*(u**2 + (v*ab)**2)/sigma1**2)
63 + b*math.exp(-0.5*(u**2 + (v*ab)**2)/sigma2**2))/(1 + b)
66class SpatialModelPsfTestCase(lsst.utils.tests.TestCase):
67 """A test case for SpatialModelPsf"""
69 def measure(self, footprintSet, exposure):
70 """Measure a set of Footprints, returning a SourceCatalog"""
71 catalog = afwTable.SourceCatalog(self.schema)
72 if display:
73 afwDisplay.Display(frame=0).mtv(exposure, title="Original")
75 footprintSet.makeSources(catalog)
77 self.measureSources.run(catalog, exposure)
78 return catalog
80 def setUp(self):
81 config = SingleFrameMeasurementTask.ConfigClass()
82 config.slots.apFlux = 'base_CircularApertureFlux_12_0'
83 self.schema = afwTable.SourceTable.makeMinimalSchema()
85 self.measureSources = SingleFrameMeasurementTask(self.schema, config=config)
87 width, height = 110, 301
89 self.mi = afwImage.MaskedImageF(geom.ExtentI(width, height))
90 self.mi.set(0)
91 sd = 3 # standard deviation of image
92 self.mi.getVariance().set(sd*sd)
93 self.mi.getMask().addMaskPlane("DETECTED")
95 self.ksize = 31 # size of desired kernel
97 sigma1 = 1.75
98 sigma2 = 2*sigma1
100 self.exposure = afwImage.makeExposure(self.mi)
101 self.exposure.setPsf(measAlg.DoubleGaussianPsf(self.ksize, self.ksize,
102 1.5*sigma1, 1, 0.1))
103 cdMatrix = np.array([1.0, 0.0, 0.0, 1.0])
104 cdMatrix.shape = (2, 2)
105 wcs = afwGeom.makeSkyWcs(crpix=geom.PointD(0, 0),
106 crval=geom.SpherePoint(0.0, 0.0, geom.degrees),
107 cdMatrix=cdMatrix)
108 self.exposure.setWcs(wcs)
110 #
111 # Make a kernel with the exactly correct basis functions.
112 # Useful for debugging
113 #
114 basisKernelList = []
115 for sigma in (sigma1, sigma2):
116 basisKernel = afwMath.AnalyticKernel(self.ksize, self.ksize,
117 afwMath.GaussianFunction2D(sigma, sigma))
118 basisImage = afwImage.ImageD(basisKernel.getDimensions())
119 basisKernel.computeImage(basisImage, True)
120 basisImage /= np.sum(basisImage.getArray())
122 if sigma == sigma1:
123 basisImage0 = basisImage
124 else:
125 basisImage -= basisImage0
127 basisKernelList.append(afwMath.FixedKernel(basisImage))
129 order = 1 # 1 => up to linear
130 spFunc = afwMath.PolynomialFunction2D(order)
132 exactKernel = afwMath.LinearCombinationKernel(basisKernelList, spFunc)
133 exactKernel.setSpatialParameters([[1.0, 0, 0],
134 [0.0, 0.5*1e-2, 0.2e-2]])
136 rand = afwMath.Random() # make these tests repeatable by setting seed
138 addNoise = True
140 if addNoise:
141 im = self.mi.getImage()
142 afwMath.randomGaussianImage(im, rand) # N(0, 1)
143 im *= sd # N(0, sd^2)
144 del im
146 xarr, yarr = [], []
148 # NOTE: Warning to those trying to add sources near the edges here:
149 # self.subtractStars() assumes that every source is able to have the
150 # psf subtracted. That's not possible for sources on the edge, so the
151 # chi2 calculation that is asserted on will be off.
152 for x, y in [(20, 20), (60, 20),
153 (30, 35),
154 (50, 50),
155 (20, 90), (70, 160), (25, 265), (75, 275), (85, 30),
156 (50, 120), (70, 80),
157 (60, 210), (20, 210),
158 ]:
159 xarr.append(x)
160 yarr.append(y)
162 for x, y in zip(xarr, yarr):
163 dx = rand.uniform() - 0.5 # random (centered) offsets
164 dy = rand.uniform() - 0.5
166 k = exactKernel.getSpatialFunction(1)(x, y) # functional variation of Kernel ...
167 b = (k*sigma1**2/((1 - k)*sigma2**2)) # ... converted double Gaussian's "b"
169 # flux = 80000 - 20*x - 10*(y/float(height))**2
170 flux = 80000*(1 + 0.1*(rand.uniform() - 0.5))
171 I0 = flux*(1 + b)/(2*np.pi*(sigma1**2 + b*sigma2**2))
172 for iy in range(y - self.ksize//2, y + self.ksize//2 + 1):
173 if iy < 0 or iy >= self.mi.getHeight():
174 continue
176 for ix in range(x - self.ksize//2, x + self.ksize//2 + 1):
177 if ix < 0 or ix >= self.mi.getWidth():
178 continue
180 II = I0*psfVal(ix, iy, x + dx, y + dy, sigma1, sigma2, b)
181 Isample = rand.poisson(II) if addNoise else II
182 self.mi.image[ix, iy, afwImage.LOCAL] += Isample
183 self.mi.variance[ix, iy, afwImage.LOCAL] += II
185 bbox = geom.BoxI(geom.PointI(0, 0), geom.ExtentI(width, height))
186 self.cellSet = afwMath.SpatialCellSet(bbox, 100)
188 self.footprintSet = afwDetection.FootprintSet(self.mi, afwDetection.Threshold(100), "DETECTED")
189 self.catalog = self.measure(self.footprintSet, self.exposure)
191 for source in self.catalog:
192 cand = measAlg.makePsfCandidate(source, self.exposure)
193 self.cellSet.insertCandidate(cand)
195 def tearDown(self):
196 del self.cellSet
197 del self.exposure
198 del self.mi
199 del self.footprintSet
200 del self.catalog
201 del self.schema
202 del self.measureSources
204 def setupDeterminer(self, exposure, fluxField=None):
205 """Setup the starSelector and psfDeterminer"""
206 starSelectorClass = measAlg.sourceSelectorRegistry["objectSize"]
207 starSelectorConfig = starSelectorClass.ConfigClass()
208 starSelectorConfig.sourceFluxField = "base_GaussianFlux_instFlux"
209 starSelectorConfig.badFlags = ["base_PixelFlags_flag_edge",
210 "base_PixelFlags_flag_interpolatedCenter",
211 "base_PixelFlags_flag_saturatedCenter",
212 "base_PixelFlags_flag_crCenter",
213 ]
214 starSelectorConfig.widthStdAllowed = 0.5 # Set to match when the tolerance of the test was set
216 self.starSelector = starSelectorClass(config=starSelectorConfig)
218 self.makePsfCandidates = measAlg.MakePsfCandidatesTask()
220 psfDeterminerClass = measAlg.psfDeterminerRegistry["psfex"]
221 psfDeterminerConfig = psfDeterminerClass.ConfigClass()
222 width, height = exposure.getMaskedImage().getDimensions()
223 psfDeterminerConfig.sizeCellX = width
224 psfDeterminerConfig.sizeCellY = height//3
225 psfDeterminerConfig.spatialOrder = 1
226 if fluxField is not None:
227 psfDeterminerConfig.photometricFluxField = fluxField
229 self.psfDeterminer = psfDeterminerClass(psfDeterminerConfig)
231 def subtractStars(self, exposure, catalog, chi_lim=-1):
232 """Subtract the exposure's PSF from all the sources in catalog"""
233 mi, psf = exposure.getMaskedImage(), exposure.getPsf()
235 subtracted = mi.Factory(mi, True)
236 for s in catalog:
237 xc, yc = s.getX(), s.getY()
238 bbox = subtracted.getBBox(afwImage.PARENT)
239 if bbox.contains(geom.PointI(int(xc), int(yc))):
240 measAlg.subtractPsf(psf, subtracted, xc, yc)
241 chi = subtracted.Factory(subtracted, True)
242 var = subtracted.getVariance()
243 np.sqrt(var.getArray(), var.getArray()) # inplace sqrt
244 chi /= var
246 if display:
247 afwDisplay.Display(frame=1).mtv(subtracted, title="Subtracted")
248 afwDisplay.Display(frame=2).mtv(chi, title="Chi")
249 xc, yc = exposure.getWidth()//2, exposure.getHeight()//2
250 afwDisplay.Display(frame=3).mtv(psf.computeImage(geom.Point2D(xc, yc)),
251 title="Psf %.1f,%.1f" % (xc, yc))
253 chi_min, chi_max = np.min(chi.getImage().getArray()), np.max(chi.getImage().getArray())
255 if chi_lim > 0:
256 self.assertGreater(chi_min, -chi_lim)
257 self.assertLess(chi_max, chi_lim)
259 def testPsfexDeterminer(self):
260 """Test the (Psfex) psfDeterminer on subImages"""
262 self.setupDeterminer(self.exposure)
263 metadata = dafBase.PropertyList()
265 stars = self.starSelector.run(self.catalog, exposure=self.exposure)
266 psfCandidateList = self.makePsfCandidates.run(stars.sourceCat, exposure=self.exposure).psfCandidates
267 psf, _ = self.psfDeterminer.determinePsf(self.exposure, psfCandidateList, metadata)
268 self.exposure.setPsf(psf)
270 # Test how well we can subtract the PSF model
271 self.subtractStars(self.exposure, self.catalog, chi_lim=5.6)
273 # Test PsfexPsf.computeBBox
274 pos = psf.getAveragePosition()
275 self.assertEqual(psf.computeBBox(pos), psf.computeKernelImage(pos).getBBox())
276 self.assertEqual(psf.computeBBox(pos), psf.getKernel(pos).getBBox())
278 def testPsfexDeterminerTooFewGoodStars(self):
279 """Test the (Psfex) psfDeterminer with too few good stars."""
280 self.setupDeterminer(self.exposure)
281 metadata = dafBase.PropertyList()
283 stars = self.starSelector.run(self.catalog, exposure=self.exposure)
284 psfCandidateList = self.makePsfCandidates.run(stars.sourceCat, exposure=self.exposure).psfCandidates
286 psfCandidateListShort = psfCandidateList[0: 3]
288 with self.assertRaisesRegex(
289 lsst.meas.extensions.psfex.psfexPsfDeterminer.PsfexTooFewGoodStarsError,
290 "Failed to determine psfex psf: too few good stars"
291 ):
292 self.psfDeterminer.determinePsf(self.exposure, psfCandidateListShort, metadata)
294 def testPsfexDeterminerNoStars(self):
295 """Test the (Psfex) psfDeterminer with no stars at all."""
296 self.setupDeterminer(self.exposure)
297 metadata = dafBase.PropertyList()
298 psfCandidateListEmpty = []
300 with self.assertRaisesRegex(
301 lsst.meas.extensions.psfex.psfexPsfDeterminer.PsfexNoStarsError, "No psf candidates supplied."
302 ):
303 self.psfDeterminer.determinePsf(self.exposure, psfCandidateListEmpty, metadata)
305 def testPsfexDeterminerNoGoodStars(self):
306 """Test the (Psfex) psfDeterminer with no good stars."""
307 self.setupDeterminer(self.exposure)
308 metadata = dafBase.PropertyList()
310 stars = self.starSelector.run(self.catalog, exposure=self.exposure)
311 psfCandidateList = self.makePsfCandidates.run(stars.sourceCat, exposure=self.exposure).psfCandidates
313 # Get the first three stars to make them bad in various ways.
314 psfCandidateListNoGoodStars = psfCandidateList[0: 3]
316 # For the first star, make the centroid bad.
317 s1 = psfCandidateListNoGoodStars[0].getSource()
318 s1["base_SdssCentroid_x"] = np.nan
320 # For the second star, make the default flux flagged.
321 s2 = psfCandidateListNoGoodStars[1].getSource()
322 s2.set("base_CircularApertureFlux_9_0_flag", True)
324 # For the third star, make the default flux negative.
325 s3 = psfCandidateListNoGoodStars[2].getSource()
326 s3.set("base_CircularApertureFlux_9_0_instFlux", -0.25)
328 with self.assertRaisesRegex(
329 lsst.meas.extensions.psfex.psfexPsfDeterminer.PsfexNoGoodStarsError,
330 "No good psf candidates to pass to psfex out of 3 available.",
331 ):
332 self.psfDeterminer.determinePsf(self.exposure, psfCandidateListNoGoodStars, metadata)
334 def testPsfDeterminerChangeFluxField(self):
335 """Test the psfDeterminer with a different flux normalization field."""
336 # We test here with an aperture that we would be unlikely to ever use
337 # as a default.
338 self.setupDeterminer(self.exposure, fluxField="base_CircularApertureFlux_6_0_instFlux")
339 metadata = dafBase.PropertyList()
341 stars = self.starSelector.run(self.catalog, exposure=self.exposure)
342 psfCandidateList = self.makePsfCandidates.run(stars.sourceCat, exposure=self.exposure).psfCandidates
343 psf, _ = self.psfDeterminer.determinePsf(self.exposure, psfCandidateList, metadata)
344 self.exposure.setPsf(psf)
346 # Test how well we can subtract the PSF model
347 self.subtractStars(self.exposure, self.catalog, chi_lim=5.6)
349 def testPsfDeterminerDownsample(self):
350 """Test the psfDeterminer with downsampling."""
351 self.setupDeterminer(self.exposure)
352 metadata = dafBase.PropertyList()
354 # Decrease the maximum number of stars.
355 # Without more changes to the test harness, we do not have access to
356 # which psf stars were used. With only 3 stars we fail below so we use
357 # this to confirm that the selection code is triggering.
358 self.psfDeterminer.config.maxCandidates = 10
360 stars = self.starSelector.run(self.catalog, exposure=self.exposure)
361 psfCandidateList = self.makePsfCandidates.run(stars.sourceCat, exposure=self.exposure).psfCandidates
362 self.psfDeterminer.determinePsf(self.exposure, psfCandidateList, metadata)
364 self.assertEqual(metadata['numAvailStars'], self.psfDeterminer.config.maxCandidates)
365 self.assertLessEqual(metadata['numGoodStars'], self.psfDeterminer.config.maxCandidates)
367 def testSerializationData(self):
368 """Test that we can round-trip through the new PsfExSerializationData
369 struct.
370 """
371 self.setupDeterminer(self.exposure)
372 metadata = dafBase.PropertyList()
374 stars = self.starSelector.run(self.catalog, exposure=self.exposure)
375 psfCandidateList = self.makePsfCandidates.run(stars.sourceCat, exposure=self.exposure).psfCandidates
376 psf1, _ = self.psfDeterminer.determinePsf(self.exposure, psfCandidateList, metadata)
378 data = psf1.getSerializationData()
379 psf2 = PsfexPsf.fromSerializationData(data)
380 self.assertEqual(psf1.getAveragePosition(), psf2.getAveragePosition())
381 self.assertImagesEqual(psf1.computeImage(psf1.getAveragePosition()),
382 psf2.computeImage(psf2.getAveragePosition()))
385class TestMemory(lsst.utils.tests.MemoryTestCase):
386 pass
389def setup_module(module):
390 lsst.utils.tests.init()
393if __name__ == "__main__": 393 ↛ 394line 393 didn't jump to line 394 because the condition on line 393 was never true
394 lsst.utils.tests.init()
395 unittest.main()