Coverage for python / lsst / analysis / tools / actions / keyedData / stellarLocusFit.py: 10%

148 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-22 09:09 +0000

1# This file is part of analysis_tools. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21 

22from __future__ import annotations 

23 

24__all__ = ("StellarLocusFitAction",) 

25 

26from typing import cast 

27 

28import numpy as np 

29import scipy.odr as scipyODR 

30from lsst.pex.config import DictField 

31from lsst.pipe.base import Struct 

32 

33from ...interfaces import KeyedData, KeyedDataAction, KeyedDataSchema, Scalar, Vector 

34from ...math import sigmaMad 

35 

36 

37def _stellarLocusFit(xs, ys, mags, paramDict): 

38 """Make a fit to the stellar locus. 

39 

40 Parameters 

41 ---------- 

42 xs : `numpy.ndarray` [`float`] 

43 The color on the xaxis. 

44 ys : `numpy.ndarray` [`float`] 

45 The color on the yaxis. 

46 mags : `numpy.ndarray` [`float`] 

47 The magnitude of the reference band flux (in mag). 

48 paramDict : `dict` [`str`, `float`] 

49 A dictionary of parameters for line fitting: 

50 

51 ``"xMin"`` 

52 The minimum x edge of the box to use for initial fitting (`float`). 

53 ``"xMax"`` 

54 The maximum x edge of the box to use for initial fitting (`float`). 

55 ``"yMin"`` 

56 The minimum y edge of the box to use for initial fitting (`float`). 

57 ``"yMax"`` 

58 The maximum y edge of the box to use for initial fitting (`float`). 

59 ``"mHW"`` 

60 The hardwired gradient for the fit (`float`). 

61 ``"bHw"`` 

62 The hardwired intercept of the fit (`float`). 

63 ``"nSigmaToClip1"`` 

64 The number of sigma perpendicular to the fit to clip in the initial 

65 fitting loop (`float`). This should probably be stricter than the 

66 final iteration (i.e. nSigmaToClip1 < nSigmaToClip2). 

67 ``"nSigmaToClip2"`` 

68 The number of sigma perpendicular to the fit to clip in the final 

69 fitting loop (`float`). 

70 ``"minObjectForFit"`` 

71 Minimum number of objects surviving cuts to attempt fit. If not 

72 met, return NANs for values in ``fitParams`` (`int`). 

73 

74 Returns 

75 ------- 

76 fitParams : `dict` 

77 A dictionary of the calculated fit parameters: 

78 

79 ``"bPerpMin"`` 

80 The intercept of the perpendicular line that goes through xMin 

81 (`float`). 

82 ``"bPerpMax"`` 

83 The intercept of the perpendicular line that goes through xMax 

84 (`float`). 

85 ``"mODR"`` 

86 The gradient from the final round of fitting (`float`). 

87 ``"bODR"`` 

88 The intercept from the final round of fitting (`float`). 

89 ``"mPerp"`` 

90 The gradient of the line perpendicular to the line from the final 

91 fit (`float`). 

92 ``"fitPoints"`` 

93 A boolean list indicating which points were used in the final fit 

94 (`list` [`bool`]). 

95 

96 Notes 

97 ----- 

98 The code does two rounds of fitting, the first is initiated using the 

99 fixed values given in ``paramDict`` and is done using an Orthogonal 

100 Distance Regression (ODR) fit to the points defined by the box with limits 

101 defined by the keys: xMin, xMax, yMin, and yMax. Once this fitting has been 

102 done a perpendicular bisector is calculated at either end of the line and 

103 only points that fall within these lines are used to recalculate the fit. 

104 We also perform clipping of points perpendicular to the fit line that have 

105 distances that deviate more than nSigmaToClip1/2 (for an initial and final 

106 iteration) from the fit. 

107 """ 

108 fitParams = {} 

109 # Initial subselection of points to use for the fit 

110 # Check for nans/infs 

