Coverage for tests / test_trailedEdgeSources.py: 16%

188 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:56 +0000

1# 

2# This file is part of meas_extensions_trailedSources. 

3# 

4# Developed for the LSST Data Management System. 

5# This product includes software developed by the LSST Project 

6# (http://www.lsst.org). 

7# See the COPYRIGHT file at the top-level directory of this distribution 

8# for details of code ownership. 

9# 

10# This program is free software: you can redistribute it and/or modify 

11# it under the terms of the GNU General Public License as published by 

12# the Free Software Foundation, either version 3 of the License, or 

13# (at your option) any later version. 

14# 

15# This program is distributed in the hope that it will be useful, 

16# but WITHOUT ANY WARRANTY; without even the implied warranty of 

17# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

18# GNU General Public License for more details. 

19# 

20# You should have received a copy of the GNU General Public License 

21# along with this program. If not, see <http://www.gnu.org/licenses/>. 

22# 

23 

24import numpy as np 

25import unittest 

26import lsst.utils.tests 

27import lsst.meas.extensions.trailedSources 

28from lsst.meas.base.tests import AlgorithmTestCase 

29from lsst.utils.tests import classParameters 

30from lsst.geom import Point2I, Point2D, Box2I, Extent2I 

31from unittest.mock import patch 

32 

33# Trailed-source length, angle, and centroid coordinates. 

34trail_lengths = np.array([5, 5, 10, 4]) 

35trail_angles = np.array([100, 0, 5, 4]) 

36trail_x_coords = np.array([100, 20, -20, 90]) 

37trail_y_coords = np.array([100, 20, -30, 100]) 

38 

39 

40class TrailedEdgeSource: 

41 """Holds a set of true trail parameters. 

42 """ 

43 

44 def __init__(self, instFlux, length, angle, xc, yc): 

45 self.instFlux = instFlux 

46 self.length = length 

47 self.angle = angle 

48 self.center = Point2D(xc, yc) 

49 self.x0 = xc - length / 2 * np.cos(angle) 

50 self.y0 = yc - length / 2 * np.sin(angle) 

51 self.x1 = xc + length / 2 * np.cos(angle) 

52 self.y1 = yc + length / 2 * np.sin(angle) 

53 

54 

55class TrailedTaskSetup: 

56 

57 def makeTrailedSourceMeasurementTask(self, plugin=None, dependencies=(), 

58 config=None, schema=None, 

59 algMetadata=None): 

60 """Set up a measurement task for a trailed source plugin. 

61 """ 

62 config = self.makeSingleFrameMeasurementConfig(plugin=plugin, 

63 dependencies=dependencies) 

64 

65 # Make sure the shape slot is base_SdssShape 

66 config.slots.shape = "base_SdssShape" 

67 return self.makeSingleFrameMeasurementTask(plugin=plugin, 

68 dependencies=dependencies, 

69 config=config, 

70 schema=schema, 

71 algMetadata=algMetadata) 

72 

73 

74# "Extend" meas.base.tests.TestDataset 

75class TrailedTestDataset(lsst.meas.base.tests.TestDataset): 

76 """A dataset for testing trailed source measurements. 

77 Given a `TrailedSource`, construct a record of the true values and an 

78 Exposure. 

79 """ 

80 

81 def __init__(self, bbox, threshold=10.0, exposure=None, **kwds): 

82 

83 super().__init__(bbox, threshold, exposure, **kwds) 

84 

85 def addTrailedSource(self, trail, edge=True): 

86 """Add a trailed source to the simulation. 

87 

88 Re-implemented version of 

89 `lsst.meas.base.tests.TestDataset.addSource`. Numerically integrates a 

90 Gaussian PSF over a line to obtain an image of a trailed source and 

91 adds edge flags to the image. 

92 """ 

93 record = self.catalog.addNew() 

94 record.set(self.keys["centroid"], trail.center) 

95 rng = np.random.default_rng(32) 

96 covariance = rng.normal(0, 0.1, 4).reshape(2, 2) 

