Coverage for tests / test_ssoAssociation.py: 26%

195 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-06 08:52 +0000

1# This file is part of pipe_tasks. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21 

22import unittest 

23import numpy as np 

24from numpy.polynomial.chebyshev import chebval, Chebyshev 

25from scipy.spatial import cKDTree 

26from astropy.coordinates import SkyCoord 

27import astropy.units as u 

28import lsst.utils.tests 

29from lsst.pipe.tasks.ssoAssociation import SolarSystemAssociationTask 

30 

31AU_KM = 1.496e8 

32 

33 

34# --------------------------------------------------------------- 

35# Test vectors from 2003 YF26 (verified against JPL Horizons 

36# in sssource_verification.ipynb) 

37# --------------------------------------------------------------- 

38YF26 = dict( 

39 helio_x=-2.03972984386492, 

40 helio_y=-0.754159365265218, 

41 helio_z=-0.09916618480505775, 

42 helio_vx=6.451686836410149, 

43 helio_vy=-17.36829941588102, 

44 helio_vz=-9.013339853647382, 

45 topo_x=-1.2867480491014316, 

46 topo_y=-0.1395377271885233, 

47 topo_z=0.16727378829681566, 

48 topo_vx=-13.061265792221151, 

49 topo_vy=3.5105145303033005, 

50 topo_vz=-0.11752881599981002, 

51 ephRa=186.18909260549128, 

52 ephDec=7.364065668224215, 

53 helioRange=2.1769446746252505, 

54 topoRange=1.3050562591039694, 

55 phaseAngle=17.248719450313473, 

56 elongation=140.17164534286258, 

57 ephRate=0.13175297987940152, 

58 ephRateRa=-0.1242177608944433, 

59 ephRateDec=-0.0439180553471223, 

60 helio_vtot=20.603940956820594, 

61 topo_vtot=13.525316609422026, 

62 helioRangeRate=0.3824562080346701, 

63 topoRangeRate=12.487622241678675, 

64 eclLambda=182.73184707660818, 

65 eclBeta=9.214287183249514, 

66 galLon=283.959546691413, 

67 galLat=69.24711603487795, 

68 ephOffsetRa=-0.08092633090227584, 

69 ephOffsetDec=-0.06292996923882299, 

70 ephOffset=0.10251464315747216, 

71 ephOffsetAlongTrack=0.09727483587724564, 

72 ephOffsetCrossTrack=0.03235519072357292, 

73 DIA_ra=186.18906993899677, 

74 DIA_dec=7.364048187677204, 

75) 

76 

77 

78class TestRadecToXyz(lsst.utils.tests.TestCase): 

79 """Test the RA/Dec to unit vector conversion.""" 

80 

81 def setUp(self): 

82 self.task = SolarSystemAssociationTask() 

83 

84 def testNorthPole(self): 

85 """North celestial pole: (RA=0, Dec=90) -> (0, 0, 1).""" 

86 xyz = self.task._radec_to_xyz( 

87 np.array([0.0]), np.array([90.0]) 

88 ) 

89 np.testing.assert_allclose(xyz[0], [0, 0, 1], atol=1e-12) 

90 

91 def testEquator(self): 

92 """Points on the equator.""" 

93 # RA=0, Dec=0 -> (1, 0, 0) 

94 xyz = self.task._radec_to_xyz( 

95 np.array([0.0]), np.array([0.0]) 

96 ) 

97 np.testing.assert_allclose(xyz[0], [1, 0, 0], atol=1e-12) 

98 

99 # RA=90, Dec=0 -> (0, 1, 0) 

100 xyz = self.task._radec_to_xyz( 

101 np.array([90.0]), np.array([0.0]) 

102 ) 

103 np.testing.assert_allclose(xyz[0], [0, 1, 0], atol=1e-12) 

104 

105 def testUnitNorm(self): 

106 """All output vectors should have unit norm.""" 

107 ras = np.array([0, 45, 90, 180, 270, 123.456]) 

108 decs = np.array([0, 30, -45, 60, -80, 7.364]) 

109 xyz = self.task._radec_to_xyz(ras, decs) 

