Coverage for python / lsst / pipe / tasks / ssp / photfit.py: 18%

140 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-05 08:30 +0000

1import numpy as np 

2from collections import namedtuple 

3from scipy.interpolate import CubicSpline 

4from scipy.optimize import leastsq, least_squares 

5import warnings 

6 

7HG12FitResult = namedtuple( 

8 "HG12FitResult", 

9 ["H", "G12", "H_err", "G12_err", "HG_cov", "chi2dof", "nobs"], 

10) 

11 

12# Constants 

13 

14A = [3.332, 1.862] 

15B = [0.631, 1.218] 

16C = [0.986, 0.238] 

17 

18# values taken from sbpy for convenience 

19 

20alpha_12 = np.deg2rad([7.5, 30.0, 60, 90, 120, 150]) 

21 

22phi_1_sp = [7.5e-1, 3.3486016e-1, 1.3410560e-1, 5.1104756e-2, 2.1465687e-2, 3.6396989e-3] 

23phi_1_derivs = [-1.9098593, -9.1328612e-2] 

24 

25phi_2_sp = [9.25e-1, 6.2884169e-1, 3.1755495e-1, 1.2716367e-1, 2.2373903e-2, 1.6505689e-4] 

26phi_2_derivs = [-5.7295780e-1, -8.6573138e-8] 

27 

28alpha_3 = np.deg2rad([0.0, 0.3, 1.0, 2.0, 4.0, 8.0, 12.0, 20.0, 30.0]) 

29 

30phi_3_sp = [ 

31 1.0, 

32 8.3381185e-1, 

33 5.7735424e-1, 

34 4.2144772e-1, 

35 2.3174230e-1, 

36 1.0348178e-1, 

37 6.1733473e-2, 

38 1.6107006e-2, 

39 0.0, 

40] 

41 

42phi_3_derivs = [-1.0630097, 0] 

43 

44 

45phi_1 = CubicSpline(alpha_12, phi_1_sp, bc_type=((1, phi_1_derivs[0]), (1, phi_1_derivs[1]))) 

46phi_2 = CubicSpline(alpha_12, phi_2_sp, bc_type=((1, phi_2_derivs[0]), (1, phi_2_derivs[1]))) 

47phi_3 = CubicSpline(alpha_3, phi_3_sp, bc_type=((1, phi_3_derivs[0]), (1, phi_3_derivs[1]))) 

48 

49 

50def HG_model(phase, params): 

51 sin_a = np.sin(phase) 

52 tan_ah = np.tan(phase / 2) 

53 

54 W = np.exp(-90.56 * tan_ah * tan_ah) 

55 scale_sina = sin_a / (0.119 + 1.341 * sin_a - 0.754 * sin_a * sin_a) 

56 

57 phi_1_S = 1 - C[0] * scale_sina 

58 phi_2_S = 1 - C[1] * scale_sina 

59 

60 phi_1_L = np.exp(-A[0] * np.power(tan_ah, B[0])) 

61 phi_2_L = np.exp(-A[1] * np.power(tan_ah, B[1])) 

62 

63 phi_1 = W * phi_1_S + (1 - W) * phi_1_L 

64 phi_2 = W * phi_2_S + (1 - W) * phi_2_L 

65 return params[0] - 2.5 * np.log10((1 - params[1]) * phi_1 + (params[1]) * phi_2) 

66 

67 

68def HG1G2_model(phase, params): 

69 phi_1_ev = phi_1(phase) 

70 phi_2_ev = phi_2(phase) 

71 phi_3_ev = phi_3(phase) 

72 

73 msk = phase < 7.5 * np.pi / 180 

74 

75 phi_1_ev[msk] = 1 - 6 * phase[msk] / np.pi 

76 phi_2_ev[msk] = 1 - 9 * phase[msk] / (5 * np.pi) 

77 

78 phi_3_ev[phase > np.pi / 6] = 0 

79 

80 return params[0] - 2.5 * np.log10( 

81 params[1] * phi_1_ev + params[2] * phi_2_ev + (1 - params[1] - params[2]) * phi_3_ev 

82 ) 

83 

84 

85def HG12_model(phase, params): 

86 if params[1] >= 0.2: 

87 G1 = +0.9529 * params[1] + 0.02162 

88 G2 = -0.6125 * params[1] + 0.5572 

89 else: 

90 G1 = +0.7527 * params[1] + 0.06164 

91 G2 = -0.9612 * params[1] + 0.6270 

92 

93 return HG1G2_model(phase, [params[0], G1, G2]) 

94 

95 

96def HG12star_model(phase, params): 

97 G1 = 0 + params[1] * 0.84293649 

98 G2 = 0.53513350 - params[1] * 0.53513350 

