Coverage for tests / test_SdssShape.py: 19%

183 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-24 08:22 +0000

1# This file is part of meas_base. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21 

22import unittest 

23 

24import numpy as np 

25 

26import lsst.geom 

27import lsst.afw.geom 

28import lsst.afw.table 

29import lsst.meas.base 

30import lsst.meas.base.tests 

31import lsst.utils.tests 

32 

33 

34class SdssShapeTestCase(lsst.meas.base.tests.AlgorithmTestCase, lsst.utils.tests.TestCase): 

35 

36 def setUp(self): 

37 self.bbox = lsst.geom.Box2I(lsst.geom.Point2I(-20, -30), 

38 lsst.geom.Extent2I(240, 160)) 

39 self.dataset = lsst.meas.base.tests.TestDataset(self.bbox) 

40 # first source is a point 

41 self.dataset.addSource(100000.0, lsst.geom.Point2D(50.1, 49.8)) 

42 # second source is extended 

43 self.dataset.addSource(100000.0, lsst.geom.Point2D(149.9, 50.3), 

44 lsst.afw.geom.Quadrupole(8, 9, 3)) 

45 self.config = self.makeSingleFrameMeasurementConfig("base_SdssShape") 

46 

47 def makeAlgorithm(self, ctrl=None): 

48 """Construct an algorithm and return both it and its schema. 

49 """ 

50 if ctrl is None: 

51 ctrl = lsst.meas.base.SdssShapeControl() 

52 schema = lsst.meas.base.tests.TestDataset.makeMinimalSchema() 

53 algorithm = lsst.meas.base.SdssShapeAlgorithm(ctrl, "base_SdssShape", schema) 

54 return algorithm, schema 

55 

56 def assertFinite(self, value): 

57 self.assertTrue(np.isfinite(value), msg="%s is not finite" % (value,)) 

58 

59 def _runMeasurementTask(self): 

60 task = self.makeSingleFrameMeasurementTask("base_SdssShape", config=self.config) 

61 exposure, catalog = self.dataset.realize(10.0, task.schema, randomSeed=0) 

62 task.run(catalog, exposure) 

63 return exposure, catalog 

64 

65 def _checkShape(self, result, record): 

66 self.assertFloatsAlmostEqual(result.x, record.get("truth_x"), rtol=1E-2) 

67 self.assertFloatsAlmostEqual(result.y, record.get("truth_y"), rtol=1E-2) 

68 self.assertFloatsAlmostEqual(result.xx, record.get("truth_xx"), rtol=1E-2) 

69 self.assertFloatsAlmostEqual(result.yy, record.get("truth_yy"), rtol=1E-2) 

70 self.assertFloatsAlmostEqual(result.xy, record.get("truth_xy"), rtol=1E-1, atol=2E-1) 

71 self.assertFinite(result.xxErr) 

72 self.assertFinite(result.yyErr) 

73 self.assertFinite(result.xyErr) 

74 self.assertFinite(result.instFlux_xx_Cov) 

75 self.assertFinite(result.instFlux_yy_Cov) 

76 self.assertFinite(result.instFlux_xy_Cov) 

77 self.assertFinite(result.xx_yy_Cov) 

78 self.assertFinite(result.xx_xy_Cov) 

79 self.assertFinite(result.yy_xy_Cov) 

80 self.assertFalse(result.getFlag(lsst.meas.base.SdssShapeAlgorithm.FAILURE.number)) 

81 self.assertFalse(result.getFlag(lsst.meas.base.SdssShapeAlgorithm.UNWEIGHTED_BAD.number)) 

82 self.assertFalse(result.getFlag(lsst.meas.base.SdssShapeAlgorithm.UNWEIGHTED.number)) 

83 self.assertFalse(result.getFlag(lsst.meas.base.SdssShapeAlgorithm.SHIFT.number)) 

84 self.assertFalse(result.getFlag(lsst.meas.base.SdssShapeAlgorithm.MAXITER.number)) 

85 

86 def _checkPsfShape(self, result, psfResult, psfTruth): 

87 self.assertFloatsAlmostEqual(psfResult.getIxx(), psfTruth.getIxx(), rtol=1E-4) 

88 self.assertFloatsAlmostEqual(psfResult.getIyy(), psfTruth.getIyy(), rtol=1E-4) 

