Coverage for tests / test_detection.py: 13%
184 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-26 09:07 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-26 09:07 +0000
1#
2# LSST Data Management System
3#
4# Copyright 2008-2016 AURA/LSST.
5#
6# This product includes software developed by the
7# LSST Project (http://www.lsst.org/).
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the LSST License Statement and
20# the GNU General Public License along with this program. If not,
21# see <https://www.lsstcorp.org/LegalNotices/>.
22#
23import unittest
24import numpy as np
26import lsst.geom
27import lsst.afw.table as afwTable
28import lsst.afw.image as afwImage
29import lsst.afw.math as afwMath
30from lsst.meas.algorithms import SourceDetectionTask, SubtractBackgroundTask
31from lsst.meas.algorithms.testUtils import plantSources
32import lsst.utils.tests
34# To plot in ds9, `setup display_ds9` first, and open a ds9 window.
35# import lsstDebug
36# def DebugInfo(name):
37# debug = lsstDebug.getInfo(name)
38# if name == "lsst.meas.algorithms.detection":
39# debug.display = 2
40# return debug
41# lsstDebug.Info = DebugInfo
44class SourceDetectionTaskTestCase(lsst.utils.tests.TestCase):
46 def _create_exposure(self, bigBbox=False):
47 """Return a simulated exposure (and relevant parameters) with Gaussian
48 stars.
49 """
50 if bigBbox:
51 bbox = lsst.geom.Box2I(lsst.geom.Point2I(0, 0), lsst.geom.Extent2I(1024, 1024))
52 else:
53 bbox = lsst.geom.Box2I(lsst.geom.Point2I(256, 100), lsst.geom.Extent2I(128, 127))
54 minCounts = 5000
55 maxCounts = 50000
56 starSigma = 1.5
57 numX = 5
58 numY = 5
59 coordList = self.makeCoordList(
60 bbox=bbox,
61 numX=numX,
62 numY=numY,
63 minCounts=minCounts,
64 maxCounts=maxCounts,
65 sigma=starSigma,
66 )
67 kwid = 11
68 sky = 2000
69 addPoissonNoise = True
70 exposure = plantSources(bbox=bbox, kwid=kwid, sky=sky, coordList=coordList,
71 addPoissonNoise=addPoissonNoise)
72 schema = afwTable.SourceTable.makeMinimalSchema()
74 return exposure, schema, numX, numY, starSigma
76 def _check_detectFootprints(self, exposure, numX, numY, starSigma, task, config, doSmooth=False):
77 """Run detectFootprints and check that the output is reasonable,
78 for either value of doSmooth.
79 """
80 taskSigma = 2.2
81 res = task.detectFootprints(exposure, doSmooth=doSmooth, sigma=taskSigma)
82 self.assertEqual(res.numPos, numX * numY)
83 self.assertEqual(res.numNeg, 0)
84 self.assertEqual(task.metadata.getScalar("sigma"), taskSigma)
85 self.assertEqual(task.metadata.getScalar("doSmooth"), doSmooth)
86 self.assertEqual(task.metadata.getScalar("nGrow"), int(taskSigma * config.nSigmaToGrow + 0.5))
88 res = task.detectFootprints(exposure, doSmooth=doSmooth, sigma=None)
89 taskSigma = task.metadata.getScalar("sigma")
90 self.assertLess(abs(taskSigma - starSigma), 0.1)
91 self.assertEqual(res.numPos, numX * numY)
92 self.assertEqual(res.numNeg, 0)
93 return res
95 def test_stdev(self):
96 """Test that sources are detected on a simulated image with
97 thresholdType='stdev'.
98 """
99 exposure, schema, numX, numY, starSigma = self._create_exposure()
101 config = SourceDetectionTask.ConfigClass()
102 # don't modify the image after detection.
103 config.reEstimateBackground = False
104 config.thresholdType = "stdev"
105 task = SourceDetectionTask(config=config, schema=schema)
107 self._check_detectFootprints(exposure, numX, numY, starSigma, task, config, doSmooth=True)
108 self._check_detectFootprints(exposure, numX, numY, starSigma, task, config, doSmooth=False)
110 def test_significance_stdev(self):
111 """Check the non-smoothed, non-background updated peak significance
112 values with thresholdType="stddev".
113 """
114 exposure, schema, numX, numY, starSigma = self._create_exposure()
116 config = SourceDetectionTask.ConfigClass()
117 # don't modify the image after detection.
118 config.reEstimateBackground = False
119 config.doTempLocalBackground = False
120 config.thresholdType = "stdev"
121 task = SourceDetectionTask(config=config, schema=schema)
123 result = self._check_detectFootprints(exposure, numX, numY, starSigma, task, config, doSmooth=False)
125 bad = exposure.mask.getPlaneBitMask(config.statsMask)
126 sctrl = afwMath.StatisticsControl()
127 sctrl.setAndMask(bad)
128 stats = afwMath.makeStatistics(exposure.maskedImage, afwMath.STDEVCLIP, sctrl)
129 stddev = stats.getValue(afwMath.STDEVCLIP)
130 for footprint in result.positive.getFootprints():
131 for peak in footprint.peaks:
132 point = lsst.geom.Point2I(peak.getIx(), peak.getIy())
133 value = exposure.image[point]
134 with self.subTest(str(point)):
135 self.assertFloatsAlmostEqual(peak["significance"],
136 value/stddev,
137 rtol=1e-7,
138 msg=str(point))
140 def test_pixel_stdev(self):
141 """Test that sources are detected on a simulated image with
142 thresholdType='pixel_stdev', and that they have the right significance.
143 """
144 exposure, schema, numX, numY, starSigma = self._create_exposure()
146 config = SourceDetectionTask.ConfigClass()
147 config.thresholdType = "pixel_stdev"
148 config.reEstimateBackground = False
149 # TempLocalBackground changes the peak value of the faintest peak,
150 # so disable it for this test so that we can calculate an expected
151 # answer without having to try to deal with backgrounds.
152 config.doTempLocalBackground = False
153 task = SourceDetectionTask(config=config, schema=schema)
154 # Don't smooth, so that we can directly calculate the s/n from the exposure.
155 result = task.detectFootprints(exposure, doSmooth=False)
156 self.assertEqual(result.numPos, numX * numY)
157 self.assertEqual(result.numNeg, 0)
158 # Significance values for `pixel_stdev` should match image/sqrt(variance).
159 for footprint in result.positive.getFootprints():
160 for peak in footprint.peaks:
161 point = lsst.geom.Point2I(peak.getIx(), peak.getIy())
162 value = exposure.image[point]
163 stddev = np.sqrt(exposure.variance[point])
164 with self.subTest(str(point)):
165 self.assertFloatsAlmostEqual(peak["significance"],
166 value/stddev,
167 rtol=1e-7,
168 msg=str(point))
170 def makeCoordList(self, bbox, numX, numY, minCounts, maxCounts, sigma):
171 """Make a coordList for plantSources."""
172 """
173 Coords are uniformly spaced in a rectangular grid, with linearly increasing counts
174 """
175 dX = bbox.getWidth() / float(numX)
176 dY = bbox.getHeight() / float(numY)
177 minX = bbox.getMinX() + (dX / 2.0)
178 minY = bbox.getMinY() + (dY / 2.0)
179 dCounts = (maxCounts - minCounts) / (numX * numY - 1)
181 coordList = []
182 counts = minCounts
183 for i in range(numX):
184 x = minX + (dX * i)
185 for j in range(numY):
186 y = minY + (dY * j)
187 coordList.append([x, y, counts, sigma])
188 counts += dCounts
189 return coordList
191 def testTempBackgrounds(self):
192 """Test that the temporary backgrounds we remove are properly restored"""
193 bbox = lsst.geom.Box2I(lsst.geom.Point2I(12345, 67890), lsst.geom.Extent2I(128, 127))
194 original = afwImage.ExposureF(bbox)
195 rng = np.random.RandomState(123)
196 original.image.array[:] = rng.normal(size=original.image.array.shape)
197 original.mask.set(0)
198 original.variance.set(1.0)
200 def checkExposure(original, doTempLocalBackground, doTempWideBackground):
201 """Check that the original exposure is unmodified."""
202 config = SourceDetectionTask.ConfigClass()
203 config.reEstimateBackground = False
204 config.thresholdType = "pixel_stdev"
205 config.doTempLocalBackground = doTempLocalBackground
206 config.doTempWideBackground = doTempWideBackground
207 schema = afwTable.SourceTable.makeMinimalSchema()
208 task = SourceDetectionTask(config=config, schema=schema)
210 exposure = original.clone()
211 task.detectFootprints(exposure, sigma=3.21)
213 self.assertFloatsEqual(exposure.image.array, original.image.array)
214 # Mask is permitted to vary: DETECTED bit gets set
215 self.assertFloatsEqual(exposure.variance.array, original.variance.array)
217 checkExposure(original, False, False)
218 checkExposure(original, True, False)
219 checkExposure(original, False, True)
220 checkExposure(original, True, True)
222 def test_removeBadPixels(self):
223 """Test that if we set a NO_DATA region on top of a source,
224 One fewer sources is detected than when we don't do that."""
225 badMaskPlanes = ["NO_DATA", ]
226 exposure_regular, schema, numX, numY, starSigma = self._create_exposure()
227 exposure_removed = exposure_regular.clone()
229 # Define a region on one test exposure we will set to NO_DATA, blocking out one source
230 region = lsst.geom.Box2I(exposure_removed.getXY0(), lsst.geom.Extent2I(25, 35))
231 exposure_removed[region].mask.array |= exposure_removed.mask.getPlaneBitMask(badMaskPlanes)
233 config = SourceDetectionTask.ConfigClass()
234 config.reEstimateBackground = False
235 config.thresholdType = "stdev"
236 config.excludeMaskPlanes = badMaskPlanes
237 task = SourceDetectionTask(config=config, schema=schema)
239 # The regular test exposure finds 25 sources
240 self._check_detectFootprints(exposure_regular, numX, numY, starSigma, task, config, doSmooth=True)
242 # Modify numx and numy, which check_detectFootprints multiplies, to yield 24 instead of 25
243 self.assertEqual(numX, numY)
244 self._check_detectFootprints(exposure_removed, numX-1, numY+1, starSigma, task, config, doSmooth=True)
246 def _create_exposure_bkg(self):
247 """Return a simulated exposure for reestimating background."""
249 bbox = lsst.geom.Box2I(lsst.geom.Point2I(0, 0), lsst.geom.Extent2I(1024, 1024))
250 kwid = 11
251 sky = 2000
253 coordList = [[10, 10, 0, 2.0]]
255 exposure = plantSources(
256 bbox=bbox,
257 kwid=kwid,
258 sky=sky,
259 coordList=coordList,
260 addPoissonNoise=True,
261 )
263 return exposure
265 def test_reEstimateBackground(self):
266 exp, schema, _, _, _ = self._create_exposure(bigBbox=True)
268 # Add in some sky.
269 sky = 2000.0
270 exp.image.array[:, :] += sky
272 # Subtract that sky using the background task.
273 bkgTask = SubtractBackgroundTask()
274 result = bkgTask.run(exp)
275 background = result.background
277 config = SourceDetectionTask.ConfigClass()
278 config.reEstimateBackground = True
279 task = SourceDetectionTask(config=config, schema=schema)
281 result = task.detectFootprints(exposure=exp, doSmooth=True, sigma=2.0, background=background)
283 # Check that an additional background has been added to the list.
284 self.assertEqual(len(background), 2)
286 backgroundImage = background.getImage()
287 self.assertFloatsAlmostEqual(np.mean(backgroundImage.array), sky, atol=0.1)
289 def test_reEstimateBackgroundWithFlatRatio(self):
290 exp, schema, _, _, _ = self._create_exposure(bigBbox=True)
292 # Add in some sky.
293 sky = 2000.0
294 exp.image.array[:, :] += sky
296 # Subtract that sky using the background task.
297 bkgConfig = SubtractBackgroundTask.ConfigClass()
298 bkgConfig.doApplyFlatBackgroundRatio = True
299 bkgTask = SubtractBackgroundTask(config=bkgConfig)
301 ratioImage = exp.image.clone()
302 ratioImage.array[:, :] = 2.0
304 result = bkgTask.run(exp, backgroundToPhotometricRatio=ratioImage)
305 background = result.background
307 config = SourceDetectionTask.ConfigClass()
308 config.reEstimateBackground = True
309 config.doApplyFlatBackgroundRatio = True
310 task = SourceDetectionTask(config=config, schema=schema)
312 result = task.detectFootprints(
313 exposure=exp,
314 doSmooth=True,
315 sigma=2.0,
316 background=background,
317 backgroundToPhotometricRatio=ratioImage,
318 )
320 # Check that an additional background has been added to the list.
321 self.assertEqual(len(background), 2)
323 backgroundImage = background.getImage()
324 self.assertFloatsAlmostEqual(np.mean(backgroundImage.array), sky * 2.0, atol=0.2)
327class TestMemory(lsst.utils.tests.MemoryTestCase):
328 pass
331def setup_module(module):
332 lsst.utils.tests.init()
335if __name__ == "__main__": 335 ↛ 336line 335 didn't jump to line 336 because the condition on line 335 was never true
336 lsst.utils.tests.init()
337 unittest.main()