110 norms = np.linalg.norm(xyz, axis=1) 

111 np.testing.assert_allclose(norms, 1.0, atol=1e-12) 

112 

113 def testRoundtrip(self): 

114 """Convert RA/Dec -> xyz -> RA/Dec and verify recovery.""" 

115 ras = np.array([0.0, 45.0, 186.189, 300.0]) 

116 decs = np.array([0.0, -30.0, 7.364, 85.0]) 

117 xyz = self.task._radec_to_xyz(ras, decs) 

118 

119 # Recover RA/Dec from xyz 

120 rec_dec = np.degrees(np.arcsin(xyz[:, 2])) 

121 rec_ra = np.degrees(np.arctan2(xyz[:, 1], xyz[:, 0])) % 360 

122 

123 np.testing.assert_allclose(rec_ra, ras, atol=1e-10) 

124 np.testing.assert_allclose(rec_dec, decs, atol=1e-10) 

125 

126 

127class TestCoordinateTransforms(lsst.utils.tests.TestCase): 

128 """Test ecliptic and galactic coordinate transforms.""" 

129 

130 def testEclipticCoords(self): 

131 """Verify ecliptic coords for the 2003 YF26 test vector.""" 

132 sc = SkyCoord( 

133 ra=YF26['DIA_ra'] * u.deg, 

134 dec=YF26['DIA_dec'] * u.deg, 

135 ) 

136 ecl = sc.barycentrictrueecliptic 

137 self.assertAlmostEqual( 

138 ecl.lon.deg, YF26['eclLambda'], places=6 

139 ) 

140 self.assertAlmostEqual( 

141 ecl.lat.deg, YF26['eclBeta'], places=6 

142 ) 

143 

144 def testGalacticCoords(self): 

145 """Verify galactic coords for the 2003 YF26 test vector.""" 

146 sc = SkyCoord( 

147 ra=YF26['DIA_ra'] * u.deg, 

148 dec=YF26['DIA_dec'] * u.deg, 

149 ) 

150 gal = sc.galactic 

151 self.assertAlmostEqual( 

152 gal.l.deg, YF26['galLon'], places=6 

153 ) 

154 self.assertAlmostEqual( 

155 gal.b.deg, YF26['galLat'], places=6 

156 ) 

157 

158 def testEclipticPole(self): 

159 """North ecliptic pole should be near (RA=270, Dec=66.56).""" 

160 sc = SkyCoord(ra=270 * u.deg, dec=66.56 * u.deg) 

161 ecl = sc.barycentrictrueecliptic 

162 self.assertAlmostEqual(ecl.lat.deg, 90.0, places=0) 

163 

164 

165class TestEphemerisOffsets(lsst.utils.tests.TestCase): 

166 """Test ephemeris offset computation.""" 

167 

168 def testEphOffsetRaDec(self): 

169 """Verify RA/Dec offset formula.""" 

170 dia_ra = YF26['DIA_ra'] 

171 dia_dec = YF26['DIA_dec'] 

172 eph_ra = YF26['ephRa'] 

173 eph_dec = YF26['ephDec'] 

174 

175 offset_ra = ( 

176 (dia_ra - eph_ra) 

177 * np.cos(np.deg2rad(dia_dec)) * 3600 

178 ) 

179 offset_dec = (dia_dec - eph_dec) * 3600 

180 

181 self.assertAlmostEqual( 

182 offset_ra, YF26['ephOffsetRa'], places=10 

183 ) 

184 self.assertAlmostEqual( 

185 offset_dec, YF26['ephOffsetDec'], places=10 

186 ) 

187 

188 def testEphOffsetTotal(self): 

189 """Verify total offset = sqrt(dRA² + dDec²).""" 

190 total = np.sqrt( 

191 YF26['ephOffsetRa']**2 + YF26['ephOffsetDec']**2 

192 ) 

193 self.assertAlmostEqual( 

194 total, YF26['ephOffset'], places=10 

195 ) 

196 

197 def testAlongCrossTrack(self): 

198 """Verify along/cross-track decomposition.""" 

199 rate_ra = YF26['ephRateRa'] 

