Coverage for tests / test_photfit.py: 23%

118 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-05 18:38 +0000

1# This file is part of pipe_tasks. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21 

22import unittest 

23import numpy as np 

24import lsst.utils.tests 

25from lsst.pipe.tasks.ssp.photfit import HG12_model, fitHG12 

26 

27# Independently calculated test vectors for H=17.30, G12=0.42 

28# 15 observations spanning phase angles 0.8-60 degrees 

29H_TRUE = 17.30 

30G12_TRUE = 0.42 

31 

32PHASE_ANGLE = np.array([ 

33 0.8, 1.5, 2.3, 3.5, 5.0, 

34 7.5, 10.0, 13.0, 16.0, 20.0, 

35 25.0, 30.0, 38.0, 48.0, 60.0, 

36]) 

37RDIST = np.array([ 

38 2.33, 2.31, 2.29, 2.27, 2.25, 

39 2.23, 2.21, 2.20, 2.18, 2.16, 

40 2.14, 2.12, 2.10, 2.08, 2.06, 

41]) 

42TDIST = np.array([ 

43 1.42, 1.38, 1.34, 1.30, 1.26, 

44 1.23, 1.20, 1.18, 1.16, 1.15, 

45 1.14, 1.15, 1.18, 1.24, 1.33, 

46]) 

47MAG_EXPECTED = np.array([ 

48 20.03700074, 20.01538407, 19.99152984, 19.98201741, 19.97201618, 

49 20.00200255, 20.01813960, 20.07342182, 20.11401822, 20.19891977, 

50 20.30457878, 20.43928577, 20.69068120, 21.05570108, 21.53526150, 

51]) 

52MAG_SIGMA = np.full_like(MAG_EXPECTED, 0.03) 

53 

54 

55class TestHG12Model(lsst.utils.tests.TestCase): 

56 

57 def testHG12ModelEvaluation(self): 

58 """Verify HG12_model reproduces independently calculated 

59 apparent magnitudes. 

60 """ 

61 phase_rad = np.deg2rad(PHASE_ANGLE) 

62 reduced_mag = HG12_model(phase_rad, [H_TRUE, G12_TRUE]) 

63 dmag = 5.0 * np.log10(TDIST * RDIST) 

64 apparent = reduced_mag + dmag 

65 

66 np.testing.assert_allclose( 

67 apparent, MAG_EXPECTED, atol=1e-6, 

68 err_msg="HG12_model does not reproduce expected mags", 

69 ) 

70 

71 def testFitHG12FreeG12(self): 

72 """Fit noiseless data with free G12 and verify parameter 

73 recovery. 

74 """ 

75 result = fitHG12( 

76 MAG_EXPECTED, MAG_SIGMA, PHASE_ANGLE, TDIST, RDIST, 

77 ) 

78 H_fit, G12_fit, H_err, G_err, HG_cov, chi2dof, nobs = result 

79 

80 self.assertAlmostEqual(H_fit, H_TRUE, places=4) 

81 self.assertAlmostEqual(G12_fit, G12_TRUE, places=4) 

82 self.assertTrue(np.isfinite(H_err) and H_err > 0) 

83 self.assertTrue(np.isfinite(G_err) and G_err > 0) 

84 self.assertLess(chi2dof, 1e-6) 

85 self.assertEqual(nobs, 15) 

86 

87 def testFitHG12FixedG12(self): 

88 """Fit with fixedG12 matching the true value.""" 

89 result = fitHG12( 

90 MAG_EXPECTED, MAG_SIGMA, PHASE_ANGLE, TDIST, RDIST, 

91 fixedG12=G12_TRUE, 

92 ) 

93 H_fit, G12_fit, H_err, G_err, HG_cov, chi2dof, nobs = result 

94 

95 self.assertAlmostEqual(H_fit, H_TRUE, places=4) 

96 self.assertEqual(G12_fit, G12_TRUE) 

97 self.assertTrue(np.isfinite(H_err) and H_err > 0) 

98 self.assertTrue(np.isnan(G_err)) 

99 self.assertTrue(np.isnan(HG_cov)) 

100 self.assertLess(chi2dof, 1e-6) 

101 self.assertEqual(nobs, 15) 

102 

103 def testFitHG12FixedG12Wrong(self): 

