Coverage for tests / test_cloughTocher2DInterpolate.py: 16%

158 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-17 09:07 +0000

1# This file is part of meas_algorithms. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21 

22 

23import unittest 

24 

25from typing import Iterable 

26from itertools import product 

27import numpy as np 

28 

29import lsst.utils.tests 

30import lsst.geom 

31import lsst.afw.image as afwImage 

32from lsst.meas.algorithms.cloughTocher2DInterpolator import ( 

33 CloughTocher2DInterpolateTask, 

34) 

35from lsst.meas.algorithms import CloughTocher2DInterpolatorUtils as ctUtils 

36 

37 

38class CloughTocher2DInterpolateTestCase(lsst.utils.tests.TestCase): 

39 """Test the CloughTocher2DInterpolateTask.""" 

40 

41 def setUp(self): 

42 super().setUp() 

43 

44 self.maskedimage = afwImage.MaskedImageF(100, 121) 

45 for x in range(100): 

46 for y in range(121): 

47 self.maskedimage[x, y] = (3 * y + x * 5, 0, 1.0) 

48 

49 # Clone the maskedimage so we can compare it after running the task. 

50 self.reference = self.maskedimage.clone() 

51 

52 # Set some central pixels as SAT 

53 sliceX, sliceY = slice(30, 35), slice(40, 45) 

54 self.maskedimage.mask[sliceX, sliceY] = afwImage.Mask.getPlaneBitMask("SAT") 

55 self.maskedimage.image[sliceX, sliceY] = np.nan 

56 # Put nans here to make sure interp is done ok 

57 

58 # Set an entire column as BAD 

59 self.maskedimage.mask[54:55, :] = afwImage.Mask.getPlaneBitMask("BAD") 

60 self.maskedimage.image[54:55, :] = np.nan 

61 

62 # Set an entire row as BAD 

63 self.maskedimage.mask[:, 110:111] = afwImage.Mask.getPlaneBitMask("BAD") 

64 self.maskedimage.image[:, 110:111] = np.nan 

65 

66 # Set an asymmetric region as BAD 

67 self.maskedimage.mask[41:42, 63:66] = afwImage.Mask.getPlaneBitMask("SAT") 

68 self.maskedimage.image[41:42, 63:66] = np.nan 

69 self.maskedimage.mask[42:43, 63:65] = afwImage.Mask.getPlaneBitMask("SAT") 

70 self.maskedimage.image[42:43, 63:65] = np.nan 

71 self.maskedimage.mask[44, 63] = afwImage.Mask.getPlaneBitMask("SAT") 

72 self.maskedimage.image[44, 63] = np.nan 

73 

74 # Set a diagonal set of pixels as CR 

75 for i in range(74, 78): 

76 self.maskedimage.mask[i, i] = afwImage.Mask.getPlaneBitMask("CR") 

77 self.maskedimage.image[i, i] = np.nan 

78 

79 # Set one of the edges as EDGE 

80 self.maskedimage.mask[0:1, :] = afwImage.Mask.getPlaneBitMask("EDGE") 

81 self.maskedimage.image[0:1, :] = np.nan 

82 

83 # Set a smaller streak at the edge 

84 self.maskedimage.mask[25:28, 0:1] = afwImage.Mask.getPlaneBitMask("EDGE") 

85 self.maskedimage.image[25:28, 0:1] = np.nan 

86 

87 # Update the reference image's mask alone, so we can compare them after 

88 # running the task. 

89 self.reference.mask.array[:, :] = self.maskedimage.mask.array 

90 

91 # Create a noise image 

92 self.noise = self.maskedimage.clone() 

93 rng = np.random.Generator(np.random.MT19937(5)) 

94 self.noise.image.array[:, :] = rng.normal(size=self.noise.image.array.shape) 

95 

96 @lsst.utils.tests.methodParametersProduct(n_runs=(1, 2), flipXY=(False, True)) 

97 def test_interpolation(self, n_runs: int, flipXY: bool): 