99 

100 return HG1G2_model(phase, [params[0], G1, G2]) 

101 

102 

103def chi2(params, mag, phase, mag_err, model): 

104 pred = model(phase, params) 

105 return (mag - pred) / mag_err 

106 

107 

108def fit(mag, phase, sigma, model=HG12_model, params=[0.1]): 

109 phase = np.deg2rad(phase) 

110 

111 sol = leastsq(chi2, [mag[0]] + params, (mag, phase, sigma, model), full_output=True) 

112 

113 return sol 

114 

115 

116def fitHG12( 

117 mag, magSigma, phaseAngle, tdist, rdist, 

118 fixedG12=None, magSigmaFloor=0.0, nSigmaClip=None, 

119): 

120 """Fit the HG12 phase curve model (Muinonen et al. 2010). 

121 

122 Fits absolute magnitude H (and optionally the slope parameter 

123 G12) to apparent magnitude observations at known phase angles 

124 and distances. 

125 

126 Parameters 

127 ---------- 

128 mag : array_like 

129 Apparent magnitudes. 

130 magSigma : array_like 

131 Magnitude uncertainties (1-sigma). 

132 phaseAngle : array_like 

133 Phase angles in degrees. 

134 tdist : array_like 

135 Topocentric (observer-target) distances in AU. 

136 rdist : array_like 

137 Heliocentric (sun-target) distances in AU. 

138 fixedG12 : float or None, optional 

139 If set, fix G12 to this value and only fit H. 

140 If None (default), both H and G12 are fit. 

141 magSigmaFloor : float, optional 

142 Systematic error floor (mag) added in quadrature to 

143 ``magSigma`` before fitting. Default is 0.0. 

144 nSigmaClip : float or None, optional 

145 If set, perform outlier rejection: an initial robust fit 

146 (soft_l1 loss) followed by sigma clipping at this 

147 threshold, then a final linear least-squares refit on the 

148 clipped data. If None (default), no clipping is performed. 

149 

150 Returns 

151 ------- 

152 result : `HG12FitResult` 

153 Named tuple with fields: 

154 

155 ``H`` 

156 Best-fit absolute magnitude. 

157 ``G12`` 

158 Best-fit (or fixed) slope parameter. 

159 ``H_err`` 

160 Uncertainty on H from the covariance matrix. 

161 ``G12_err`` 

162 Uncertainty on G12 (NaN if ``fixedG12`` is set). 

163 ``HG_cov`` 

164 H-G12 covariance (NaN if ``fixedG12`` is set). 

165 ``chi2dof`` 

166 Reduced chi-squared of the fit. 

167 ``nobs`` 

168 Number of observations used (after clipping). 

169 

170 On failure, all float fields are NaN and ``nobs`` is 0. 

171 """ 

172 nobsv = len(mag) 

173 

174 if nobsv == 0: 

175 return HG12FitResult(*(np.nan,) * 6, nobs=0) 

176 

177 # ensure these are plain ndarrays 

178 (mag, magSigma, phaseAngle, tdist, rdist) = map( 

179 np.asarray, (mag, magSigma, phaseAngle, tdist, rdist) 

180 ) 

181 

182 # add systematic error floor in quadrature 

183 if magSigmaFloor > 0: 

184 magSigma = np.sqrt(magSigma**2 + magSigmaFloor**2) 

185 

186 # filter to finite magnitudes and positive errors 

187 good = ( 

188 np.isfinite(mag) & np.isfinite(magSigma) 

189 & (magSigma > 0) 

190 ) 

191 mag = mag[good] 

192 magSigma = magSigma[good] 

193 phaseAngle = phaseAngle[good] 

194 tdist = tdist[good] 

195 rdist = rdist[good] 

196 nobsv = len(mag) 

197 

198 if nobsv == 0: 

199 return HG12FitResult(*(np.nan,) * 6, nobs=0) 

200 

201 # correct the mag to 1AU distance 

202 dmag = -5.0 * np.log10(tdist * rdist) 

203 mag = mag + dmag 

204 

205 if fixedG12 is not None: 

206 def model(phase, params): 

207 return HG12_model(phase, [params[0], fixedG12]) 

208 

209 nparams = 1 

210 else: 

211 model = HG12_model 

212 nparams = 2 

213 

214 phase_rad = np.deg2rad(phaseAngle) 

215 x0 = np.array( 

216 [mag[0]] + ([] if fixedG12 is not None else [0.1]) 

217 ) 

218 

219 def residuals(params): 

220 return (mag - model(phase_rad, params)) / magSigma 

221 

222 # fit, suppressing warnings 

223 with warnings.catch_warnings(): 

224 warnings.simplefilter("ignore") 

225 