111 goodPoints = np.isfinite(xs) & np.isfinite(ys) & np.isfinite(mags) 

112 

113 fitPoints = ( 

114 goodPoints 

115 & (xs > paramDict["xMin"]) 

116 & (xs < paramDict["xMax"]) 

117 & (ys > paramDict["yMin"]) 

118 & (ys < paramDict["yMax"]) 

119 ) 

120 if sum(fitPoints) < paramDict["minObjectForFit"]: 

121 fitParams = _setFitParamsNans(fitParams, fitPoints, paramDict) 

122 return fitParams 

123 

124 linear = scipyODR.polynomial(1) 

125 

126 fitData = scipyODR.Data(xs[fitPoints], ys[fitPoints]) 

127 odr = scipyODR.ODR(fitData, linear, beta0=[paramDict["bFixed"], paramDict["mFixed"]]) 

128 params = odr.run() 

129 mODR0 = float(params.beta[1]) 

130 bODR0 = float(params.beta[0]) 

131 mPerp0 = -1.0 / mODR0 

132 

133 # Loop twice over the fit and include sigma clipping of points 

134 # perpendicular to the fit line (stricter on first iteration). 

135 for nSigmaToClip in [paramDict["nSigmaToClip1"], paramDict["nSigmaToClip2"]]: 

136 # Having found the initial fit calculate perpendicular ends. 

137 # When the gradient is really steep we need to use the 

138 # y limits of the fit line rather than the x ones. 

139 if np.abs(mODR0) > 1: 

140 yPerpMin = paramDict["yMin"] 

141 xPerpMin = (yPerpMin - bODR0) / mODR0 

142 yPerpMax = paramDict["yMax"] 

143 xPerpMax = (yPerpMax - bODR0) / mODR0 

144 else: 

145 yPerpMin = mODR0 * paramDict["xMin"] + bODR0 

146 xPerpMin = paramDict["xMin"] 

147 yPerpMax = mODR0 * paramDict["xMax"] + bODR0 

148 xPerpMax = paramDict["xMax"] 

149 

150 bPerpMin = yPerpMin - mPerp0 * xPerpMin 

151 bPerpMax = yPerpMax - mPerp0 * xPerpMax 

152 

153 # Use these perpendicular lines to choose the data and refit. 

154 fitPoints = (ys > mPerp0 * xs + bPerpMin) & (ys < mPerp0 * xs + bPerpMax) 

155 if sum(fitPoints) < paramDict["minObjectForFit"]: 

156 fitParams = _setFitParamsNans(fitParams, fitPoints, paramDict) 

157 return fitParams 

158 

159 # Compute two points along the line (making sure not to extrapolate 

160 # way off the plot limits, especially for the near vertical fits). 

161 if np.abs(mODR0) > 1: 

162 p1 = np.array([1.0, mODR0 + bODR0]) 

163 p2 = np.array([(1.0 - bODR0) / mODR0, 1.0]) 

164 else: 

165 p1 = np.array([0, bODR0]) 

166 p2 = np.array([-bODR0 / mODR0, 0]) 

167 if np.abs(sum(p1 - p2)) < 1e-12: # p1 and p2 must be different. 

168 if np.abs(mODR0) > 1: 

169 p2 = np.array([(1.5 - bODR0) / mODR0, 1.5]) 

170 else: 

171 p2 = np.array([(1.0 - bODR0) / mODR0, 1.0]) 

172 

173 # Sigma clip points based on perpendicular distance (in mmag) to 

174 # current fit. 

175 fitDists = np.array(perpDistance(p1, p2, zip(xs[fitPoints], ys[fitPoints]))) * 1000 

176 clippedStats = calcQuartileClippedStats(fitDists, nSigmaToClip=nSigmaToClip) 

177 allDists = np.array(perpDistance(p1, p2, zip(xs, ys))) * 1000 

178 keep = np.abs(allDists) <= clippedStats.clipValue 

179 fitPoints &= keep 