97 covariance[0, 1] = covariance[1, 0] 

98 record.set(self.keys["centroid_sigma"], covariance.astype(np.float32)) 

99 record.set(self.keys["shape"], self.psfShape) 

100 record.set(self.keys["isStar"], False) 

101 

102 # Sum the psf at each 

103 numIter = int(2 * trail.length) 

104 xp = np.linspace(trail.x0, trail.x1, num=numIter) 

105 yp = np.linspace(trail.y0, trail.y1, num=numIter) 

106 for (x, y) in zip(xp, yp): 

107 pt = Point2D(x, y) 

108 im = self.drawGaussian(self.exposure.getBBox(), trail.instFlux, 

109 lsst.afw.geom.Ellipse(self.psfShape, pt)) 

110 self.exposure.getMaskedImage().getImage().getArray()[:, :] += im.getArray() 

111 

112 planes = self.exposure.mask.getMaskPlaneDict() 

113 dim = self.exposure.getBBox().getDimensions() 

114 

115 # Add edge flags to the first and last 20 columns and rows. 

116 if edge: 

117 for y in range(20): 

118 self.exposure.mask.setMaskPlaneValues(planes['EDGE'], 0, dim[0] - 1, y) 

119 self.exposure.mask.setMaskPlaneValues(planes['EDGE'], 0, dim[0] - 1, y + dim[1] - 20) 

120 

121 for y in range(dim[1]): 

122 self.exposure.mask.setMaskPlaneValues(planes['EDGE'], 0, 20, y) 

123 self.exposure.mask.setMaskPlaneValues(planes['EDGE'], dim[0] - 20, dim[0] - 1, y) 

124 

125 totFlux = self.exposure.image.array.sum() 

126 self.exposure.image.array /= totFlux 

127 self.exposure.image.array *= trail.instFlux 

128 

129 record.set(self.keys["instFlux"], trail.instFlux) 

130 self._installFootprint(record, self.exposure.getImage()) 

131 

132 return record, self.exposure.getImage() 

133 

134 

135# Following from test_trailedSources 

136@classParameters(length=trail_lengths, theta=trail_angles, xc=trail_x_coords, yc=trail_y_coords) 

137class TrailedEdgeSourcesTestCase(AlgorithmTestCase, lsst.utils.tests.TestCase): 

138 """ Test if ext_trailedSources_Naive_flag_edge is set correctly. 

139 

140 Given a `TrailedSource`, test if the edge flag is set correctly in the 

141 source catalog after the 

142 `lsst.meas.extensions.trailedSources.Naive.Plugin.makeTrailedSourceMeasurementTask` 

143 has been run on the source catalog. 

144 """ 

145 

146 def setUp(self): 

147 self.center = Point2D(50.1, 49.8) 

148 self.bbox = Box2I(lsst.geom.Point2I(-20, -30), Extent2I(140, 160)) 

149 self.dataset = TrailedTestDataset(self.bbox) 

150 

151 # Trail which extends into edge pixels 

152 self.trail = TrailedEdgeSource(100000.0, self.length, self.theta, self.xc, self.yc) 

153 self.dataset.addTrailedSource(self.trail) 

154 

155 def testEdgeFlag(self): 

156 """Test if edge flags are correctly set in NaivePlugin.py 

157 

158 Given a `TrailedTestDataset`, run the NaivePlugin measurement and 

159 check that the trailed sources have the edge flag set. [100,100] does 

160 not contain any edge pixels and should not have a flag set, [20,20] 

161 crosses into the edge region on only one side and should have the edge 

162 flag set, and [-20,-30] extends off the chip and should have the edge 

163 flag set. 

164 """ 

165 # Set up and run Naive measurement. 

166 task = TrailedTaskSetup.makeTrailedSourceMeasurementTask(self, 

167 plugin="ext_trailedSources_Naive", 

168 dependencies=("base_SdssCentroid", 

169 "base_SdssShape") 

170 ) 

171 exposure, catalog = self.dataset.realize(5.0, task.schema, randomSeed=0) 

