Coverage for python / lsst / analysis / tools / actions / keyedData / stellarLocusFit.py: 10%

148 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-21 10:55 +0000

1# This file is part of analysis_tools. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21 

22from __future__ import annotations 

23 

24__all__ = ("StellarLocusFitAction",) 

25 

26from typing import cast 

27 

28import numpy as np 

29import scipy.odr as scipyODR 

30 

31from lsst.pex.config import DictField 

32from lsst.pipe.base import Struct 

33 

34from ...interfaces import KeyedData, KeyedDataAction, KeyedDataSchema, Scalar, Vector 

35from ...math import sigmaMad 

36 

37 

38def _stellarLocusFit(xs, ys, mags, paramDict): 

39 """Make a fit to the stellar locus. 

40 

41 Parameters 

42 ---------- 

43 xs : `numpy.ndarray` [`float`] 

44 The color on the xaxis. 

45 ys : `numpy.ndarray` [`float`] 

46 The color on the yaxis. 

47 mags : `numpy.ndarray` [`float`] 

48 The magnitude of the reference band flux (in mag). 

49 paramDict : `dict` [`str`, `float`] 

50 A dictionary of parameters for line fitting: 

51 

52 ``"xMin"`` 

53 The minimum x edge of the box to use for initial fitting (`float`). 

54 ``"xMax"`` 

55 The maximum x edge of the box to use for initial fitting (`float`). 

56 ``"yMin"`` 

57 The minimum y edge of the box to use for initial fitting (`float`). 

58 ``"yMax"`` 

59 The maximum y edge of the box to use for initial fitting (`float`). 

60 ``"mHW"`` 

61 The hardwired gradient for the fit (`float`). 

62 ``"bHw"`` 

63 The hardwired intercept of the fit (`float`). 

64 ``"nSigmaToClip1"`` 

65 The number of sigma perpendicular to the fit to clip in the initial 

66 fitting loop (`float`). This should probably be stricter than the 

67 final iteration (i.e. nSigmaToClip1 < nSigmaToClip2). 

68 ``"nSigmaToClip2"`` 

69 The number of sigma perpendicular to the fit to clip in the final 

70 fitting loop (`float`). 

71 ``"minObjectForFit"`` 

72 Minimum number of objects surviving cuts to attempt fit. If not 

73 met, return NANs for values in ``fitParams`` (`int`). 

74 

75 Returns 

76 ------- 

77 fitParams : `dict` 

78 A dictionary of the calculated fit parameters: 

79 

80 ``"bPerpMin"`` 

81 The intercept of the perpendicular line that goes through xMin 

82 (`float`). 

83 ``"bPerpMax"`` 

84 The intercept of the perpendicular line that goes through xMax 

85 (`float`). 

86 ``"mODR"`` 

87 The gradient from the final round of fitting (`float`). 

88 ``"bODR"`` 

89 The intercept from the final round of fitting (`float`). 

90 ``"mPerp"`` 

91 The gradient of the line perpendicular to the line from the final 

92 fit (`float`). 

93 ``"fitPoints"`` 

94 A boolean list indicating which points were used in the final fit 

95 (`list` [`bool`]). 

96 

97 Notes 

98 ----- 

99 The code does two rounds of fitting, the first is initiated using the 

100 fixed values given in ``paramDict`` and is done using an Orthogonal 

101 Distance Regression (ODR) fit to the points defined by the box with limits 

102 defined by the keys: xMin, xMax, yMin, and yMax. Once this fitting has been 

103 done a perpendicular bisector is calculated at either end of the line and 

104 only points that fall within these lines are used to recalculate the fit. 

105 We also perform clipping of points perpendicular to the fit line that have 

106 distances that deviate more than nSigmaToClip1/2 (for an initial and final 

107 iteration) from the fit. 

108 """ 

109 fitParams = {} 

110 # Initial subselection of points to use for the fit 

111 # Check for nans/infs 