98 """Test that the interpolation is done correctly. 

99 

100 Parameters 

101 ---------- 

102 n_runs : `int` 

103 Number of times to run the task. Running the task more than once 

104 should have no effect. 

105 flipXY : `bool` 

106 Whether to set the flipXY config parameter to True. 

107 """ 

108 config = CloughTocher2DInterpolateTask.ConfigClass() 

109 config.badMaskPlanes = ( 

110 "BAD", 

111 "SAT", 

112 "CR", 

113 "EDGE", 

114 ) 

115 config.fillValue = 0.5 

116 config.flipXY = flipXY 

117 task = CloughTocher2DInterpolateTask(config) 

118 for n in range(n_runs): 

119 task.run(self.maskedimage) 

120 

121 # Assert that the mask and the variance planes remain unchanged. 

122 self.assertImagesEqual(self.maskedimage.variance, self.reference.variance) 

123 self.assertMasksEqual(self.maskedimage.mask, self.reference.mask) 

124 

125 # Check that the long streak of bad pixels have been replaced with the 

126 # fillValue, but not the short streak. 

127 np.testing.assert_array_equal(self.maskedimage.image[0:1, :].array, config.fillValue) 

128 with self.assertRaises(AssertionError): 

129 np.testing.assert_array_equal(self.maskedimage.image[25:28, 0:1].array, config.fillValue) 

130 

131 # Check that interpolated pixels are close to the reference (original), 

132 # and that none of them is still NaN. 

133 self.assertTrue(np.isfinite(self.maskedimage.image.array).all()) 

134 self.assertImagesAlmostEqual( 

135 self.maskedimage.image[1:, :], 

136 self.reference.image[1:, :], 

137 rtol=1e-05, 

138 atol=1e-08, 

139 ) 

140 

141 @lsst.utils.tests.methodParametersProduct( 

142 pass_badpix=(True, False), 

143 pass_goodpix=(True, False), 

144 flipXY=(False, True), 

145 ) 

146 def test_interpolation_with_noise( 

147 self, pass_badpix: bool = True, pass_goodpix: bool = True, flipXY: bool = False 

148 ): 

149 """Test that we can reuse the badpix and goodpix. 

150 

151 Parameters 

152 ---------- 

153 pass_badpix : `bool` 

154 Whether to pass the badpix to the task? 

155 pass_goodpix : `bool` 

156 Whether to pass the goodpix to the task? 

157 flipXY : `bool` 

158 Whether to set the flipXY config parameter to True. 

159 """ 

160 

161 config = CloughTocher2DInterpolateTask.ConfigClass() 

162 config.flipXY = flipXY 

163 config.badMaskPlanes = ( 

164 "BAD", 

165 "SAT", 

166 "CR", 

167 "EDGE", 

168 ) 

169 task = CloughTocher2DInterpolateTask(config) 

170 

171 badpix, goodpix = task.run(self.noise) 

172 task.run( 

173 self.maskedimage, 

174 badpix=(badpix if pass_badpix else None), 

175 goodpix=(goodpix if pass_goodpix else None), 

176 ) 

177 

178 # Check that the long streak of bad pixels by the edge have been 

179 # replaced with fillValue, but not the short streak. 

180 np.testing.assert_array_equal(self.maskedimage.image[0:1, :].array, config.fillValue) 

181 with self.assertRaises(AssertionError): 

182 np.testing.assert_array_equal(self.maskedimage.image[25:28, 0:1].array, config.fillValue) 

183 

184 # Check that interpolated pixels are close to the reference (original), 

185 # and that none of them is still NaN. 

186 self.assertTrue(np.isfinite(self.maskedimage.image.array).all()) 

187 self.assertImagesAlmostEqual( 

188 self.maskedimage.image[1:, :], 

189 self.reference.image[1:, :], 

190 rtol=1e-05, 

191 atol=1e-08, 

192 ) 

193 

194 def test_interpolation_with_flipXY(self): 

195 """Test that the interpolation with both values for flipXY.""" 

