Coverage for tests / test_ssoAssociationTask.py: 23%

82 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-23 08:46 +0000

1# This file is part of ap_association. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21 

22import numpy as np 

23from astropy.table import Table 

24import astropy.table as tb 

25import astropy.units as u 

26import unittest 

27 

28from lsst.pipe.tasks.ssoAssociation import SolarSystemAssociationTask 

29import astshim as ast 

30import lsst.geom as geom 

31import lsst.afw.geom as afwGeom 

32import lsst.meas.base.tests as measTests 

33import lsst.utils.tests 

34 

35 

36class TestSolarSystemAssociation(unittest.TestCase): 

37 

38 def setUp(self): 

39 # Make fake sources 

40 self.nSources = 10 

41 self.bbox = geom.Box2I(geom.Point2I(0, 0), 

42 geom.Extent2I(1024, 1153)) 

43 self.xyLoc = 100 

44 dataset = measTests.TestDataset(self.bbox) 

45 for srcIdx in range(self.nSources): 

46 dataset.addSource(100000.0, 

47 geom.Point2D(srcIdx*self.xyLoc, 

48 srcIdx*self.xyLoc)) 

49 schema = dataset.makeMinimalSchema() 

50 schema.addField("base_PixelFlags_flag", type="Flag") 

51 schema.addField("base_PixelFlags_flag_offimage", type="Flag") 

52 self.exposure, catalog = dataset.realize( 

53 10.0, schema, randomSeed=1234) 

54 for src in catalog: 

55 src.setCoord(self.exposure.getWcs().pixelToSky(src.getCentroid())) 

56 # Non-invertible WCS to test robustness to distortions. 

57 # Coefficients transform (x - 1e-7*x^3 -> x, y -> y); see docs for PolyMap. 

58 pixelCoeffs = np.array([[-1.0e-7, 1, 3, 0], 

59 [1.0, 1, 1, 0], 

60 [1.0, 2, 0, 1], 

61 ]) 

62 self.exposure.setWcs(afwGeom.makeModifiedWcs( 

63 afwGeom.TransformPoint2ToPoint2(ast.PolyMap(pixelCoeffs, 2, options="IterInverse=1")), 

64 self.exposure.wcs, 

65 modifyActualPixels=False 

66 )) 

67 

68 # Convert to task required format 

69 self.testDiaSources = catalog.asAstropy() 

70 self.testDiaSources.rename_columns(["coord_ra", "coord_dec"], ["ra", "dec"]) 

71 self.testDiaSources["ra"] = np.rad2deg(self.testDiaSources["ra"]) * u.deg 

72 self.testDiaSources["dec"] = np.rad2deg(self.testDiaSources["dec"]) * u.deg 

73 self.testDiaSources["midpointMjdTai"] = (self.testDiaSources["ra"]*0 + 65000.0) * u.d 

74 self.testDiaSources["extendedness"] = np.zeros(len(self.testDiaSources)) 

75 self.testDiaSources["band"] = np.full(len(self.testDiaSources), 'r', dtype='U1') 

76 self.testDiaSources["psfFlux"] = np.ones(len(self.testDiaSources)) * 3. * u.nJy 

77 self.testDiaSources["psfFluxErr"] = np.ones(len(self.testDiaSources)) * 0.1 * u.nJy 

78 self.testDiaSources = tb.vstack([self.testDiaSources, Table([[45], [45]], names=["ra", "dec"])]) 

79 self.testDiaSources["ssObjectId"] = 0 

80 self.testDiaSources["diaSourceId"] = [i for i in range(len(self.testDiaSources))] 

81 

82 # Grab a subset to treat as solar system objects 

83 self.testSsObjects = Table() 

84 self.testSsObjects["ObjID"] = ["K15MK4Q"] 

85 self.testSsObjects["MPCORB_unpacked_primary_provisional_designation"] = self.testSsObjects["ObjID"] 

86 

87 self.testSsObjects["ssObjectId"] = [21164728253101137] 

88 self.testSsObjects["ephRa"] = [45 * u.deg] 

89 self.testSsObjects["ephDec"] = [45 * u.deg] 

90 self.testSsObjects["tmin"] = [-1 * u.d] 

91 self.testSsObjects["tmax"] = [1 * u.d] 

92 self.testSsObjects["obs_x_poly"] = [np.array([0])] 

93 self.testSsObjects["obs_y_poly"] = [np.array([0])] 

94 self.testSsObjects["obs_z_poly"] = [np.array([0])] 

95 self.testSsObjects["obj_x_poly"] = [np.array([1])] 

96 self.testSsObjects["obj_y_poly"] = [np.array([1])] 

97 self.testSsObjects["obj_z_poly"] = [np.array([2 ** 0.5])] 

98 self.testSsObjects["Err(arcsec)"] = np.ones(len(self.testSsObjects)) * u.arcsec 

99 self.testSsObjects["trailedSourceMagTrue"] = 22 

100 

101 def test_run(self): 

102 """Test that association and id assignment work as expected. 

103 """ 

104 ssAssocTask = SolarSystemAssociationTask() 

105 results = ssAssocTask.run(self.testDiaSources, 

106 self.testSsObjects, 

107 self.exposure.visitInfo, 

108 self.exposure.getBBox(), 

109 self.exposure.wcs) 

110 self.assertEqual(len(results.ssoAssocDiaSources), 1) 

111 self.assertEqual(results.ssoAssocDiaSources['ra'][0], 45.0) 

112 self.assertEqual(results.ssoAssocDiaSources['dec'][0], 45.0) 

113 self.assertEqual(results.ssoAssocDiaSources['ssObjectId'][0], 21164728253101137) 

114 

115 def test_mask(self): 

116 """Test that masking against the CCD bounding box works as expected. 

117 """ 

118 ssAssocTask = SolarSystemAssociationTask() 

119 # Test will all inside ccd 

120 maskedObjects = ssAssocTask._maskToCcdRegion(self.testSsObjects, 

121 self.exposure.getBBox(), 

122 self.exposure.wcs, 

123 1.0) 

124 self.assertEqual(len(maskedObjects), len(self.testSsObjects)) 

125 

126 # Add a new SolarSystemObjects outside of the bbox and test that it 

127 # is excluded. 

128 testObjects = tb.vstack([self.testSsObjects[:1] for i in range(3)]) 

129 testObjects[0]["ephRa"] = 150 

130 testObjects[0]["ephDec"] = 80 

131 testObjects[1]["ephRa"] = 150 

132 testObjects[1]["ephDec"] = -80 

133 # Coordinates are chosen so that the inverse WCS erroneously maps them 

134 # to inside the box (to (74.5, 600.6) instead of around (1745, 600.6)). 

135 testObjects[2]["ephRa"] = 44.91215199831453 

136 testObjects[2]["ephDec"] = 45.001331943391406 

137 maskedObjects = ssAssocTask._maskToCcdRegion( 

138 tb.vstack([self.testSsObjects, testObjects]), 

139 self.exposure.getBBox(), 

140 self.exposure.wcs, 

141 1.0) 

142 self.assertEqual(len(maskedObjects), len(self.testSsObjects)) 

143 

144 

145class MemoryTester(lsst.utils.tests.MemoryTestCase): 

146 pass 

147 

148 

149def setup_module(module): 

150 lsst.utils.tests.init() 

151 

152 

153if __name__ == "__main__": 153 ↛ 154line 153 didn't jump to line 154 because the condition on line 153 was never true

154 lsst.utils.tests.init() 

155 unittest.main()