Coverage for tests / test_trailedSources.py: 21%
225 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-30 08:59 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-30 08:59 +0000
1#
2# This file is part of meas_extensions_trailedSources.
3#
4# Developed for the LSST Data Management System.
5# This product includes software developed by the LSST Project
6# (http://www.lsst.org).
7# See the COPYRIGHT file at the top-level directory of this distribution
8# for details of code ownership.
9#
10# This program is free software: you can redistribute it and/or modify
11# it under the terms of the GNU General Public License as published by
12# the Free Software Foundation, either version 3 of the License, or
13# (at your option) any later version.
14#
15# This program is distributed in the hope that it will be useful,
16# but WITHOUT ANY WARRANTY; without even the implied warranty of
17# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18# GNU General Public License for more details.
19#
20# You should have received a copy of the GNU General Public License
21# along with this program. If not, see <http://www.gnu.org/licenses/>.
22#
24import numpy as np
25import unittest
26import lsst.utils.tests
27import lsst.meas.extensions.trailedSources
28from scipy.optimize import check_grad
29import lsst.afw.table as afwTable
30from lsst.meas.base.tests import AlgorithmTestCase
31from lsst.meas.extensions.trailedSources import SingleFrameNaiveTrailPlugin
32from lsst.meas.extensions.trailedSources import VeresModel
33from lsst.meas.extensions.trailedSources.utils import getMeasurementCutout
34from lsst.utils.tests import classParameters
37# Trailed-source length, angle, and centroid.
38rng = np.random.default_rng(432)
39nTrails = 50
40Ls = rng.uniform(2, 20, nTrails)
41thetas = rng.uniform(0, 2*np.pi, nTrails)
42xcs = rng.uniform(0, 100, nTrails)
43ycs = rng.uniform(0, 100, nTrails)
46class TrailedSource:
47 """Holds a set of true trail parameters.
48 """
50 def __init__(self, instFlux, length, angle, xc, yc):
51 self.instFlux = instFlux
52 self.length = length
53 self.angle = angle
54 self.center = lsst.geom.Point2D(xc, yc)
55 self.x0 = xc - length/2 * np.cos(angle)
56 self.y0 = yc - length/2 * np.sin(angle)
57 self.x1 = xc + length/2 * np.cos(angle)
58 self.y1 = yc + length/2 * np.sin(angle)
61# "Extend" meas.base.tests.TestDataset
62class TrailedTestDataset(lsst.meas.base.tests.TestDataset):
63 """A dataset for testing trailed source measurements.
64 Given a `TrailedSource`, construct a record of the true values and an
65 Exposure.
66 """
68 def __init__(self, bbox, threshold=10.0, exposure=None, **kwds):
69 super().__init__(bbox, threshold, exposure, **kwds)
71 def addTrailedSource(self, trail):
72 """Add a trailed source to the simulation.
73 'Re-implemented' version of
74 `lsst.meas.base.tests.TestDataset.addSource`. Numerically integrates a
75 Gaussian PSF over a line to obtain am image of a trailed source.
76 """
78 record = self.catalog.addNew()
79 record.set(self.keys["centroid"], trail.center)
80 rng = np.random.default_rng(32)
81 covariance = rng.normal(0, 0.1, 4).reshape(2, 2)
82 covariance[0, 1] = covariance[1, 0]
83 record.set(self.keys["centroid_sigma"], covariance.astype(np.float32))
84 record.set(self.keys["shape"], self.psfShape)
85 record.set(self.keys["isStar"], False)
87 # Sum the psf at each
88 numIter = int(10*trail.length)
89 xp = np.linspace(trail.x0, trail.x1, num=numIter)
90 yp = np.linspace(trail.y0, trail.y1, num=numIter)
91 for (x, y) in zip(xp, yp):
92 pt = lsst.geom.Point2D(x, y)
93 im = self.drawGaussian(self.exposure.getBBox(), trail.instFlux,
94 lsst.afw.geom.Ellipse(self.psfShape, pt))
95 self.exposure.getMaskedImage().getImage().getArray()[:, :] += im.getArray()
97 totFlux = self.exposure.image.array.sum()
98 self.exposure.image.array /= totFlux
99 self.exposure.image.array *= trail.instFlux
101 record.set(self.keys["instFlux"], trail.instFlux)
102 self._installFootprint(record, self.exposure.getImage())
104 return record, self.exposure.getImage()
107class TrailedSourcesFailuresTestCase(AlgorithmTestCase, lsst.utils.tests.TestCase):
108 """Tests of various failure modes of the trailed source plugin.
109 """
110 def setUp(self):
111 self.center = lsst.geom.Point2D(50.1, 49.8)
112 self.bbox = lsst.geom.Box2I(lsst.geom.Point2I(-20, -30),
113 lsst.geom.Extent2I(140, 160))
114 self.dataset = TrailedTestDataset(self.bbox)
115 self.trail = TrailedSource(100000.0, 20, .5, 40, 40)
116 self.dataset.addTrailedSource(self.trail)
117 # So that the tests are consistent about output field names.
118 self.name = "trailedSource"
120 def _setupPlugin(self, plugins=None):
121 """Return a prepared trailed source plugin, catalog, and exposure for
122 use with mocked failures.
123 """
124 schema = TrailedTestDataset.makeMinimalSchema()
125 dependencies = ["base_SdssCentroid"]
126 dependencies += plugins if plugins is not None else []
127 config = self.makeSingleFrameMeasurementConfig(plugin="base_SdssShape",
128 dependencies=dependencies)
129 config.slots.shape = "base_SdssShape"
130 config.slots.centroid = "base_SdssCentroid"
131 trailedPlugin = SingleFrameNaiveTrailPlugin(SingleFrameNaiveTrailPlugin.ConfigClass(),
132 self.name,
133 schema,
134 None)
135 task = self.makeSingleFrameMeasurementTask(
136 plugin="base_SdssShape",
137 dependencies=dependencies,
138 config=config,
139 schema=schema
140 )
141 exposure, catalog = self.dataset.realize(10.0, task.schema, randomSeed=0)
142 task.run(catalog, exposure)
143 return trailedPlugin, catalog, exposure
145 def testShapeFlag(self):
146 """Test that the correct trailed source flags get set if shape_flag
147 is set.
148 """
149 trailedPlugin, catalog, exposure = self._setupPlugin()
150 catalog["base_SdssShape_flag"] = True
151 trailedPlugin.measure(catalog[0], exposure)
152 self.assertTrue(catalog[0][f"{self.name}_flag"])
153 self.assertTrue(catalog[0][f"{self.name}_flag_shape"])
156# Following from meas_base/test_NaiveCentroid.py
157# Taken from NaiveCentroidTestCase
158@classParameters(length=Ls, theta=thetas, xc=xcs, yc=ycs)
159class TrailedSourcesTestCase(AlgorithmTestCase, lsst.utils.tests.TestCase):
161 def setUp(self):
162 self.center = lsst.geom.Point2D(50.1, 49.8)
163 self.bbox = lsst.geom.Box2I(lsst.geom.Point2I(-20, -30),
164 lsst.geom.Extent2I(140, 160))
165 self.dataset = TrailedTestDataset(self.bbox)
167 self.trail = TrailedSource(100000.0, self.length, self.theta, self.xc, self.yc)
168 self.dataset.addTrailedSource(self.trail)
170 @staticmethod
171 def transformMoments(Ixx, Iyy, Ixy):
172 """Transform second-moments to semi-major and minor axis.
173 """
174 xmy = Ixx - Iyy
175 xpy = Ixx + Iyy
176 xmy2 = xmy*xmy
177 xy2 = Ixy*Ixy
178 a2 = 0.5 * (xpy + np.sqrt(xmy2 + 4.0*xy2))
179 b2 = 0.5 * (xpy - np.sqrt(xmy2 + 4.0*xy2))
180 return a2, b2
182 @staticmethod
183 def f_length(x):
184 return SingleFrameNaiveTrailPlugin.findLength(*x)[0]
186 @staticmethod
187 def g_length(x):
188 return SingleFrameNaiveTrailPlugin.findLength(*x)[1]
190 @staticmethod
191 def f_flux(x, model):
192 return model.computeFluxWithGradient(x)[0]
194 @staticmethod
195 def g_flux(x, model):
196 return model.computeFluxWithGradient(x)[1]
198 @staticmethod
199 def central_difference(func, x, *args, h=1e-8):
200 result = np.zeros(len(x))
201 for i in range(len(x)):
202 xp = x.copy()
203 xp[i] += h
204 fp = func(xp, *args)
206 xm = x.copy()
207 xm[i] -= h
208 fm = func(xm, *args)
209 result[i] = (fp - fm) / (2*h)
211 return result
213 def makeTrailedSourceMeasurementTask(self, plugin=None, dependencies=(),
214 config=None, schema=None, algMetadata=None):
215 """Set up a measurement task for a trailed source plugin.
216 """
218 config = self.makeSingleFrameMeasurementConfig(plugin=plugin,
219 dependencies=dependencies)
221 # Make sure the shape slot is base_SdssShape
222 config.slots.shape = "base_SdssShape"
223 return self.makeSingleFrameMeasurementTask(plugin=plugin,
224 dependencies=dependencies,
225 config=config, schema=schema,
226 algMetadata=algMetadata)
228 def testNaivePlugin(self):
229 """Test the NaivePlugin measurements.
230 Given a `TrailedTestDataset`, run the NaivePlugin measurement and
231 compare the measured parameters to the true values.
232 """
234 # Set up and run Naive measurement.
235 task = self.makeTrailedSourceMeasurementTask(
236 plugin="ext_trailedSources_Naive",
237 dependencies=("base_SdssCentroid", "base_SdssShape")
238 )
239 exposure, catalog = self.dataset.realize(10.0, task.schema, randomSeed=0)
240 task.run(catalog, exposure)
241 record = catalog[0]
243 # Check the RA and Dec measurements
244 wcs = exposure.getWcs()
245 spt = wcs.pixelToSky(self.center)
246 ra_true = spt.getRa().asDegrees()
247 dec_true = spt.getDec().asDegrees()
248 ra_meas = record.get("ext_trailedSources_Naive_ra")
249 dec_meas = record.get("ext_trailedSources_Naive_dec")
250 self.assertFloatsAlmostEqual(ra_true, ra_meas, atol=None, rtol=0.01)
251 self.assertFloatsAlmostEqual(dec_true, dec_meas, atol=None, rtol=0.01)
253 # Check that root finder converged
254 converged = record.get("ext_trailedSources_Naive_flag_noConverge")
255 self.assertFalse(converged)
257 # Compare true with measured length, angle, and flux.
258 # Accuracy is dependent on the second-moments measurements, so the
259 # rtol values are simply rough upper bounds.
260 length = record.get("ext_trailedSources_Naive_length")
261 theta = record.get("ext_trailedSources_Naive_angle")
262 flux = record.get("ext_trailedSources_Naive_flux")
263 self.assertFloatsAlmostEqual(length, self.trail.length, atol=None, rtol=0.1)
264 self.assertFloatsAlmostEqual(theta % np.pi, self.trail.angle % np.pi,
265 atol=np.arctan(1/length), rtol=None)
266 self.assertFloatsAlmostEqual(flux, self.trail.instFlux, atol=None, rtol=0.1)
268 # Test function gradients versus finite difference derivatives
269 # Do length first
270 Ixx, Iyy, Ixy = record.getShape().getParameterVector()
271 a2, b2 = self.transformMoments(Ixx, Iyy, Ixy)
272 self.assertLessEqual(check_grad(self.f_length, self.g_length, [a2, b2]), 1e-6)
274 # Now flux gradient
275 xc = record.get("base_SdssShape_x")
276 yc = record.get("base_SdssShape_y")
277 params = np.array([xc, yc, 1.0, length, theta])
278 cutout = getMeasurementCutout(record, exposure)
279 model = VeresModel(cutout)
280 gradNum = self.central_difference(self.f_flux, params, model, h=9e-5)
281 gradMax = np.max(np.abs(gradNum - self.g_flux(params, model)))
282 self.assertLessEqual(gradMax, 1e-5)
284 # Check test setup
285 self.assertNotEqual(length, self.trail.length)
286 self.assertNotEqual(theta, self.trail.angle)
288 # Make sure measurement flag is False
289 self.assertFalse(record.get("ext_trailedSources_Naive_flag"))
291 def testVeresPlugin(self):
292 """Test the VeresPlugin measurements.
293 Given a `TrailedTestDataset`, run the VeresPlugin measurement and
294 compare the measured parameters to the true values.
295 """
297 # Set up and run Veres measurement.
298 task = self.makeTrailedSourceMeasurementTask(
299 plugin="ext_trailedSources_Veres",
300 dependencies=(
301 "base_SdssCentroid",
302 "base_SdssShape",
303 "ext_trailedSources_Naive")
304 )
305 exposure, catalog = self.dataset.realize(10.0, task.schema, randomSeed=0)
306 task.run(catalog, exposure)
307 record = catalog[0]
309 # Make sure optmizer converged
310 converged = record.get("ext_trailedSources_Veres_flag_nonConvergence")
311 self.assertFalse(converged)
313 # Compare measured trail length, angle, and flux to true values
314 # These measurements should perform at least as well as NaivePlugin
315 length = record.get("ext_trailedSources_Veres_length")
316 theta = record.get("ext_trailedSources_Veres_angle")
317 flux = record.get("ext_trailedSources_Veres_flux")
318 self.assertFloatsAlmostEqual(length, self.trail.length, atol=None, rtol=0.1)
319 self.assertFloatsAlmostEqual(theta % np.pi, self.trail.angle % np.pi,
320 atol=np.arctan(1/length), rtol=None)
321 self.assertFloatsAlmostEqual(flux, self.trail.instFlux, atol=None, rtol=0.1)
323 xc = record.get("ext_trailedSources_Veres_centroid_x")
324 yc = record.get("ext_trailedSources_Veres_centroid_y")
325 params = np.array([xc, yc, flux, length, theta])
326 cutout = getMeasurementCutout(record, exposure)
327 model = VeresModel(cutout)
328 gradNum = self.central_difference(model, params, h=1e-6)
329 gradMax = np.max(np.abs(gradNum - model.gradient(params)))
330 self.assertLessEqual(gradMax, 1e-5)
332 # Make sure test setup is working as expected
333 self.assertNotEqual(length, self.trail.length)
334 self.assertNotEqual(theta, self.trail.angle)
336 # Test that reduced chi-squared is reasonable
337 rChiSq = record.get("ext_trailedSources_Veres_rChiSq")
338 self.assertGreater(rChiSq, 0.8)
339 self.assertLess(rChiSq, 1.3)
341 # Make sure measurement flag is False
342 self.assertFalse(record.get("ext_trailedSources_Veres_flag"))
344 def testMonteCarlo(self):
345 """Test the uncertainties in trail measurements from NaivePlugin
346 """
347 # Adapted from lsst.meas.base
349 # Set up Naive measurement and dependencies.
350 task = self.makeTrailedSourceMeasurementTask(
351 plugin="ext_trailedSources_Naive",
352 dependencies=("base_SdssCentroid", "base_SdssShape")
353 )
355 nSamples = 2000
356 catalog = afwTable.SourceCatalog(task.schema)
357 sample = 0
358 seed = 0
359 while sample < nSamples:
360 seed += 1
361 exp, cat = self.dataset.realize(100.0, task.schema, randomSeed=seed)
362 rec = cat[0]
363 task.run(cat, exp)
365 # Accuracy of this measurement is entirely dependent on shape and
366 # centroiding. Skip when shape measurement fails.
367 if rec['base_SdssShape_flag']:
368 continue
369 catalog.append(rec)
370 sample += 1
372 catalog = catalog.copy(deep=True)
373 nameBase = "ext_trailedSources_Naive_"
375 # Currently, the errors don't include covariances, so just make sure
376 # we're close or at least over estimate
377 length = catalog[nameBase+"length"]
378 lengthErr = catalog[nameBase+"lengthErr"]
379 lengthStd = np.nanstd(length)
380 lengthErrMean = np.nanmean(lengthErr)
381 diff = (lengthErrMean - lengthStd) / lengthErrMean
382 self.assertGreater(diff, -0.1)
383 self.assertLess(diff, 0.5)
385 angle = catalog[nameBase+"angle"]
386 if (np.max(angle) - np.min(angle)) > np.pi/2:
387 angle = angle % np.pi # Wrap if bimodal
388 angleErr = catalog[nameBase+"angleErr"]
389 angleStd = np.nanstd(angle)
390 angleErrMean = np.nanmean(angleErr)
391 diff = (angleErrMean - angleStd) / angleErrMean
392 self.assertGreater(diff, -0.1)
393 self.assertLess(diff, 0.6)
396class TestMemory(lsst.utils.tests.MemoryTestCase):
397 pass
400def setup_module(module):
401 lsst.utils.tests.init()
404if __name__ == "__main__": 404 ↛ 405line 404 didn't jump to line 405 because the condition on line 404 was never true
405 lsst.utils.tests.init()
406 unittest.main()