Coverage for tests / test_trailedSources.py: 21%

225 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-24 08:26 +0000

1# 

2# This file is part of meas_extensions_trailedSources. 

3# 

4# Developed for the LSST Data Management System. 

5# This product includes software developed by the LSST Project 

6# (http://www.lsst.org). 

7# See the COPYRIGHT file at the top-level directory of this distribution 

8# for details of code ownership. 

9# 

10# This program is free software: you can redistribute it and/or modify 

11# it under the terms of the GNU General Public License as published by 

12# the Free Software Foundation, either version 3 of the License, or 

13# (at your option) any later version. 

14# 

15# This program is distributed in the hope that it will be useful, 

16# but WITHOUT ANY WARRANTY; without even the implied warranty of 

17# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

18# GNU General Public License for more details. 

19# 

20# You should have received a copy of the GNU General Public License 

21# along with this program. If not, see <http://www.gnu.org/licenses/>. 

22# 

23 

24import numpy as np 

25import unittest 

26import lsst.utils.tests 

27import lsst.meas.extensions.trailedSources 

28from scipy.optimize import check_grad 

29import lsst.afw.table as afwTable 

30from lsst.meas.base.tests import AlgorithmTestCase 

31from lsst.meas.extensions.trailedSources import SingleFrameNaiveTrailPlugin 

32from lsst.meas.extensions.trailedSources import VeresModel 

33from lsst.meas.extensions.trailedSources.utils import getMeasurementCutout 

34from lsst.utils.tests import classParameters 

35 

36 

37# Trailed-source length, angle, and centroid. 

38rng = np.random.default_rng(432) 

39nTrails = 50 

40Ls = rng.uniform(2, 20, nTrails) 

41thetas = rng.uniform(0, 2*np.pi, nTrails) 

42xcs = rng.uniform(0, 100, nTrails) 

43ycs = rng.uniform(0, 100, nTrails) 

44 

45 

46class TrailedSource: 

47 """Holds a set of true trail parameters. 

48 """ 

49 

50 def __init__(self, instFlux, length, angle, xc, yc): 

51 self.instFlux = instFlux 

52 self.length = length 

53 self.angle = angle 

54 self.center = lsst.geom.Point2D(xc, yc) 

55 self.x0 = xc - length/2 * np.cos(angle) 

56 self.y0 = yc - length/2 * np.sin(angle) 

57 self.x1 = xc + length/2 * np.cos(angle) 

58 self.y1 = yc + length/2 * np.sin(angle) 

59 

60 

61# "Extend" meas.base.tests.TestDataset 

62class TrailedTestDataset(lsst.meas.base.tests.TestDataset): 

63 """A dataset for testing trailed source measurements. 

64 Given a `TrailedSource`, construct a record of the true values and an 

65 Exposure. 

66 """ 

67 

68 def __init__(self, bbox, threshold=10.0, exposure=None, **kwds): 

69 super().__init__(bbox, threshold, exposure, **kwds) 

70 

71 def addTrailedSource(self, trail): 

72 """Add a trailed source to the simulation. 

73 'Re-implemented' version of 

74 `lsst.meas.base.tests.TestDataset.addSource`. Numerically integrates a 

75 Gaussian PSF over a line to obtain am image of a trailed source. 

76 """ 

77 

78 record = self.catalog.addNew() 

79 record.set(self.keys["centroid"], trail.center) 

80 rng = np.random.default_rng(32) 

81 covariance = rng.normal(0, 0.1, 4).reshape(2, 2) 

82 covariance[0, 1] = covariance[1, 0] 

83 record.set(self.keys["centroid_sigma"], covariance.astype(np.float32)) 

84 record.set(self.keys["shape"], self.psfShape) 

85 record.set(self.keys["isStar"], False) 

86 

87 # Sum the psf at each 

88 numIter = int(10*trail.length) 

89 xp = np.linspace(trail.x0, trail.x1, num=numIter) 

90 yp = np.linspace(trail.y0, trail.y1, num=numIter) 

91 for (x, y) in zip(xp, yp): 

92 pt = lsst.geom.Point2D(x, y) 

93 im = self.drawGaussian(self.exposure.getBBox(), trail.instFlux, 

94 lsst.afw.geom.Ellipse(self.psfShape, pt)) 

95 self.exposure.getMaskedImage().getImage().getArray()[:, :] += im.getArray() 

96 

97 totFlux = self.exposure.image.array.sum() 

98 self.exposure.image.array /= totFlux 

99 self.exposure.image.array *= trail.instFlux 

100 

101 record.set(self.keys["instFlux"], trail.instFlux) 

102 self._installFootprint(record, self.exposure.getImage()) 

103 

104 return record, self.exposure.getImage() 

