Coverage for tests / test_trailedEdgeSources.py: 16%
188 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-18 08:52 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-18 08:52 +0000
1#
2# This file is part of meas_extensions_trailedSources.
3#
4# Developed for the LSST Data Management System.
5# This product includes software developed by the LSST Project
6# (http://www.lsst.org).
7# See the COPYRIGHT file at the top-level directory of this distribution
8# for details of code ownership.
9#
10# This program is free software: you can redistribute it and/or modify
11# it under the terms of the GNU General Public License as published by
12# the Free Software Foundation, either version 3 of the License, or
13# (at your option) any later version.
14#
15# This program is distributed in the hope that it will be useful,
16# but WITHOUT ANY WARRANTY; without even the implied warranty of
17# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18# GNU General Public License for more details.
19#
20# You should have received a copy of the GNU General Public License
21# along with this program. If not, see <http://www.gnu.org/licenses/>.
22#
24import numpy as np
25import unittest
26import lsst.utils.tests
27import lsst.meas.extensions.trailedSources
28from lsst.meas.base.tests import AlgorithmTestCase
29from lsst.utils.tests import classParameters
30from lsst.geom import Point2I, Point2D, Box2I, Extent2I
31from unittest.mock import patch
33# Trailed-source length, angle, and centroid coordinates.
34trail_lengths = np.array([5, 5, 10, 4])
35trail_angles = np.array([100, 0, 5, 4])
36trail_x_coords = np.array([100, 20, -20, 90])
37trail_y_coords = np.array([100, 20, -30, 100])
40class TrailedEdgeSource:
41 """Holds a set of true trail parameters.
42 """
44 def __init__(self, instFlux, length, angle, xc, yc):
45 self.instFlux = instFlux
46 self.length = length
47 self.angle = angle
48 self.center = Point2D(xc, yc)
49 self.x0 = xc - length / 2 * np.cos(angle)
50 self.y0 = yc - length / 2 * np.sin(angle)
51 self.x1 = xc + length / 2 * np.cos(angle)
52 self.y1 = yc + length / 2 * np.sin(angle)
55class TrailedTaskSetup:
57 def makeTrailedSourceMeasurementTask(self, plugin=None, dependencies=(),
58 config=None, schema=None,
59 algMetadata=None):
60 """Set up a measurement task for a trailed source plugin.
61 """
62 config = self.makeSingleFrameMeasurementConfig(plugin=plugin,
63 dependencies=dependencies)
65 # Make sure the shape slot is base_SdssShape
66 config.slots.shape = "base_SdssShape"
67 return self.makeSingleFrameMeasurementTask(plugin=plugin,
68 dependencies=dependencies,
69 config=config,
70 schema=schema,
71 algMetadata=algMetadata)
74# "Extend" meas.base.tests.TestDataset
75class TrailedTestDataset(lsst.meas.base.tests.TestDataset):
76 """A dataset for testing trailed source measurements.
77 Given a `TrailedSource`, construct a record of the true values and an
78 Exposure.
79 """
81 def __init__(self, bbox, threshold=10.0, exposure=None, **kwds):
83 super().__init__(bbox, threshold, exposure, **kwds)
85 def addTrailedSource(self, trail, edge=True):
86 """Add a trailed source to the simulation.
88 Re-implemented version of
89 `lsst.meas.base.tests.TestDataset.addSource`. Numerically integrates a
90 Gaussian PSF over a line to obtain an image of a trailed source and
91 adds edge flags to the image.
92 """
93 record = self.catalog.addNew()
94 record.set(self.keys["centroid"], trail.center)
95 rng = np.random.default_rng(32)
96 covariance = rng.normal(0, 0.1, 4).reshape(2, 2)
97 covariance[0, 1] = covariance[1, 0]
98 record.set(self.keys["centroid_sigma"], covariance.astype(np.float32))
99 record.set(self.keys["shape"], self.psfShape)
100 record.set(self.keys["isStar"], False)
102 # Sum the psf at each
103 numIter = int(2 * trail.length)
104 xp = np.linspace(trail.x0, trail.x1, num=numIter)
105 yp = np.linspace(trail.y0, trail.y1, num=numIter)
106 for (x, y) in zip(xp, yp):
107 pt = Point2D(x, y)
108 im = self.drawGaussian(self.exposure.getBBox(), trail.instFlux,
109 lsst.afw.geom.Ellipse(self.psfShape, pt))
110 self.exposure.getMaskedImage().getImage().getArray()[:, :] += im.getArray()
112 planes = self.exposure.mask.getMaskPlaneDict()
113 dim = self.exposure.getBBox().getDimensions()
115 # Add edge flags to the first and last 20 columns and rows.
116 if edge:
117 for y in range(20):
118 self.exposure.mask.setMaskPlaneValues(planes['EDGE'], 0, dim[0] - 1, y)
119 self.exposure.mask.setMaskPlaneValues(planes['EDGE'], 0, dim[0] - 1, y + dim[1] - 20)
121 for y in range(dim[1]):
122 self.exposure.mask.setMaskPlaneValues(planes['EDGE'], 0, 20, y)
123 self.exposure.mask.setMaskPlaneValues(planes['EDGE'], dim[0] - 20, dim[0] - 1, y)
125 totFlux = self.exposure.image.array.sum()
126 self.exposure.image.array /= totFlux
127 self.exposure.image.array *= trail.instFlux
129 record.set(self.keys["instFlux"], trail.instFlux)
130 self._installFootprint(record, self.exposure.getImage())
132 return record, self.exposure.getImage()
135# Following from test_trailedSources
136@classParameters(length=trail_lengths, theta=trail_angles, xc=trail_x_coords, yc=trail_y_coords)
137class TrailedEdgeSourcesTestCase(AlgorithmTestCase, lsst.utils.tests.TestCase):
138 """ Test if ext_trailedSources_Naive_flag_edge is set correctly.
140 Given a `TrailedSource`, test if the edge flag is set correctly in the
141 source catalog after the
142 `lsst.meas.extensions.trailedSources.Naive.Plugin.makeTrailedSourceMeasurementTask`
143 has been run on the source catalog.
144 """
146 def setUp(self):
147 self.center = Point2D(50.1, 49.8)
148 self.bbox = Box2I(lsst.geom.Point2I(-20, -30), Extent2I(140, 160))
149 self.dataset = TrailedTestDataset(self.bbox)
151 # Trail which extends into edge pixels
152 self.trail = TrailedEdgeSource(100000.0, self.length, self.theta, self.xc, self.yc)
153 self.dataset.addTrailedSource(self.trail)
155 def testEdgeFlag(self):
156 """Test if edge flags are correctly set in NaivePlugin.py
158 Given a `TrailedTestDataset`, run the NaivePlugin measurement and
159 check that the trailed sources have the edge flag set. [100,100] does
160 not contain any edge pixels and should not have a flag set, [20,20]
161 crosses into the edge region on only one side and should have the edge
162 flag set, and [-20,-30] extends off the chip and should have the edge
163 flag set.
164 """
165 # Set up and run Naive measurement.
166 task = TrailedTaskSetup.makeTrailedSourceMeasurementTask(self,
167 plugin="ext_trailedSources_Naive",
168 dependencies=("base_SdssCentroid",
169 "base_SdssShape")
170 )
171 exposure, catalog = self.dataset.realize(5.0, task.schema, randomSeed=0)
172 task.run(catalog, exposure)
173 record = catalog[0]
175 # Check that x0, y0 or x1, y1 is flagged as an edge pixel
176 x1 = int(record['ext_trailedSources_Naive_x1'])
177 y1 = int(record['ext_trailedSources_Naive_y1'])
178 x0 = int(record['ext_trailedSources_Naive_x0'])
179 y0 = int(record['ext_trailedSources_Naive_y0'])
181 # Test Case with no edge pixels
182 if record['truth_x'] == 100:
183 # These are used to ensure the mask pixels the trailed sources are
184 # compared with have the correct flags set
185 begin_edge_pixel_set = (exposure.mask[Point2I(x0, y0)] & exposure.mask.getPlaneBitMask(
186 'EDGE') != 0)
187 end_edge_pixel_set = (exposure.mask[Point2I(x1, y1)] & exposure.mask.getPlaneBitMask(
188 'EDGE') != 0)
190 self.assertFalse(begin_edge_pixel_set)
191 self.assertTrue(end_edge_pixel_set)
193 # Make sure measurement edge flag is set, but Naive_flag is not.
194 # A failed trailed source measurement with the edge flag
195 # set means the edge flag was set despite the measurement
196 # failing.
197 self.assertTrue(record.get("ext_trailedSources_Naive_flag_edge"))
198 self.assertFalse(record.get("ext_trailedSources_Naive_flag"))
199 self.assertFalse(record.get("ext_trailedSources_Naive_flag_off_image"))
200 self.assertFalse(record.get("ext_trailedSources_Naive_flag_nan"))
202 x1 = int(record['ext_trailedSources_Naive_x1'])
203 y1 = int(record['ext_trailedSources_Naive_y1'])
205 self.assertFalse(exposure.mask[Point2I(x0, y0)] & exposure.mask.getPlaneBitMask('EDGE') != 0)
206 self.assertTrue(exposure.mask[Point2I(x1, y1)] & exposure.mask.getPlaneBitMask('EDGE') != 0)
208 # Test case with one end of trail containing edge pixels
209 elif record['truth_x'] == 20:
210 begin_edge_pixel_set = (exposure.mask[Point2I(x0, y0)] & exposure.mask.getPlaneBitMask(
211 'EDGE') != 0)
212 end_edge_pixel_set = (exposure.mask[Point2I(x1, y1)] & exposure.mask.getPlaneBitMask(
213 'EDGE') != 0)
215 self.assertFalse(begin_edge_pixel_set)
216 self.assertFalse(end_edge_pixel_set)
218 # Make sure measurement Naive_flag_edge and Naive_flag not set
219 self.assertFalse(record.get("ext_trailedSources_Naive_flag_edge"))
220 self.assertFalse(record.get("ext_trailedSources_Naive_flag"))
221 self.assertFalse(record.get("ext_trailedSources_Naive_flag_off_image"))
222 self.assertFalse(record.get("ext_trailedSources_Naive_flag_nan"))
224 x1 = int(record['ext_trailedSources_Naive_x1'])
225 y1 = int(record['ext_trailedSources_Naive_y1'])
227 self.assertFalse(exposure.mask[Point2I(x0, y0)] & exposure.mask.getPlaneBitMask('EDGE') != 0)
228 self.assertFalse(exposure.mask[Point2I(x1, y1)] & exposure.mask.getPlaneBitMask('EDGE') != 0)
230 # Test case trail fully contained
231 elif record["truth_x"] == 90:
232 begin_edge_pixel_set = (exposure.mask[Point2I(x0, y0)] & exposure.mask.getPlaneBitMask(
233 'EDGE') != 0)
234 end_edge_pixel_set = (exposure.mask[Point2I(x1, y1)] & exposure.mask.getPlaneBitMask(
235 'EDGE') != 0)
237 self.assertFalse(begin_edge_pixel_set)
238 self.assertFalse(end_edge_pixel_set)
240 # Make sure measurement Naive_flag_edge and Naive_flag not set
241 self.assertFalse(record.get("ext_trailedSources_Naive_flag_edge"))
242 self.assertFalse(record.get("ext_trailedSources_Naive_flag"))
243 self.assertFalse(record.get("ext_trailedSources_Naive_flag_off_image"))
244 self.assertFalse(record.get("ext_trailedSources_Naive_flag_nan"))
246 x1 = int(record['ext_trailedSources_Naive_x1'])
247 y1 = int(record['ext_trailedSources_Naive_y1'])
249 self.assertFalse(exposure.mask[Point2I(x0, y0)] & exposure.mask.getPlaneBitMask('EDGE') != 0)
250 self.assertFalse(exposure.mask[Point2I(x1, y1)] & exposure.mask.getPlaneBitMask('EDGE') != 0)
252 # Test case with trailed source extending off chip.
253 else:
254 self.assertEqual(record['truth_x'], -20)
255 self.assertTrue(record.get("ext_trailedSources_Naive_flag_edge"))
256 self.assertFalse(record.get("ext_trailedSources_Naive_flag"))
257 self.assertTrue(record.get("ext_trailedSources_Naive_flag_off_image"))
258 self.assertFalse(record.get("ext_trailedSources_Naive_flag_nan"))
260 def testNanFlag(self):
261 """Test if nan flags are correctly set in NaivePlugin.py
263 Given a `TrailedTestDataset`, run the NaivePlugin measurement which
264 has trailed sources where one of the end point values results in a
265 nan.
266 """
267 # Set up and run Naive measurement.
268 task = TrailedTaskSetup.makeTrailedSourceMeasurementTask(self,
269 plugin="ext_trailedSources_Naive",
270 dependencies=("base_SdssCentroid",
271 "base_SdssShape")
272 )
274 exposure, catalog = self.dataset.realize(5.0, task.schema, randomSeed=0)
276 original_check_trail_function = task.plugins['ext_trailedSources_Naive'].check_trail
277 # Used to simulate a trailed source where one of the coordinates is a
278 # nan.
280 def check_trail_mock(*args, **kwargs):
281 measRecord = args[0]
282 exposure = args[1]
283 x0 = args[2]
284 y0 = args[3]
285 x1 = args[4]
286 y1 = np.nan # overriding to test NAN flagging
287 length = args[6]
288 measRecord['ext_trailedSources_Naive_y1'] = np.nan
289 return original_check_trail_function(measRecord, exposure, x0, y0, x1, y1, length)
291 # This patcher mocks check_trail so that one of the trailed sources
292 # includes it is checking contains a nan at one of its endpoints.
293 patcher = patch(
294 'lsst.meas.extensions.trailedSources.NaivePlugin.SingleFrameNaiveTrailPlugin.check_trail',
295 side_effect=check_trail_mock)
296 patcher.start()
297 task.run(catalog, exposure)
298 record = catalog[0]
300 # Test Case with no edge pixels, but one is set to nan.
301 if record['truth_x'] == 100:
302 self.assertFalse(record.get("ext_trailedSources_Naive_flag_edge"))
303 self.assertFalse(record.get("ext_trailedSources_Naive_flag"))
304 self.assertFalse(record.get("ext_trailedSources_Naive_flag_off_image"))
305 self.assertTrue(record.get("ext_trailedSources_Naive_flag_nan"))
307 # Test case with one end of trail containing edge pixels, but nan is
308 # set so edge does not end up set.
309 elif record['truth_x'] == 20:
311 self.assertFalse(record.get("ext_trailedSources_Naive_flag_edge"))
312 self.assertFalse(record.get("ext_trailedSources_Naive_flag"))
313 self.assertFalse(record.get("ext_trailedSources_Naive_flag_off_image"))
314 self.assertTrue(record.get("ext_trailedSources_Naive_flag_nan"))
316 # Test case trail fully contained, but contains one nan. Only nan flag
317 # is set.
318 elif record["truth_x"] == 90:
320 self.assertFalse(record.get("ext_trailedSources_Naive_flag_edge"))
321 self.assertFalse(record.get("ext_trailedSources_Naive_flag"))
322 self.assertFalse(record.get("ext_trailedSources_Naive_flag_off_image"))
323 self.assertTrue(record.get("ext_trailedSources_Naive_flag_nan"))
325 # Test case with trailed source extending off chip. One coordinate
326 # is off image the other is nan, so edge, off_image, and nan should
327 # be set.
328 else:
329 self.assertEqual(record['truth_x'], -20)
330 self.assertTrue(record.get("ext_trailedSources_Naive_flag_edge"))
331 self.assertFalse(record.get("ext_trailedSources_Naive_flag"))
332 self.assertTrue(record.get("ext_trailedSources_Naive_flag_off_image"))
333 self.assertTrue(record.get("ext_trailedSources_Naive_flag_nan"))
335 patcher.stop()
338@classParameters(length=[10], theta=[5], xc=[-20], yc=[-30])
339class TrailedEdgeSourcesOffImageTest(AlgorithmTestCase, lsst.utils.tests.TestCase):
340 """ Test if ext_trailedSources_Naive_flag_edge is set correctly.
342 Given a `TrailedSource`, test if the edge flag is set correctly in the
343 source catalog after the
344 'lsst.meas.extensions.trailedSources.Naive.Plugin.makeTrailedSourceMeasurementTask'
345 has been run on the source catalog.
346 """
348 def setUp(self):
349 self.center = Point2D(50.1, 49.8)
350 self.bbox = Box2I(lsst.geom.Point2I(-20, -30), Extent2I(140, 160))
351 self.dataset = TrailedTestDataset(self.bbox)
353 # Trail which extends into edge pixels
354 self.trail = TrailedEdgeSource(100000.0, self.length, self.theta,
355 self.xc, self.yc)
356 self.dataset.addTrailedSource(self.trail, edge=False)
358 def tearDown(self):
359 del self.center
360 del self.bbox
361 del self.trail
362 del self.dataset
364 def testOffImageEdgeFlag(self):
365 """Test if edge flags are correctly set in NaivePlugin.py when source
366 extends off the the image.
368 Given a `TrailedTestDataset`, run the NaivePlugin measurement and
369 check that the edge flag set when a source extends off the chip.
370 Edge pixels are not set in this test.
371 """
372 # Set up and run Naive measurement.
373 task = TrailedTaskSetup.makeTrailedSourceMeasurementTask(self,
374 plugin="ext_trailedSources_Naive",
375 dependencies=("base_SdssCentroid",
376 "base_SdssShape")
377 )
378 exposure, catalog = self.dataset.realize(5.0, task.schema, randomSeed=0)
379 task.run(catalog, exposure)
380 record = catalog[0]
382 self.assertTrue(record.get("ext_trailedSources_Naive_flag_edge"))
383 self.assertFalse(record.get("ext_trailedSources_Naive_flag"))
386class TestMemory(lsst.utils.tests.MemoryTestCase):
387 pass
390def setup_module(module):
391 lsst.utils.tests.init()
394if __name__ == "__main__": 394 ↛ 395line 394 didn't jump to line 395 because the condition on line 394 was never true
395 lsst.utils.tests.init()
396 unittest.main()