112 goodPoints = np.isfinite(xs) & np.isfinite(ys) & np.isfinite(mags) 

113 

114 fitPoints = ( 

115 goodPoints 

116 & (xs > paramDict["xMin"]) 

117 & (xs < paramDict["xMax"]) 

118 & (ys > paramDict["yMin"]) 

119 & (ys < paramDict["yMax"]) 

120 ) 

121 if sum(fitPoints) < paramDict["minObjectForFit"]: 

122 fitParams = _setFitParamsNans(fitParams, fitPoints, paramDict) 

123 return fitParams 

124 

125 linear = scipyODR.polynomial(1) 

126 

127 fitData = scipyODR.Data(xs[fitPoints], ys[fitPoints]) 

128 odr = scipyODR.ODR(fitData, linear, beta0=[paramDict["bFixed"], paramDict["mFixed"]]) 

129 params = odr.run() 

130 mODR0 = float(params.beta[1]) 

131 bODR0 = float(params.beta[0]) 

132 mPerp0 = -1.0 / mODR0 

133 

134 # Loop twice over the fit and include sigma clipping of points 

135 # perpendicular to the fit line (stricter on first iteration). 

136 for nSigmaToClip in [paramDict["nSigmaToClip1"], paramDict["nSigmaToClip2"]]: 

137 # Having found the initial fit calculate perpendicular ends. 

138 # When the gradient is really steep we need to use the 

139 # y limits of the fit line rather than the x ones. 

140 if np.abs(mODR0) > 1: 

141 yPerpMin = paramDict["yMin"] 

142 xPerpMin = (yPerpMin - bODR0) / mODR0 

143 yPerpMax = paramDict["yMax"] 

144 xPerpMax = (yPerpMax - bODR0) / mODR0 

145 else: 

146 yPerpMin = mODR0 * paramDict["xMin"] + bODR0 

147 xPerpMin = paramDict["xMin"] 

148 yPerpMax = mODR0 * paramDict["xMax"] + bODR0 

149 xPerpMax = paramDict["xMax"] 

150 

151 bPerpMin = yPerpMin - mPerp0 * xPerpMin 

152 bPerpMax = yPerpMax - mPerp0 * xPerpMax 

153 

154 # Use these perpendicular lines to choose the data and refit. 

155 fitPoints = (ys > mPerp0 * xs + bPerpMin) & (ys < mPerp0 * xs + bPerpMax) 

156 if sum(fitPoints) < paramDict["minObjectForFit"]: 

157 fitParams = _setFitParamsNans(fitParams, fitPoints, paramDict) 

158 return fitParams 

159 

160 # Compute two points along the line (making sure not to extrapolate 

161 # way off the plot limits, especially for the near vertical fits). 

162 if np.abs(mODR0) > 1: 

163 p1 = np.array([1.0, mODR0 + bODR0]) 

164 p2 = np.array([(1.0 - bODR0) / mODR0, 1.0]) 

165 else: 

166 p1 = np.array([0, bODR0]) 

167 p2 = np.array([-bODR0 / mODR0, 0]) 

168 if np.abs(sum(p1 - p2)) < 1e-12: # p1 and p2 must be different. 

169 if np.abs(mODR0) > 1: 

170 p2 = np.array([(1.5 - bODR0) / mODR0, 1.5]) 

171 else: 

172 p2 = np.array([(1.0 - bODR0) / mODR0, 1.0]) 

173 

174 # Sigma clip points based on perpendicular distance (in mmag) to 

175 # current fit. 

176 fitDists = np.array(perpDistance(p1, p2, zip(xs[fitPoints], ys[fitPoints]))) * 1000 

177 clippedStats = calcQuartileClippedStats(fitDists, nSigmaToClip=nSigmaToClip) 

178 allDists = np.array(perpDistance(p1, p2, zip(xs, ys))) * 1000 

179 keep = np.abs(allDists) <= clippedStats.clipValue 

180 fitPoints &= keep 

