Coverage for python / lsst / analysis / tools / actions / scalar / scalarActions.py: 34%

223 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-22 09:32 +0000

1# This file is part of analysis_tools. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21 

22from __future__ import annotations 

23 

24__all__ = ( 

25 "MedianAction", 

26 "MeanAction", 

27 "StdevAction", 

28 "ValueAction", 

29 "SigmaMadAction", 

30 "CountAction", 

31 "CountUniqueAction", 

32 "ApproxFloor", 

33 "FracThreshold", 

34 "MaxAction", 

35 "MinAction", 

36 "FracInRange", 

37 "FracNan", 

38 "SumAction", 

39 "MedianHistAction", 

40 "IqrHistAction", 

41 "DivideScalar", 

42 "RmsAction", 

43 "MedianGradientAction", 

44) 

45 

46import logging 

47import operator 

48from math import nan 

49from typing import cast 

50 

51import numpy as np 

52from scipy.optimize import curve_fit 

53from scipy.stats import binned_statistic 

54 

55from lsst.pex.config import ChoiceField, Field 

56from lsst.pex.config.configurableActions import ConfigurableActionField 

57 

58from ...interfaces import KeyedData, KeyedDataSchema, Scalar, ScalarAction, Vector 

59from ...math import nanMax, nanMean, nanMedian, nanMin, nanSigmaMad, nanStd 

60 

61log = logging.getLogger(__name__) 

62 

63 

64def _dataToArray(data): 

65 """Convert input data into a numpy array using the appropriate 

66 protocol. `np.from_dlpack` is used for Tensor-like arrays 

67 where possible. 

68 """ 

69 try: 

70 return np.from_dlpack(data) 

71 except (AttributeError, BufferError): 

72 return np.array(data) 

73 

74 

75class ScalarFromVectorAction(ScalarAction): 

76 """Calculates a statistic from a single vector.""" 

77 

78 vectorKey = Field[str]("Key of Vector to compute statistic from.") 

79 

80 def getInputSchema(self) -> KeyedDataSchema: 

81 return ((self.vectorKey, Vector),) 

82 

83 

84class MedianAction(ScalarFromVectorAction): 

85 """Calculates the median of the given data.""" 

86 

87 def __call__(self, data: KeyedData, **kwargs) -> Scalar: 

88 mask = self.getMask(**kwargs) 

89 values = _dataToArray(data[self.vectorKey.format(**kwargs)])[mask] 

90 med = nanMedian(values) if values.size else np.nan 

91 

92 return med 

93 

94 

95class MeanAction(ScalarFromVectorAction): 

96 """Calculates the mean of the given data.""" 

97 

98 def __call__(self, data: KeyedData, **kwargs) -> Scalar: 

99 mask = self.getMask(**kwargs) 

100 values = _dataToArray(data[self.vectorKey.format(**kwargs)])[mask] 

101 mean = nanMean(values) if values.size else np.nan 

102 

103 return mean 

104 

105 

106class StdevAction(ScalarFromVectorAction): 

107 """Calculates the standard deviation of the given data.""" 

108 

109 def __call__(self, data: KeyedData, **kwargs) -> Scalar: 

110 mask = self.getMask(**kwargs) 

111 return nanStd(_dataToArray(data[self.vectorKey.format(**kwargs)])[mask]) 

112 

113 

114class RmsAction(ScalarFromVectorAction): 

115 """Calculates the root mean square of the given data (without subtracting 

116 the mean as in StdevAction).""" 

117 

118 def __call__(self, data: KeyedData, **kwargs) -> Scalar: 

119 mask = self.getMask(**kwargs) 

120 vector = _dataToArray(data[self.vectorKey.format(**kwargs)])[mask] 

121 vector = vector[~np.isnan(vector)] 

122 

123 return np.sqrt(np.mean(vector**2)) 

124 

125 

126class ValueAction(ScalarFromVectorAction): 

127 """Extracts the first value from a vector.""" 

128 

129 def __call__(self, data: KeyedData, **kwargs) -> Scalar: 

130 return cast(Scalar, float(data[self.vectorKey.format(**kwargs)][0])) 

131 

132 

133class SigmaMadAction(ScalarFromVectorAction): 

134 """Calculates the sigma mad of the given data.""" 

135 

136 def __call__(self, data: KeyedData, **kwargs) -> Scalar: 

137 mask = self.getMask(**kwargs) 

138 values = _dataToArray(data[self.vectorKey.format(**kwargs)])[mask] 

139 return nanSigmaMad(values) 

140 

141 

142class CountAction(ScalarAction): 