200 rate_dec = YF26['ephRateDec'] 

201 rate = YF26['ephRate'] 

202 

203 # Unit vectors 

204 along = np.array([rate_ra / rate, rate_dec / rate]) 

205 cross = np.array([-rate_dec / rate, rate_ra / rate]) 

206 offset = np.array( 

207 [YF26['ephOffsetRa'], YF26['ephOffsetDec']] 

208 ) 

209 

210 along_track = np.dot(offset, along) 

211 cross_track = np.dot(offset, cross) 

212 

213 self.assertAlmostEqual( 

214 along_track, YF26['ephOffsetAlongTrack'], places=10 

215 ) 

216 self.assertAlmostEqual( 

217 cross_track, YF26['ephOffsetCrossTrack'], places=10 

218 ) 

219 

220 def testAlongCrossTrackPureRA(self): 

221 """If motion is purely in RA, along-track = RA offset.""" 

222 offset_ra = 0.1 # arcsec 

223 offset_dec = 0.05 

224 rate_ra = 0.15 # deg/day 

225 rate_dec = 0.0 

226 rate = abs(rate_ra) 

227 

228 along = np.array([rate_ra / rate, rate_dec / rate]) 

229 cross = np.array([-rate_dec / rate, rate_ra / rate]) 

230 offset = np.array([offset_ra, offset_dec]) 

231 

232 self.assertAlmostEqual( 

233 np.dot(offset, along), offset_ra, places=10 

234 ) 

235 self.assertAlmostEqual( 

236 np.dot(offset, cross), offset_dec, places=10 

237 ) 

238 

239 def testAlongCrossTrackOrthogonality(self): 

240 """Along² + cross² should equal total².""" 

241 at = YF26['ephOffsetAlongTrack'] 

242 ct = YF26['ephOffsetCrossTrack'] 

243 total = YF26['ephOffset'] 

244 self.assertAlmostEqual( 

245 at**2 + ct**2, total**2, places=10 

246 ) 

247 

248 

249class TestDerivedQuantities(lsst.utils.tests.TestCase): 

250 """Test derived physical quantities.""" 

251 

252 def testHelioRange(self): 

253 """helioRange = |helio_xyz|.""" 

254 r = np.sqrt( 

255 YF26['helio_x']**2 + YF26['helio_y']**2 

256 + YF26['helio_z']**2 

257 ) 

258 self.assertAlmostEqual(r, YF26['helioRange'], places=10) 

259 

260 def testTopoRange(self): 

261 """topoRange = |topo_xyz|.""" 

262 r = np.sqrt( 

263 YF26['topo_x']**2 + YF26['topo_y']**2 

264 + YF26['topo_z']**2 

265 ) 

266 self.assertAlmostEqual(r, YF26['topoRange'], places=10) 

267 

268 def testHelioVtot(self): 

269 """helio_vtot = |helio_v|.""" 

270 v = np.sqrt( 

271 YF26['helio_vx']**2 + YF26['helio_vy']**2 

272 + YF26['helio_vz']**2 

273 ) 

274 self.assertAlmostEqual(v, YF26['helio_vtot'], places=10) 

275 

276 def testTopoVtot(self): 

277 """topo_vtot = |topo_v|.""" 

278 v = np.sqrt( 

279 YF26['topo_vx']**2 + YF26['topo_vy']**2 

280 + YF26['topo_vz']**2 

281 ) 

282 self.assertAlmostEqual(v, YF26['topo_vtot'], places=10) 

283 

284 def testEphRate(self): 

285 """ephRate = sqrt(ephRateRa² + ephRateDec²).""" 

286 rate = np.sqrt( 

287 YF26['ephRateRa']**2 + YF26['ephRateDec']**2 

288 ) 

289 self.assertAlmostEqual(rate, YF26['ephRate'], places=10) 

290 

291 def testPhaseAngle(self): 

292 """Phase angle from helio and topo vectors.""" 

293 helio = np.array([ 

294 YF26['helio_x'], YF26['helio_y'], YF26['helio_z'] 

295 ]) 

296 topo = np.array([ 

297 YF26['topo_x'], YF26['topo_y'], YF26['topo_z'] 

298 ]) 

