Coverage for python / lsst / analysis / tools / actions / scalar / scalarActions.py: 33%

199 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-05 18:53 +0000

1# This file is part of analysis_tools. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21 

22from __future__ import annotations 

23 

24__all__ = ( 

25 "MedianAction", 

26 "MeanAction", 

27 "StdevAction", 

28 "ValueAction", 

29 "SigmaMadAction", 

30 "CountAction", 

31 "CountUniqueAction", 

32 "ApproxFloor", 

33 "FracThreshold", 

34 "MaxAction", 

35 "MinAction", 

36 "FracInRange", 

37 "FracNan", 

38 "SumAction", 

39 "MedianHistAction", 

40 "IqrHistAction", 

41 "DivideScalar", 

42 "RmsAction", 

43) 

44 

45import logging 

46import operator 

47from math import nan 

48from typing import cast 

49 

50import numpy as np 

51from lsst.pex.config import ChoiceField, Field 

52from lsst.pex.config.configurableActions import ConfigurableActionField 

53 

54from ...interfaces import KeyedData, KeyedDataSchema, Scalar, ScalarAction, Vector 

55from ...math import nanMax, nanMean, nanMedian, nanMin, nanSigmaMad, nanStd 

56 

57log = logging.getLogger(__name__) 

58 

59 

60def _dataToArray(data): 

61 """Convert input data into a numpy array using the appropriate 

62 protocol. `np.from_dlpack` is used for Tensor-like arrays 

63 where possible. 

64 """ 

65 try: 

66 return np.from_dlpack(data) 

67 except (AttributeError, BufferError): 

68 return np.array(data) 

69 

70 

71class ScalarFromVectorAction(ScalarAction): 

72 """Calculates a statistic from a single vector.""" 

73 

74 vectorKey = Field[str]("Key of Vector to compute statistic from.") 

75 

76 def getInputSchema(self) -> KeyedDataSchema: 

77 return ((self.vectorKey, Vector),) 

78 

79 

80class MedianAction(ScalarFromVectorAction): 

81 """Calculates the median of the given data.""" 

82 

83 def __call__(self, data: KeyedData, **kwargs) -> Scalar: 

84 mask = self.getMask(**kwargs) 

85 values = _dataToArray(data[self.vectorKey.format(**kwargs)])[mask] 

86 med = nanMedian(values) if values.size else np.nan 

87 

88 return med 

89 

90 

91class MeanAction(ScalarFromVectorAction): 

92 """Calculates the mean of the given data.""" 

93 

94 def __call__(self, data: KeyedData, **kwargs) -> Scalar: 

95 mask = self.getMask(**kwargs) 

96 values = _dataToArray(data[self.vectorKey.format(**kwargs)])[mask] 

97 mean = nanMean(values) if values.size else np.nan 

98 

99 return mean 

100 

101 

102class StdevAction(ScalarFromVectorAction): 

103 """Calculates the standard deviation of the given data.""" 

104 

105 def __call__(self, data: KeyedData, **kwargs) -> Scalar: 

106 mask = self.getMask(**kwargs) 

107 return nanStd(_dataToArray(data[self.vectorKey.format(**kwargs)])[mask]) 

108 

109 

110class RmsAction(ScalarFromVectorAction): 

111 """Calculates the root mean square of the given data (without subtracting 

112 the mean as in StdevAction).""" 

113 

114 def __call__(self, data: KeyedData, **kwargs) -> Scalar: 

115 mask = self.getMask(**kwargs) 

116 vector = _dataToArray(data[self.vectorKey.format(**kwargs)])[mask] 

117 vector = vector[~np.isnan(vector)] 

118 

119 return np.sqrt(np.mean(vector**2)) 

120 

121 

122class ValueAction(ScalarFromVectorAction): 

123 """Extracts the first value from a vector.""" 

124 

125 def __call__(self, data: KeyedData, **kwargs) -> Scalar: 

126 return cast(Scalar, float(data[self.vectorKey.format(**kwargs)][0])) 

127 

128 

129class SigmaMadAction(ScalarFromVectorAction): 

130 """Calculates the sigma mad of the given data.""" 

131 

132 def __call__(self, data: KeyedData, **kwargs) -> Scalar: 

133 mask = self.getMask(**kwargs) 

