Coverage for python / lsst / analysis / tools / actions / scalar / scalarActions.py: 34%
223 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-06 09:07 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-06 09:07 +0000
1# This file is part of analysis_tools.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
22from __future__ import annotations
24__all__ = (
25 "MedianAction",
26 "MeanAction",
27 "StdevAction",
28 "ValueAction",
29 "SigmaMadAction",
30 "CountAction",
31 "CountUniqueAction",
32 "ApproxFloor",
33 "FracThreshold",
34 "MaxAction",
35 "MinAction",
36 "FracInRange",
37 "FracNan",
38 "SumAction",
39 "MedianHistAction",
40 "IqrHistAction",
41 "DivideScalar",
42 "RmsAction",
43 "MedianGradientAction",
44)
46import logging
47import operator
48from math import nan
49from typing import cast
51import numpy as np
52from scipy.optimize import curve_fit
53from scipy.stats import binned_statistic
55from lsst.pex.config import ChoiceField, Field
56from lsst.pex.config.configurableActions import ConfigurableActionField
58from ...interfaces import KeyedData, KeyedDataSchema, Scalar, ScalarAction, Vector
59from ...math import nanMax, nanMean, nanMedian, nanMin, nanSigmaMad, nanStd
61log = logging.getLogger(__name__)
64def _dataToArray(data):
65 """Convert input data into a numpy array using the appropriate
66 protocol. `np.from_dlpack` is used for Tensor-like arrays
67 where possible.
68 """
69 try:
70 return np.from_dlpack(data)
71 except (AttributeError, BufferError):
72 return np.array(data)
75class ScalarFromVectorAction(ScalarAction):
76 """Calculates a statistic from a single vector."""
78 vectorKey = Field[str]("Key of Vector to compute statistic from.")
80 def getInputSchema(self) -> KeyedDataSchema:
81 return ((self.vectorKey, Vector),)
84class MedianAction(ScalarFromVectorAction):
85 """Calculates the median of the given data."""
87 def __call__(self, data: KeyedData, **kwargs) -> Scalar:
88 mask = self.getMask(**kwargs)
89 values = _dataToArray(data[self.vectorKey.format(**kwargs)])[mask]
90 med = nanMedian(values) if values.size else np.nan
92 return med
95class MeanAction(ScalarFromVectorAction):
96 """Calculates the mean of the given data."""
98 def __call__(self, data: KeyedData, **kwargs) -> Scalar:
99 mask = self.getMask(**kwargs)
100 values = _dataToArray(data[self.vectorKey.format(**kwargs)])[mask]
101 mean = nanMean(values) if values.size else np.nan
103 return mean
106class StdevAction(ScalarFromVectorAction):
107 """Calculates the standard deviation of the given data."""
109 def __call__(self, data: KeyedData, **kwargs) -> Scalar:
110 mask = self.getMask(**kwargs)
111 return nanStd(_dataToArray(data[self.vectorKey.format(**kwargs)])[mask])
114class RmsAction(ScalarFromVectorAction):
115 """Calculates the root mean square of the given data (without subtracting
116 the mean as in StdevAction)."""
118 def __call__(self, data: KeyedData, **kwargs) -> Scalar:
119 mask = self.getMask(**kwargs)
120 vector = _dataToArray(data[self.vectorKey.format(**kwargs)])[mask]
121 vector = vector[~np.isnan(vector)]
123 return np.sqrt(np.mean(vector**2))
126class ValueAction(ScalarFromVectorAction):
127 """Extracts the first value from a vector."""
129 def __call__(self, data: KeyedData, **kwargs) -> Scalar:
130 return cast(Scalar, float(data[self.vectorKey.format(**kwargs)][0]))
133class SigmaMadAction(ScalarFromVectorAction):
134 """Calculates the sigma mad of the given data."""
136 def __call__(self, data: KeyedData, **kwargs) -> Scalar:
137 mask = self.getMask(**kwargs)
138 values = _dataToArray(data[self.vectorKey.format(**kwargs)])[mask]
139 return nanSigmaMad(values)
142class CountAction(ScalarAction):
143 """Performs count actions, with threshold-based filtering.
144 The operator is specified as a string, for example, "lt", "le", "ge",
145 "gt", "ne", and "eq" for the mathematical operations <, <=, >=, >, !=,
146 and == respectively. To count non-NaN values, only pass the column name
147 as vector key. To count NaN values, pass threshold = nan (from math.nan).
148 Optionally to configure from a YAML file, pass "threshold: !!float nan".
149 To compute the number of elements with values less than a given threshold,
150 use op="le".
151 """
153 vectorKey = Field[str]("Key of Vector to count")
154 op = ChoiceField[str](
155 doc="Operator name string.",
156 allowed={
157 "lt": "less than threshold",
158 "le": "less than or equal to threshold",
159 "ge": "greater than or equal to threshold",
160 "ne": "not equal to a given value",
161 "eq": "equal to a given value",
162 "gt": "greater than threshold",
163 },
164 default="ne",
165 )
166 threshold = Field[float](doc="Threshold to apply.", default=nan)
168 def getInputSchema(self) -> KeyedDataSchema:
169 return ((self.vectorKey, Vector),)
171 def __call__(self, data: KeyedData, **kwargs) -> Scalar:
172 mask = self.getMask(**kwargs)
173 arr = _dataToArray(data[self.vectorKey.format(**kwargs)])[mask]
175 # Count NaNs and non-NaNs
176 if self.threshold == nan:
177 if self.op == "eq":
178 # Count number of NaNs
179 result = np.isnan(arr).sum()
180 return cast(Scalar, int(result))
181 elif self.op == "ne":
182 # Count number of non-NaNs
183 result = arr.size - np.isnan(arr).sum()
184 return cast(Scalar, int(result))
185 else:
186 raise ValueError("Invalid operator for counting NaNs.")
187 # Count for given threshold ignoring all NaNs
188 else:
189 result = arr[~np.isnan(arr)]
190 result = cast(
191 Scalar,
192 int(np.sum(getattr(operator, self.op)(result, self.threshold))),
193 )
194 return result
197class CountUniqueAction(ScalarFromVectorAction):
198 """Counts the number of unique rows in a given column."""
200 def __call__(self, data: KeyedData, **kwargs) -> Scalar:
201 mask = self.getMask(**kwargs)
202 values = _dataToArray(data[self.vectorKey.format(**kwargs)])[mask]
203 count = np.unique(values).size
204 return cast(Scalar, count)
207class ApproxFloor(ScalarFromVectorAction):
208 """Returns the median of the lowest ten values of the sorted input."""
210 def __call__(self, data: KeyedData, **kwargs) -> Scalar:
211 mask = self.getMask(**kwargs)
212 values = np.sort(_dataToArray(data[self.vectorKey.format(**kwargs)])[mask], axis=None) # type: ignore
213 x = values.size // 10
214 return nanMedian(values[-x:])
217class FracThreshold(ScalarFromVectorAction):
218 """Compute the fraction of a distribution above or below a threshold.
220 The operator is specified as a string, for example,
221 "lt", "le", "ge", "gt" for the mathematical operations <, <=, >=, >. To
222 compute the fraction of elements with values less than a given threshold,
223 use op="le".
224 """
226 op = ChoiceField[str](
227 doc="Operator name string.",
228 allowed={
229 "lt": "less than threshold",
230 "le": "less than or equal to threshold",
231 "ge": "greater than or equal to threshold",
232 "gt": "greater than threshold",
233 },
234 )
235 threshold = Field[float](doc="Threshold to apply.")
236 percent = Field[bool](doc="Express result as percentage", default=False)
237 relative_to_median = Field[bool](doc="Calculate threshold relative to the median?", default=False)
238 use_absolute_value = Field[bool](
239 doc=(
240 "Calculate threshold after taking absolute value. If relative_to_median"
241 " is true the absolute value will be applied after the median is subtracted"
242 ),
243 default=False,
244 )
246 def __call__(self, data: KeyedData, **kwargs) -> Scalar:
247 mask = self.getMask(**kwargs)
248 values = _dataToArray(data[self.vectorKey.format(**kwargs)])[mask]
249 values = values[np.logical_not(np.isnan(values))]
250 n_values = values.size
251 if n_values == 0:
252 return np.nan
253 threshold = self.threshold
254 # If relative_to_median is set, shift the threshold to be median+thresh
255 if self.relative_to_median and values.size > 0:
256 offset = nanMedian(values)
257 if np.isfinite(offset):
258 values -= offset
259 if self.use_absolute_value:
260 values = np.abs(values)
261 result = cast(
262 Scalar,
263 float(np.sum(getattr(operator, self.op)(values, threshold)) / n_values), # type: ignore
264 )
265 if self.percent:
266 return 100.0 * result
267 else:
268 return result
271class MaxAction(ScalarFromVectorAction):
272 """Returns the maximum of the given data."""
274 def __call__(self, data: KeyedData, **kwargs) -> Scalar:
275 mask = self.getMask(**kwargs)
276 return nanMax(_dataToArray(data[self.vectorKey.format(**kwargs)])[mask])
279class MinAction(ScalarFromVectorAction):
280 """Returns the minimum of the given data."""
282 def __call__(self, data: KeyedData, **kwargs) -> Scalar:
283 mask = self.getMask(**kwargs)
284 return nanMin(_dataToArray(data[self.vectorKey.format(**kwargs)])[mask])
287class FracInRange(ScalarFromVectorAction):
288 """Compute the fraction of a distribution that is between specified
289 minimum and maximum values, and is not NaN.
290 """
292 maximum = Field[float](doc="The maximum value", default=np.nextafter(np.inf, 0.0))
293 minimum = Field[float](doc="The minimum value", default=np.nextafter(-np.inf, 0.0))
294 percent = Field[bool](doc="Express result as percentage", default=False)
296 def __call__(self, data: KeyedData, **kwargs) -> Scalar:
297 mask = self.getMask(**kwargs)
298 values = _dataToArray(data[self.vectorKey.format(**kwargs)])[mask]
299 nvalues = values.size
300 values = values[np.logical_not(np.isnan(values))]
301 sel_range = (values >= self.minimum) & (values < self.maximum)
302 result = cast(
303 Scalar,
304 float(values[sel_range].size / nvalues), # type: ignore
305 )
306 if self.percent:
307 return 100.0 * result
308 else:
309 return result
312class FracNan(ScalarFromVectorAction):
313 """Compute the fraction of vector entries that are NaN."""
315 percent = Field[bool](doc="Express result as percentage", default=False)
317 def __call__(self, data: KeyedData, **kwargs) -> Scalar:
318 mask = self.getMask(**kwargs)
319 values = _dataToArray(data[self.vectorKey.format(**kwargs)])[mask]
320 nvalues = values.size
321 values = values[np.isnan(values)]
322 result = cast(
323 Scalar,
324 float(values.size / nvalues), # type: ignore
325 )
326 if self.percent:
327 return 100.0 * result
328 else:
329 return result
332class SumAction(ScalarFromVectorAction):
333 """Returns the sum of all values in the column."""
335 def __call__(self, data: KeyedData, **kwargs) -> Scalar:
336 mask = self.getMask(**kwargs)
337 arr = _dataToArray(data[self.vectorKey.format(**kwargs)])[mask]
338 return cast(Scalar, np.nansum(arr))
341class MedianHistAction(ScalarAction):
342 """Calculates the median of the given histogram data."""
344 histKey = Field[str]("Key of frequency Vector")
345 midKey = Field[str]("Key of bin midpoints Vector")
347 def getInputSchema(self) -> KeyedDataSchema:
348 return (
349 (self.histKey, Vector),
350 (self.midKey, Vector),
351 )
353 def histMedian(self, hist, bin_mid):
354 """Calculates the median of a histogram with binned values
356 Parameters
357 ----------
358 hist : `numpy.ndarray`
359 Frequency array
360 bin_mid : `numpy.ndarray`
361 Bin midpoints array
363 Returns
364 -------
365 median : `float`
366 Median of histogram with binned values
367 """
368 cumulative_sum = np.cumsum(hist)
369 median_index = np.searchsorted(cumulative_sum, cumulative_sum[-1] / 2)
370 median = bin_mid[median_index]
371 return median
373 def __call__(self, data: KeyedData, **kwargs):
374 hist = _dataToArray(data[self.histKey.format(**kwargs)])
375 if hist.size != 0:
376 bin_mid = _dataToArray(data[self.midKey.format(**kwargs)])
377 med = cast(Scalar, float(self.histMedian(hist, bin_mid)))
378 else:
379 med = np.nan
380 return med
383class IqrHistAction(ScalarAction):
384 """Calculates the interquartile range of the given histogram data."""
386 histKey = Field[str]("Key of frequency Vector")
387 midKey = Field[str]("Key of bin midpoints Vector")
389 def getInputSchema(self) -> KeyedDataSchema:
390 return (
391 (self.histKey, Vector),
392 (self.midKey, Vector),
393 )
395 def histIqr(self, hist, bin_mid):
396 """Calculates the interquartile range of a histogram with binned values
398 Parameters
399 ----------
400 hist : `numpy.ndarray`
401 Frequency array
402 bin_mid : `numpy.ndarray`
403 Bin midpoints array
405 Returns
406 -------
407 iqr : `float`
408 Inter-quartile range of histogram with binned values
409 """
410 cumulative_sum = np.cumsum(hist)
411 liqr_index = np.searchsorted(cumulative_sum, cumulative_sum[-1] / 4)
412 uiqr_index = np.searchsorted(cumulative_sum, (3 / 4) * cumulative_sum[-1])
413 liqr = bin_mid[liqr_index]
414 uiqr = bin_mid[uiqr_index]
415 iqr = uiqr - liqr
416 return iqr
418 def __call__(self, data: KeyedData, **kwargs):
419 hist = _dataToArray(data[self.histKey.format(**kwargs)])
420 if hist.size != 0:
421 bin_mid = _dataToArray(data[self.midKey.format(**kwargs)])
422 iqr = cast(Scalar, float(self.histIqr(hist, bin_mid)))
423 else:
424 iqr = np.nan
425 return iqr
428class DivideScalar(ScalarAction):
429 """Calculate (A/B) for scalars."""
431 actionA = ConfigurableActionField[ScalarAction](doc="Action which supplies scalar A")
432 actionB = ConfigurableActionField[ScalarAction](doc="Action which supplies scalar B")
434 def getInputSchema(self) -> KeyedDataSchema:
435 yield from self.actionA.getInputSchema()
436 yield from self.actionB.getInputSchema()
438 def __call__(self, data: KeyedData, **kwargs) -> Scalar:
439 """Return the result of A/B.
441 Parameters
442 ----------
443 data : `KeyedData`
445 Returns
446 -------
447 result : `Scalar`
448 The result of dividing A by B.
449 """
450 scalarA = self.actionA(data, **kwargs)
451 scalarB = self.actionB(data, **kwargs)
452 if scalarB == 0:
453 if scalarA == 0:
454 log.warning("Both numerator and denominator are zero! Returning NaN.")
455 return np.nan
456 else:
457 value = np.sign(scalarA) * np.inf
458 log.warning("Non-zero scalar divided by zero! Returning %f.", value)
459 return value
460 else:
461 return scalarA / scalarB
464class MedianGradientAction(ScalarAction):
465 """Calculate the gradient of a running median"""
467 lowerBinLimit = Field[float](doc="Percentile of data to start the bining at", default=5.0)
468 upperBinLimit = Field[float](doc="Percentile of data to end the binning at", default=95.0)
469 nBins = Field[int](doc="Number of bins to use for running median", default=50)
470 xsVectorKey = Field[str]("Key of Vector that gives the x location of the points.")
471 ysVectorKey = Field[str]("Key of the Vector to compute the statistic from.")
473 def getInputSchema(self) -> KeyedDataSchema:
474 return ((self.xsVectorKey, Vector), (self.ysVectorKey, Vector))
476 def __call__(self, data: KeyedData, **kwargs) -> Scalar:
477 """Return the gradient of the running median.
479 Parameters
480 ----------
481 data : `KeyedData`
483 Returns
484 -------
485 result : `Scalar`
486 The gradient of the running median
487 """
489 xs = _dataToArray(data[self.xsVectorKey.format(**kwargs)])
490 ys = _dataToArray(data[self.ysVectorKey.format(**kwargs)])
492 lowerLim = np.nanpercentile(xs, self.lowerBinLimit)
493 upperLim = np.nanpercentile(xs, self.upperBinLimit)
495 use = (xs > lowerLim) & (xs < upperLim) & np.isfinite(xs) & np.isfinite(ys)
497 if np.sum(use) > 2:
498 meds, binEdges, _ = binned_statistic(xs[use], ys[use], statistic="median", bins=self.nBins)
499 else:
500 return np.nan
502 def func(x, m, b):
503 return m * x + b
505 finiteMeds = np.isfinite(meds)
506 popt, _ = curve_fit(func, binEdges[:-1][finiteMeds], meds[finiteMeds])
508 return popt[0]