196 config = CloughTocher2DInterpolateTask.ConfigClass() 

197 config.badMaskPlanes = ( 

198 "BAD", 

199 "SAT", 

200 "CR", 

201 "EDGE", 

202 ) 

203 config.flipXY = True 

204 task = CloughTocher2DInterpolateTask(config) 

205 badpix_true, goodpix_true = task.run(self.maskedimage) 

206 

207 config.flipXY = False 

208 task = CloughTocher2DInterpolateTask(config) 

209 badpix_false, goodpix_false = task.run(self.maskedimage) 

210 

211 # Check that the locations of the bad and the good pixels, and the good 

212 # pixel values themselves are identical. 

213 np.testing.assert_array_equal(goodpix_false, goodpix_true) 

214 np.testing.assert_array_equal(badpix_false[:, :2], badpix_true[:, :2]) 

215 

216 # Check that the interpolated values at at least approximately equal. 

217 np.testing.assert_array_equal(badpix_false[:, 2], badpix_true[:, 2]) 

218 

219 

220class CloughTocher2DInterpolatorUtilsTestCase(CloughTocher2DInterpolateTestCase): 

221 """Test the CloughTocher2DInterpolatorUtils.""" 

222 

223 @classmethod 

224 def find_good_pixels_around_bad_pixels( 

225 cls, 

226 image: afwImage.MaskedImage, 

227 maskPlanes: Iterable[str], 

228 *, 

229 max_window_extent: lsst.geom.Extent2I, 

230 badpix: set | None = None, 

231 goodpix: dict | None = None, 

232 ): 

233 """Find the location of bad pixels, and neighboring good pixels. 

234 

235 Parameters 

236 ---------- 

237 image : `~lsst.afw.image.MaskedImage` 

238 Image from which to find the bad and the good pixels. 

239 maskPlanes : `list` [`str`] 

240 List of mask planes to consider as bad pixels. 

241 max_window_extent : `lsst.geom.Extent2I` 

242 Maximum extent of the window around a bad pixel to consider when 

243 looking for good pixels. 

244 badpix : `list` [`tuple` [`int`, `int`]], optional 

245 A known list of bad pixels. If provided, the function does not look for 

246 any additional bad pixels, but it verifies that the provided 

247 coordinates correspond to bad pixels. If an input``badpix`` is not 

248 found to be bad as specified by ``maskPlanes``, an exception is raised. 

249 goodpix : `dict` [`tuple` [`int`, `int`], `float`], optional 

250 A known mapping of the coordinates of good pixels to their values, to 

251 which any newly found good pixels locations will be added, and the 

252 values (even for existing items) will be updated. 

253 

254 Returns 

255 ------- 

256 badpix : `list` [`tuple` [`int`, `int`]] 

257 The coordinates of the bad pixels. If ``badpix`` was provided as an 

258 input argument, the returned quantity is the same as the input. 

259 goodpix : `dict` [`tuple` [`int`, `int`], `float`] 

260 Updated mapping of the coordinates of good pixels to their values. 

261 

262 Raises 

263 ------ 

264 RuntimeError 

265 If a pixel passed in as ``goodpix`` is found to be bad as specified by 

266 ``maskPlanes``. 

267 ValueError 

268 If an input ``badpix`` is not found to be bad as specified by 

269 ``maskPlanes``. 

270 """ 

271 

272 bbox = image.getBBox() 

273 if badpix is None: 

274 iterator = product(range(bbox.minX, bbox.maxX + 1), range(bbox.minY, bbox.maxY + 1)) 

275 badpix = set() 

276 else: 

277 iterator = badpix 

278 

279 if goodpix is None: 

280 goodpix = {} 

281 

282 for x, y in iterator: 

283 if image.mask[x, y] & afwImage.Mask.getPlaneBitMask(maskPlanes): 

284 if (x, y) in goodpix: 

285 raise RuntimeError( 

286 f"Pixel ({x}, {y}) is bad as specified by maskPlanes {maskPlanes} but " 

287 "passed in as goodpix" 

288 ) 