180 if sum(fitPoints) < paramDict["minObjectForFit"]: 

181 fitParams = _setFitParamsNans(fitParams, fitPoints, paramDict) 

182 return fitParams 

183 fitData = scipyODR.Data(xs[fitPoints], ys[fitPoints]) 

184 odr = scipyODR.ODR(fitData, linear, beta0=[bODR0, mODR0]) 

185 params = odr.run() 

186 mODR0 = params.beta[1] 

187 bODR0 = params.beta[0] 

188 

189 fitParams["bPerpMin"] = bPerpMin 

190 fitParams["bPerpMax"] = bPerpMax 

191 

192 fitParams["mODR"] = float(params.beta[1]) 

193 fitParams["bODR"] = float(params.beta[0]) 

194 

195 fitParams["mPerp"] = -1.0 / fitParams["mODR"] 

196 fitParams["goodPoints"] = goodPoints 

197 fitParams["fitPoints"] = fitPoints 

198 fitParams["paramDict"] = paramDict 

199 

200 return fitParams 

201 

202 

203def _setFitParamsNans(fitParams, fitPoints, paramDict): 

204 fitParams["bPerpMin"] = np.nan 

205 fitParams["bPerpMax"] = np.nan 

206 fitParams["mODR"] = np.nan 

207 fitParams["bODR"] = np.nan 

208 fitParams["mPerp"] = np.nan 

209 fitParams["goodPoints"] = np.nan 

210 fitParams["fitPoints"] = fitPoints 

211 fitParams["paramDict"] = paramDict 

212 return fitParams 

213 

214 

215def perpDistance(p1, p2, points): 

216 """Calculate the perpendicular distance to a line from a point. 

217 

218 Parameters 

219 ---------- 

220 p1 : `numpy.ndarray` [`float`] 

221 A point on the line. 

222 p2 : `numpy.ndarray` [`float`] 

223 Another point on the line. 

224 points : `zip` [(`float`, `float`)] 

225 The points to calculate the distance to. 

226 

227 Returns 

228 ------- 

229 dists : `numpy.ndarray` [`float`] 

230 The distances from the line to the points. Uses the cross 

231 product to work this out. 

232 """ 

233 if sum(p2 - p1) == 0: 

234 raise ValueError(f"Must supply two different points for p1, p2. Got {p1}, {p2}") 

235 points = list(points) 

236 if len(points) == 0: 

237 raise ValueError("Must provied a non-empty zip() list of points.") 

238 

239 # Recommendation from numpy docs for 2d cross product. 

240 def cross2d(x, y): 

241 return x[..., 0] * y[..., 1] - x[..., 1] * y[..., 0] 

242 

243 dists = cross2d(p2 - p1, points - p1) / np.linalg.norm(p2 - p1) 

244 

245 return dists 

246 

247 

248def calcQuartileClippedStats(dataArray, nSigmaToClip=3.0): 

249 """Calculate the quartile-based clipped statistics of a data array. 

250 

251 The difference between quartiles[2] and quartiles[0] is the interquartile 

252 distance. 0.74*interquartileDistance is an estimate of standard deviation 

253 so, in the case that ``dataArray`` has an approximately Gaussian 

254 distribution, this is equivalent to nSigma clipping. 

255 

256 Parameters 

257 ---------- 

258 dataArray : `list` or `numpy.ndarray` [`float`] 

259 List or array containing the values for which the quartile-based 

260 clipped statistics are to be calculated. 

261 nSigmaToClip : `float`, optional 

262 Number of \"sigma\" outside of which to clip data when computing the 

263 statistics. 

264 

265 Returns 

266 ------- 

267 result : `lsst.pipe.base.Struct` 

268 The quartile-based clipped statistics with ``nSigmaToClip`` clipping. 

269 Atributes are: 

270 

271 ``median`` 

272 The median of the full ``dataArray`` (`float`). 

273 ``mean`` 

274 The quartile-based clipped mean (`float`). 

275 ``stdDev`` 

276 The quartile-based clipped standard deviation (`float`). 

277 ``rms`` 

278 The quartile-based clipped root-mean-squared (`float`). 

279 ``clipValue`` 

280 The value outside of which to clip the data before computing the 

281 statistics (`float`). 

282 ``goodArray`` 

283 A boolean array indicating which data points in ``dataArray`` were 

284 used in the calculation of the statistics, where `False` indicates 

285 a clipped datapoint (`numpy.ndarray` of `bool`). 

286 """ 