89 self.assertFloatsAlmostEqual(psfResult.getIxy(), psfTruth.getIxy(), rtol=1E-4) 

90 self.assertFalse(result.getFlag(lsst.meas.base.SdssShapeAlgorithm.PSF_SHAPE_BAD.number)) 

91 

92 def testMeasureGoodPsf(self): 

93 """Test that we measure shapes and record the PSF shape correctly 

94 

95 Note: Given that the PSF model here is constant over the entire image, this test 

96 would not catch an error in the potition at which base_SdssShape_psf is computed. 

97 Such a test requires a spatially varying PSF model such that different locations 

98 can be distinguished by their different PSF model shapes. Such a test exists in 

99 meas_algorithms (tests/testSdssShapePsf.py), making use of the PcaPsf algorithm 

100 to build the spatially varying PSF. 

101 """ 

102 exposure, catalog = self._runMeasurementTask() 

103 key = lsst.meas.base.SdssShapeResultKey(catalog.schema["base_SdssShape"]) 

104 psfTruth = exposure.getPsf().computeShape(catalog[0].getCentroid()) 

105 for record in catalog: 

106 result = record.get(key) 

107 self._checkShape(result, record) 

108 psfResult = key.getPsfShape(record) 

109 self._checkPsfShape(result, psfResult, psfTruth) 

110 

111 def testMeasureWithoutPsf(self): 

112 """Test that we measure shapes correctly and do not record the PSF shape when not needed.""" 

113 self.config.plugins["base_SdssShape"].doMeasurePsf = False 

114 _, catalog = self._runMeasurementTask() 

115 key = lsst.meas.base.SdssShapeResultKey(catalog.schema["base_SdssShape"]) 

116 for record in catalog: 

117 result = record.get(key) 

118 self._checkShape(result, record) 

119 self.assertNotIn("base_SdssShape_psf_xx", catalog.schema) 

120 self.assertNotIn("base_SdssShape_psf_yy", catalog.schema) 

121 self.assertNotIn("base_SdssShape_psf_xy", catalog.schema) 

122 self.assertNotIn("base_SdssShape_flag_psf", catalog.schema) 

123 

124 def testMeasureBadPsf(self): 

125 """Test that we measure shapes correctly and set a flag with the PSF is unavailable.""" 

126 self.config.plugins["base_SdssShape"].doMeasurePsf = True 

127 task = self.makeSingleFrameMeasurementTask("base_SdssShape", config=self.config) 

128 exposure, catalog = self.dataset.realize(10.0, task.schema, randomSeed=1) 

129 exposure.setPsf(None) # Set PSF to None to test no PSF case 

130 task.run(catalog, exposure) 

131 key = lsst.meas.base.SdssShapeResultKey(catalog.schema["base_SdssShape"]) 

132 for record in catalog: 

133 result = record.get(key) 

134 self._checkShape(result, record) 

135 self.assertTrue(result.getFlag(lsst.meas.base.SdssShapeAlgorithm.PSF_SHAPE_BAD.number)) 

136 

137 def testMonteCarlo(self): 

138 """Test an ideal simulation, with deterministic noise. 

139 

140 Demonstrate that: 

141 

142 - We get the right answer, and 

143 - The reported uncertainty agrees with a Monte Carlo test of the noise. 

144 """ 

145 algorithm, schema = self.makeAlgorithm() 

146 # Results are RNG dependent; we choose a seed that is known to pass. 

147 exposure, cat = self.dataset.realize(0.0, schema, randomSeed=3) 

148 record = cat[1] 

149 instFlux = record["truth_instFlux"] 

150 algorithm.measure(record, exposure) 

151 for suffix in ["xx", "yy", "xy"]: 

152 self.assertFloatsAlmostEqual(record.get("truth_"+suffix), 

153 record.get("base_SdssShape_"+suffix), rtol=1E-4) 

154 

155 for noise in (0.0001, 0.001,): 

156 nSamples = 1000 

157 catalog = lsst.afw.table.SourceCatalog(cat.schema) 

158 for i in range(nSamples): 

159 # By using ``i`` to seed the RNG, we get results which 

160 # fall within the tolerances defined below. If we allow this 

161 # test to be truly random, passing becomes RNG-dependent. 