289 badpix.add((x, y)) 

290 window = lsst.geom.Box2I.makeCenteredBox( 

291 center=lsst.geom.Point2D(x, y), # center has to be a Point2D instance. 

292 size=max_window_extent, 

293 ) 

294 # Restrict to the bounding box of the image. 

295 window.clip(bbox) 

296 

297 for xx, yy in product( 

298 range(window.minX, window.maxX + 1), 

299 range(window.minY, window.maxY + 1), 

300 ): 

301 if not (image.mask[xx, yy] & afwImage.Mask.getPlaneBitMask(maskPlanes)): 

302 goodpix[(xx, yy)] = image.image[xx, yy] 

303 elif (x, y) in badpix: 

304 # If (x, y) is in badpix, but did not get flagged as bad, 

305 # raise an exception. 

306 raise ValueError(f"Pixel ({x}, {y}) is not bad as specified by maskPlanes {maskPlanes}") 

307 

308 return badpix, goodpix 

309 

310 def test_parity(self, buffer=4): 

311 """Test that the C++ implementation gives the same results as the 

312 pure-Python implementation. 

313 

314 Parameters 

315 ---------- 

316 buffer : `int`, optional 

317 Same as the buffer parameter in `findGoodPixelsAroundBadPixels`. 

318 """ 

319 bpix, gpix = ctUtils.findGoodPixelsAroundBadPixels( 

320 self.maskedimage, ["BAD", "SAT", "CR", "EDGE"], buffer=buffer 

321 ) 

322 badpix, goodpix = self.find_good_pixels_around_bad_pixels( 

323 self.maskedimage, 

324 ["BAD", "SAT", "CR", "EDGE"], 

325 max_window_extent=lsst.geom.Extent2I(2 * buffer + 1, 2 * buffer + 1), 

326 ) 

327 

328 self.assertEqual(len(goodpix), gpix.shape[0]) 

329 for row in gpix: 

330 x, y, val = int(row[0]), int(row[1]), row[2] 

331 self.assertEqual(goodpix[(x, y)], val) 

332 

333 self.assertEqual(set(zip(bpix[:, 0], bpix[:, 1])), badpix) 

334 

335 def test_findGoodPixelsAroundBadPixels(self): 

336 """Test the findGoodPixelsAroundBadPixels utility functino.""" 

337 badpix, goodpix = ctUtils.findGoodPixelsAroundBadPixels( 

338 self.maskedimage, 

339 ["BAD", "SAT", "CR", "EDGE"], 

340 buffer=4, 

341 ) 

342 

343 # Check that badpix and goodpix have no overlaps 

344 badSet = set(zip(badpix[:, 0], badpix[:, 1])) 

345 goodSet = set(zip(goodpix[:, 0], goodpix[:, 1])) 

346 self.assertEqual(len(badSet & goodSet), 0) 

347 

348 # buffer = 0 should give no goodpix, but same badpix 

349 badpix0, goodpix0 = ctUtils.findGoodPixelsAroundBadPixels( 

350 self.maskedimage, 

351 ["BAD", "SAT", "CR", "EDGE"], 

352 buffer=0, 

353 ) 

354 

355 self.assertEqual(len(goodpix0), 0) 

356 np.testing.assert_array_equal(badpix0, badpix) 

357 

358 # For large enough buffer, badpix and goodpix should be mutually 

359 # exclusive and complete. This also checks that edges are handled. 

360 badpix, goodpix = ctUtils.findGoodPixelsAroundBadPixels( 

361 self.maskedimage, 

362 ["BAD", "SAT", "CR", "EDGE"], 

363 buffer=251, 

364 ) 

365 

366 self.assertEqual( 

367 len(badpix) + len(goodpix), 

368 self.maskedimage.getWidth() * self.maskedimage.getHeight(), 

369 ) 

370 

371 def test_update_functions(self): 

372 """Test updateArrayFromImage and updateImageFromArray behave as 

373 expected. 

374 """ 