181 if sum(fitPoints) < paramDict["minObjectForFit"]: 

182 fitParams = _setFitParamsNans(fitParams, fitPoints, paramDict) 

183 return fitParams 

184 fitData = scipyODR.Data(xs[fitPoints], ys[fitPoints]) 

185 odr = scipyODR.ODR(fitData, linear, beta0=[bODR0, mODR0]) 

186 params = odr.run() 

187 mODR0 = params.beta[1] 

188 bODR0 = params.beta[0] 

189 

190 fitParams["bPerpMin"] = bPerpMin 

191 fitParams["bPerpMax"] = bPerpMax 

192 

193 fitParams["mODR"] = float(params.beta[1]) 

194 fitParams["bODR"] = float(params.beta[0]) 

195 

196 fitParams["mPerp"] = -1.0 / fitParams["mODR"] 

197 fitParams["goodPoints"] = goodPoints 

198 fitParams["fitPoints"] = fitPoints 

199 fitParams["paramDict"] = paramDict 

200 

201 return fitParams 

202 

203 

204def _setFitParamsNans(fitParams, fitPoints, paramDict): 

205 fitParams["bPerpMin"] = np.nan 

206 fitParams["bPerpMax"] = np.nan 

207 fitParams["mODR"] = np.nan 

208 fitParams["bODR"] = np.nan 

209 fitParams["mPerp"] = np.nan 

210 fitParams["goodPoints"] = np.nan 

211 fitParams["fitPoints"] = fitPoints 

212 fitParams["paramDict"] = paramDict 

213 return fitParams 

214 

215 

216def perpDistance(p1, p2, points): 

217 """Calculate the perpendicular distance to a line from a point. 

218 

219 Parameters 

220 ---------- 

221 p1 : `numpy.ndarray` [`float`] 

222 A point on the line. 

223 p2 : `numpy.ndarray` [`float`] 

224 Another point on the line. 

225 points : `zip` [(`float`, `float`)] 

226 The points to calculate the distance to. 

227 

228 Returns 

229 ------- 

230 dists : `numpy.ndarray` [`float`] 

231 The distances from the line to the points. Uses the cross 

232 product to work this out. 

233 """ 

234 if sum(p2 - p1) == 0: 

235 raise ValueError(f"Must supply two different points for p1, p2. Got {p1}, {p2}") 

236 points = list(points) 

237 if len(points) == 0: 

238 raise ValueError("Must provied a non-empty zip() list of points.") 

239 

240 # Recommendation from numpy docs for 2d cross product. 

241 def cross2d(x, y): 

242 return x[..., 0] * y[..., 1] - x[..., 1] * y[..., 0] 

243 

244 dists = cross2d(p2 - p1, points - p1) / np.linalg.norm(p2 - p1) 

245 

246 return dists 

247 

248 

249def calcQuartileClippedStats(dataArray, nSigmaToClip=3.0): 

250 """Calculate the quartile-based clipped statistics of a data array. 

251 

252 The difference between quartiles[2] and quartiles[0] is the interquartile 

253 distance. 0.74*interquartileDistance is an estimate of standard deviation 

254 so, in the case that ``dataArray`` has an approximately Gaussian 

255 distribution, this is equivalent to nSigma clipping. 

256 

257 Parameters 

258 ---------- 

259 dataArray : `list` or `numpy.ndarray` [`float`] 

260 List or array containing the values for which the quartile-based 

261 clipped statistics are to be calculated. 

262 nSigmaToClip : `float`, optional 

263 Number of \"sigma\" outside of which to clip data when computing the 

264 statistics. 

265 

266 Returns 

267 ------- 

268 result : `lsst.pipe.base.Struct` 

269 The quartile-based clipped statistics with ``nSigmaToClip`` clipping. 

270 Atributes are: 

271 

272 ``median`` 

273 The median of the full ``dataArray`` (`float`). 

274 ``mean`` 

275 The quartile-based clipped mean (`float`). 

276 ``stdDev`` 

277 The quartile-based clipped standard deviation (`float`). 

278 ``rms`` 

279 The quartile-based clipped root-mean-squared (`float`). 

280 ``clipValue`` 

281 The value outside of which to clip the data before computing the 

282 statistics (`float`). 

283 ``goodArray`` 

284 A boolean array indicating which data points in ``dataArray`` were 

285 used in the calculation of the statistics, where `False` indicates 

286 a clipped datapoint (`numpy.ndarray` of `bool`). 

287 """ 

