Coverage for tests / test_cloughTocher2DInterpolate.py: 16%
158 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-18 08:51 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-18 08:51 +0000
1# This file is part of meas_algorithms.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
23import unittest
25from typing import Iterable
26from itertools import product
27import numpy as np
29import lsst.utils.tests
30import lsst.geom
31import lsst.afw.image as afwImage
32from lsst.meas.algorithms.cloughTocher2DInterpolator import (
33 CloughTocher2DInterpolateTask,
34)
35from lsst.meas.algorithms import CloughTocher2DInterpolatorUtils as ctUtils
38class CloughTocher2DInterpolateTestCase(lsst.utils.tests.TestCase):
39 """Test the CloughTocher2DInterpolateTask."""
41 def setUp(self):
42 super().setUp()
44 self.maskedimage = afwImage.MaskedImageF(100, 121)
45 for x in range(100):
46 for y in range(121):
47 self.maskedimage[x, y] = (3 * y + x * 5, 0, 1.0)
49 # Clone the maskedimage so we can compare it after running the task.
50 self.reference = self.maskedimage.clone()
52 # Set some central pixels as SAT
53 sliceX, sliceY = slice(30, 35), slice(40, 45)
54 self.maskedimage.mask[sliceX, sliceY] = afwImage.Mask.getPlaneBitMask("SAT")
55 self.maskedimage.image[sliceX, sliceY] = np.nan
56 # Put nans here to make sure interp is done ok
58 # Set an entire column as BAD
59 self.maskedimage.mask[54:55, :] = afwImage.Mask.getPlaneBitMask("BAD")
60 self.maskedimage.image[54:55, :] = np.nan
62 # Set an entire row as BAD
63 self.maskedimage.mask[:, 110:111] = afwImage.Mask.getPlaneBitMask("BAD")
64 self.maskedimage.image[:, 110:111] = np.nan
66 # Set an asymmetric region as BAD
67 self.maskedimage.mask[41:42, 63:66] = afwImage.Mask.getPlaneBitMask("SAT")
68 self.maskedimage.image[41:42, 63:66] = np.nan
69 self.maskedimage.mask[42:43, 63:65] = afwImage.Mask.getPlaneBitMask("SAT")
70 self.maskedimage.image[42:43, 63:65] = np.nan
71 self.maskedimage.mask[44, 63] = afwImage.Mask.getPlaneBitMask("SAT")
72 self.maskedimage.image[44, 63] = np.nan
74 # Set a diagonal set of pixels as CR
75 for i in range(74, 78):
76 self.maskedimage.mask[i, i] = afwImage.Mask.getPlaneBitMask("CR")
77 self.maskedimage.image[i, i] = np.nan
79 # Set one of the edges as EDGE
80 self.maskedimage.mask[0:1, :] = afwImage.Mask.getPlaneBitMask("EDGE")
81 self.maskedimage.image[0:1, :] = np.nan
83 # Set a smaller streak at the edge
84 self.maskedimage.mask[25:28, 0:1] = afwImage.Mask.getPlaneBitMask("EDGE")
85 self.maskedimage.image[25:28, 0:1] = np.nan
87 # Update the reference image's mask alone, so we can compare them after
88 # running the task.
89 self.reference.mask.array[:, :] = self.maskedimage.mask.array
91 # Create a noise image
92 self.noise = self.maskedimage.clone()
93 rng = np.random.Generator(np.random.MT19937(5))
94 self.noise.image.array[:, :] = rng.normal(size=self.noise.image.array.shape)
96 @lsst.utils.tests.methodParametersProduct(n_runs=(1, 2), flipXY=(False, True))
97 def test_interpolation(self, n_runs: int, flipXY: bool):
98 """Test that the interpolation is done correctly.
100 Parameters
101 ----------
102 n_runs : `int`
103 Number of times to run the task. Running the task more than once
104 should have no effect.
105 flipXY : `bool`
106 Whether to set the flipXY config parameter to True.
107 """
108 config = CloughTocher2DInterpolateTask.ConfigClass()
109 config.badMaskPlanes = (
110 "BAD",
111 "SAT",
112 "CR",
113 "EDGE",
114 )
115 config.fillValue = 0.5
116 config.flipXY = flipXY
117 task = CloughTocher2DInterpolateTask(config)
118 for n in range(n_runs):
119 task.run(self.maskedimage)
121 # Assert that the mask and the variance planes remain unchanged.
122 self.assertImagesEqual(self.maskedimage.variance, self.reference.variance)
123 self.assertMasksEqual(self.maskedimage.mask, self.reference.mask)
125 # Check that the long streak of bad pixels have been replaced with the
126 # fillValue, but not the short streak.
127 np.testing.assert_array_equal(self.maskedimage.image[0:1, :].array, config.fillValue)
128 with self.assertRaises(AssertionError):
129 np.testing.assert_array_equal(self.maskedimage.image[25:28, 0:1].array, config.fillValue)
131 # Check that interpolated pixels are close to the reference (original),
132 # and that none of them is still NaN.
133 self.assertTrue(np.isfinite(self.maskedimage.image.array).all())
134 self.assertImagesAlmostEqual(
135 self.maskedimage.image[1:, :],
136 self.reference.image[1:, :],
137 rtol=1e-05,
138 atol=1e-08,
139 )
141 @lsst.utils.tests.methodParametersProduct(
142 pass_badpix=(True, False),
143 pass_goodpix=(True, False),
144 flipXY=(False, True),
145 )
146 def test_interpolation_with_noise(
147 self, pass_badpix: bool = True, pass_goodpix: bool = True, flipXY: bool = False
148 ):
149 """Test that we can reuse the badpix and goodpix.
151 Parameters
152 ----------
153 pass_badpix : `bool`
154 Whether to pass the badpix to the task?
155 pass_goodpix : `bool`
156 Whether to pass the goodpix to the task?
157 flipXY : `bool`
158 Whether to set the flipXY config parameter to True.
159 """
161 config = CloughTocher2DInterpolateTask.ConfigClass()
162 config.flipXY = flipXY
163 config.badMaskPlanes = (
164 "BAD",
165 "SAT",
166 "CR",
167 "EDGE",
168 )
169 task = CloughTocher2DInterpolateTask(config)
171 badpix, goodpix = task.run(self.noise)
172 task.run(
173 self.maskedimage,
174 badpix=(badpix if pass_badpix else None),
175 goodpix=(goodpix if pass_goodpix else None),
176 )
178 # Check that the long streak of bad pixels by the edge have been
179 # replaced with fillValue, but not the short streak.
180 np.testing.assert_array_equal(self.maskedimage.image[0:1, :].array, config.fillValue)
181 with self.assertRaises(AssertionError):
182 np.testing.assert_array_equal(self.maskedimage.image[25:28, 0:1].array, config.fillValue)
184 # Check that interpolated pixels are close to the reference (original),
185 # and that none of them is still NaN.
186 self.assertTrue(np.isfinite(self.maskedimage.image.array).all())
187 self.assertImagesAlmostEqual(
188 self.maskedimage.image[1:, :],
189 self.reference.image[1:, :],
190 rtol=1e-05,
191 atol=1e-08,
192 )
194 def test_interpolation_with_flipXY(self):
195 """Test that the interpolation with both values for flipXY."""
196 config = CloughTocher2DInterpolateTask.ConfigClass()
197 config.badMaskPlanes = (
198 "BAD",
199 "SAT",
200 "CR",
201 "EDGE",
202 )
203 config.flipXY = True
204 task = CloughTocher2DInterpolateTask(config)
205 badpix_true, goodpix_true = task.run(self.maskedimage)
207 config.flipXY = False
208 task = CloughTocher2DInterpolateTask(config)
209 badpix_false, goodpix_false = task.run(self.maskedimage)
211 # Check that the locations of the bad and the good pixels, and the good
212 # pixel values themselves are identical.
213 np.testing.assert_array_equal(goodpix_false, goodpix_true)
214 np.testing.assert_array_equal(badpix_false[:, :2], badpix_true[:, :2])
216 # Check that the interpolated values at at least approximately equal.
217 np.testing.assert_array_equal(badpix_false[:, 2], badpix_true[:, 2])
220class CloughTocher2DInterpolatorUtilsTestCase(CloughTocher2DInterpolateTestCase):
221 """Test the CloughTocher2DInterpolatorUtils."""
223 @classmethod
224 def find_good_pixels_around_bad_pixels(
225 cls,
226 image: afwImage.MaskedImage,
227 maskPlanes: Iterable[str],
228 *,
229 max_window_extent: lsst.geom.Extent2I,
230 badpix: set | None = None,
231 goodpix: dict | None = None,
232 ):
233 """Find the location of bad pixels, and neighboring good pixels.
235 Parameters
236 ----------
237 image : `~lsst.afw.image.MaskedImage`
238 Image from which to find the bad and the good pixels.
239 maskPlanes : `list` [`str`]
240 List of mask planes to consider as bad pixels.
241 max_window_extent : `lsst.geom.Extent2I`
242 Maximum extent of the window around a bad pixel to consider when
243 looking for good pixels.
244 badpix : `list` [`tuple` [`int`, `int`]], optional
245 A known list of bad pixels. If provided, the function does not look for
246 any additional bad pixels, but it verifies that the provided
247 coordinates correspond to bad pixels. If an input``badpix`` is not
248 found to be bad as specified by ``maskPlanes``, an exception is raised.
249 goodpix : `dict` [`tuple` [`int`, `int`], `float`], optional
250 A known mapping of the coordinates of good pixels to their values, to
251 which any newly found good pixels locations will be added, and the
252 values (even for existing items) will be updated.
254 Returns
255 -------
256 badpix : `list` [`tuple` [`int`, `int`]]
257 The coordinates of the bad pixels. If ``badpix`` was provided as an
258 input argument, the returned quantity is the same as the input.
259 goodpix : `dict` [`tuple` [`int`, `int`], `float`]
260 Updated mapping of the coordinates of good pixels to their values.
262 Raises
263 ------
264 RuntimeError
265 If a pixel passed in as ``goodpix`` is found to be bad as specified by
266 ``maskPlanes``.
267 ValueError
268 If an input ``badpix`` is not found to be bad as specified by
269 ``maskPlanes``.
270 """
272 bbox = image.getBBox()
273 if badpix is None:
274 iterator = product(range(bbox.minX, bbox.maxX + 1), range(bbox.minY, bbox.maxY + 1))
275 badpix = set()
276 else:
277 iterator = badpix
279 if goodpix is None:
280 goodpix = {}
282 for x, y in iterator:
283 if image.mask[x, y] & afwImage.Mask.getPlaneBitMask(maskPlanes):
284 if (x, y) in goodpix:
285 raise RuntimeError(
286 f"Pixel ({x}, {y}) is bad as specified by maskPlanes {maskPlanes} but "
287 "passed in as goodpix"
288 )
289 badpix.add((x, y))
290 window = lsst.geom.Box2I.makeCenteredBox(
291 center=lsst.geom.Point2D(x, y), # center has to be a Point2D instance.
292 size=max_window_extent,
293 )
294 # Restrict to the bounding box of the image.
295 window.clip(bbox)
297 for xx, yy in product(
298 range(window.minX, window.maxX + 1),
299 range(window.minY, window.maxY + 1),
300 ):
301 if not (image.mask[xx, yy] & afwImage.Mask.getPlaneBitMask(maskPlanes)):
302 goodpix[(xx, yy)] = image.image[xx, yy]
303 elif (x, y) in badpix:
304 # If (x, y) is in badpix, but did not get flagged as bad,
305 # raise an exception.
306 raise ValueError(f"Pixel ({x}, {y}) is not bad as specified by maskPlanes {maskPlanes}")
308 return badpix, goodpix
310 def test_parity(self, buffer=4):
311 """Test that the C++ implementation gives the same results as the
312 pure-Python implementation.
314 Parameters
315 ----------
316 buffer : `int`, optional
317 Same as the buffer parameter in `findGoodPixelsAroundBadPixels`.
318 """
319 bpix, gpix = ctUtils.findGoodPixelsAroundBadPixels(
320 self.maskedimage, ["BAD", "SAT", "CR", "EDGE"], buffer=buffer
321 )
322 badpix, goodpix = self.find_good_pixels_around_bad_pixels(
323 self.maskedimage,
324 ["BAD", "SAT", "CR", "EDGE"],
325 max_window_extent=lsst.geom.Extent2I(2 * buffer + 1, 2 * buffer + 1),
326 )
328 self.assertEqual(len(goodpix), gpix.shape[0])
329 for row in gpix:
330 x, y, val = int(row[0]), int(row[1]), row[2]
331 self.assertEqual(goodpix[(x, y)], val)
333 self.assertEqual(set(zip(bpix[:, 0], bpix[:, 1])), badpix)
335 def test_findGoodPixelsAroundBadPixels(self):
336 """Test the findGoodPixelsAroundBadPixels utility functino."""
337 badpix, goodpix = ctUtils.findGoodPixelsAroundBadPixels(
338 self.maskedimage,
339 ["BAD", "SAT", "CR", "EDGE"],
340 buffer=4,
341 )
343 # Check that badpix and goodpix have no overlaps
344 badSet = set(zip(badpix[:, 0], badpix[:, 1]))
345 goodSet = set(zip(goodpix[:, 0], goodpix[:, 1]))
346 self.assertEqual(len(badSet & goodSet), 0)
348 # buffer = 0 should give no goodpix, but same badpix
349 badpix0, goodpix0 = ctUtils.findGoodPixelsAroundBadPixels(
350 self.maskedimage,
351 ["BAD", "SAT", "CR", "EDGE"],
352 buffer=0,
353 )
355 self.assertEqual(len(goodpix0), 0)
356 np.testing.assert_array_equal(badpix0, badpix)
358 # For large enough buffer, badpix and goodpix should be mutually
359 # exclusive and complete. This also checks that edges are handled.
360 badpix, goodpix = ctUtils.findGoodPixelsAroundBadPixels(
361 self.maskedimage,
362 ["BAD", "SAT", "CR", "EDGE"],
363 buffer=251,
364 )
366 self.assertEqual(
367 len(badpix) + len(goodpix),
368 self.maskedimage.getWidth() * self.maskedimage.getHeight(),
369 )
371 def test_update_functions(self):
372 """Test updateArrayFromImage and updateImageFromArray behave as
373 expected.
374 """
375 badpix, _ = ctUtils.findGoodPixelsAroundBadPixels(
376 self.maskedimage,
377 ["BAD", "SAT", "CR", "EDGE"],
378 buffer=3,
379 )
381 # Ensure that maskedimage and reference are not the same initially.
382 with self.assertRaises(AssertionError):
383 self.assertImagesEqual(self.maskedimage.image, self.reference.image)
385 # Update badpix values from the reference image
386 ctUtils.updateArrayFromImage(badpix, self.reference.image)
388 # Update maskedimage from badpix values
389 ctUtils.updateImageFromArray(self.maskedimage.image, badpix)
391 # maskedimage and reference image should now to be identifical
392 self.assertImagesEqual(self.maskedimage.image, self.reference.image)
394 @lsst.utils.tests.methodParametersProduct(x0=(0, 23, -53), y0=(0, 47, -31))
395 def test_origin(self, x0=23, y0=47):
396 """Test that we get consistent results with arbitrary image origins.
398 Parameters
399 ----------
400 x0 : `int`
401 The origin of the image along the horizontal axis.
402 y0 : `int`
403 The origin of the image along the vertical axis.
404 """
405 # Calling setUp explicitly becomes necessary, as we change in the image
406 # in-place and need to reset to the original state when running with
407 # a different set of parameters.
408 self.setUp()
409 badpix0, goodpix0 = ctUtils.findGoodPixelsAroundBadPixels(
410 self.maskedimage,
411 ["BAD", "SAT", "CR", "EDGE"],
412 buffer=4,
413 )
415 # Check that badpix and goodpix have no overlaps
416 badSet = set(zip(badpix0[:, 0], badpix0[:, 1]))
417 goodSet = set(zip(goodpix0[:, 0], goodpix0[:, 1]))
418 self.assertEqual(len(badSet & goodSet), 0)
420 # Set a non-trivial xy0 for the maskedimage
421 self.maskedimage.setXY0(lsst.geom.Point2I(x0, y0))
422 badpix, goodpix = ctUtils.findGoodPixelsAroundBadPixels(
423 self.maskedimage,
424 ["BAD", "SAT", "CR", "EDGE"],
425 buffer=4,
426 )
428 # Adjust the x and y columns with origin, so we can compare them.
429 badpix0[:, 0] += x0
430 goodpix0[:, 0] += x0
431 badpix0[:, 1] += y0
432 goodpix0[:, 1] += y0
434 # The third column (pixel values) must match exactly if the
435 # corresponding pixel values are read, regardless of the coordinate.
436 np.testing.assert_array_equal(goodpix, goodpix0)
437 np.testing.assert_array_equal(badpix, badpix0)
439 # Update one of the goodpix arrays from image and check that it is
440 # invariant. It would be invariant if it handles the pixel coordinates
441 # consistently.
442 ctUtils.updateArrayFromImage(goodpix0, self.maskedimage.image)
443 np.testing.assert_array_equal(goodpix, goodpix0)
445 # There should be some nan values right now.
446 self.assertFalse(np.isfinite(self.maskedimage.image.array).all())
448 # There should not be any nan values if the image is updated correctly.
449 badpix[:, 2] = -99
450 ctUtils.updateImageFromArray(self.maskedimage.image, badpix)
451 self.assertTrue(np.isfinite(self.maskedimage.image.array).all())
454def setup_module(module):
455 lsst.utils.tests.init()
458class MemoryTestCase(lsst.utils.tests.MemoryTestCase):
459 pass
462if __name__ == "__main__": 462 ↛ 463line 462 didn't jump to line 463 because the condition on line 462 was never true
463 lsst.utils.tests.init()
464 unittest.main()