105 

106 

107class TrailedSourcesFailuresTestCase(AlgorithmTestCase, lsst.utils.tests.TestCase): 

108 """Tests of various failure modes of the trailed source plugin. 

109 """ 

110 def setUp(self): 

111 self.center = lsst.geom.Point2D(50.1, 49.8) 

112 self.bbox = lsst.geom.Box2I(lsst.geom.Point2I(-20, -30), 

113 lsst.geom.Extent2I(140, 160)) 

114 self.dataset = TrailedTestDataset(self.bbox) 

115 self.trail = TrailedSource(100000.0, 20, .5, 40, 40) 

116 self.dataset.addTrailedSource(self.trail) 

117 # So that the tests are consistent about output field names. 

118 self.name = "trailedSource" 

119 

120 def _setupPlugin(self, plugins=None): 

121 """Return a prepared trailed source plugin, catalog, and exposure for 

122 use with mocked failures. 

123 """ 

124 schema = TrailedTestDataset.makeMinimalSchema() 

125 dependencies = ["base_SdssCentroid"] 

126 dependencies += plugins if plugins is not None else [] 

127 config = self.makeSingleFrameMeasurementConfig(plugin="base_SdssShape", 

128 dependencies=dependencies) 

129 config.slots.shape = "base_SdssShape" 

130 config.slots.centroid = "base_SdssCentroid" 

131 trailedPlugin = SingleFrameNaiveTrailPlugin(SingleFrameNaiveTrailPlugin.ConfigClass(), 

132 self.name, 

133 schema, 

134 None) 

135 task = self.makeSingleFrameMeasurementTask( 

136 plugin="base_SdssShape", 

137 dependencies=dependencies, 

138 config=config, 

139 schema=schema 

140 ) 

141 exposure, catalog = self.dataset.realize(10.0, task.schema, randomSeed=0) 

142 task.run(catalog, exposure) 

143 return trailedPlugin, catalog, exposure 

144 

145 def testShapeFlag(self): 

146 """Test that the correct trailed source flags get set if shape_flag 

147 is set. 

148 """ 

149 trailedPlugin, catalog, exposure = self._setupPlugin() 

150 catalog["base_SdssShape_flag"] = True 

151 trailedPlugin.measure(catalog[0], exposure) 

152 self.assertTrue(catalog[0][f"{self.name}_flag"]) 

153 self.assertTrue(catalog[0][f"{self.name}_flag_shape"]) 

154 

155 

156# Following from meas_base/test_NaiveCentroid.py 

157# Taken from NaiveCentroidTestCase 

158@classParameters(length=Ls, theta=thetas, xc=xcs, yc=ycs) 

159class TrailedSourcesTestCase(AlgorithmTestCase, lsst.utils.tests.TestCase): 

160 

161 def setUp(self): 

162 self.center = lsst.geom.Point2D(50.1, 49.8) 

163 self.bbox = lsst.geom.Box2I(lsst.geom.Point2I(-20, -30), 

164 lsst.geom.Extent2I(140, 160)) 

165 self.dataset = TrailedTestDataset(self.bbox) 

166 

167 self.trail = TrailedSource(100000.0, self.length, self.theta, self.xc, self.yc) 

168 self.dataset.addTrailedSource(self.trail) 

169 

170 @staticmethod 

171 def transformMoments(Ixx, Iyy, Ixy): 

172 """Transform second-moments to semi-major and minor axis. 

173 """ 

174 xmy = Ixx - Iyy 

175 xpy = Ixx + Iyy 

176 xmy2 = xmy*xmy 

177 xy2 = Ixy*Ixy 

178 a2 = 0.5 * (xpy + np.sqrt(xmy2 + 4.0*xy2)) 

179 b2 = 0.5 * (xpy - np.sqrt(xmy2 + 4.0*xy2)) 

180 return a2, b2 

181 

182 @staticmethod 

183 def f_length(x): 

184 return SingleFrameNaiveTrailPlugin.findLength(*x)[0] 

185 

186 @staticmethod 

187 def g_length(x): 

188 return SingleFrameNaiveTrailPlugin.findLength(*x)[1] 

189 

190 @staticmethod 

191 def f_flux(x, model): 

192 return model.computeFluxWithGradient(x)[0] 

193 

194 @staticmethod 

195 def g_flux(x, model): 

196 return model.computeFluxWithGradient(x)[1] 

197 

198 @staticmethod 

199 def central_difference(func, x, *args, h=1e-8): 

200 result = np.zeros(len(x)) 

201 for i in range(len(x)): 

202 xp = x.copy() 

203 xp[i] += h 

204 fp = func(xp, *args) 

205 

206 xm = x.copy() 

207 xm[i] -= h 