288 quartiles = np.percentile(dataArray, [25, 50, 75]) 

289 assert len(quartiles) == 3 

290 median = quartiles[1] 

291 interQuartileDistance = quartiles[2] - quartiles[0] 

292 clipValue = nSigmaToClip * 0.74 * interQuartileDistance 

293 good = np.abs(dataArray - median) <= clipValue 

294 quartileClippedMean = dataArray[good].mean() 

295 quartileClippedStdDev = dataArray[good].std() 

296 quartileClippedRms = np.sqrt(np.mean(dataArray[good] ** 2)) 

297 

298 return Struct( 

299 median=median, 

300 mean=quartileClippedMean, 

301 stdDev=quartileClippedStdDev, 

302 rms=quartileClippedRms, 

303 clipValue=clipValue, 

304 goodArray=good, 

305 ) 

306 

307 

308class StellarLocusFitAction(KeyedDataAction): 

309 r"""Determine Stellar Locus fit parameters from given input `Vector`\ s.""" 

310 

311 stellarLocusFitDict = DictField[str, float]( 

312 doc="The parameters to use for the stellar locus fit. For xMin/Max, yMin/Max, " 

313 "and m/bFixed, the default parameters are examples and are not generally useful " 

314 "for any of the fits, so should be updated in the PlotAction definition in the " 

315 "atools directory. The dict needs to contain xMin/xMax/yMin/yMax which are the " 

316 "limits of the initial point selection box for fitting the stellar locus, mFixed " 

317 "and bFixed are meant to represent the intercept and gradient of a canonical fit " 

318 "for a given dataset (and should be derived from data). They are used here as an " 

319 "initial guess for the fitting. nSigmaToClip1/2 set the number of sigma to clip " 

320 "perpendicular the fit in the first and second fit iterations after the initial " 

321 "guess and point selection fit. minObjectForFit sets a minimum number of points " 

322 "deemed suitable for inclusion in the fit in order to bother attempting the fit.", 

323 default={ 

324 "xMin": 0.1, 

325 "xMax": 0.2, 

326 "yMin": 0.1, 

327 "yMax": 0.2, 

328 "mHW": 0.5, 

329 "bHW": 0.0, 

330 "nSigmaToClip1": 3.5, 

331 "nSigmaToClip2": 5.0, 

332 "minObjectForFit": 7, 

333 }, 

334 ) 

335 

336 def getInputSchema(self) -> KeyedDataSchema: 

337 return (("x", Vector), ("y", Vector)) 

338 

339 def getOutputSchema(self) -> KeyedDataSchema: 

340 value = ( 

341 (f"{self.identity or ''}_sigmaMAD", Scalar), 

342 (f"{self.identity or ''}_median", Scalar), 

343 ) 

344 return value 

345 

346 def __call__(self, data: KeyedData, **kwargs) -> KeyedData: 

347 xs = cast(Vector, data["x"]) 

348 ys = cast(Vector, data["y"]) 

349 mags = cast(Vector, data["mag"]) 

350 

351 fitParams = _stellarLocusFit(xs, ys, mags, self.stellarLocusFitDict) 

352 # Bail out if there were not enough points to fit. 

353 for value in fitParams.values(): 

354 if isinstance(value, float): 

355 if np.isnan(value): 

356 fitParams[f"{self.identity or ''}_sigmaMAD"] = np.nan 

357 fitParams[f"{self.identity or ''}_median"] = np.nan 

358 return fitParams 