287 quartiles = np.percentile(dataArray, [25, 50, 75]) 

288 assert len(quartiles) == 3 

289 median = quartiles[1] 

290 interQuartileDistance = quartiles[2] - quartiles[0] 

291 clipValue = nSigmaToClip * 0.74 * interQuartileDistance 

292 good = np.abs(dataArray - median) <= clipValue 

293 quartileClippedMean = dataArray[good].mean() 

294 quartileClippedStdDev = dataArray[good].std() 

295 quartileClippedRms = np.sqrt(np.mean(dataArray[good] ** 2)) 

296 

297 return Struct( 

298 median=median, 

299 mean=quartileClippedMean, 

300 stdDev=quartileClippedStdDev, 

301 rms=quartileClippedRms, 

302 clipValue=clipValue, 

303 goodArray=good, 

304 ) 

305 

306 

307class StellarLocusFitAction(KeyedDataAction): 

308 r"""Determine Stellar Locus fit parameters from given input `Vector`\ s.""" 

309 

310 stellarLocusFitDict = DictField[str, float]( 

311 doc="The parameters to use for the stellar locus fit. For xMin/Max, yMin/Max, " 

312 "and m/bFixed, the default parameters are examples and are not generally useful " 

313 "for any of the fits, so should be updated in the PlotAction definition in the " 

314 "atools directory. The dict needs to contain xMin/xMax/yMin/yMax which are the " 

315 "limits of the initial point selection box for fitting the stellar locus, mFixed " 

316 "and bFixed are meant to represent the intercept and gradient of a canonical fit " 

317 "for a given dataset (and should be derived from data). They are used here as an " 

318 "initial guess for the fitting. nSigmaToClip1/2 set the number of sigma to clip " 

319 "perpendicular the fit in the first and second fit iterations after the initial " 

320 "guess and point selection fit. minObjectForFit sets a minimum number of points " 

321 "deemed suitable for inclusion in the fit in order to bother attempting the fit.", 

322 default={ 

323 "xMin": 0.1, 

324 "xMax": 0.2, 

325 "yMin": 0.1, 

326 "yMax": 0.2, 

327 "mHW": 0.5, 

328 "bHW": 0.0, 

329 "nSigmaToClip1": 3.5, 

330 "nSigmaToClip2": 5.0, 

331 "minObjectForFit": 7, 

332 }, 

333 ) 

334 

335 def getInputSchema(self) -> KeyedDataSchema: 

336 return (("x", Vector), ("y", Vector)) 

337 

338 def getOutputSchema(self) -> KeyedDataSchema: 

339 value = ( 

340 (f"{self.identity or ''}_sigmaMAD", Scalar), 

341 (f"{self.identity or ''}_median", Scalar), 

342 ) 

343 return value 

344 

345 def __call__(self, data: KeyedData, **kwargs) -> KeyedData: 

346 xs = cast(Vector, data["x"]) 

347 ys = cast(Vector, data["y"]) 

348 mags = cast(Vector, data["mag"]) 

349 

350 fitParams = _stellarLocusFit(xs, ys, mags, self.stellarLocusFitDict) 

351 # Bail out if there were not enough points to fit. 

352 for value in fitParams.values(): 

353 if isinstance(value, float): 

354 if np.isnan(value): 

355 fitParams[f"{self.identity or ''}_sigmaMAD"] = np.nan 

356 fitParams[f"{self.identity or ''}_median"] = np.nan 

357 return fitParams 