208 fm = func(xm, *args) 

209 result[i] = (fp - fm) / (2*h) 

210 

211 return result 

212 

213 def makeTrailedSourceMeasurementTask(self, plugin=None, dependencies=(), 

214 config=None, schema=None, algMetadata=None): 

215 """Set up a measurement task for a trailed source plugin. 

216 """ 

217 

218 config = self.makeSingleFrameMeasurementConfig(plugin=plugin, 

219 dependencies=dependencies) 

220 

221 # Make sure the shape slot is base_SdssShape 

222 config.slots.shape = "base_SdssShape" 

223 return self.makeSingleFrameMeasurementTask(plugin=plugin, 

224 dependencies=dependencies, 

225 config=config, schema=schema, 

226 algMetadata=algMetadata) 

227 

228 def testNaivePlugin(self): 

229 """Test the NaivePlugin measurements. 

230 Given a `TrailedTestDataset`, run the NaivePlugin measurement and 

231 compare the measured parameters to the true values. 

232 """ 

233 

234 # Set up and run Naive measurement. 

235 task = self.makeTrailedSourceMeasurementTask( 

236 plugin="ext_trailedSources_Naive", 

237 dependencies=("base_SdssCentroid", "base_SdssShape") 

238 ) 

239 exposure, catalog = self.dataset.realize(10.0, task.schema, randomSeed=0) 

240 task.run(catalog, exposure) 

241 record = catalog[0] 

242 

243 # Check the RA and Dec measurements 

244 wcs = exposure.getWcs() 

245 spt = wcs.pixelToSky(self.center) 

246 ra_true = spt.getRa().asDegrees() 

247 dec_true = spt.getDec().asDegrees() 

248 ra_meas = record.get("ext_trailedSources_Naive_ra") 

249 dec_meas = record.get("ext_trailedSources_Naive_dec") 

250 self.assertFloatsAlmostEqual(ra_true, ra_meas, atol=None, rtol=0.01) 

251 self.assertFloatsAlmostEqual(dec_true, dec_meas, atol=None, rtol=0.01) 

252 

253 # Check that root finder converged 

254 converged = record.get("ext_trailedSources_Naive_flag_noConverge") 

255 self.assertFalse(converged) 

256 

257 # Compare true with measured length, angle, and flux. 

258 # Accuracy is dependent on the second-moments measurements, so the 

259 # rtol values are simply rough upper bounds. 

260 length = record.get("ext_trailedSources_Naive_length") 

261 theta = record.get("ext_trailedSources_Naive_angle") 

262 flux = record.get("ext_trailedSources_Naive_flux") 

263 self.assertFloatsAlmostEqual(length, self.trail.length, atol=None, rtol=0.1) 

264 self.assertFloatsAlmostEqual(theta % np.pi, self.trail.angle % np.pi, 

265 atol=np.arctan(1/length), rtol=None) 

266 self.assertFloatsAlmostEqual(flux, self.trail.instFlux, atol=None, rtol=0.1) 

267 

268 # Test function gradients versus finite difference derivatives 

269 # Do length first 

270 Ixx, Iyy, Ixy = record.getShape().getParameterVector() 

271 a2, b2 = self.transformMoments(Ixx, Iyy, Ixy) 

272 self.assertLessEqual(check_grad(self.f_length, self.g_length, [a2, b2]), 1e-6) 

273 

274 # Now flux gradient 

275 xc = record.get("base_SdssShape_x") 

276 yc = record.get("base_SdssShape_y") 

277 params = np.array([xc, yc, 1.0, length, theta]) 

278 cutout = getMeasurementCutout(record, exposure) 

279 model = VeresModel(cutout) 

280 gradNum = self.central_difference(self.f_flux, params, model, h=9e-5) 

281 gradMax = np.max(np.abs(gradNum - self.g_flux(params, model))) 

282 self.assertLessEqual(gradMax, 1e-5) 

283 

284 # Check test setup 

285 self.assertNotEqual(length, self.trail.length) 

286 self.assertNotEqual(theta, self.trail.angle) 

287 

288 # Make sure measurement flag is False 

289 self.assertFalse(record.get("ext_trailedSources_Naive_flag")) 

290 

291 def testVeresPlugin(self): 

292 """Test the VeresPlugin measurements. 

293 Given a `TrailedTestDataset`, run the VeresPlugin measurement and 

294 compare the measured parameters to the true values. 

295 """ 

296 

297 # Set up and run Veres measurement. 

298 task = self.makeTrailedSourceMeasurementTask( 

299 plugin="ext_trailedSources_Veres", 

300 dependencies=( 

301 "base_SdssCentroid", 

302 "base_SdssShape", 

303 "ext_trailedSources_Naive") 

304 ) 