104 """Fit with wrong fixedG12 — should succeed but with worse 

105 chi2 and shifted H. 

106 """ 

107 result_correct = fitHG12( 

108 MAG_EXPECTED, MAG_SIGMA, PHASE_ANGLE, TDIST, RDIST, 

109 fixedG12=G12_TRUE, 

110 ) 

111 result_wrong = fitHG12( 

112 MAG_EXPECTED, MAG_SIGMA, PHASE_ANGLE, TDIST, RDIST, 

113 fixedG12=0.8, 

114 ) 

115 chi2_correct = result_correct.chi2dof 

116 H_wrong = result_wrong.H 

117 chi2_wrong = result_wrong.chi2dof 

118 

119 # Wrong G12 should produce a noticeably different H 

120 self.assertGreater(abs(H_wrong - H_TRUE), 0.005) 

121 # Wrong G12 should produce larger chi2 

122 self.assertGreater(chi2_wrong, chi2_correct) 

123 

124 def testMagSigmaFloor(self): 

125 """Verify magSigmaFloor reduces chi2 and increases H_err.""" 

126 rng = np.random.RandomState(42) 

127 noisy_mag = MAG_EXPECTED + rng.normal(0, 0.03, len(MAG_EXPECTED)) 

128 

129 result_no_floor = fitHG12( 

130 noisy_mag, MAG_SIGMA, PHASE_ANGLE, TDIST, RDIST, 

131 fixedG12=G12_TRUE, 

132 ) 

133 result_with_floor = fitHG12( 

134 noisy_mag, MAG_SIGMA, PHASE_ANGLE, TDIST, RDIST, 

135 fixedG12=G12_TRUE, magSigmaFloor=0.05, 

136 ) 

137 chi2_no_floor = result_no_floor.chi2dof 

138 chi2_with_floor = result_with_floor.chi2dof 

139 herr_no_floor = result_no_floor.H_err 

140 herr_with_floor = result_with_floor.H_err 

141 

142 # Floor inflates errors → chi2 should decrease 

143 self.assertLess(chi2_with_floor, chi2_no_floor) 

144 # H_err should increase (or at least not decrease) with floor 

145 self.assertGreaterEqual(herr_with_floor, herr_no_floor * 0.99) 

146 

147 def testFitFailureSingleObs(self): 

148 """Single observation should fail gracefully.""" 

149 # Free fit with 1 obs and 2 params is underdetermined 

150 result = fitHG12( 

151 MAG_EXPECTED[:1], MAG_SIGMA[:1], PHASE_ANGLE[:1], 

152 TDIST[:1], RDIST[:1], fixedG12=0.5, 

153 ) 

154 # With fixedG12, 1 obs and 1 param gives 0 DOF — fit may 

155 # succeed but let's just check it doesn't crash. 

156 self.assertEqual(len(result), 7) 

157 

158 # With 0 observations, should return NaN 

159 result = fitHG12( 

160 np.array([]), np.array([]), np.array([]), 

161 np.array([]), np.array([]), 

162 ) 

163 H, G, H_err, G_err, HG_cov, chi2dof, nobs = result 

164 self.assertTrue(np.isnan(H)) 

165 self.assertEqual(nobs, 0) 

166 

167 def testFitWithNaNMagnitudes(self): 

168 """NaN magnitudes (from negative fluxes) should be filtered 

169 out, not crash the fit. 

170 """ 

171 # Insert NaN values into the test data 

172 mag = MAG_EXPECTED.copy() 

173 sigma = MAG_SIGMA.copy() 

174 phase = PHASE_ANGLE.copy() 

175 tdist = TDIST.copy() 

176 rdist = RDIST.copy() 

177 

178 # Inject 3 NaN magnitudes 

179 mag_nan = np.concatenate([mag, [np.nan, np.nan, np.nan]]) 

180 sig_nan = np.concatenate([sigma, [0.03, 0.03, 0.03]]) 

181 pa_nan = np.concatenate([phase, [10.0, 20.0, 30.0]]) 

182 td_nan = np.concatenate([tdist, [1.3, 1.3, 1.3]]) 

183 rd_nan = np.concatenate([rdist, [2.2, 2.2, 2.2]]) 

184 

185 result = fitHG12( 

186 mag_nan, sig_nan, pa_nan, td_nan, rd_nan, 

187 fixedG12=G12_TRUE, 

188 ) 

