Coverage for tests / test_ssoAssociation.py: 26%
195 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-05 18:38 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-05 18:38 +0000
1# This file is part of pipe_tasks.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
22import unittest
23import numpy as np
24from numpy.polynomial.chebyshev import chebval, Chebyshev
25from scipy.spatial import cKDTree
26from astropy.coordinates import SkyCoord
27import astropy.units as u
28import lsst.utils.tests
29from lsst.pipe.tasks.ssoAssociation import SolarSystemAssociationTask
31AU_KM = 1.496e8
34# ---------------------------------------------------------------
35# Test vectors from 2003 YF26 (verified against JPL Horizons
36# in sssource_verification.ipynb)
37# ---------------------------------------------------------------
38YF26 = dict(
39 helio_x=-2.03972984386492,
40 helio_y=-0.754159365265218,
41 helio_z=-0.09916618480505775,
42 helio_vx=6.451686836410149,
43 helio_vy=-17.36829941588102,
44 helio_vz=-9.013339853647382,
45 topo_x=-1.2867480491014316,
46 topo_y=-0.1395377271885233,
47 topo_z=0.16727378829681566,
48 topo_vx=-13.061265792221151,
49 topo_vy=3.5105145303033005,
50 topo_vz=-0.11752881599981002,
51 ephRa=186.18909260549128,
52 ephDec=7.364065668224215,
53 helioRange=2.1769446746252505,
54 topoRange=1.3050562591039694,
55 phaseAngle=17.248719450313473,
56 elongation=140.17164534286258,
57 ephRate=0.13175297987940152,
58 ephRateRa=-0.1242177608944433,
59 ephRateDec=-0.0439180553471223,
60 helio_vtot=20.603940956820594,
61 topo_vtot=13.525316609422026,
62 helioRangeRate=0.3824562080346701,
63 topoRangeRate=12.487622241678675,
64 eclLambda=182.73184707660818,
65 eclBeta=9.214287183249514,
66 galLon=283.959546691413,
67 galLat=69.24711603487795,
68 ephOffsetRa=-0.08092633090227584,
69 ephOffsetDec=-0.06292996923882299,
70 ephOffset=0.10251464315747216,
71 ephOffsetAlongTrack=0.09727483587724564,
72 ephOffsetCrossTrack=0.03235519072357292,
73 DIA_ra=186.18906993899677,
74 DIA_dec=7.364048187677204,
75)
78class TestRadecToXyz(lsst.utils.tests.TestCase):
79 """Test the RA/Dec to unit vector conversion."""
81 def setUp(self):
82 self.task = SolarSystemAssociationTask()
84 def testNorthPole(self):
85 """North celestial pole: (RA=0, Dec=90) -> (0, 0, 1)."""
86 xyz = self.task._radec_to_xyz(
87 np.array([0.0]), np.array([90.0])
88 )
89 np.testing.assert_allclose(xyz[0], [0, 0, 1], atol=1e-12)
91 def testEquator(self):
92 """Points on the equator."""
93 # RA=0, Dec=0 -> (1, 0, 0)
94 xyz = self.task._radec_to_xyz(
95 np.array([0.0]), np.array([0.0])
96 )
97 np.testing.assert_allclose(xyz[0], [1, 0, 0], atol=1e-12)
99 # RA=90, Dec=0 -> (0, 1, 0)
100 xyz = self.task._radec_to_xyz(
101 np.array([90.0]), np.array([0.0])
102 )
103 np.testing.assert_allclose(xyz[0], [0, 1, 0], atol=1e-12)
105 def testUnitNorm(self):
106 """All output vectors should have unit norm."""
107 ras = np.array([0, 45, 90, 180, 270, 123.456])
108 decs = np.array([0, 30, -45, 60, -80, 7.364])
109 xyz = self.task._radec_to_xyz(ras, decs)
110 norms = np.linalg.norm(xyz, axis=1)
111 np.testing.assert_allclose(norms, 1.0, atol=1e-12)
113 def testRoundtrip(self):
114 """Convert RA/Dec -> xyz -> RA/Dec and verify recovery."""
115 ras = np.array([0.0, 45.0, 186.189, 300.0])
116 decs = np.array([0.0, -30.0, 7.364, 85.0])
117 xyz = self.task._radec_to_xyz(ras, decs)
119 # Recover RA/Dec from xyz
120 rec_dec = np.degrees(np.arcsin(xyz[:, 2]))
121 rec_ra = np.degrees(np.arctan2(xyz[:, 1], xyz[:, 0])) % 360
123 np.testing.assert_allclose(rec_ra, ras, atol=1e-10)
124 np.testing.assert_allclose(rec_dec, decs, atol=1e-10)
127class TestCoordinateTransforms(lsst.utils.tests.TestCase):
128 """Test ecliptic and galactic coordinate transforms."""
130 def testEclipticCoords(self):
131 """Verify ecliptic coords for the 2003 YF26 test vector."""
132 sc = SkyCoord(
133 ra=YF26['DIA_ra'] * u.deg,
134 dec=YF26['DIA_dec'] * u.deg,
135 )
136 ecl = sc.barycentrictrueecliptic
137 self.assertAlmostEqual(
138 ecl.lon.deg, YF26['eclLambda'], places=6
139 )
140 self.assertAlmostEqual(
141 ecl.lat.deg, YF26['eclBeta'], places=6
142 )
144 def testGalacticCoords(self):
145 """Verify galactic coords for the 2003 YF26 test vector."""
146 sc = SkyCoord(
147 ra=YF26['DIA_ra'] * u.deg,
148 dec=YF26['DIA_dec'] * u.deg,
149 )
150 gal = sc.galactic
151 self.assertAlmostEqual(
152 gal.l.deg, YF26['galLon'], places=6
153 )
154 self.assertAlmostEqual(
155 gal.b.deg, YF26['galLat'], places=6
156 )
158 def testEclipticPole(self):
159 """North ecliptic pole should be near (RA=270, Dec=66.56)."""
160 sc = SkyCoord(ra=270 * u.deg, dec=66.56 * u.deg)
161 ecl = sc.barycentrictrueecliptic
162 self.assertAlmostEqual(ecl.lat.deg, 90.0, places=0)
165class TestEphemerisOffsets(lsst.utils.tests.TestCase):
166 """Test ephemeris offset computation."""
168 def testEphOffsetRaDec(self):
169 """Verify RA/Dec offset formula."""
170 dia_ra = YF26['DIA_ra']
171 dia_dec = YF26['DIA_dec']
172 eph_ra = YF26['ephRa']
173 eph_dec = YF26['ephDec']
175 offset_ra = (
176 (dia_ra - eph_ra)
177 * np.cos(np.deg2rad(dia_dec)) * 3600
178 )
179 offset_dec = (dia_dec - eph_dec) * 3600
181 self.assertAlmostEqual(
182 offset_ra, YF26['ephOffsetRa'], places=10
183 )
184 self.assertAlmostEqual(
185 offset_dec, YF26['ephOffsetDec'], places=10
186 )
188 def testEphOffsetTotal(self):
189 """Verify total offset = sqrt(dRA² + dDec²)."""
190 total = np.sqrt(
191 YF26['ephOffsetRa']**2 + YF26['ephOffsetDec']**2
192 )
193 self.assertAlmostEqual(
194 total, YF26['ephOffset'], places=10
195 )
197 def testAlongCrossTrack(self):
198 """Verify along/cross-track decomposition."""
199 rate_ra = YF26['ephRateRa']
200 rate_dec = YF26['ephRateDec']
201 rate = YF26['ephRate']
203 # Unit vectors
204 along = np.array([rate_ra / rate, rate_dec / rate])
205 cross = np.array([-rate_dec / rate, rate_ra / rate])
206 offset = np.array(
207 [YF26['ephOffsetRa'], YF26['ephOffsetDec']]
208 )
210 along_track = np.dot(offset, along)
211 cross_track = np.dot(offset, cross)
213 self.assertAlmostEqual(
214 along_track, YF26['ephOffsetAlongTrack'], places=10
215 )
216 self.assertAlmostEqual(
217 cross_track, YF26['ephOffsetCrossTrack'], places=10
218 )
220 def testAlongCrossTrackPureRA(self):
221 """If motion is purely in RA, along-track = RA offset."""
222 offset_ra = 0.1 # arcsec
223 offset_dec = 0.05
224 rate_ra = 0.15 # deg/day
225 rate_dec = 0.0
226 rate = abs(rate_ra)
228 along = np.array([rate_ra / rate, rate_dec / rate])
229 cross = np.array([-rate_dec / rate, rate_ra / rate])
230 offset = np.array([offset_ra, offset_dec])
232 self.assertAlmostEqual(
233 np.dot(offset, along), offset_ra, places=10
234 )
235 self.assertAlmostEqual(
236 np.dot(offset, cross), offset_dec, places=10
237 )
239 def testAlongCrossTrackOrthogonality(self):
240 """Along² + cross² should equal total²."""
241 at = YF26['ephOffsetAlongTrack']
242 ct = YF26['ephOffsetCrossTrack']
243 total = YF26['ephOffset']
244 self.assertAlmostEqual(
245 at**2 + ct**2, total**2, places=10
246 )
249class TestDerivedQuantities(lsst.utils.tests.TestCase):
250 """Test derived physical quantities."""
252 def testHelioRange(self):
253 """helioRange = |helio_xyz|."""
254 r = np.sqrt(
255 YF26['helio_x']**2 + YF26['helio_y']**2
256 + YF26['helio_z']**2
257 )
258 self.assertAlmostEqual(r, YF26['helioRange'], places=10)
260 def testTopoRange(self):
261 """topoRange = |topo_xyz|."""
262 r = np.sqrt(
263 YF26['topo_x']**2 + YF26['topo_y']**2
264 + YF26['topo_z']**2
265 )
266 self.assertAlmostEqual(r, YF26['topoRange'], places=10)
268 def testHelioVtot(self):
269 """helio_vtot = |helio_v|."""
270 v = np.sqrt(
271 YF26['helio_vx']**2 + YF26['helio_vy']**2
272 + YF26['helio_vz']**2
273 )
274 self.assertAlmostEqual(v, YF26['helio_vtot'], places=10)
276 def testTopoVtot(self):
277 """topo_vtot = |topo_v|."""
278 v = np.sqrt(
279 YF26['topo_vx']**2 + YF26['topo_vy']**2
280 + YF26['topo_vz']**2
281 )
282 self.assertAlmostEqual(v, YF26['topo_vtot'], places=10)
284 def testEphRate(self):
285 """ephRate = sqrt(ephRateRa² + ephRateDec²)."""
286 rate = np.sqrt(
287 YF26['ephRateRa']**2 + YF26['ephRateDec']**2
288 )
289 self.assertAlmostEqual(rate, YF26['ephRate'], places=10)
291 def testPhaseAngle(self):
292 """Phase angle from helio and topo vectors."""
293 helio = np.array([
294 YF26['helio_x'], YF26['helio_y'], YF26['helio_z']
295 ])
296 topo = np.array([
297 YF26['topo_x'], YF26['topo_y'], YF26['topo_z']
298 ])
299 cos_pa = np.dot(helio, topo) / (
300 np.linalg.norm(helio) * np.linalg.norm(topo)
301 )
302 pa = np.degrees(np.arccos(np.clip(cos_pa, -1, 1)))
303 self.assertAlmostEqual(pa, YF26['phaseAngle'], places=6)
305 def testElongation(self):
306 """Elongation = Sun-object angle as seen from observer."""
307 helio = np.array([
308 YF26['helio_x'], YF26['helio_y'], YF26['helio_z']
309 ])
310 topo = np.array([
311 YF26['topo_x'], YF26['topo_y'], YF26['topo_z']
312 ])
313 sun_obs = topo - helio # vector from Sun to observer
314 cos_elong = np.dot(sun_obs, topo) / (
315 np.linalg.norm(sun_obs) * np.linalg.norm(topo)
316 )
317 elong = np.degrees(np.arccos(np.clip(cos_elong, -1, 1)))
318 self.assertAlmostEqual(
319 elong, YF26['elongation'], places=4
320 )
322 def testHelioRangeRate(self):
323 """helioRangeRate = (v . r) / |r|."""
324 r = np.array([
325 YF26['helio_x'], YF26['helio_y'], YF26['helio_z']
326 ])
327 # Velocities are km/s, positions are AU
328 # helioRangeRate is in km/s
329 v = np.array([
330 YF26['helio_vx'], YF26['helio_vy'],
331 YF26['helio_vz'],
332 ])
333 r_km = r * AU_KM
334 rdot = np.dot(r_km, v) / np.linalg.norm(r_km)
335 self.assertAlmostEqual(
336 rdot, YF26['helioRangeRate'], places=4
337 )
340class TestMatching(lsst.utils.tests.TestCase):
341 """Test KDTree spatial matching logic."""
343 def setUp(self):
344 self.task = SolarSystemAssociationTask()
346 def testExactMatch(self):
347 """Source at same position as SSO should match."""
348 sso_ra = np.array([180.0])
349 sso_dec = np.array([0.0])
350 dia_ra = np.array([180.0])
351 dia_dec = np.array([0.0])
353 dia_xyz = self.task._radec_to_xyz(dia_ra, dia_dec)
354 tree = cKDTree(dia_xyz)
356 sso_xyz = self.task._radec_to_xyz(sso_ra, sso_dec)
357 dist, idx = tree.query(sso_xyz, k=1)
358 self.assertEqual(idx[0], 0)
359 self.assertAlmostEqual(dist[0], 0.0, places=10)
361 def testNoMatchBeyondRadius(self):
362 """Sources beyond matching radius should not match."""
363 max_arcsec = self.task.config.maxDistArcSeconds
364 sso_ra = np.array([180.0])
365 sso_dec = np.array([0.0])
366 # Source well beyond the matching radius
367 dia_ra = np.array([180.0 + 10 * max_arcsec / 3600])
368 dia_dec = np.array([0.0])
370 dia_xyz = self.task._radec_to_xyz(dia_ra, dia_dec)
371 tree = cKDTree(dia_xyz)
373 sso_xyz = self.task._radec_to_xyz(sso_ra, sso_dec)
374 max_rad = np.radians(max_arcsec / 3600)
375 dist, idx = tree.query(
376 sso_xyz, k=1,
377 distance_upper_bound=max_rad,
378 )
379 self.assertTrue(np.isinf(dist[0]))
381 def testNearestNeighbor(self):
382 """Closer source should be preferred over farther one."""
383 sso_ra = np.array([180.0])
384 sso_dec = np.array([0.0])
385 # Two sources: one 0.2", one 0.5" away
386 dia_ra = np.array([
387 180.0 + 0.2 / 3600, 180.0 + 0.5 / 3600
388 ])
389 dia_dec = np.array([0.0, 0.0])
391 dia_xyz = self.task._radec_to_xyz(dia_ra, dia_dec)
392 tree = cKDTree(dia_xyz)
394 sso_xyz = self.task._radec_to_xyz(sso_ra, sso_dec)
395 dist, idx = tree.query(sso_xyz, k=1)
396 self.assertEqual(idx[0], 0) # closer source
399class TestChebyshevEphemeris(lsst.utils.tests.TestCase):
400 """Test Chebyshev polynomial ephemeris evaluation."""
402 def testChebvalEvaluation(self):
403 """Evaluate a known Chebyshev polynomial."""
404 # f(x) = 3 + 2*T1(x) + 0.5*T2(x)
405 # At x=0: T0=1, T1=0, T2=-1 -> f(0) = 3 + 0 - 0.5 = 2.5
406 coeffs = [3.0, 2.0, 0.5]
407 self.assertAlmostEqual(chebval(0, coeffs), 2.5)
408 # At x=1: T0=1, T1=1, T2=1 -> f(1) = 3 + 2 + 0.5 = 5.5
409 self.assertAlmostEqual(chebval(1, coeffs), 5.5)
411 def testChebvalDerivative(self):
412 """Velocity from Chebyshev derivative of position."""
413 # Position: p(x) = 1 + x + x² (in Chebyshev basis)
414 # In Chebyshev: x = T1, x² = (T0 + T2)/2
415 # So p(x) = 1.5*T0 + T1 + 0.5*T2
416 coeffs = np.array([1.5, 1.0, 0.5])
417 poly = Chebyshev(coeffs)
418 dpoly = poly.deriv()
420 # dp/dx = 1 + 2x
421 # At x=0: dp/dx = 1
422 self.assertAlmostEqual(dpoly(0), 1.0, places=10)
423 # At x=0.5: dp/dx = 2
424 self.assertAlmostEqual(dpoly(0.5), 2.0, places=10)
427class TestSorchaPath(lsst.utils.tests.TestCase):
428 """Test Sorcha ephemeris path computations."""
430 def testTopoFromHelioAndObserver(self):
431 """Topo position = helio position - observer position."""
432 # Use YF26 vectors: topo = helio - observer
433 # observer = helio - topo
434 obs_x = YF26['helio_x'] - YF26['topo_x']
435 obs_y = YF26['helio_y'] - YF26['topo_y']
436 obs_z = YF26['helio_z'] - YF26['topo_z']
438 topo_x = YF26['helio_x'] - obs_x
439 topo_y = YF26['helio_y'] - obs_y
440 topo_z = YF26['helio_z'] - obs_z
442 self.assertAlmostEqual(topo_x, YF26['topo_x'], places=12)
443 self.assertAlmostEqual(topo_y, YF26['topo_y'], places=12)
444 self.assertAlmostEqual(topo_z, YF26['topo_z'], places=12)
446 def testKmToAuConversion(self):
447 """Verify km to AU conversion factor."""
448 # 1 AU in km
449 self.assertAlmostEqual(
450 1.0 * AU_KM, 1.496e8, places=-2
451 )
452 # Round-trip: AU -> km -> AU
453 dist_au = 2.17
454 dist_km = dist_au * AU_KM
455 self.assertAlmostEqual(
456 dist_km / AU_KM, dist_au, places=10
457 )
460class MemoryTester(lsst.utils.tests.MemoryTestCase):
461 pass
464def setup_module(module):
465 lsst.utils.tests.init()
468if __name__ == "__main__": 468 ↛ 469line 468 didn't jump to line 469 because the condition on line 468 was never true
469 lsst.utils.tests.init()
470 unittest.main()