305 exposure, catalog = self.dataset.realize(10.0, task.schema, randomSeed=0) 

306 task.run(catalog, exposure) 

307 record = catalog[0] 

308 

309 # Make sure optmizer converged 

310 converged = record.get("ext_trailedSources_Veres_flag_nonConvergence") 

311 self.assertFalse(converged) 

312 

313 # Compare measured trail length, angle, and flux to true values 

314 # These measurements should perform at least as well as NaivePlugin 

315 length = record.get("ext_trailedSources_Veres_length") 

316 theta = record.get("ext_trailedSources_Veres_angle") 

317 flux = record.get("ext_trailedSources_Veres_flux") 

318 self.assertFloatsAlmostEqual(length, self.trail.length, atol=None, rtol=0.1) 

319 self.assertFloatsAlmostEqual(theta % np.pi, self.trail.angle % np.pi, 

320 atol=np.arctan(1/length), rtol=None) 

321 self.assertFloatsAlmostEqual(flux, self.trail.instFlux, atol=None, rtol=0.1) 

322 

323 xc = record.get("ext_trailedSources_Veres_centroid_x") 

324 yc = record.get("ext_trailedSources_Veres_centroid_y") 

325 params = np.array([xc, yc, flux, length, theta]) 

326 cutout = getMeasurementCutout(record, exposure) 

327 model = VeresModel(cutout) 

328 gradNum = self.central_difference(model, params, h=1e-6) 

329 gradMax = np.max(np.abs(gradNum - model.gradient(params))) 

330 self.assertLessEqual(gradMax, 1e-5) 

331 

332 # Make sure test setup is working as expected 

333 self.assertNotEqual(length, self.trail.length) 

334 self.assertNotEqual(theta, self.trail.angle) 

335 

336 # Test that reduced chi-squared is reasonable 

337 rChiSq = record.get("ext_trailedSources_Veres_rChiSq") 

338 self.assertGreater(rChiSq, 0.8) 

339 self.assertLess(rChiSq, 1.3) 

340 

341 # Make sure measurement flag is False 

342 self.assertFalse(record.get("ext_trailedSources_Veres_flag")) 

343 

344 def testMonteCarlo(self): 

345 """Test the uncertainties in trail measurements from NaivePlugin 

346 """ 

347 # Adapted from lsst.meas.base 

348 

349 # Set up Naive measurement and dependencies. 

350 task = self.makeTrailedSourceMeasurementTask( 

351 plugin="ext_trailedSources_Naive", 

352 dependencies=("base_SdssCentroid", "base_SdssShape") 

353 ) 

354 

355 nSamples = 2000 

356 catalog = afwTable.SourceCatalog(task.schema) 

357 sample = 0 

358 seed = 0 

359 while sample < nSamples: 

360 seed += 1 

361 exp, cat = self.dataset.realize(100.0, task.schema, randomSeed=seed) 

362 rec = cat[0] 

363 task.run(cat, exp) 

364 

365 # Accuracy of this measurement is entirely dependent on shape and 

366 # centroiding. Skip when shape measurement fails. 

367 if rec['base_SdssShape_flag']: 

368 continue 

369 catalog.append(rec) 

370 sample += 1 

371 

372 catalog = catalog.copy(deep=True) 

373 nameBase = "ext_trailedSources_Naive_" 

374 

375 # Currently, the errors don't include covariances, so just make sure 

376 # we're close or at least over estimate 

377 length = catalog[nameBase+"length"] 

378 lengthErr = catalog[nameBase+"lengthErr"] 

379 lengthStd = np.nanstd(length) 

380 lengthErrMean = np.nanmean(lengthErr) 

381 diff = (lengthErrMean - lengthStd) / lengthErrMean 

382 self.assertGreater(diff, -0.1) 

383 self.assertLess(diff, 0.5) 

384 

385 angle = catalog[nameBase+"angle"] 

386 if (np.max(angle) - np.min(angle)) > np.pi/2: 

387 angle = angle % np.pi # Wrap if bimodal 

388 angleErr = catalog[nameBase+"angleErr"] 

389 angleStd = np.nanstd(angle) 

390 angleErrMean = np.nanmean(angleErr) 

391 diff = (angleErrMean - angleStd) / angleErrMean 

392 self.assertGreater(diff, -0.1) 

393 self.assertLess(diff, 0.6) 

394 

395 

396class TestMemory(lsst.utils.tests.MemoryTestCase): 

397 pass 

398 

399 

400def setup_module(module): 

401 lsst.utils.tests.init() 

402 

403 

404if __name__ == "__main__": 404 ↛ 405line 404 didn't jump to line 405 because the condition on line 404 was never true

405 lsst.utils.tests.init() 

406 unittest.main()