172 task.run(catalog, exposure) 

173 record = catalog[0] 

174 

175 # Check that x0, y0 or x1, y1 is flagged as an edge pixel 

176 x1 = int(record['ext_trailedSources_Naive_x1']) 

177 y1 = int(record['ext_trailedSources_Naive_y1']) 

178 x0 = int(record['ext_trailedSources_Naive_x0']) 

179 y0 = int(record['ext_trailedSources_Naive_y0']) 

180 

181 # Test Case with no edge pixels 

182 if record['truth_x'] == 100: 

183 # These are used to ensure the mask pixels the trailed sources are 

184 # compared with have the correct flags set 

185 begin_edge_pixel_set = (exposure.mask[Point2I(x0, y0)] & exposure.mask.getPlaneBitMask( 

186 'EDGE') != 0) 

187 end_edge_pixel_set = (exposure.mask[Point2I(x1, y1)] & exposure.mask.getPlaneBitMask( 

188 'EDGE') != 0) 

189 

190 self.assertFalse(begin_edge_pixel_set) 

191 self.assertTrue(end_edge_pixel_set) 

192 

193 # Make sure measurement edge flag is set, but Naive_flag is not. 

194 # A failed trailed source measurement with the edge flag 

195 # set means the edge flag was set despite the measurement 

196 # failing. 

197 self.assertTrue(record.get("ext_trailedSources_Naive_flag_edge")) 

198 self.assertFalse(record.get("ext_trailedSources_Naive_flag")) 

199 self.assertFalse(record.get("ext_trailedSources_Naive_flag_off_image")) 

200 self.assertFalse(record.get("ext_trailedSources_Naive_flag_nan")) 

201 

202 x1 = int(record['ext_trailedSources_Naive_x1']) 

203 y1 = int(record['ext_trailedSources_Naive_y1']) 

204 

205 self.assertFalse(exposure.mask[Point2I(x0, y0)] & exposure.mask.getPlaneBitMask('EDGE') != 0) 

206 self.assertTrue(exposure.mask[Point2I(x1, y1)] & exposure.mask.getPlaneBitMask('EDGE') != 0) 

207 

208 # Test case with one end of trail containing edge pixels 

209 elif record['truth_x'] == 20: 

210 begin_edge_pixel_set = (exposure.mask[Point2I(x0, y0)] & exposure.mask.getPlaneBitMask( 

211 'EDGE') != 0) 

212 end_edge_pixel_set = (exposure.mask[Point2I(x1, y1)] & exposure.mask.getPlaneBitMask( 

213 'EDGE') != 0) 

214 

215 self.assertFalse(begin_edge_pixel_set) 

216 self.assertFalse(end_edge_pixel_set) 

217 

218 # Make sure measurement Naive_flag_edge and Naive_flag not set 

219 self.assertFalse(record.get("ext_trailedSources_Naive_flag_edge")) 

220 self.assertFalse(record.get("ext_trailedSources_Naive_flag")) 

221 self.assertFalse(record.get("ext_trailedSources_Naive_flag_off_image")) 

222 self.assertFalse(record.get("ext_trailedSources_Naive_flag_nan")) 

223 

224 x1 = int(record['ext_trailedSources_Naive_x1']) 

225 y1 = int(record['ext_trailedSources_Naive_y1']) 

226 

227 self.assertFalse(exposure.mask[Point2I(x0, y0)] & exposure.mask.getPlaneBitMask('EDGE') != 0) 

228 self.assertFalse(exposure.mask[Point2I(x1, y1)] & exposure.mask.getPlaneBitMask('EDGE') != 0) 

229 

230 # Test case trail fully contained 

231 elif record["truth_x"] == 90: 

232 begin_edge_pixel_set = (exposure.mask[Point2I(x0, y0)] & exposure.mask.getPlaneBitMask( 

233 'EDGE') != 0) 

234 end_edge_pixel_set = (exposure.mask[Point2I(x1, y1)] & exposure.mask.getPlaneBitMask( 

235 'EDGE') != 0) 