134 values = _dataToArray(data[self.vectorKey.format(**kwargs)])[mask] 

135 return nanSigmaMad(values) 

136 

137 

138class CountAction(ScalarAction): 

139 """Performs count actions, with threshold-based filtering. 

140 The operator is specified as a string, for example, "lt", "le", "ge", 

141 "gt", "ne", and "eq" for the mathematical operations <, <=, >=, >, !=, 

142 and == respectively. To count non-NaN values, only pass the column name 

143 as vector key. To count NaN values, pass threshold = nan (from math.nan). 

144 Optionally to configure from a YAML file, pass "threshold: !!float nan". 

145 To compute the number of elements with values less than a given threshold, 

146 use op="le". 

147 """ 

148 

149 vectorKey = Field[str]("Key of Vector to count") 

150 op = ChoiceField[str]( 

151 doc="Operator name string.", 

152 allowed={ 

153 "lt": "less than threshold", 

154 "le": "less than or equal to threshold", 

155 "ge": "greater than or equal to threshold", 

156 "ne": "not equal to a given value", 

157 "eq": "equal to a given value", 

158 "gt": "greater than threshold", 

159 }, 

160 default="ne", 

161 ) 

162 threshold = Field[float](doc="Threshold to apply.", default=nan) 

163 

164 def getInputSchema(self) -> KeyedDataSchema: 

165 return ((self.vectorKey, Vector),) 

166 

167 def __call__(self, data: KeyedData, **kwargs) -> Scalar: 

168 mask = self.getMask(**kwargs) 

169 arr = _dataToArray(data[self.vectorKey.format(**kwargs)])[mask] 

170 

171 # Count NaNs and non-NaNs 

172 if self.threshold == nan: 

173 if self.op == "eq": 

174 # Count number of NaNs 

175 result = np.isnan(arr).sum() 

176 return cast(Scalar, int(result)) 

177 elif self.op == "ne": 

178 # Count number of non-NaNs 

179 result = arr.size - np.isnan(arr).sum() 

180 return cast(Scalar, int(result)) 

181 else: 

182 raise ValueError("Invalid operator for counting NaNs.") 

183 # Count for given threshold ignoring all NaNs 

184 else: 

185 result = arr[~np.isnan(arr)] 

186 result = cast( 

187 Scalar, 

188 int(np.sum(getattr(operator, self.op)(result, self.threshold))), 

189 ) 

190 return result 

191 

192 

193class CountUniqueAction(ScalarFromVectorAction): 

194 """Counts the number of unique rows in a given column.""" 

195 

196 def __call__(self, data: KeyedData, **kwargs) -> Scalar: 

197 mask = self.getMask(**kwargs) 

198 values = _dataToArray(data[self.vectorKey.format(**kwargs)])[mask] 

199 count = np.unique(values).size 

200 return cast(Scalar, count) 

201 

202 

203class ApproxFloor(ScalarFromVectorAction): 

204 """Returns the median of the lowest ten values of the sorted input.""" 

205 

206 def __call__(self, data: KeyedData, **kwargs) -> Scalar: 

207 mask = self.getMask(**kwargs) 

208 values = np.sort(_dataToArray(data[self.vectorKey.format(**kwargs)])[mask], axis=None) # type: ignore 

209 x = values.size // 10 

210 return nanMedian(values[-x:]) 

211 

212 

213class FracThreshold(ScalarFromVectorAction): 

214 """Compute the fraction of a distribution above or below a threshold. 

215 

216 The operator is specified as a string, for example, 

217 "lt", "le", "ge", "gt" for the mathematical operations <, <=, >=, >. To 

218 compute the fraction of elements with values less than a given threshold, 

219 use op="le". 

220 """ 

221 

222 op = ChoiceField[str]( 

223 doc="Operator name string.", 

224 allowed={ 

225 "lt": "less than threshold", 

226 "le": "less than or equal to threshold", 

227 "ge": "greater than or equal to threshold", 

228 "gt": "greater than threshold", 

229 }, 

230 ) 

231 threshold = Field[float](doc="Threshold to apply.") 

232 percent = Field[bool](doc="Express result as percentage", default=False) 

233 relative_to_median = Field[bool](doc="Calculate threshold relative to the median?", default=False) 