299 cos_pa = np.dot(helio, topo) / ( 

300 np.linalg.norm(helio) * np.linalg.norm(topo) 

301 ) 

302 pa = np.degrees(np.arccos(np.clip(cos_pa, -1, 1))) 

303 self.assertAlmostEqual(pa, YF26['phaseAngle'], places=6) 

304 

305 def testElongation(self): 

306 """Elongation = Sun-object angle as seen from observer.""" 

307 helio = np.array([ 

308 YF26['helio_x'], YF26['helio_y'], YF26['helio_z'] 

309 ]) 

310 topo = np.array([ 

311 YF26['topo_x'], YF26['topo_y'], YF26['topo_z'] 

312 ]) 

313 sun_obs = topo - helio # vector from Sun to observer 

314 cos_elong = np.dot(sun_obs, topo) / ( 

315 np.linalg.norm(sun_obs) * np.linalg.norm(topo) 

316 ) 

317 elong = np.degrees(np.arccos(np.clip(cos_elong, -1, 1))) 

318 self.assertAlmostEqual( 

319 elong, YF26['elongation'], places=4 

320 ) 

321 

322 def testHelioRangeRate(self): 

323 """helioRangeRate = (v . r) / |r|.""" 

324 r = np.array([ 

325 YF26['helio_x'], YF26['helio_y'], YF26['helio_z'] 

326 ]) 

327 # Velocities are km/s, positions are AU 

328 # helioRangeRate is in km/s 

329 v = np.array([ 

330 YF26['helio_vx'], YF26['helio_vy'], 

331 YF26['helio_vz'], 

332 ]) 

333 r_km = r * AU_KM 

334 rdot = np.dot(r_km, v) / np.linalg.norm(r_km) 

335 self.assertAlmostEqual( 

336 rdot, YF26['helioRangeRate'], places=4 

337 ) 

338 

339 

340class TestMatching(lsst.utils.tests.TestCase): 

341 """Test KDTree spatial matching logic.""" 

342 

343 def setUp(self): 

344 self.task = SolarSystemAssociationTask() 

345 

346 def testExactMatch(self): 

347 """Source at same position as SSO should match.""" 

348 sso_ra = np.array([180.0]) 

349 sso_dec = np.array([0.0]) 

350 dia_ra = np.array([180.0]) 

351 dia_dec = np.array([0.0]) 

352 

353 dia_xyz = self.task._radec_to_xyz(dia_ra, dia_dec) 

354 tree = cKDTree(dia_xyz) 

355 

356 sso_xyz = self.task._radec_to_xyz(sso_ra, sso_dec) 

357 dist, idx = tree.query(sso_xyz, k=1) 

358 self.assertEqual(idx[0], 0) 

359 self.assertAlmostEqual(dist[0], 0.0, places=10) 

360 

361 def testNoMatchBeyondRadius(self): 

362 """Sources beyond matching radius should not match.""" 

363 max_arcsec = self.task.config.maxDistArcSeconds 

364 sso_ra = np.array([180.0]) 

365 sso_dec = np.array([0.0]) 

366 # Source well beyond the matching radius 

367 dia_ra = np.array([180.0 + 10 * max_arcsec / 3600]) 

368 dia_dec = np.array([0.0]) 

369 

370 dia_xyz = self.task._radec_to_xyz(dia_ra, dia_dec) 

371 tree = cKDTree(dia_xyz) 

372 

373 sso_xyz = self.task._radec_to_xyz(sso_ra, sso_dec) 

374 max_rad = np.radians(max_arcsec / 3600) 

375 dist, idx = tree.query( 

376 sso_xyz, k=1, 

377 distance_upper_bound=max_rad, 

378 ) 

379 self.assertTrue(np.isinf(dist[0])) 

380 

381 def testNearestNeighbor(self): 

382 """Closer source should be preferred over farther one.""" 

383 sso_ra = np.array([180.0]) 

384 sso_dec = np.array([0.0]) 

385 # Two sources: one 0.2", one 0.5" away 