236 

237 self.assertFalse(begin_edge_pixel_set) 

238 self.assertFalse(end_edge_pixel_set) 

239 

240 # Make sure measurement Naive_flag_edge and Naive_flag not set 

241 self.assertFalse(record.get("ext_trailedSources_Naive_flag_edge")) 

242 self.assertFalse(record.get("ext_trailedSources_Naive_flag")) 

243 self.assertFalse(record.get("ext_trailedSources_Naive_flag_off_image")) 

244 self.assertFalse(record.get("ext_trailedSources_Naive_flag_nan")) 

245 

246 x1 = int(record['ext_trailedSources_Naive_x1']) 

247 y1 = int(record['ext_trailedSources_Naive_y1']) 

248 

249 self.assertFalse(exposure.mask[Point2I(x0, y0)] & exposure.mask.getPlaneBitMask('EDGE') != 0) 

250 self.assertFalse(exposure.mask[Point2I(x1, y1)] & exposure.mask.getPlaneBitMask('EDGE') != 0) 

251 

252 # Test case with trailed source extending off chip. 

253 else: 

254 self.assertEqual(record['truth_x'], -20) 

255 self.assertTrue(record.get("ext_trailedSources_Naive_flag_edge")) 

256 self.assertFalse(record.get("ext_trailedSources_Naive_flag")) 

257 self.assertTrue(record.get("ext_trailedSources_Naive_flag_off_image")) 

258 self.assertFalse(record.get("ext_trailedSources_Naive_flag_nan")) 

259 

260 def testNanFlag(self): 

261 """Test if nan flags are correctly set in NaivePlugin.py 

262 

263 Given a `TrailedTestDataset`, run the NaivePlugin measurement which 

264 has trailed sources where one of the end point values results in a 

265 nan. 

266 """ 

267 # Set up and run Naive measurement. 

268 task = TrailedTaskSetup.makeTrailedSourceMeasurementTask(self, 

269 plugin="ext_trailedSources_Naive", 

270 dependencies=("base_SdssCentroid", 

271 "base_SdssShape") 

272 ) 

273 

274 exposure, catalog = self.dataset.realize(5.0, task.schema, randomSeed=0) 

275 

276 original_check_trail_function = task.plugins['ext_trailedSources_Naive'].check_trail 

277 # Used to simulate a trailed source where one of the coordinates is a 

278 # nan. 

279 

280 def check_trail_mock(*args, **kwargs): 

281 measRecord = args[0] 

282 exposure = args[1] 

283 x0 = args[2] 

284 y0 = args[3] 

285 x1 = args[4] 

286 y1 = np.nan # overriding to test NAN flagging 

287 length = args[6] 

288 measRecord['ext_trailedSources_Naive_y1'] = np.nan 

289 return original_check_trail_function(measRecord, exposure, x0, y0, x1, y1, length) 

290 

291 # This patcher mocks check_trail so that one of the trailed sources 

292 # includes it is checking contains a nan at one of its endpoints. 

293 patcher = patch( 

294 'lsst.meas.extensions.trailedSources.NaivePlugin.SingleFrameNaiveTrailPlugin.check_trail', 

295 side_effect=check_trail_mock) 

296 patcher.start() 

297 task.run(catalog, exposure) 

298 record = catalog[0] 

299 

300 # Test Case with no edge pixels, but one is set to nan. 

301 if record['truth_x'] == 100: 

302 self.assertFalse(record.get("ext_trailedSources_Naive_flag_edge")) 

303 self.assertFalse(record.get("ext_trailedSources_Naive_flag")) 

304 self.assertFalse(record.get("ext_trailedSources_Naive_flag_off_image")) 

305 self.assertTrue(record.get("ext_trailedSources_Naive_flag_nan")) 

306 

307 # Test case with one end of trail containing edge pixels, but nan is 

308 # set so edge does not end up set. 

309 elif record['truth_x'] == 20: 

310 

311 self.assertFalse(record.get("ext_trailedSources_Naive_flag_edge")) 