234 use_absolute_value = Field[bool]( 

235 doc=( 

236 "Calculate threshold after taking absolute value. If relative_to_median" 

237 " is true the absolute value will be applied after the median is subtracted" 

238 ), 

239 default=False, 

240 ) 

241 

242 def __call__(self, data: KeyedData, **kwargs) -> Scalar: 

243 mask = self.getMask(**kwargs) 

244 values = _dataToArray(data[self.vectorKey.format(**kwargs)])[mask] 

245 values = values[np.logical_not(np.isnan(values))] 

246 n_values = values.size 

247 if n_values == 0: 

248 return np.nan 

249 threshold = self.threshold 

250 # If relative_to_median is set, shift the threshold to be median+thresh 

251 if self.relative_to_median and values.size > 0: 

252 offset = nanMedian(values) 

253 if np.isfinite(offset): 

254 values -= offset 

255 if self.use_absolute_value: 

256 values = np.abs(values) 

257 result = cast( 

258 Scalar, 

259 float(np.sum(getattr(operator, self.op)(values, threshold)) / n_values), # type: ignore 

260 ) 

261 if self.percent: 

262 return 100.0 * result 

263 else: 

264 return result 

265 

266 

267class MaxAction(ScalarFromVectorAction): 

268 """Returns the maximum of the given data.""" 

269 

270 def __call__(self, data: KeyedData, **kwargs) -> Scalar: 

271 mask = self.getMask(**kwargs) 

272 return nanMax(_dataToArray(data[self.vectorKey.format(**kwargs)])[mask]) 

273 

274 

275class MinAction(ScalarFromVectorAction): 

276 """Returns the minimum of the given data.""" 

277 

278 def __call__(self, data: KeyedData, **kwargs) -> Scalar: 

279 mask = self.getMask(**kwargs) 

280 return nanMin(_dataToArray(data[self.vectorKey.format(**kwargs)])[mask]) 

281 

282 

283class FracInRange(ScalarFromVectorAction): 

284 """Compute the fraction of a distribution that is between specified 

285 minimum and maximum values, and is not NaN. 

286 """ 

287 

288 maximum = Field[float](doc="The maximum value", default=np.nextafter(np.inf, 0.0)) 

289 minimum = Field[float](doc="The minimum value", default=np.nextafter(-np.inf, 0.0)) 

290 percent = Field[bool](doc="Express result as percentage", default=False) 

291 

292 def __call__(self, data: KeyedData, **kwargs) -> Scalar: 

293 mask = self.getMask(**kwargs) 

294 values = _dataToArray(data[self.vectorKey.format(**kwargs)])[mask] 

295 nvalues = values.size 

296 values = values[np.logical_not(np.isnan(values))] 

297 sel_range = (values >= self.minimum) & (values < self.maximum) 

298 result = cast( 

299 Scalar, 

300 float(values[sel_range].size / nvalues), # type: ignore 

301 ) 

302 if self.percent: 

303 return 100.0 * result 

304 else: 

305 return result 

306 

307 

308class FracNan(ScalarFromVectorAction): 

309 """Compute the fraction of vector entries that are NaN.""" 

310 

311 percent = Field[bool](doc="Express result as percentage", default=False) 

312 

313 def __call__(self, data: KeyedData, **kwargs) -> Scalar: 

314 mask = self.getMask(**kwargs) 

315 values = _dataToArray(data[self.vectorKey.format(**kwargs)])[mask] 

316 nvalues = values.size 

317 values = values[np.isnan(values)] 

318 result = cast( 

319 Scalar, 

320 float(values.size / nvalues), # type: ignore 

321 ) 

322 if self.percent: 

323 return 100.0 * result 

324 else: 

325 return result 

326 

327 

328class SumAction(ScalarFromVectorAction): 

329 """Returns the sum of all values in the column.""" 

330 

331 def __call__(self, data: KeyedData, **kwargs) -> Scalar: 

332 mask = self.getMask(**kwargs) 

333 arr = _dataToArray(data[self.vectorKey.format(**kwargs)])[mask] 

334 return cast(Scalar, np.nansum(arr)) 

335 

336 

337class MedianHistAction(ScalarAction): 

338 """Calculates the median of the given histogram data.""" 

339 

340 histKey = Field[str]("Key of frequency Vector") 

341 midKey = Field[str]("Key of bin midpoints Vector") 

342 