375 badpix, _ = ctUtils.findGoodPixelsAroundBadPixels( 

376 self.maskedimage, 

377 ["BAD", "SAT", "CR", "EDGE"], 

378 buffer=3, 

379 ) 

380 

381 # Ensure that maskedimage and reference are not the same initially. 

382 with self.assertRaises(AssertionError): 

383 self.assertImagesEqual(self.maskedimage.image, self.reference.image) 

384 

385 # Update badpix values from the reference image 

386 ctUtils.updateArrayFromImage(badpix, self.reference.image) 

387 

388 # Update maskedimage from badpix values 

389 ctUtils.updateImageFromArray(self.maskedimage.image, badpix) 

390 

391 # maskedimage and reference image should now to be identifical 

392 self.assertImagesEqual(self.maskedimage.image, self.reference.image) 

393 

394 @lsst.utils.tests.methodParametersProduct(x0=(0, 23, -53), y0=(0, 47, -31)) 

395 def test_origin(self, x0=23, y0=47): 

396 """Test that we get consistent results with arbitrary image origins. 

397 

398 Parameters 

399 ---------- 

400 x0 : `int` 

401 The origin of the image along the horizontal axis. 

402 y0 : `int` 

403 The origin of the image along the vertical axis. 

404 """ 

405 # Calling setUp explicitly becomes necessary, as we change in the image 

406 # in-place and need to reset to the original state when running with 

407 # a different set of parameters. 

408 self.setUp() 

409 badpix0, goodpix0 = ctUtils.findGoodPixelsAroundBadPixels( 

410 self.maskedimage, 

411 ["BAD", "SAT", "CR", "EDGE"], 

412 buffer=4, 

413 ) 

414 

415 # Check that badpix and goodpix have no overlaps 

416 badSet = set(zip(badpix0[:, 0], badpix0[:, 1])) 

417 goodSet = set(zip(goodpix0[:, 0], goodpix0[:, 1])) 

418 self.assertEqual(len(badSet & goodSet), 0) 

419 

420 # Set a non-trivial xy0 for the maskedimage 

421 self.maskedimage.setXY0(lsst.geom.Point2I(x0, y0)) 

422 badpix, goodpix = ctUtils.findGoodPixelsAroundBadPixels( 

423 self.maskedimage, 

424 ["BAD", "SAT", "CR", "EDGE"], 

425 buffer=4, 

426 ) 

427 

428 # Adjust the x and y columns with origin, so we can compare them. 

429 badpix0[:, 0] += x0 

430 goodpix0[:, 0] += x0 

431 badpix0[:, 1] += y0 

432 goodpix0[:, 1] += y0 

433 

434 # The third column (pixel values) must match exactly if the 

435 # corresponding pixel values are read, regardless of the coordinate. 

436 np.testing.assert_array_equal(goodpix, goodpix0) 

437 np.testing.assert_array_equal(badpix, badpix0) 

438 

439 # Update one of the goodpix arrays from image and check that it is 

440 # invariant. It would be invariant if it handles the pixel coordinates 

441 # consistently. 

442 ctUtils.updateArrayFromImage(goodpix0, self.maskedimage.image) 

443 np.testing.assert_array_equal(goodpix, goodpix0) 

444 

445 # There should be some nan values right now. 

446 self.assertFalse(np.isfinite(self.maskedimage.image.array).all()) 

447 

448 # There should not be any nan values if the image is updated correctly. 

449 badpix[:, 2] = -99 

450 ctUtils.updateImageFromArray(self.maskedimage.image, badpix) 

451 self.assertTrue(np.isfinite(self.maskedimage.image.array).all()) 

452 

453 

454def setup_module(module): 

455 lsst.utils.tests.init() 

456 

457 

458class MemoryTestCase(lsst.utils.tests.MemoryTestCase): 

459 pass 

460 

461 

462if __name__ == "__main__": 462 ↛ 463line 462 didn't jump to line 463 because the condition on line 462 was never true

463 lsst.utils.tests.init() 

464 unittest.main()