359 fitPoints = fitParams["fitPoints"] 

360 

361 if np.fabs(self.stellarLocusFitDict["mFixed"]) > 1: 

362 ysFitLineFixed = np.array([self.stellarLocusFitDict["yMin"], self.stellarLocusFitDict["yMax"]]) 

363 xsFitLineFixed = (ysFitLineFixed - self.stellarLocusFitDict["bFixed"]) / self.stellarLocusFitDict[ 

364 "mFixed" 

365 ] 

366 ysFitLine = np.array([self.stellarLocusFitDict["yMin"], self.stellarLocusFitDict["yMax"]]) 

367 xsFitLine = (ysFitLine - fitParams["bODR"]) / fitParams["mODR"] 

368 

369 else: 

370 xsFitLineFixed = np.array([self.stellarLocusFitDict["xMin"], self.stellarLocusFitDict["xMax"]]) 

371 ysFitLineFixed = ( 

372 self.stellarLocusFitDict["mFixed"] * xsFitLineFixed + self.stellarLocusFitDict["bFixed"] 

373 ) 

374 xsFitLine = [self.stellarLocusFitDict["xMin"], self.stellarLocusFitDict["xMax"]] 

375 ysFitLine = np.array( 

376 [ 

377 fitParams["mODR"] * xsFitLine[0] + fitParams["bODR"], 

378 fitParams["mODR"] * xsFitLine[1] + fitParams["bODR"], 

379 ] 

380 ) 

381 

382 # Calculate the distances to that line. 

383 # Need two points to characterize the lines we want to get the 

384 # distances to. 

385 p1 = np.array([xsFitLine[0], ysFitLine[0]]) 

386 p2 = np.array([xsFitLine[1], ysFitLine[1]]) 

387 

388 # Convert this to mmag. 

389 dists = np.array(perpDistance(p1, p2, zip(xs[fitPoints], ys[fitPoints]))) * 1000 

390 

391 # Now we have the information for the perpendicular line we 

392 # can use it to calculate the points at the ends of the 

393 # perpendicular lines that intersect at the box edges. 

394 if np.fabs(self.stellarLocusFitDict["mFixed"]) > 1: 

395 xMid = (self.stellarLocusFitDict["yMin"] - fitParams["bODR"]) / fitParams["mODR"] 

396 xs = np.array([xMid - 0.5, xMid, xMid + 0.5]) 

397 ys = fitParams["mPerp"] * xs + fitParams["bPerpMin"] 

398 else: 

399 xs = np.array( 

400 [ 

401 self.stellarLocusFitDict["xMin"] - 0.2, 

402 self.stellarLocusFitDict["xMin"], 

403 self.stellarLocusFitDict["xMin"] + 0.2, 

404 ] 

405 ) 

406 ys = xs * fitParams["mPerp"] + fitParams["bPerpMin"] 

407 

408 if np.fabs(self.stellarLocusFitDict["mFixed"]) > 1: 

409 xMid = (self.stellarLocusFitDict["yMax"] - fitParams["bODR"]) / fitParams["mODR"] 

410 xs = np.array([xMid - 0.5, xMid, xMid + 0.5]) 

411 ys = fitParams["mPerp"] * xs + fitParams["bPerpMax"] 

412 else: 

413 xs = np.array( 

414 [ 

415 self.stellarLocusFitDict["xMax"] - 0.2, 

416 self.stellarLocusFitDict["xMax"], 

417 self.stellarLocusFitDict["xMax"] + 0.2, 

418 ] 

419 ) 

420 ys = xs * fitParams["mPerp"] + fitParams["bPerpMax"] 

421 

422 fit_sigma, fit_med = (sigmaMad(dists), np.median(dists)) if len(dists) else (np.nan, np.nan) 

423 fitParams[f"{self.identity or ''}_sigmaMAD"] = fit_sigma 

424 fitParams[f"{self.identity or ''}_median"] = fit_med 

425 

426 return fitParams