143 """Performs count actions, with threshold-based filtering. 

144 The operator is specified as a string, for example, "lt", "le", "ge", 

145 "gt", "ne", and "eq" for the mathematical operations <, <=, >=, >, !=, 

146 and == respectively. To count non-NaN values, only pass the column name 

147 as vector key. To count NaN values, pass threshold = nan (from math.nan). 

148 Optionally to configure from a YAML file, pass "threshold: !!float nan". 

149 To compute the number of elements with values less than a given threshold, 

150 use op="le". 

151 """ 

152 

153 vectorKey = Field[str]("Key of Vector to count") 

154 op = ChoiceField[str]( 

155 doc="Operator name string.", 

156 allowed={ 

157 "lt": "less than threshold", 

158 "le": "less than or equal to threshold", 

159 "ge": "greater than or equal to threshold", 

160 "ne": "not equal to a given value", 

161 "eq": "equal to a given value", 

162 "gt": "greater than threshold", 

163 }, 

164 default="ne", 

165 ) 

166 threshold = Field[float](doc="Threshold to apply.", default=nan) 

167 

168 def getInputSchema(self) -> KeyedDataSchema: 

169 return ((self.vectorKey, Vector),) 

170 

171 def __call__(self, data: KeyedData, **kwargs) -> Scalar: 

172 mask = self.getMask(**kwargs) 

173 arr = _dataToArray(data[self.vectorKey.format(**kwargs)])[mask] 

174 

175 # Count NaNs and non-NaNs 

176 if self.threshold == nan: 

177 if self.op == "eq": 

178 # Count number of NaNs 

179 result = np.isnan(arr).sum() 

180 return cast(Scalar, int(result)) 

181 elif self.op == "ne": 

182 # Count number of non-NaNs 

183 result = arr.size - np.isnan(arr).sum() 

184 return cast(Scalar, int(result)) 

185 else: 

186 raise ValueError("Invalid operator for counting NaNs.") 

187 # Count for given threshold ignoring all NaNs 

188 else: 

189 result = arr[~np.isnan(arr)] 

190 result = cast( 

191 Scalar, 

192 int(np.sum(getattr(operator, self.op)(result, self.threshold))), 

193 ) 

194 return result 

195 

196 

197class CountUniqueAction(ScalarFromVectorAction): 

198 """Counts the number of unique rows in a given column.""" 

199 

200 def __call__(self, data: KeyedData, **kwargs) -> Scalar: 

201 mask = self.getMask(**kwargs) 

202 values = _dataToArray(data[self.vectorKey.format(**kwargs)])[mask] 

203 count = np.unique(values).size 

204 return cast(Scalar, count) 

205 

206 

207class ApproxFloor(ScalarFromVectorAction): 

208 """Returns the median of the lowest ten values of the sorted input.""" 

209 

210 def __call__(self, data: KeyedData, **kwargs) -> Scalar: 

211 mask = self.getMask(**kwargs) 

212 values = np.sort(_dataToArray(data[self.vectorKey.format(**kwargs)])[mask], axis=None) # type: ignore 

213 x = values.size // 10 

214 return nanMedian(values[-x:]) 

215 

216 

217class FracThreshold(ScalarFromVectorAction): 

218 """Compute the fraction of a distribution above or below a threshold. 

219 

220 The operator is specified as a string, for example, 

221 "lt", "le", "ge", "gt" for the mathematical operations <, <=, >=, >. To 

222 compute the fraction of elements with values less than a given threshold, 

223 use op="le". 

224 """ 

225 

226 op = ChoiceField[str]( 

227 doc="Operator name string.", 

228 allowed={ 

229 "lt": "less than threshold", 

230 "le": "less than or equal to threshold", 

231 "ge": "greater than or equal to threshold", 

232 "gt": "greater than threshold", 

233 }, 

234 ) 

235 threshold = Field[float](doc="Threshold to apply.") 

236 percent = Field[bool](doc="Express result as percentage", default=False) 

237 relative_to_median = Field[bool](doc="Calculate threshold relative to the median?", default=False) 

238 use_absolute_value = Field[bool]( 

239 doc=( 

240 "Calculate threshold after taking absolute value. If relative_to_median" 

241 " is true the absolute value will be applied after the median is subtracted" 

242 ), 

243 default=False, 

244 ) 

245 

246 def __call__(self, data: KeyedData, **kwargs) -> Scalar: 

247 mask = self.getMask(**kwargs) 

248 values = _dataToArray(data[self.vectorKey.format(**kwargs)])[mask] 

249 values = values[np.logical_not(np.isnan(values))] 

250 n_values = values.size 

251 if n_values == 0: 

252 return np.nan 

253 threshold = self.threshold 

254 # If relative_to_median is set, shift the threshold to be median+thresh 

255 if self.relative_to_median and values.size > 0: 

256 offset = nanMedian(values) 

257 if np.isfinite(offset): 

258 values -= offset 

259 if self.use_absolute_value: 

260 values = np.abs(values) 

