Coverage for python / lsst / analysis / tools / actions / scalar / scalarActions.py: 33%
199 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-22 09:09 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-22 09:09 +0000
1# This file is part of analysis_tools.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
22from __future__ import annotations
24__all__ = (
25 "MedianAction",
26 "MeanAction",
27 "StdevAction",
28 "ValueAction",
29 "SigmaMadAction",
30 "CountAction",
31 "CountUniqueAction",
32 "ApproxFloor",
33 "FracThreshold",
34 "MaxAction",
35 "MinAction",
36 "FracInRange",
37 "FracNan",
38 "SumAction",
39 "MedianHistAction",
40 "IqrHistAction",
41 "DivideScalar",
42 "RmsAction",
43)
45import logging
46import operator
47from math import nan
48from typing import cast
50import numpy as np
51from lsst.pex.config import ChoiceField, Field
52from lsst.pex.config.configurableActions import ConfigurableActionField
54from ...interfaces import KeyedData, KeyedDataSchema, Scalar, ScalarAction, Vector
55from ...math import nanMax, nanMean, nanMedian, nanMin, nanSigmaMad, nanStd
57log = logging.getLogger(__name__)
60def _dataToArray(data):
61 """Convert input data into a numpy array using the appropriate
62 protocol. `np.from_dlpack` is used for Tensor-like arrays
63 where possible.
64 """
65 try:
66 return np.from_dlpack(data)
67 except (AttributeError, BufferError):
68 return np.array(data)
71class ScalarFromVectorAction(ScalarAction):
72 """Calculates a statistic from a single vector."""
74 vectorKey = Field[str]("Key of Vector to compute statistic from.")
76 def getInputSchema(self) -> KeyedDataSchema:
77 return ((self.vectorKey, Vector),)
80class MedianAction(ScalarFromVectorAction):
81 """Calculates the median of the given data."""
83 def __call__(self, data: KeyedData, **kwargs) -> Scalar:
84 mask = self.getMask(**kwargs)
85 values = _dataToArray(data[self.vectorKey.format(**kwargs)])[mask]
86 med = nanMedian(values) if values.size else np.nan
88 return med
91class MeanAction(ScalarFromVectorAction):
92 """Calculates the mean of the given data."""
94 def __call__(self, data: KeyedData, **kwargs) -> Scalar:
95 mask = self.getMask(**kwargs)
96 values = _dataToArray(data[self.vectorKey.format(**kwargs)])[mask]
97 mean = nanMean(values) if values.size else np.nan
99 return mean
102class StdevAction(ScalarFromVectorAction):
103 """Calculates the standard deviation of the given data."""
105 def __call__(self, data: KeyedData, **kwargs) -> Scalar:
106 mask = self.getMask(**kwargs)
107 return nanStd(_dataToArray(data[self.vectorKey.format(**kwargs)])[mask])
110class RmsAction(ScalarFromVectorAction):
111 """Calculates the root mean square of the given data (without subtracting
112 the mean as in StdevAction)."""
114 def __call__(self, data: KeyedData, **kwargs) -> Scalar:
115 mask = self.getMask(**kwargs)
116 vector = _dataToArray(data[self.vectorKey.format(**kwargs)])[mask]
117 vector = vector[~np.isnan(vector)]
119 return np.sqrt(np.mean(vector**2))
122class ValueAction(ScalarFromVectorAction):
123 """Extracts the first value from a vector."""
125 def __call__(self, data: KeyedData, **kwargs) -> Scalar:
126 return cast(Scalar, float(data[self.vectorKey.format(**kwargs)][0]))
129class SigmaMadAction(ScalarFromVectorAction):
130 """Calculates the sigma mad of the given data."""
132 def __call__(self, data: KeyedData, **kwargs) -> Scalar:
133 mask = self.getMask(**kwargs)
134 values = _dataToArray(data[self.vectorKey.format(**kwargs)])[mask]
135 return nanSigmaMad(values)
138class CountAction(ScalarAction):
139 """Performs count actions, with threshold-based filtering.
140 The operator is specified as a string, for example, "lt", "le", "ge",
141 "gt", "ne", and "eq" for the mathematical operations <, <=, >=, >, !=,
142 and == respectively. To count non-NaN values, only pass the column name
143 as vector key. To count NaN values, pass threshold = nan (from math.nan).
144 Optionally to configure from a YAML file, pass "threshold: !!float nan".
145 To compute the number of elements with values less than a given threshold,
146 use op="le".
147 """
149 vectorKey = Field[str]("Key of Vector to count")
150 op = ChoiceField[str](
151 doc="Operator name string.",
152 allowed={
153 "lt": "less than threshold",
154 "le": "less than or equal to threshold",
155 "ge": "greater than or equal to threshold",
156 "ne": "not equal to a given value",
157 "eq": "equal to a given value",
158 "gt": "greater than threshold",
159 },
160 default="ne",
161 )
162 threshold = Field[float](doc="Threshold to apply.", default=nan)
164 def getInputSchema(self) -> KeyedDataSchema:
165 return ((self.vectorKey, Vector),)
167 def __call__(self, data: KeyedData, **kwargs) -> Scalar:
168 mask = self.getMask(**kwargs)
169 arr = _dataToArray(data[self.vectorKey.format(**kwargs)])[mask]
171 # Count NaNs and non-NaNs
172 if self.threshold == nan:
173 if self.op == "eq":
174 # Count number of NaNs
175 result = np.isnan(arr).sum()
176 return cast(Scalar, int(result))
177 elif self.op == "ne":
178 # Count number of non-NaNs
179 result = arr.size - np.isnan(arr).sum()
180 return cast(Scalar, int(result))
181 else:
182 raise ValueError("Invalid operator for counting NaNs.")
183 # Count for given threshold ignoring all NaNs
184 else:
185 result = arr[~np.isnan(arr)]
186 result = cast(
187 Scalar,
188 int(np.sum(getattr(operator, self.op)(result, self.threshold))),
189 )
190 return result
193class CountUniqueAction(ScalarFromVectorAction):
194 """Counts the number of unique rows in a given column."""
196 def __call__(self, data: KeyedData, **kwargs) -> Scalar:
197 mask = self.getMask(**kwargs)
198 values = _dataToArray(data[self.vectorKey.format(**kwargs)])[mask]
199 count = np.unique(values).size
200 return cast(Scalar, count)
203class ApproxFloor(ScalarFromVectorAction):
204 """Returns the median of the lowest ten values of the sorted input."""
206 def __call__(self, data: KeyedData, **kwargs) -> Scalar:
207 mask = self.getMask(**kwargs)
208 values = np.sort(_dataToArray(data[self.vectorKey.format(**kwargs)])[mask], axis=None) # type: ignore
209 x = values.size // 10
210 return nanMedian(values[-x:])
213class FracThreshold(ScalarFromVectorAction):
214 """Compute the fraction of a distribution above or below a threshold.
216 The operator is specified as a string, for example,
217 "lt", "le", "ge", "gt" for the mathematical operations <, <=, >=, >. To
218 compute the fraction of elements with values less than a given threshold,
219 use op="le".
220 """
222 op = ChoiceField[str](
223 doc="Operator name string.",
224 allowed={
225 "lt": "less than threshold",
226 "le": "less than or equal to threshold",
227 "ge": "greater than or equal to threshold",
228 "gt": "greater than threshold",
229 },
230 )
231 threshold = Field[float](doc="Threshold to apply.")
232 percent = Field[bool](doc="Express result as percentage", default=False)
233 relative_to_median = Field[bool](doc="Calculate threshold relative to the median?", default=False)
234 use_absolute_value = Field[bool](
235 doc=(
236 "Calculate threshold after taking absolute value. If relative_to_median"
237 " is true the absolute value will be applied after the median is subtracted"
238 ),
239 default=False,
240 )
242 def __call__(self, data: KeyedData, **kwargs) -> Scalar:
243 mask = self.getMask(**kwargs)
244 values = _dataToArray(data[self.vectorKey.format(**kwargs)])[mask]
245 values = values[np.logical_not(np.isnan(values))]
246 n_values = values.size
247 if n_values == 0:
248 return np.nan
249 threshold = self.threshold
250 # If relative_to_median is set, shift the threshold to be median+thresh
251 if self.relative_to_median and values.size > 0:
252 offset = nanMedian(values)
253 if np.isfinite(offset):
254 values -= offset
255 if self.use_absolute_value:
256 values = np.abs(values)
257 result = cast(
258 Scalar,
259 float(np.sum(getattr(operator, self.op)(values, threshold)) / n_values), # type: ignore
260 )
261 if self.percent:
262 return 100.0 * result
263 else:
264 return result
267class MaxAction(ScalarFromVectorAction):
268 """Returns the maximum of the given data."""
270 def __call__(self, data: KeyedData, **kwargs) -> Scalar:
271 mask = self.getMask(**kwargs)
272 return nanMax(_dataToArray(data[self.vectorKey.format(**kwargs)])[mask])
275class MinAction(ScalarFromVectorAction):
276 """Returns the minimum of the given data."""
278 def __call__(self, data: KeyedData, **kwargs) -> Scalar:
279 mask = self.getMask(**kwargs)
280 return nanMin(_dataToArray(data[self.vectorKey.format(**kwargs)])[mask])
283class FracInRange(ScalarFromVectorAction):
284 """Compute the fraction of a distribution that is between specified
285 minimum and maximum values, and is not NaN.
286 """
288 maximum = Field[float](doc="The maximum value", default=np.nextafter(np.inf, 0.0))
289 minimum = Field[float](doc="The minimum value", default=np.nextafter(-np.inf, 0.0))
290 percent = Field[bool](doc="Express result as percentage", default=False)
292 def __call__(self, data: KeyedData, **kwargs) -> Scalar:
293 mask = self.getMask(**kwargs)
294 values = _dataToArray(data[self.vectorKey.format(**kwargs)])[mask]
295 nvalues = values.size
296 values = values[np.logical_not(np.isnan(values))]
297 sel_range = (values >= self.minimum) & (values < self.maximum)
298 result = cast(
299 Scalar,
300 float(values[sel_range].size / nvalues), # type: ignore
301 )
302 if self.percent:
303 return 100.0 * result
304 else:
305 return result
308class FracNan(ScalarFromVectorAction):
309 """Compute the fraction of vector entries that are NaN."""
311 percent = Field[bool](doc="Express result as percentage", default=False)
313 def __call__(self, data: KeyedData, **kwargs) -> Scalar:
314 mask = self.getMask(**kwargs)
315 values = _dataToArray(data[self.vectorKey.format(**kwargs)])[mask]
316 nvalues = values.size
317 values = values[np.isnan(values)]
318 result = cast(
319 Scalar,
320 float(values.size / nvalues), # type: ignore
321 )
322 if self.percent:
323 return 100.0 * result
324 else:
325 return result
328class SumAction(ScalarFromVectorAction):
329 """Returns the sum of all values in the column."""
331 def __call__(self, data: KeyedData, **kwargs) -> Scalar:
332 mask = self.getMask(**kwargs)
333 arr = _dataToArray(data[self.vectorKey.format(**kwargs)])[mask]
334 return cast(Scalar, np.nansum(arr))
337class MedianHistAction(ScalarAction):
338 """Calculates the median of the given histogram data."""
340 histKey = Field[str]("Key of frequency Vector")
341 midKey = Field[str]("Key of bin midpoints Vector")
343 def getInputSchema(self) -> KeyedDataSchema:
344 return (
345 (self.histKey, Vector),
346 (self.midKey, Vector),
347 )
349 def histMedian(self, hist, bin_mid):
350 """Calculates the median of a histogram with binned values
352 Parameters
353 ----------
354 hist : `numpy.ndarray`
355 Frequency array
356 bin_mid : `numpy.ndarray`
357 Bin midpoints array
359 Returns
360 -------
361 median : `float`
362 Median of histogram with binned values
363 """
364 cumulative_sum = np.cumsum(hist)
365 median_index = np.searchsorted(cumulative_sum, cumulative_sum[-1] / 2)
366 median = bin_mid[median_index]
367 return median
369 def __call__(self, data: KeyedData, **kwargs):
370 hist = _dataToArray(data[self.histKey.format(**kwargs)])
371 if hist.size != 0:
372 bin_mid = _dataToArray(data[self.midKey.format(**kwargs)])
373 med = cast(Scalar, float(self.histMedian(hist, bin_mid)))
374 else:
375 med = np.nan
376 return med
379class IqrHistAction(ScalarAction):
380 """Calculates the interquartile range of the given histogram data."""
382 histKey = Field[str]("Key of frequency Vector")
383 midKey = Field[str]("Key of bin midpoints Vector")
385 def getInputSchema(self) -> KeyedDataSchema:
386 return (
387 (self.histKey, Vector),
388 (self.midKey, Vector),
389 )
391 def histIqr(self, hist, bin_mid):
392 """Calculates the interquartile range of a histogram with binned values
394 Parameters
395 ----------
396 hist : `numpy.ndarray`
397 Frequency array
398 bin_mid : `numpy.ndarray`
399 Bin midpoints array
401 Returns
402 -------
403 iqr : `float`
404 Inter-quartile range of histogram with binned values
405 """
406 cumulative_sum = np.cumsum(hist)
407 liqr_index = np.searchsorted(cumulative_sum, cumulative_sum[-1] / 4)
408 uiqr_index = np.searchsorted(cumulative_sum, (3 / 4) * cumulative_sum[-1])
409 liqr = bin_mid[liqr_index]
410 uiqr = bin_mid[uiqr_index]
411 iqr = uiqr - liqr
412 return iqr
414 def __call__(self, data: KeyedData, **kwargs):
415 hist = _dataToArray(data[self.histKey.format(**kwargs)])
416 if hist.size != 0:
417 bin_mid = _dataToArray(data[self.midKey.format(**kwargs)])
418 iqr = cast(Scalar, float(self.histIqr(hist, bin_mid)))
419 else:
420 iqr = np.nan
421 return iqr
424class DivideScalar(ScalarAction):
425 """Calculate (A/B) for scalars."""
427 actionA = ConfigurableActionField[ScalarAction](doc="Action which supplies scalar A")
428 actionB = ConfigurableActionField[ScalarAction](doc="Action which supplies scalar B")
430 def getInputSchema(self) -> KeyedDataSchema:
431 yield from self.actionA.getInputSchema()
432 yield from self.actionB.getInputSchema()
434 def __call__(self, data: KeyedData, **kwargs) -> Scalar:
435 """Return the result of A/B.
437 Parameters
438 ----------
439 data : `KeyedData`
441 Returns
442 -------
443 result : `Scalar`
444 The result of dividing A by B.
445 """
446 scalarA = self.actionA(data, **kwargs)
447 scalarB = self.actionB(data, **kwargs)
448 if scalarB == 0:
449 if scalarA == 0:
450 log.warning("Both numerator and denominator are zero! Returning NaN.")
451 return np.nan
452 else:
453 value = np.sign(scalarA) * np.inf
454 log.warning("Non-zero scalar divided by zero! Returning %f.", value)
455 return value
456 else:
457 return scalarA / scalarB