312 self.assertFalse(record.get("ext_trailedSources_Naive_flag")) 

313 self.assertFalse(record.get("ext_trailedSources_Naive_flag_off_image")) 

314 self.assertTrue(record.get("ext_trailedSources_Naive_flag_nan")) 

315 

316 # Test case trail fully contained, but contains one nan. Only nan flag 

317 # is set. 

318 elif record["truth_x"] == 90: 

319 

320 self.assertFalse(record.get("ext_trailedSources_Naive_flag_edge")) 

321 self.assertFalse(record.get("ext_trailedSources_Naive_flag")) 

322 self.assertFalse(record.get("ext_trailedSources_Naive_flag_off_image")) 

323 self.assertTrue(record.get("ext_trailedSources_Naive_flag_nan")) 

324 

325 # Test case with trailed source extending off chip. One coordinate 

326 # is off image the other is nan, so edge, off_image, and nan should 

327 # be set. 

328 else: 

329 self.assertEqual(record['truth_x'], -20) 

330 self.assertTrue(record.get("ext_trailedSources_Naive_flag_edge")) 

331 self.assertFalse(record.get("ext_trailedSources_Naive_flag")) 

332 self.assertTrue(record.get("ext_trailedSources_Naive_flag_off_image")) 

333 self.assertTrue(record.get("ext_trailedSources_Naive_flag_nan")) 

334 

335 patcher.stop() 

336 

337 

338@classParameters(length=[10], theta=[5], xc=[-20], yc=[-30]) 

339class TrailedEdgeSourcesOffImageTest(AlgorithmTestCase, lsst.utils.tests.TestCase): 

340 """ Test if ext_trailedSources_Naive_flag_edge is set correctly. 

341 

342 Given a `TrailedSource`, test if the edge flag is set correctly in the 

343 source catalog after the 

344 'lsst.meas.extensions.trailedSources.Naive.Plugin.makeTrailedSourceMeasurementTask' 

345 has been run on the source catalog. 

346 """ 

347 

348 def setUp(self): 

349 self.center = Point2D(50.1, 49.8) 

350 self.bbox = Box2I(lsst.geom.Point2I(-20, -30), Extent2I(140, 160)) 

351 self.dataset = TrailedTestDataset(self.bbox) 

352 

353 # Trail which extends into edge pixels 

354 self.trail = TrailedEdgeSource(100000.0, self.length, self.theta, 

355 self.xc, self.yc) 

356 self.dataset.addTrailedSource(self.trail, edge=False) 

357 

358 def tearDown(self): 

359 del self.center 

360 del self.bbox 

361 del self.trail 

362 del self.dataset 

363 

364 def testOffImageEdgeFlag(self): 

365 """Test if edge flags are correctly set in NaivePlugin.py when source 

366 extends off the the image. 

367 

368 Given a `TrailedTestDataset`, run the NaivePlugin measurement and 

369 check that the edge flag set when a source extends off the chip. 

370 Edge pixels are not set in this test. 

371 """ 

372 # Set up and run Naive measurement. 

373 task = TrailedTaskSetup.makeTrailedSourceMeasurementTask(self, 

374 plugin="ext_trailedSources_Naive", 

375 dependencies=("base_SdssCentroid", 

376 "base_SdssShape") 

377 ) 

378 exposure, catalog = self.dataset.realize(5.0, task.schema, randomSeed=0) 

379 task.run(catalog, exposure) 

380 record = catalog[0] 

381 

382 self.assertTrue(record.get("ext_trailedSources_Naive_flag_edge")) 

383 self.assertFalse(record.get("ext_trailedSources_Naive_flag")) 

384 

385 

386class TestMemory(lsst.utils.tests.MemoryTestCase): 

387 pass 

388 

389 

390def setup_module(module): 

391 lsst.utils.tests.init() 

392 

393 

394if __name__ == "__main__": 394 ↛ 395line 394 didn't jump to line 395 because the condition on line 394 was never true

395 lsst.utils.tests.init() 

396 unittest.main()