261 result = cast( 

262 Scalar, 

263 float(np.sum(getattr(operator, self.op)(values, threshold)) / n_values), # type: ignore 

264 ) 

265 if self.percent: 

266 return 100.0 * result 

267 else: 

268 return result 

269 

270 

271class MaxAction(ScalarFromVectorAction): 

272 """Returns the maximum of the given data.""" 

273 

274 def __call__(self, data: KeyedData, **kwargs) -> Scalar: 

275 mask = self.getMask(**kwargs) 

276 return nanMax(_dataToArray(data[self.vectorKey.format(**kwargs)])[mask]) 

277 

278 

279class MinAction(ScalarFromVectorAction): 

280 """Returns the minimum of the given data.""" 

281 

282 def __call__(self, data: KeyedData, **kwargs) -> Scalar: 

283 mask = self.getMask(**kwargs) 

284 return nanMin(_dataToArray(data[self.vectorKey.format(**kwargs)])[mask]) 

285 

286 

287class FracInRange(ScalarFromVectorAction): 

288 """Compute the fraction of a distribution that is between specified 

289 minimum and maximum values, and is not NaN. 

290 """ 

291 

292 maximum = Field[float](doc="The maximum value", default=np.nextafter(np.inf, 0.0)) 

293 minimum = Field[float](doc="The minimum value", default=np.nextafter(-np.inf, 0.0)) 

294 percent = Field[bool](doc="Express result as percentage", default=False) 

295 

296 def __call__(self, data: KeyedData, **kwargs) -> Scalar: 

297 mask = self.getMask(**kwargs) 

298 values = _dataToArray(data[self.vectorKey.format(**kwargs)])[mask] 

299 nvalues = values.size 

300 values = values[np.logical_not(np.isnan(values))] 

301 sel_range = (values >= self.minimum) & (values < self.maximum) 

302 result = cast( 

303 Scalar, 

304 float(values[sel_range].size / nvalues), # type: ignore 

305 ) 

306 if self.percent: 

307 return 100.0 * result 

308 else: 

309 return result 

310 

311 

312class FracNan(ScalarFromVectorAction): 

313 """Compute the fraction of vector entries that are NaN.""" 

314 

315 percent = Field[bool](doc="Express result as percentage", default=False) 

316 

317 def __call__(self, data: KeyedData, **kwargs) -> Scalar: 

318 mask = self.getMask(**kwargs) 

319 values = _dataToArray(data[self.vectorKey.format(**kwargs)])[mask] 

320 nvalues = values.size 

321 values = values[np.isnan(values)] 

322 result = cast( 

323 Scalar, 

324 float(values.size / nvalues), # type: ignore 

325 ) 

326 if self.percent: 

327 return 100.0 * result 

328 else: 

329 return result 

330 

331 

332class SumAction(ScalarFromVectorAction): 

333 """Returns the sum of all values in the column.""" 

334 

335 def __call__(self, data: KeyedData, **kwargs) -> Scalar: 

336 mask = self.getMask(**kwargs) 

337 arr = _dataToArray(data[self.vectorKey.format(**kwargs)])[mask] 

338 return cast(Scalar, np.nansum(arr)) 

339 

340 

341class MedianHistAction(ScalarAction): 

342 """Calculates the median of the given histogram data.""" 

343 

344 histKey = Field[str]("Key of frequency Vector") 

345 midKey = Field[str]("Key of bin midpoints Vector") 

346 

347 def getInputSchema(self) -> KeyedDataSchema: 

348 return ( 

349 (self.histKey, Vector), 

350 (self.midKey, Vector), 

351 ) 

352 

353 def histMedian(self, hist, bin_mid): 

354 """Calculates the median of a histogram with binned values 

355 

356 Parameters 

357 ---------- 

358 hist : `numpy.ndarray` 

359 Frequency array 

360 bin_mid : `numpy.ndarray` 

361 Bin midpoints array 

362 

363 Returns 

364 ------- 

365 median : `float` 

366 Median of histogram with binned values 

367 """ 

368 cumulative_sum = np.cumsum(hist) 

369 median_index = np.searchsorted(cumulative_sum, cumulative_sum[-1] / 2) 

370 median = bin_mid[median_index] 

371 return median 

372 

373 def __call__(self, data: KeyedData, **kwargs): 

374 hist = _dataToArray(data[self.histKey.format(**kwargs)]) 

375 if hist.size != 0: 

376 bin_mid = _dataToArray(data[self.midKey.format(**kwargs)]) 

377 med = cast(Scalar, float(self.histMedian(hist, bin_mid))) 

378 else: 

379 med = np.nan 

380 return med 

381 

382 

383class IqrHistAction(ScalarAction): 

384 """Calculates the interquartile range of the given histogram data.""" 

385 

386 histKey = Field[str]("Key of frequency Vector") 

387 midKey = Field[str]("Key of bin midpoints Vector") 

388 