162 exposure, cat = self.dataset.realize(noise*instFlux, schema, randomSeed=i) 

163 record = cat[1] 

164 algorithm.measure(record, exposure) 

165 catalog.append(record) 

166 

167 catalog = catalog.copy(deep=True) 

168 for suffix in ["xx", "yy", "xy"]: 

169 shapeMean = np.mean(catalog["base_SdssShape_"+suffix]) 

170 shapeErrMean = np.nanmean(catalog["base_SdssShape_"+suffix+"Err"]) 

171 shapeInterval68 = 0.5*(np.nanpercentile(catalog["base_SdssShape_"+suffix], 84) 

172 - np.nanpercentile(catalog["base_SdssShape_"+suffix], 16)) 

173 self.assertFloatsAlmostEqual(np.nanstd(catalog["base_SdssShape_"+suffix]), 

174 shapeInterval68, rtol=0.03) 

175 self.assertFloatsAlmostEqual(shapeErrMean, shapeInterval68, rtol=0.03) 

176 self.assertLess(abs(shapeMean - record.get("truth_"+suffix)), 2.0*shapeErrMean/nSamples**0.5) 

177 

178 def testNegative(self): 

179 """Test of shape measurement on negative sources. 

180 """ 

181 config = self.makeSingleFrameMeasurementConfig("base_SdssShape", ["base_SdssCentroid"]) 

182 config.slots.centroid = "base_SdssCentroid" 

183 task = self.makeSingleFrameMeasurementTask("base_SdssShape", dependencies=["base_SdssCentroid"]) 

184 # Ensure we're testing with the correct slots. 

185 task.schema.getAliasMap().set("slot_Centroid", "base_SdssCentroid") 

186 task.schema.getAliasMap().set("slot_Shape", "base_SdssShape") 

187 

188 dataset = lsst.meas.base.tests.TestDataset(self.bbox) 

189 dataset.addSource(-10000.0, lsst.geom.Point2D(50., 50.), negative=True) 

190 dataset.addSource(-100000.0, lsst.geom.Point2D(149.9, 50.3), 

191 lsst.afw.geom.Quadrupole(8, 9, 3), 

192 negative=True) 

193 

194 exposure, catalog = dataset.realize(10.0, task.schema, randomSeed=4) 

195 

196 task.run(catalog, exposure) 

197 

198 # both the point and elliptical source should have valid measurements 

199 for record in catalog: 

200 self.assertFalse(record.get("base_SdssShape_flag")) 

201 self.assertFalse(record.get("base_SdssShape_flag_maxIter")) 

202 self.assertFalse(record.get("base_SdssShape_flag_psf")) 

203 self.assertFalse(record.get("base_SdssShape_flag_unweighted")) 

204 self.assertFalse(record.get("base_SdssShape_flag_unweightedBad")) 

205 self.assertFloatsAlmostEqual(record.get("base_SdssShape_xx"), record.get("truth_xx"), rtol=0.03) 

206 self.assertFloatsAlmostEqual(record.get("base_SdssShape_yy"), record.get("truth_yy"), rtol=0.03) 

207 # correct value == 0, so need atol instead of rtol 

208 self.assertFloatsAlmostEqual(record.get("base_SdssShape_xy"), record.get("truth_xy"), 

209 atol=0.04, rtol=None) 

210 # There should be valid errors; we're not checking the exact uncertainty calculation here. 

211 self.assertFloatsAlmostEqual(record.get("base_SdssShape_xxErr"), 0.05, rtol=0.5) 

212 self.assertFloatsAlmostEqual(record.get("base_SdssShape_yyErr"), 0.05, rtol=0.5) 

213 self.assertFloatsAlmostEqual(record.get("base_SdssShape_xyErr"), 0.05, rtol=0.5) 

214 

215 

216class SdssShapeTransformTestCase(lsst.meas.base.tests.FluxTransformTestCase, 

217 lsst.meas.base.tests.CentroidTransformTestCase, 

218 lsst.meas.base.tests.SingleFramePluginTransformSetupHelper, 

219 lsst.utils.tests.TestCase): 

220 

221 name = "sdssShape" 

222 controlClass = lsst.meas.base.SdssShapeControl 

223 algorithmClass = lsst.meas.base.SdssShapeAlgorithm 