358 fitPoints = fitParams["fitPoints"] 

359 

360 if np.fabs(self.stellarLocusFitDict["mFixed"]) > 1: 

361 ysFitLineFixed = np.array([self.stellarLocusFitDict["yMin"], self.stellarLocusFitDict["yMax"]]) 

362 xsFitLineFixed = (ysFitLineFixed - self.stellarLocusFitDict["bFixed"]) / self.stellarLocusFitDict[ 

363 "mFixed" 

364 ] 

365 ysFitLine = np.array([self.stellarLocusFitDict["yMin"], self.stellarLocusFitDict["yMax"]]) 

366 xsFitLine = (ysFitLine - fitParams["bODR"]) / fitParams["mODR"] 

367 

368 else: 

369 xsFitLineFixed = np.array([self.stellarLocusFitDict["xMin"], self.stellarLocusFitDict["xMax"]]) 

370 ysFitLineFixed = ( 

371 self.stellarLocusFitDict["mFixed"] * xsFitLineFixed + self.stellarLocusFitDict["bFixed"] 

372 ) 

373 xsFitLine = [self.stellarLocusFitDict["xMin"], self.stellarLocusFitDict["xMax"]] 

374 ysFitLine = np.array( 

375 [ 

376 fitParams["mODR"] * xsFitLine[0] + fitParams["bODR"], 

377 fitParams["mODR"] * xsFitLine[1] + fitParams["bODR"], 

378 ] 

379 ) 

380 

381 # Calculate the distances to that line. 

382 # Need two points to characterize the lines we want to get the 

383 # distances to. 

384 p1 = np.array([xsFitLine[0], ysFitLine[0]]) 

385 p2 = np.array([xsFitLine[1], ysFitLine[1]]) 

386 

387 # Convert this to mmag. 

388 dists = np.array(perpDistance(p1, p2, zip(xs[fitPoints], ys[fitPoints]))) * 1000 

389 

390 # Now we have the information for the perpendicular line we 

391 # can use it to calculate the points at the ends of the 

392 # perpendicular lines that intersect at the box edges. 

393 if np.fabs(self.stellarLocusFitDict["mFixed"]) > 1: 

394 xMid = (self.stellarLocusFitDict["yMin"] - fitParams["bODR"]) / fitParams["mODR"] 

395 xs = np.array([xMid - 0.5, xMid, xMid + 0.5]) 

396 ys = fitParams["mPerp"] * xs + fitParams["bPerpMin"] 

397 else: 

398 xs = np.array( 

399 [ 

400 self.stellarLocusFitDict["xMin"] - 0.2, 

401 self.stellarLocusFitDict["xMin"], 

402 self.stellarLocusFitDict["xMin"] + 0.2, 

403 ] 

404 ) 

405 ys = xs * fitParams["mPerp"] + fitParams["bPerpMin"] 

406 

407 if np.fabs(self.stellarLocusFitDict["mFixed"]) > 1: 

408 xMid = (self.stellarLocusFitDict["yMax"] - fitParams["bODR"]) / fitParams["mODR"] 

409 xs = np.array([xMid - 0.5, xMid, xMid + 0.5]) 

410 ys = fitParams["mPerp"] * xs + fitParams["bPerpMax"] 

411 else: 

412 xs = np.array( 

413 [ 

414 self.stellarLocusFitDict["xMax"] - 0.2, 

415 self.stellarLocusFitDict["xMax"], 

416 self.stellarLocusFitDict["xMax"] + 0.2, 

417 ] 

418 ) 

419 ys = xs * fitParams["mPerp"] + fitParams["bPerpMax"] 

420 

421 fit_sigma, fit_med = (sigmaMad(dists), np.median(dists)) if len(dists) else (np.nan, np.nan) 

422 fitParams[f"{self.identity or ''}_sigmaMAD"] = fit_sigma 

423 fitParams[f"{self.identity or ''}_median"] = fit_med 

424 

425 return fitParams