189 # Should succeed using the 15 valid observations 

190 self.assertAlmostEqual(result.H, H_TRUE, places=4) 

191 self.assertEqual(result.nobs, 15) 

192 

193 def testFitAllNaN(self): 

194 """All-NaN input should return failure, not crash.""" 

195 result = fitHG12( 

196 np.array([np.nan, np.nan]), 

197 np.array([0.03, 0.03]), 

198 np.array([10.0, 20.0]), 

199 np.array([1.3, 1.3]), 

200 np.array([2.2, 2.2]), 

201 ) 

202 self.assertTrue(np.isnan(result.H)) 

203 self.assertEqual(result.nobs, 0) 

204 

205 def testSigmaClipping(self): 

206 """Outliers should be rejected when nSigmaClip is set.""" 

207 # Start with clean noiseless data, add a few outliers 

208 mag = MAG_EXPECTED.copy() 

209 sigma = MAG_SIGMA.copy() 

210 

211 # Add 3 extreme outlier observations 

212 outlier_mag = np.array([25.0, 15.0, 30.0]) 

213 outlier_sigma = np.array([0.03, 0.03, 0.03]) 

214 outlier_phase = np.array([10.0, 20.0, 40.0]) 

215 outlier_tdist = np.array([1.3, 1.2, 1.25]) 

216 outlier_rdist = np.array([2.2, 2.18, 2.1]) 

217 

218 all_mag = np.concatenate([mag, outlier_mag]) 

219 all_sigma = np.concatenate([sigma, outlier_sigma]) 

220 all_phase = np.concatenate([PHASE_ANGLE, outlier_phase]) 

221 all_tdist = np.concatenate([TDIST, outlier_tdist]) 

222 all_rdist = np.concatenate([RDIST, outlier_rdist]) 

223 

224 # Without clipping: fit is pulled by outliers 

225 result_no_clip = fitHG12( 

226 all_mag, all_sigma, all_phase, all_tdist, all_rdist, 

227 fixedG12=G12_TRUE, 

228 ) 

229 

230 # With clipping: outliers removed, H should recover 

231 result_clip = fitHG12( 

232 all_mag, all_sigma, all_phase, all_tdist, all_rdist, 

233 fixedG12=G12_TRUE, nSigmaClip=3.0, 

234 ) 

235 

236 # Clipped result should be closer to true H 

237 self.assertLess( 

238 abs(result_clip.H - H_TRUE), 

239 abs(result_no_clip.H - H_TRUE), 

240 ) 

241 # Should have fewer observations after clipping 

242 self.assertLess(result_clip.nobs, 18) 

243 self.assertGreater(result_clip.nobs, 0) 

244 

245 def testSigmaClipNoOutliers(self): 

246 """Clean data with clipping should keep all points.""" 

247 result = fitHG12( 

248 MAG_EXPECTED, MAG_SIGMA, PHASE_ANGLE, TDIST, RDIST, 

249 fixedG12=G12_TRUE, nSigmaClip=3.0, 

250 ) 

251 self.assertAlmostEqual(result.H, H_TRUE, places=4) 

252 self.assertEqual(result.nobs, 15) 

253 

254 def testDistanceCorrection(self): 

255 """Same (H, G12, phase) at different distances should yield 

256 the same H. 

257 """ 

258 # Create a second set at 2x heliocentric distance 

259 rdist2 = RDIST * 2.0 

260 phase_rad = np.deg2rad(PHASE_ANGLE) 

261 reduced_mag = HG12_model(phase_rad, [H_TRUE, G12_TRUE]) 

262 mag2 = reduced_mag + 5.0 * np.log10(TDIST * rdist2) 

263 sigma2 = np.full_like(mag2, 0.03) 

264 

265 result = fitHG12( 

266 mag2, sigma2, PHASE_ANGLE, TDIST, rdist2, 

267 ) 

268 self.assertAlmostEqual(result.H, H_TRUE, places=4) 

269 

270 

271class MemoryTester(lsst.utils.tests.MemoryTestCase): 

272 pass 

273 

274 

275def setup_module(module): 

276 lsst.utils.tests.init() 

277 

278 

279if __name__ == "__main__": 279 ↛ 280line 279 didn't jump to line 280 because the condition on line 279 was never true

280 lsst.utils.tests.init() 

281 unittest.main()