Coverage for python / lsst / analysis / tools / actions / vector / vectorActions.py: 46%
212 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-21 10:55 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-21 10:55 +0000
1# This file is part of analysis_tools.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
21from __future__ import annotations
23__all__ = (
24 "LoadVector",
25 "DownselectVector",
26 "MultiCriteriaDownselectVector",
27 "ConvertFluxToMag",
28 "ConvertUnits",
29 "CalcSn",
30 "ColorDiff",
31 "ColorError",
32 "MagDiff",
33 "ExtinctionCorrectedMagDiff",
34 "PerGroupStatistic",
35 "ResidualWithPerGroupStatistic",
36 "RAcosDec",
37 "AngularSeparation",
38 "IsMatchedObjectSameClass",
39)
41import logging
42from typing import cast
44import numpy as np
45import pandas as pd
46from astropy import units as u
47from astropy.coordinates import SkyCoord
49from lsst.pex.config import DictField, Field
50from lsst.pex.config.configurableActions import ConfigurableActionField, ConfigurableActionStructField
52from ...interfaces import KeyedData, KeyedDataSchema, Vector, VectorAction
53from ...math import divide, fluxToMag, log10
54from .selectors import VectorSelector
56_LOG = logging.getLogger(__name__)
58# Basic vectorActions
61class LoadVector(VectorAction):
62 """Load and return a Vector from KeyedData."""
64 vectorKey = Field[str](doc="Key of vector which should be loaded")
66 def getInputSchema(self) -> KeyedDataSchema:
67 return ((self.vectorKey, Vector),)
69 def __call__(self, data: KeyedData, **kwargs) -> Vector:
70 return np.array(cast(Vector, data[self.vectorKey.format(**kwargs)]))
73class DownselectVector(VectorAction):
74 """Get a vector from KeyedData, apply specified selector, return the
75 shorter Vector.
76 """
78 vectorKey = Field[str](doc="column key to load from KeyedData")
80 selector = ConfigurableActionField[VectorAction](
81 doc="Action which returns a selection mask", default=VectorSelector
82 )
84 def getInputSchema(self) -> KeyedDataSchema:
85 yield (self.vectorKey, Vector)
86 yield from cast(VectorAction, self.selector).getInputSchema()
88 def __call__(self, data: KeyedData, **kwargs) -> Vector:
89 # Explicitly comparing to True makes this work even if the selector
90 # returns an int or other non-boolean typed array
91 mask = np.array(cast(VectorAction, self.selector)(data, **kwargs) == True, dtype=bool) # noqa: E712
92 return cast(Vector, data[self.vectorKey.format(**kwargs)])[mask]
95class MultiCriteriaDownselectVector(VectorAction):
96 """Get a vector from KeyedData, apply specified set of selectors with AND
97 logic, and return the shorter Vector.
98 """
100 vectorKey = Field[str](doc="column key to load from KeyedData")
102 selectors = ConfigurableActionStructField[VectorAction](
103 doc="Selectors for selecting rows, will be AND together",
104 )
106 def getInputSchema(self) -> KeyedDataSchema:
107 yield (self.vectorKey, Vector)
108 for action in self.selectors:
109 yield from action.getInputSchema()
111 def __call__(self, data: KeyedData, **kwargs) -> Vector:
112 mask: Vector | None = None
113 for selector in self.selectors:
114 subMask = selector(data, **kwargs)
115 if mask is None:
116 mask = subMask
117 else:
118 mask *= subMask # type: ignore
119 return cast(Vector, data[self.vectorKey.format(**kwargs)])[mask]
122# Astronomical vectorActions
125class CalcSn(VectorAction):
126 """Calculate the signal-to-noise ratio from a single flux vector."""
128 fluxType = Field[str](doc="Flux type (vector key) to calculate the S/N.", default="{band}_psfFlux")
129 uncertaintySuffix = Field[str](
130 doc="Suffix to add to fluxType to specify the uncertainty column", default="Err"
131 )
133 def getInputSchema(self) -> KeyedDataSchema:
134 yield self.fluxType, Vector
135 yield f"{self.fluxType}{self.uncertaintySuffix}", Vector
137 def __call__(self, data: KeyedData, **kwargs) -> Vector:
138 signal = np.array(data[self.fluxType.format(**kwargs)])
139 noise = np.array(data[f"{self.fluxType}{self.uncertaintySuffix}".format(**kwargs)])
140 return divide(signal, noise)
143class ColorDiff(VectorAction):
144 """Calculate the difference between two colors from flux actions."""
146 color1_flux1 = ConfigurableActionField[VectorAction](doc="Action providing first color's first flux")
147 color1_flux2 = ConfigurableActionField[VectorAction](doc="Action providing first color's second flux")
148 color2_flux1 = ConfigurableActionField[VectorAction](doc="Action providing second color's first flux")
149 color2_flux2 = ConfigurableActionField[VectorAction](doc="Action providing second color's second flux")
150 returnMillimags = Field[bool](doc="Whether to return color_diff in millimags (mags if not)", default=True)
152 def getInputSchema(self) -> KeyedDataSchema:
153 yield from self.color1_flux1.getInputSchema()
154 yield from self.color1_flux2.getInputSchema()
155 yield from self.color2_flux1.getInputSchema()
156 yield from self.color2_flux2.getInputSchema()
158 def __call__(self, data: KeyedData, **kwargs) -> Vector:
159 color_diff = -2.5 * log10(
160 divide(
161 self.color1_flux1(data, **kwargs) * self.color2_flux2(data, **kwargs),
162 self.color1_flux2(data, **kwargs) * self.color2_flux1(data, **kwargs),
163 )
164 )
166 if self.returnMillimags:
167 color_diff *= 1000
169 return color_diff
172class ColorError(VectorAction):
173 """Calculate the error in a color from two different flux error columns."""
175 flux_err1 = ConfigurableActionField[VectorAction](doc="Action providing error for first flux")
176 flux_err2 = ConfigurableActionField[VectorAction](doc="Action providing error for second flux")
177 returnMillimags = Field[bool](doc="Whether to return color_err in millimags (mags if not)", default=True)
179 def getInputSchema(self) -> KeyedDataSchema:
180 yield from self.flux_err1.getInputSchema()
181 yield from self.flux_err2.getInputSchema()
183 def __call__(self, data: KeyedData, **kwargs) -> Vector:
184 flux_err1 = self.flux_err1(data, **kwargs)
185 flux_err2 = self.flux_err2(data, **kwargs)
186 color_err = (2.5 / np.log(10)) * np.hypot(flux_err1, flux_err2)
188 if self.returnMillimags:
189 color_err *= 1000
191 return color_err
194class ConvertFluxToMag(VectorAction):
195 """Turn nanojanskys into magnitudes."""
197 vectorKey = Field[str](doc="Key of flux vector to convert to mags")
198 fluxUnit = Field[str](doc="Astropy unit of flux vector", default="nJy")
199 returnMillimags = Field[bool](doc="Use millimags or not?", default=False)
201 def getInputSchema(self) -> KeyedDataSchema:
202 return ((self.vectorKey, Vector),)
204 def __call__(self, data: KeyedData, **kwargs) -> Vector:
205 return fluxToMag(
206 cast(Vector, data[self.vectorKey.format(**kwargs)]),
207 flux_unit=self.fluxUnit,
208 return_millimags=self.returnMillimags,
209 )
212class ConvertUnits(VectorAction):
213 """Convert the units of a vector."""
215 buildAction = ConfigurableActionField(doc="Action to build vector", default=LoadVector)
216 inUnit = Field[str](doc="input Astropy unit")
217 outUnit = Field[str](doc="output Astropy unit")
219 def getInputSchema(self) -> KeyedDataSchema:
220 return tuple(self.buildAction.getInputSchema())
222 def __call__(self, data: KeyedData, **kwargs) -> Vector:
223 dataWithUnit = self.buildAction(data, **kwargs) * u.Unit(self.inUnit)
224 return dataWithUnit.to(self.outUnit).value
227class MagDiff(VectorAction):
228 """Calculate the difference between two magnitudes;
229 each magnitude is derived from a flux column.
231 Notes
232 -----
233 The flux columns need to be in units (specifiable in
234 the fluxUnits1 and 2 config options) that can be converted
235 to janskys. This action doesn't have any calibration
236 information and assumes that the fluxes are already
237 calibrated.
238 """
240 col1 = Field[str](doc="Column to subtract from")
241 fluxUnits1 = Field[str](doc="Units for col1", default="nanojansky")
242 col2 = Field[str](doc="Column to subtract")
243 fluxUnits2 = Field[str](doc="Units for col2", default="nanojansky")
244 returnMillimags = Field[bool](doc="Use millimags or not?", default=True)
246 def getInputSchema(self) -> KeyedDataSchema:
247 return ((self.col1, Vector), (self.col2, Vector))
249 def __call__(self, data: KeyedData, **kwargs) -> Vector:
250 mag1 = fluxToMag(data[self.col1.format(**kwargs)], flux_unit=u.Unit(self.fluxUnits1))
251 mag2 = fluxToMag(data[self.col2.format(**kwargs)], flux_unit=u.Unit(self.fluxUnits2))
252 magDiff = mag1 - mag2
253 if self.returnMillimags:
254 magDiff *= 1000.0
255 return magDiff
258class ExtinctionCorrectedMagDiff(VectorAction):
259 """Compute the difference between two magnitudes and correct for extinction
260 By default bands are derived from the <band>_ prefix on flux columns,
261 per the naming convention in the Object Table:
262 e.g. the band of 'g_psfFlux' is 'g'. If column names follow another
263 convention, bands can alternatively be supplied via the band1 or band2
264 config parameters.
265 If band1 and band2 are supplied, the flux column names are ignored.
266 """
268 magDiff = ConfigurableActionField[VectorAction](
269 doc="Action that returns a difference in magnitudes", default=MagDiff
270 )
271 ebvCol = Field[str](doc="E(B-V) Column Name", default="ebv")
272 band1 = Field[str](
273 doc="Optional band for magDiff.col1. Supersedes column name prefix",
274 optional=True,
275 default=None,
276 )
277 band2 = Field[str](
278 doc="Optional band for magDiff.col2. Supersedes column name prefix",
279 optional=True,
280 default=None,
281 )
282 extinctionCoeffs = DictField[str, float](
283 doc="Dictionary of extinction coefficients for conversion from E(B-V) to extinction, A_band."
284 "Key must be the band",
285 optional=True,
286 default=None,
287 )
289 def getInputSchema(self) -> KeyedDataSchema:
290 return self.magDiff.getInputSchema() + ((self.ebvCol, Vector),)
292 def __call__(self, data: KeyedData, **kwargs) -> Vector:
293 diff = self.magDiff(data, **kwargs)
294 if not self.extinctionCoeffs:
295 _LOG.debug("No extinction Coefficients. Not applying extinction correction")
296 return diff
298 col1Band = self.band1 if self.band1 else self.magDiff.col1.split("_")[0]
299 col2Band = self.band2 if self.band2 else self.magDiff.col2.split("_")[0]
301 # Return plain MagDiff with warning if either coeff not found
302 for band in (col1Band, col2Band):
303 if band not in self.extinctionCoeffs:
304 _LOG.warning(
305 "%s band not found in coefficients dictionary: %s. Not applying extinction correction",
306 band,
307 self.extinctionCoeffs,
308 )
309 return diff
311 av1: float = self.extinctionCoeffs[col1Band]
312 av2: float = self.extinctionCoeffs[col2Band]
314 ebv = data[self.ebvCol]
315 # Ignore type until a more complete Vector protocol
316 correction = np.array((av1 - av2) * ebv) * u.mag # type: ignore
318 if self.magDiff.returnMillimags:
319 correction = correction.to(u.mmag)
321 return np.array(diff - correction.value)
324class RAcosDec(VectorAction):
325 """Construct a vector of RA*cos(Dec) in order to have commensurate values
326 between RA and Dec."""
328 raKey = Field[str](doc="RA coordinate", default="coord_ra")
329 decKey = Field[str](doc="Dec coordinate", default="coord_dec")
331 def getInputSchema(self) -> KeyedDataSchema:
332 return ((self.decKey, Vector), (self.raKey, Vector))
334 def __call__(self, data: KeyedData, **kwargs) -> Vector:
335 ra = np.array(data[self.raKey])
336 dec = np.array(data[self.decKey])
337 return ra * np.cos((dec * u.degree).to(u.radian).value)
340class AngularSeparation(VectorAction):
341 """Calculate the angular separation between two coordinate positions."""
343 raKey_A = Field[str](doc="RA coordinate for position A", default="coord_ra")
344 decKey_A = Field[str](doc="Dec coordinate for position A", default="coord_dec")
345 raKey_B = Field[str](doc="RA coordinate for position B", default="coord_ra")
346 decKey_B = Field[str](doc="Dec coordinate for position B", default="coord_dec")
347 outputUnit = Field[str](doc="Output astropy unit", default="milliarcsecond")
349 def getInputSchema(self) -> KeyedDataSchema:
350 return (
351 (self.decKey_A, Vector),
352 (self.raKey_A, Vector),
353 (self.decKey_B, Vector),
354 (self.raKey_B, Vector),
355 )
357 def __call__(self, data: KeyedData, **kwargs) -> Vector:
358 ra_A = np.array(data[self.raKey_A])
359 dec_A = np.array(data[self.decKey_A])
360 ra_B = np.array(data[self.raKey_B])
361 dec_B = np.array(data[self.decKey_B])
362 coord_A = SkyCoord(ra_A * u.degree, dec_A * u.degree)
363 coord_B = SkyCoord(ra_B * u.degree, dec_B * u.degree)
364 return coord_A.separation(coord_B).to(u.Unit(self.outputUnit)).value
367# Statistical vectorActions
370class PerGroupStatistic(VectorAction):
371 """Compute per-group statistic values and return result as a vector with
372 one element per group. The computed statistic can be any function accepted
373 by pandas DataFrameGroupBy.aggregate passed in as a string function name.
374 """
376 groupKey = Field[str](doc="Column key to use for forming groups", default="isolated_star_id")
377 buildAction = ConfigurableActionField[VectorAction](doc="Action to build vector", default=LoadVector)
378 func = Field[str](doc="Name of function to be applied per group")
380 def getInputSchema(self) -> KeyedDataSchema:
381 return tuple(self.buildAction.getInputSchema()) + ((self.groupKey, Vector),)
383 def __call__(self, data: KeyedData, **kwargs) -> Vector:
384 df = pd.DataFrame({"groupKey": data[self.groupKey], "value": self.buildAction(data, **kwargs)})
385 result = df.groupby("groupKey")["value"].aggregate(self.func)
386 return np.array(result)
389class ResidualWithPerGroupStatistic(VectorAction):
390 """Compute residual between individual elements of group and the per-group
391 statistic."""
393 groupKey = Field[str](doc="Column key to use for forming groups", default="isolated_star_id")
394 buildAction = ConfigurableActionField(doc="Action to build vector", default=LoadVector)
395 func = Field[str](doc="Name of function to be applied per group", default="mean")
397 def getInputSchema(self) -> KeyedDataSchema:
398 return tuple(self.buildAction.getInputSchema()) + ((self.groupKey, Vector),)
400 def __call__(self, data: KeyedData, **kwargs) -> Vector:
401 values = self.buildAction(data, **kwargs)
402 df = pd.DataFrame({"groupKey": data[self.groupKey], "value": values})
403 result = df.groupby("groupKey")["value"].aggregate(self.func)
405 joinedDf = df.join(result, on="groupKey", validate="m:1", lsuffix="_individual", rsuffix="_group")
407 result = joinedDf["value_individual"] - joinedDf["value_group"]
408 return np.array(result)
411class IsMatchedObjectSameClass(VectorAction):
412 """Action to return whether matched objects are the same class."""
414 key_is_ref_galaxy = Field[str](
415 doc="The key of the boolean field selecting reference galaxies",
416 )
417 key_is_ref_star = Field[str](
418 doc="The key of the boolean field selecting reference stars",
419 )
420 key_is_target_galaxy = Field[str](
421 doc="The key of the boolean field selecting observed galaxies",
422 )
423 key_is_target_star = Field[str](
424 doc="The key of the boolean field selecting observed stars",
425 )
427 def __call__(self, data: KeyedData, **kwargs) -> Vector:
428 mask = kwargs.get("mask")
429 is_ref_galaxy = data[self.key_is_ref_galaxy]
430 is_ref_star = data[self.key_is_ref_star]
431 is_target_galaxy = data[self.key_is_target_galaxy]
432 is_target_star = data[self.key_is_target_star]
433 if mask:
434 is_ref_galaxy = is_ref_galaxy[mask]
435 is_ref_star = is_ref_star[mask]
436 is_target_galaxy = is_target_galaxy[mask]
437 is_target_star = is_target_star[mask]
438 return (is_ref_galaxy & is_target_galaxy) | (is_ref_star & is_target_star)
440 def getInputSchema(self) -> KeyedDataSchema:
441 yield self.key_is_ref_galaxy, Vector
442 yield self.key_is_ref_star, Vector
443 yield self.key_is_target_galaxy, Vector
444 yield self.key_is_target_star, Vector