386 dia_ra = np.array([ 

387 180.0 + 0.2 / 3600, 180.0 + 0.5 / 3600 

388 ]) 

389 dia_dec = np.array([0.0, 0.0]) 

390 

391 dia_xyz = self.task._radec_to_xyz(dia_ra, dia_dec) 

392 tree = cKDTree(dia_xyz) 

393 

394 sso_xyz = self.task._radec_to_xyz(sso_ra, sso_dec) 

395 dist, idx = tree.query(sso_xyz, k=1) 

396 self.assertEqual(idx[0], 0) # closer source 

397 

398 

399class TestChebyshevEphemeris(lsst.utils.tests.TestCase): 

400 """Test Chebyshev polynomial ephemeris evaluation.""" 

401 

402 def testChebvalEvaluation(self): 

403 """Evaluate a known Chebyshev polynomial.""" 

404 # f(x) = 3 + 2*T1(x) + 0.5*T2(x) 

405 # At x=0: T0=1, T1=0, T2=-1 -> f(0) = 3 + 0 - 0.5 = 2.5 

406 coeffs = [3.0, 2.0, 0.5] 

407 self.assertAlmostEqual(chebval(0, coeffs), 2.5) 

408 # At x=1: T0=1, T1=1, T2=1 -> f(1) = 3 + 2 + 0.5 = 5.5 

409 self.assertAlmostEqual(chebval(1, coeffs), 5.5) 

410 

411 def testChebvalDerivative(self): 

412 """Velocity from Chebyshev derivative of position.""" 

413 # Position: p(x) = 1 + x + x² (in Chebyshev basis) 

414 # In Chebyshev: x = T1, x² = (T0 + T2)/2 

415 # So p(x) = 1.5*T0 + T1 + 0.5*T2 

416 coeffs = np.array([1.5, 1.0, 0.5]) 

417 poly = Chebyshev(coeffs) 

418 dpoly = poly.deriv() 

419 

420 # dp/dx = 1 + 2x 

421 # At x=0: dp/dx = 1 

422 self.assertAlmostEqual(dpoly(0), 1.0, places=10) 

423 # At x=0.5: dp/dx = 2 

424 self.assertAlmostEqual(dpoly(0.5), 2.0, places=10) 

425 

426 

427class TestSorchaPath(lsst.utils.tests.TestCase): 

428 """Test Sorcha ephemeris path computations.""" 

429 

430 def testTopoFromHelioAndObserver(self): 

431 """Topo position = helio position - observer position.""" 

432 # Use YF26 vectors: topo = helio - observer 

433 # observer = helio - topo 

434 obs_x = YF26['helio_x'] - YF26['topo_x'] 

435 obs_y = YF26['helio_y'] - YF26['topo_y'] 

436 obs_z = YF26['helio_z'] - YF26['topo_z'] 

437 

438 topo_x = YF26['helio_x'] - obs_x 

439 topo_y = YF26['helio_y'] - obs_y 

440 topo_z = YF26['helio_z'] - obs_z 

441 

442 self.assertAlmostEqual(topo_x, YF26['topo_x'], places=12) 

443 self.assertAlmostEqual(topo_y, YF26['topo_y'], places=12) 

444 self.assertAlmostEqual(topo_z, YF26['topo_z'], places=12) 

445 

446 def testKmToAuConversion(self): 

447 """Verify km to AU conversion factor.""" 

448 # 1 AU in km 

449 self.assertAlmostEqual( 

450 1.0 * AU_KM, 1.496e8, places=-2 

451 ) 

452 # Round-trip: AU -> km -> AU 

453 dist_au = 2.17 

454 dist_km = dist_au * AU_KM 

455 self.assertAlmostEqual( 

456 dist_km / AU_KM, dist_au, places=10 

457 ) 

458 

459 

460class MemoryTester(lsst.utils.tests.MemoryTestCase): 

461 pass 

462 

463 

464def setup_module(module): 

465 lsst.utils.tests.init() 

466 

467 

468if __name__ == "__main__": 468 ↛ 469line 468 didn't jump to line 469 because the condition on line 468 was never true

469 lsst.utils.tests.init() 

470 unittest.main()