343 def getInputSchema(self) -> KeyedDataSchema: 

344 return ( 

345 (self.histKey, Vector), 

346 (self.midKey, Vector), 

347 ) 

348 

349 def histMedian(self, hist, bin_mid): 

350 """Calculates the median of a histogram with binned values 

351 

352 Parameters 

353 ---------- 

354 hist : `numpy.ndarray` 

355 Frequency array 

356 bin_mid : `numpy.ndarray` 

357 Bin midpoints array 

358 

359 Returns 

360 ------- 

361 median : `float` 

362 Median of histogram with binned values 

363 """ 

364 cumulative_sum = np.cumsum(hist) 

365 median_index = np.searchsorted(cumulative_sum, cumulative_sum[-1] / 2) 

366 median = bin_mid[median_index] 

367 return median 

368 

369 def __call__(self, data: KeyedData, **kwargs): 

370 hist = _dataToArray(data[self.histKey.format(**kwargs)]) 

371 if hist.size != 0: 

372 bin_mid = _dataToArray(data[self.midKey.format(**kwargs)]) 

373 med = cast(Scalar, float(self.histMedian(hist, bin_mid))) 

374 else: 

375 med = np.nan 

376 return med 

377 

378 

379class IqrHistAction(ScalarAction): 

380 """Calculates the interquartile range of the given histogram data.""" 

381 

382 histKey = Field[str]("Key of frequency Vector") 

383 midKey = Field[str]("Key of bin midpoints Vector") 

384 

385 def getInputSchema(self) -> KeyedDataSchema: 

386 return ( 

387 (self.histKey, Vector), 

388 (self.midKey, Vector), 

389 ) 

390 

391 def histIqr(self, hist, bin_mid): 

392 """Calculates the interquartile range of a histogram with binned values 

393 

394 Parameters 

395 ---------- 

396 hist : `numpy.ndarray` 

397 Frequency array 

398 bin_mid : `numpy.ndarray` 

399 Bin midpoints array 

400 

401 Returns 

402 ------- 

403 iqr : `float` 

404 Inter-quartile range of histogram with binned values 

405 """ 

406 cumulative_sum = np.cumsum(hist) 

407 liqr_index = np.searchsorted(cumulative_sum, cumulative_sum[-1] / 4) 

408 uiqr_index = np.searchsorted(cumulative_sum, (3 / 4) * cumulative_sum[-1]) 

409 liqr = bin_mid[liqr_index] 

410 uiqr = bin_mid[uiqr_index] 

411 iqr = uiqr - liqr 

412 return iqr 

413 

414 def __call__(self, data: KeyedData, **kwargs): 

415 hist = _dataToArray(data[self.histKey.format(**kwargs)]) 

416 if hist.size != 0: 

417 bin_mid = _dataToArray(data[self.midKey.format(**kwargs)]) 

418 iqr = cast(Scalar, float(self.histIqr(hist, bin_mid))) 

419 else: 

420 iqr = np.nan 

421 return iqr 

422 

423 

424class DivideScalar(ScalarAction): 

425 """Calculate (A/B) for scalars.""" 

426 

427 actionA = ConfigurableActionField[ScalarAction](doc="Action which supplies scalar A") 

428 actionB = ConfigurableActionField[ScalarAction](doc="Action which supplies scalar B") 

429 

430 def getInputSchema(self) -> KeyedDataSchema: 

431 yield from self.actionA.getInputSchema() 

432 yield from self.actionB.getInputSchema() 

433 

434 def __call__(self, data: KeyedData, **kwargs) -> Scalar: 

435 """Return the result of A/B. 

436 

437 Parameters 

438 ---------- 

439 data : `KeyedData` 

440 

441 Returns 

442 ------- 

443 result : `Scalar` 

444 The result of dividing A by B. 

445 """ 

446 scalarA = self.actionA(data, **kwargs) 

447 scalarB = self.actionB(data, **kwargs) 

448 if scalarB == 0: 

449 if scalarA == 0: 

450 log.warning("Both numerator and denominator are zero! Returning NaN.") 

451 return np.nan 

452 else: 

453 value = np.sign(scalarA) * np.inf 

454 log.warning("Non-zero scalar divided by zero! Returning %f.", value) 

455 return value 

456 else: 

457 return scalarA / scalarB