226 if nSigmaClip is not None and nobsv > nparams + 1: 

227 # Stage 1: robust fit with soft_l1 loss 

228 sol_robust = least_squares( 

229 residuals, x0, loss='soft_l1', f_scale=1.0, 

230 ) 

231 if not sol_robust.success: 

232 return HG12FitResult(*(np.nan,) * 6, nobs=0) 

233 

234 # Sigma clipping on residuals from robust fit 

235 resid = residuals(sol_robust.x) 

236 keep = np.abs(resid) < nSigmaClip 

237 mag = mag[keep] 

238 magSigma = magSigma[keep] 

239 phase_rad = phase_rad[keep] 

240 nobsv = len(mag) 

241 

242 if nobsv <= nparams: 

243 return HG12FitResult(*(np.nan,) * 6, nobs=0) 

244 

245 # Redefine residuals for clipped data 

246 def residuals(params): 

247 return ( 

248 (mag - model(phase_rad, params)) / magSigma 

249 ) 

250 

251 x0 = sol_robust.x 

252 

253 # Final fit (linear loss for proper chi2/covariance) 

254 sol = least_squares(residuals, x0, loss='linear') 

255 

256 if not sol.success: 

257 return HG12FitResult(*(np.nan,) * 6, nobs=0) 

258 

259 # Extract results 

260 chi2_total = np.sum(sol.fun ** 2) 

261 

262 # Covariance from Jacobian: cov = inv(J^T J) 

263 J = sol.jac 

264 try: 

265 cov = np.linalg.inv(J.T @ J) 

266 except np.linalg.LinAlgError: 

267 return HG12FitResult(*(np.nan,) * 6, nobs=0) 

268 

269 H = sol.x[0] 

270 H_err = np.sqrt(cov[0, 0]) 

271 

272 if fixedG12 is not None: 

273 G = fixedG12 

274 G_err = np.nan 

275 HG_cov = np.nan 

276 else: 

277 G = sol.x[1] 

278 G_err = np.sqrt(cov[1, 1]) 

279 HG_cov = cov[0, 1] 

280 

281 return HG12FitResult( 

282 H=H, G12=G, H_err=H_err, G12_err=G_err, 

283 HG_cov=HG_cov, 

284 chi2dof=chi2_total / (nobsv - nparams), 

285 nobs=nobsv, 

286 ) 

287 

288 

289#################### 

290 

291 

292def phase_angle_deg(r_obj_sun, r_obs_sun): 

293 """ 

294 Compute phase angle (Sun–Object–Observer) in degrees. 

295 

296 Parameters 

297 ---------- 

298 r_obj_sun : array, shape (3,) or (3, N) 

299 Object position vector wrt Sun (Sun → object). 

300 r_obs_sun : array, shape (3,) or (3, N) 

301 Observer position vector wrt Sun (Sun → observer). 

302 

303 Returns 

304 ------- 

305 float or ndarray 

306 Phase angle(s) in degrees, in [0, 180]. 

307 """ 

308 r_obj_sun = np.asarray(r_obj_sun) 

309 r_obs_sun = np.asarray(r_obs_sun) 

310 

311 # Vectors at the object 

312 v_sun = -r_obj_sun # object → Sun 

313 v_obs = r_obs_sun - r_obj_sun # object → observer 

314 

315 # Dot products and norms along axis 0 

316 dot = np.sum(v_sun * v_obs, axis=0) 

317 norm_sun = np.linalg.norm(v_sun, axis=0) 

318 norm_obs = np.linalg.norm(v_obs, axis=0) 

319 

320 cosang = dot / (norm_sun * norm_obs) 

321 cosang = np.clip(cosang, -1.0, 1.0) 

322 

323 return np.degrees(np.arccos(cosang)) 

324 

325 

326def hg_V_mag(H, G, r, delta, phase_deg): 

327 """ 

328 Compute apparent V magnitude from the IAU H–G system. 

329 

330 Parameters 

331 ---------- 

332 H : float or ndarray 

333 Absolute magnitude (V-band). 

334 G : float or ndarray 

335 Slope parameter. 

336 r : float or ndarray 

337 Heliocentric distance in AU. 

338 delta : float or ndarray 

339 Observer distance (Δ) in AU. 

340 phase_deg : float or ndarray 

341 Phase angle in degrees. 

342 """ 

343 a = np.radians(phase_deg) / 2.0 

344 

345 # Phase functions 

346 phi1 = np.exp(-3.33 * np.tan(a) ** 0.63) 

347 phi2 = np.exp(-1.87 * np.tan(a) ** 1.22) 

348 

349 phi = (1 - G) * phi1 + G * phi2 

350 

351 return H + 5 * np.log10(r * delta) - 2.5 * np.log10(phi)