Coverage for tests / test_SdssShape.py: 19%
183 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-30 08:55 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-30 08:55 +0000
1# This file is part of meas_base.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
22import unittest
24import numpy as np
26import lsst.geom
27import lsst.afw.geom
28import lsst.afw.table
29import lsst.meas.base
30import lsst.meas.base.tests
31import lsst.utils.tests
34class SdssShapeTestCase(lsst.meas.base.tests.AlgorithmTestCase, lsst.utils.tests.TestCase):
36 def setUp(self):
37 self.bbox = lsst.geom.Box2I(lsst.geom.Point2I(-20, -30),
38 lsst.geom.Extent2I(240, 160))
39 self.dataset = lsst.meas.base.tests.TestDataset(self.bbox)
40 # first source is a point
41 self.dataset.addSource(100000.0, lsst.geom.Point2D(50.1, 49.8))
42 # second source is extended
43 self.dataset.addSource(100000.0, lsst.geom.Point2D(149.9, 50.3),
44 lsst.afw.geom.Quadrupole(8, 9, 3))
45 self.config = self.makeSingleFrameMeasurementConfig("base_SdssShape")
47 def makeAlgorithm(self, ctrl=None):
48 """Construct an algorithm and return both it and its schema.
49 """
50 if ctrl is None:
51 ctrl = lsst.meas.base.SdssShapeControl()
52 schema = lsst.meas.base.tests.TestDataset.makeMinimalSchema()
53 algorithm = lsst.meas.base.SdssShapeAlgorithm(ctrl, "base_SdssShape", schema)
54 return algorithm, schema
56 def assertFinite(self, value):
57 self.assertTrue(np.isfinite(value), msg="%s is not finite" % (value,))
59 def _runMeasurementTask(self):
60 task = self.makeSingleFrameMeasurementTask("base_SdssShape", config=self.config)
61 exposure, catalog = self.dataset.realize(10.0, task.schema, randomSeed=0)
62 task.run(catalog, exposure)
63 return exposure, catalog
65 def _checkShape(self, result, record):
66 self.assertFloatsAlmostEqual(result.x, record.get("truth_x"), rtol=1E-2)
67 self.assertFloatsAlmostEqual(result.y, record.get("truth_y"), rtol=1E-2)
68 self.assertFloatsAlmostEqual(result.xx, record.get("truth_xx"), rtol=1E-2)
69 self.assertFloatsAlmostEqual(result.yy, record.get("truth_yy"), rtol=1E-2)
70 self.assertFloatsAlmostEqual(result.xy, record.get("truth_xy"), rtol=1E-1, atol=2E-1)
71 self.assertFinite(result.xxErr)
72 self.assertFinite(result.yyErr)
73 self.assertFinite(result.xyErr)
74 self.assertFinite(result.instFlux_xx_Cov)
75 self.assertFinite(result.instFlux_yy_Cov)
76 self.assertFinite(result.instFlux_xy_Cov)
77 self.assertFinite(result.xx_yy_Cov)
78 self.assertFinite(result.xx_xy_Cov)
79 self.assertFinite(result.yy_xy_Cov)
80 self.assertFalse(result.getFlag(lsst.meas.base.SdssShapeAlgorithm.FAILURE.number))
81 self.assertFalse(result.getFlag(lsst.meas.base.SdssShapeAlgorithm.UNWEIGHTED_BAD.number))
82 self.assertFalse(result.getFlag(lsst.meas.base.SdssShapeAlgorithm.UNWEIGHTED.number))
83 self.assertFalse(result.getFlag(lsst.meas.base.SdssShapeAlgorithm.SHIFT.number))
84 self.assertFalse(result.getFlag(lsst.meas.base.SdssShapeAlgorithm.MAXITER.number))
86 def _checkPsfShape(self, result, psfResult, psfTruth):
87 self.assertFloatsAlmostEqual(psfResult.getIxx(), psfTruth.getIxx(), rtol=1E-4)
88 self.assertFloatsAlmostEqual(psfResult.getIyy(), psfTruth.getIyy(), rtol=1E-4)
89 self.assertFloatsAlmostEqual(psfResult.getIxy(), psfTruth.getIxy(), rtol=1E-4)
90 self.assertFalse(result.getFlag(lsst.meas.base.SdssShapeAlgorithm.PSF_SHAPE_BAD.number))
92 def testMeasureGoodPsf(self):
93 """Test that we measure shapes and record the PSF shape correctly
95 Note: Given that the PSF model here is constant over the entire image, this test
96 would not catch an error in the potition at which base_SdssShape_psf is computed.
97 Such a test requires a spatially varying PSF model such that different locations
98 can be distinguished by their different PSF model shapes. Such a test exists in
99 meas_algorithms (tests/testSdssShapePsf.py), making use of the PcaPsf algorithm
100 to build the spatially varying PSF.
101 """
102 exposure, catalog = self._runMeasurementTask()
103 key = lsst.meas.base.SdssShapeResultKey(catalog.schema["base_SdssShape"])
104 psfTruth = exposure.getPsf().computeShape(catalog[0].getCentroid())
105 for record in catalog:
106 result = record.get(key)
107 self._checkShape(result, record)
108 psfResult = key.getPsfShape(record)
109 self._checkPsfShape(result, psfResult, psfTruth)
111 def testMeasureWithoutPsf(self):
112 """Test that we measure shapes correctly and do not record the PSF shape when not needed."""
113 self.config.plugins["base_SdssShape"].doMeasurePsf = False
114 _, catalog = self._runMeasurementTask()
115 key = lsst.meas.base.SdssShapeResultKey(catalog.schema["base_SdssShape"])
116 for record in catalog:
117 result = record.get(key)
118 self._checkShape(result, record)
119 self.assertNotIn("base_SdssShape_psf_xx", catalog.schema)
120 self.assertNotIn("base_SdssShape_psf_yy", catalog.schema)
121 self.assertNotIn("base_SdssShape_psf_xy", catalog.schema)
122 self.assertNotIn("base_SdssShape_flag_psf", catalog.schema)
124 def testMeasureBadPsf(self):
125 """Test that we measure shapes correctly and set a flag with the PSF is unavailable."""
126 self.config.plugins["base_SdssShape"].doMeasurePsf = True
127 task = self.makeSingleFrameMeasurementTask("base_SdssShape", config=self.config)
128 exposure, catalog = self.dataset.realize(10.0, task.schema, randomSeed=1)
129 exposure.setPsf(None) # Set PSF to None to test no PSF case
130 task.run(catalog, exposure)
131 key = lsst.meas.base.SdssShapeResultKey(catalog.schema["base_SdssShape"])
132 for record in catalog:
133 result = record.get(key)
134 self._checkShape(result, record)
135 self.assertTrue(result.getFlag(lsst.meas.base.SdssShapeAlgorithm.PSF_SHAPE_BAD.number))
137 def testMonteCarlo(self):
138 """Test an ideal simulation, with deterministic noise.
140 Demonstrate that:
142 - We get the right answer, and
143 - The reported uncertainty agrees with a Monte Carlo test of the noise.
144 """
145 algorithm, schema = self.makeAlgorithm()
146 # Results are RNG dependent; we choose a seed that is known to pass.
147 exposure, cat = self.dataset.realize(0.0, schema, randomSeed=3)
148 record = cat[1]
149 instFlux = record["truth_instFlux"]
150 algorithm.measure(record, exposure)
151 for suffix in ["xx", "yy", "xy"]:
152 self.assertFloatsAlmostEqual(record.get("truth_"+suffix),
153 record.get("base_SdssShape_"+suffix), rtol=1E-4)
155 for noise in (0.0001, 0.001,):
156 nSamples = 1000
157 catalog = lsst.afw.table.SourceCatalog(cat.schema)
158 for i in range(nSamples):
159 # By using ``i`` to seed the RNG, we get results which
160 # fall within the tolerances defined below. If we allow this
161 # test to be truly random, passing becomes RNG-dependent.
162 exposure, cat = self.dataset.realize(noise*instFlux, schema, randomSeed=i)
163 record = cat[1]
164 algorithm.measure(record, exposure)
165 catalog.append(record)
167 catalog = catalog.copy(deep=True)
168 for suffix in ["xx", "yy", "xy"]:
169 shapeMean = np.mean(catalog["base_SdssShape_"+suffix])
170 shapeErrMean = np.nanmean(catalog["base_SdssShape_"+suffix+"Err"])
171 shapeInterval68 = 0.5*(np.nanpercentile(catalog["base_SdssShape_"+suffix], 84)
172 - np.nanpercentile(catalog["base_SdssShape_"+suffix], 16))
173 self.assertFloatsAlmostEqual(np.nanstd(catalog["base_SdssShape_"+suffix]),
174 shapeInterval68, rtol=0.03)
175 self.assertFloatsAlmostEqual(shapeErrMean, shapeInterval68, rtol=0.03)
176 self.assertLess(abs(shapeMean - record.get("truth_"+suffix)), 2.0*shapeErrMean/nSamples**0.5)
178 def testNegative(self):
179 """Test of shape measurement on negative sources.
180 """
181 config = self.makeSingleFrameMeasurementConfig("base_SdssShape", ["base_SdssCentroid"])
182 config.slots.centroid = "base_SdssCentroid"
183 task = self.makeSingleFrameMeasurementTask("base_SdssShape", dependencies=["base_SdssCentroid"])
184 # Ensure we're testing with the correct slots.
185 task.schema.getAliasMap().set("slot_Centroid", "base_SdssCentroid")
186 task.schema.getAliasMap().set("slot_Shape", "base_SdssShape")
188 dataset = lsst.meas.base.tests.TestDataset(self.bbox)
189 dataset.addSource(-10000.0, lsst.geom.Point2D(50., 50.), negative=True)
190 dataset.addSource(-100000.0, lsst.geom.Point2D(149.9, 50.3),
191 lsst.afw.geom.Quadrupole(8, 9, 3),
192 negative=True)
194 exposure, catalog = dataset.realize(10.0, task.schema, randomSeed=4)
196 task.run(catalog, exposure)
198 # both the point and elliptical source should have valid measurements
199 for record in catalog:
200 self.assertFalse(record.get("base_SdssShape_flag"))
201 self.assertFalse(record.get("base_SdssShape_flag_maxIter"))
202 self.assertFalse(record.get("base_SdssShape_flag_psf"))
203 self.assertFalse(record.get("base_SdssShape_flag_unweighted"))
204 self.assertFalse(record.get("base_SdssShape_flag_unweightedBad"))
205 self.assertFloatsAlmostEqual(record.get("base_SdssShape_xx"), record.get("truth_xx"), rtol=0.03)
206 self.assertFloatsAlmostEqual(record.get("base_SdssShape_yy"), record.get("truth_yy"), rtol=0.03)
207 # correct value == 0, so need atol instead of rtol
208 self.assertFloatsAlmostEqual(record.get("base_SdssShape_xy"), record.get("truth_xy"),
209 atol=0.04, rtol=None)
210 # There should be valid errors; we're not checking the exact uncertainty calculation here.
211 self.assertFloatsAlmostEqual(record.get("base_SdssShape_xxErr"), 0.05, rtol=0.5)
212 self.assertFloatsAlmostEqual(record.get("base_SdssShape_yyErr"), 0.05, rtol=0.5)
213 self.assertFloatsAlmostEqual(record.get("base_SdssShape_xyErr"), 0.05, rtol=0.5)
216class SdssShapeTransformTestCase(lsst.meas.base.tests.FluxTransformTestCase,
217 lsst.meas.base.tests.CentroidTransformTestCase,
218 lsst.meas.base.tests.SingleFramePluginTransformSetupHelper,
219 lsst.utils.tests.TestCase):
221 name = "sdssShape"
222 controlClass = lsst.meas.base.SdssShapeControl
223 algorithmClass = lsst.meas.base.SdssShapeAlgorithm
224 transformClass = lsst.meas.base.SdssShapeTransform
225 flagNames = ("flag", "flag_unweighted", "flag_unweightedBad", "flag_shift", "flag_maxIter")
226 singleFramePlugins = ('base_SdssShape',)
227 forcedPlugins = ('base_SdssShape',)
228 testPsf = True
230 def _setFieldsInRecords(self, records, name):
231 lsst.meas.base.tests.FluxTransformTestCase._setFieldsInRecords(self, records, name)
232 lsst.meas.base.tests.CentroidTransformTestCase._setFieldsInRecords(self, records, name)
233 for record in records:
234 for field in ('xx', 'yy', 'xy', 'xxErr', 'yyErr', 'xyErr', 'psf_xx', 'psf_yy', 'psf_xy'):
235 if record.schema.join(name, field) in record.schema:
236 record[record.schema.join(name, field)] = np.random.random()
238 def _compareFieldsInRecords(self, inSrc, outSrc, name):
239 lsst.meas.base.tests.FluxTransformTestCase._compareFieldsInRecords(self, inSrc, outSrc, name)
240 lsst.meas.base.tests.CentroidTransformTestCase._compareFieldsInRecords(self, inSrc, outSrc, name)
242 inShape = lsst.meas.base.ShapeResultKey(inSrc.schema[name]).get(inSrc)
243 outShape = lsst.meas.base.ShapeResultKey(outSrc.schema[name]).get(outSrc)
245 centroid = lsst.meas.base.CentroidResultKey(inSrc.schema[name]).get(inSrc).getCentroid()
246 xform = self.calexp.getWcs().linearizePixelToSky(centroid, lsst.geom.radians)
248 trInShape = inShape.getShape().transform(xform.getLinear())
249 self.assertEqual(trInShape.getIxx(), outShape.getShape().getIxx())
250 self.assertEqual(trInShape.getIyy(), outShape.getShape().getIyy())
251 self.assertEqual(trInShape.getIxy(), outShape.getShape().getIxy())
253 m = lsst.meas.base.makeShapeTransformMatrix(xform.getLinear())
254 np.testing.assert_array_almost_equal(
255 np.dot(np.dot(m, inShape.getShapeErr()), m.transpose()), outShape.getShapeErr()
256 )
257 if self.testPsf:
258 inPsfShape = lsst.meas.base.ShapeResultKey(
259 inSrc.schema[inSrc.schema.join(name, "psf")]
260 ).get(inSrc)
261 outPsfShape = lsst.meas.base.ShapeResultKey(
262 outSrc.schema[outSrc.schema.join(name, "psf")]
263 ).get(outSrc)
264 trInPsfShape = inPsfShape.getShape().transform(xform.getLinear())
266 self.assertEqual(trInPsfShape.getIxx(), outPsfShape.getShape().getIxx())
267 self.assertEqual(trInPsfShape.getIyy(), outPsfShape.getShape().getIyy())
268 self.assertEqual(trInPsfShape.getIxy(), outPsfShape.getShape().getIxy())
269 else:
270 self.assertNotIn(inSrc.schema.join(name, "psf", "xx"), inSrc.schema)
271 self.assertNotIn(inSrc.schema.join(name, "flag", "psf"), inSrc.schema)
274class PsfSdssShapeTransformTestCase(SdssShapeTransformTestCase, lsst.utils.tests.TestCase):
275 testPsf = False
277 def makeSdssShapeControl(self):
278 ctrl = lsst.meas.base.SdssShapeControl()
279 ctrl.doMeasurePsf = False
280 return ctrl
281 controlClass = makeSdssShapeControl
284class TestMemory(lsst.utils.tests.MemoryTestCase):
285 pass
288def setup_module(module):
289 lsst.utils.tests.init()
292if __name__ == "__main__": 292 ↛ 293line 292 didn't jump to line 293 because the condition on line 292 was never true
293 lsst.utils.tests.init()
294 unittest.main()