224 transformClass = lsst.meas.base.SdssShapeTransform 

225 flagNames = ("flag", "flag_unweighted", "flag_unweightedBad", "flag_shift", "flag_maxIter") 

226 singleFramePlugins = ('base_SdssShape',) 

227 forcedPlugins = ('base_SdssShape',) 

228 testPsf = True 

229 

230 def _setFieldsInRecords(self, records, name): 

231 lsst.meas.base.tests.FluxTransformTestCase._setFieldsInRecords(self, records, name) 

232 lsst.meas.base.tests.CentroidTransformTestCase._setFieldsInRecords(self, records, name) 

233 for record in records: 

234 for field in ('xx', 'yy', 'xy', 'xxErr', 'yyErr', 'xyErr', 'psf_xx', 'psf_yy', 'psf_xy'): 

235 if record.schema.join(name, field) in record.schema: 

236 record[record.schema.join(name, field)] = np.random.random() 

237 

238 def _compareFieldsInRecords(self, inSrc, outSrc, name): 

239 lsst.meas.base.tests.FluxTransformTestCase._compareFieldsInRecords(self, inSrc, outSrc, name) 

240 lsst.meas.base.tests.CentroidTransformTestCase._compareFieldsInRecords(self, inSrc, outSrc, name) 

241 

242 inShape = lsst.meas.base.ShapeResultKey(inSrc.schema[name]).get(inSrc) 

243 outShape = lsst.meas.base.ShapeResultKey(outSrc.schema[name]).get(outSrc) 

244 

245 centroid = lsst.meas.base.CentroidResultKey(inSrc.schema[name]).get(inSrc).getCentroid() 

246 xform = self.calexp.getWcs().linearizePixelToSky(centroid, lsst.geom.radians) 

247 

248 trInShape = inShape.getShape().transform(xform.getLinear()) 

249 self.assertEqual(trInShape.getIxx(), outShape.getShape().getIxx()) 

250 self.assertEqual(trInShape.getIyy(), outShape.getShape().getIyy()) 

251 self.assertEqual(trInShape.getIxy(), outShape.getShape().getIxy()) 

252 

253 m = lsst.meas.base.makeShapeTransformMatrix(xform.getLinear()) 

254 np.testing.assert_array_almost_equal( 

255 np.dot(np.dot(m, inShape.getShapeErr()), m.transpose()), outShape.getShapeErr() 

256 ) 

257 if self.testPsf: 

258 inPsfShape = lsst.meas.base.ShapeResultKey( 

259 inSrc.schema[inSrc.schema.join(name, "psf")] 

260 ).get(inSrc) 

261 outPsfShape = lsst.meas.base.ShapeResultKey( 

262 outSrc.schema[outSrc.schema.join(name, "psf")] 

263 ).get(outSrc) 

264 trInPsfShape = inPsfShape.getShape().transform(xform.getLinear()) 

265 

266 self.assertEqual(trInPsfShape.getIxx(), outPsfShape.getShape().getIxx()) 

267 self.assertEqual(trInPsfShape.getIyy(), outPsfShape.getShape().getIyy()) 

268 self.assertEqual(trInPsfShape.getIxy(), outPsfShape.getShape().getIxy()) 

269 else: 

270 self.assertNotIn(inSrc.schema.join(name, "psf", "xx"), inSrc.schema) 

271 self.assertNotIn(inSrc.schema.join(name, "flag", "psf"), inSrc.schema) 

272 

273 

274class PsfSdssShapeTransformTestCase(SdssShapeTransformTestCase, lsst.utils.tests.TestCase): 

275 testPsf = False 

276 

277 def makeSdssShapeControl(self): 

278 ctrl = lsst.meas.base.SdssShapeControl() 

279 ctrl.doMeasurePsf = False 

280 return ctrl 

281 controlClass = makeSdssShapeControl 

282 

283 

284class TestMemory(lsst.utils.tests.MemoryTestCase): 

285 pass 

286 

287 

288def setup_module(module): 

289 lsst.utils.tests.init() 

290 

291 

292if __name__ == "__main__": 292 ↛ 293line 292 didn't jump to line 293 because the condition on line 292 was never true

293 lsst.utils.tests.init() 

294 unittest.main()