389 def getInputSchema(self) -> KeyedDataSchema: 

390 return ( 

391 (self.histKey, Vector), 

392 (self.midKey, Vector), 

393 ) 

394 

395 def histIqr(self, hist, bin_mid): 

396 """Calculates the interquartile range of a histogram with binned values 

397 

398 Parameters 

399 ---------- 

400 hist : `numpy.ndarray` 

401 Frequency array 

402 bin_mid : `numpy.ndarray` 

403 Bin midpoints array 

404 

405 Returns 

406 ------- 

407 iqr : `float` 

408 Inter-quartile range of histogram with binned values 

409 """ 

410 cumulative_sum = np.cumsum(hist) 

411 liqr_index = np.searchsorted(cumulative_sum, cumulative_sum[-1] / 4) 

412 uiqr_index = np.searchsorted(cumulative_sum, (3 / 4) * cumulative_sum[-1]) 

413 liqr = bin_mid[liqr_index] 

414 uiqr = bin_mid[uiqr_index] 

415 iqr = uiqr - liqr 

416 return iqr 

417 

418 def __call__(self, data: KeyedData, **kwargs): 

419 hist = _dataToArray(data[self.histKey.format(**kwargs)]) 

420 if hist.size != 0: 

421 bin_mid = _dataToArray(data[self.midKey.format(**kwargs)]) 

422 iqr = cast(Scalar, float(self.histIqr(hist, bin_mid))) 

423 else: 

424 iqr = np.nan 

425 return iqr 

426 

427 

428class DivideScalar(ScalarAction): 

429 """Calculate (A/B) for scalars.""" 

430 

431 actionA = ConfigurableActionField[ScalarAction](doc="Action which supplies scalar A") 

432 actionB = ConfigurableActionField[ScalarAction](doc="Action which supplies scalar B") 

433 

434 def getInputSchema(self) -> KeyedDataSchema: 

435 yield from self.actionA.getInputSchema() 

436 yield from self.actionB.getInputSchema() 

437 

438 def __call__(self, data: KeyedData, **kwargs) -> Scalar: 

439 """Return the result of A/B. 

440 

441 Parameters 

442 ---------- 

443 data : `KeyedData` 

444 

445 Returns 

446 ------- 

447 result : `Scalar` 

448 The result of dividing A by B. 

449 """ 

450 scalarA = self.actionA(data, **kwargs) 

451 scalarB = self.actionB(data, **kwargs) 

452 if scalarB == 0: 

453 if scalarA == 0: 

454 log.warning("Both numerator and denominator are zero! Returning NaN.") 

455 return np.nan 

456 else: 

457 value = np.sign(scalarA) * np.inf 

458 log.warning("Non-zero scalar divided by zero! Returning %f.", value) 

459 return value 

460 else: 

461 return scalarA / scalarB 

462 

463 

464class MedianGradientAction(ScalarAction): 

465 """Calculate the gradient of a running median""" 

466 

467 lowerBinLimit = Field[float](doc="Percentile of data to start the bining at", default=5.0) 

468 upperBinLimit = Field[float](doc="Percentile of data to end the binning at", default=95.0) 

469 nBins = Field[int](doc="Number of bins to use for running median", default=50) 

470 xsVectorKey = Field[str]("Key of Vector that gives the x location of the points.") 

471 ysVectorKey = Field[str]("Key of the Vector to compute the statistic from.") 

472 

473 def getInputSchema(self) -> KeyedDataSchema: 

474 return ((self.xsVectorKey, Vector), (self.ysVectorKey, Vector)) 

475 

476 def __call__(self, data: KeyedData, **kwargs) -> Scalar: 

477 """Return the gradient of the running median. 

478 

479 Parameters 

480 ---------- 

481 data : `KeyedData` 

482 

483 Returns 

484 ------- 

485 result : `Scalar` 

486 The gradient of the running median 

487 """ 

488 

489 xs = _dataToArray(data[self.xsVectorKey.format(**kwargs)]) 

490 ys = _dataToArray(data[self.ysVectorKey.format(**kwargs)]) 

491 

492 lowerLim = np.nanpercentile(xs, self.lowerBinLimit) 

493 upperLim = np.nanpercentile(xs, self.upperBinLimit) 

494 

495 use = (xs > lowerLim) & (xs < upperLim) & np.isfinite(xs) & np.isfinite(ys) 

496 

497 if np.sum(use) > 2: 

498 meds, binEdges, _ = binned_statistic(xs[use], ys[use], statistic="median", bins=self.nBins) 

499 else: 

500 return np.nan 

501 

502 def func(x, m, b): 

503 return m * x + b 

504 

505 finiteMeds = np.isfinite(meds) 

506 popt, _ = curve_fit(func, binEdges[:-1][finiteMeds], meds[finiteMeds]) 

507 

508 return popt[0]