Coverage for python / lsst / analysis / tools / actions / vector / vectorActions.py: 46%
212 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-22 09:09 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-22 09:09 +0000
1# This file is part of analysis_tools.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
21from __future__ import annotations
23__all__ = (
24 "LoadVector",
25 "DownselectVector",
26 "MultiCriteriaDownselectVector",
27 "ConvertFluxToMag",
28 "ConvertUnits",
29 "CalcSn",
30 "ColorDiff",
31 "ColorError",
32 "MagDiff",
33 "ExtinctionCorrectedMagDiff",
34 "PerGroupStatistic",
35 "ResidualWithPerGroupStatistic",
36 "RAcosDec",
37 "AngularSeparation",
38 "IsMatchedObjectSameClass",
39)
41import logging
42from typing import Optional, cast
44import numpy as np
45import pandas as pd
46from astropy import units as u
47from astropy.coordinates import SkyCoord
48from lsst.pex.config import DictField, Field
49from lsst.pex.config.configurableActions import ConfigurableActionField, ConfigurableActionStructField
51from ...interfaces import KeyedData, KeyedDataSchema, Vector, VectorAction
52from ...math import divide, fluxToMag, log10
53from .selectors import VectorSelector
55_LOG = logging.getLogger(__name__)
57# Basic vectorActions
60class LoadVector(VectorAction):
61 """Load and return a Vector from KeyedData."""
63 vectorKey = Field[str](doc="Key of vector which should be loaded")
65 def getInputSchema(self) -> KeyedDataSchema:
66 return ((self.vectorKey, Vector),)
68 def __call__(self, data: KeyedData, **kwargs) -> Vector:
69 return np.array(cast(Vector, data[self.vectorKey.format(**kwargs)]))
72class DownselectVector(VectorAction):
73 """Get a vector from KeyedData, apply specified selector, return the
74 shorter Vector.
75 """
77 vectorKey = Field[str](doc="column key to load from KeyedData")
79 selector = ConfigurableActionField[VectorAction](
80 doc="Action which returns a selection mask", default=VectorSelector
81 )
83 def getInputSchema(self) -> KeyedDataSchema:
84 yield (self.vectorKey, Vector)
85 yield from cast(VectorAction, self.selector).getInputSchema()
87 def __call__(self, data: KeyedData, **kwargs) -> Vector:
88 # Explicitly comparing to True makes this work even if the selector
89 # returns an int or other non-boolean typed array
90 mask = np.array(cast(VectorAction, self.selector)(data, **kwargs) == True, dtype=bool) # noqa: E712
91 return cast(Vector, data[self.vectorKey.format(**kwargs)])[mask]
94class MultiCriteriaDownselectVector(VectorAction):
95 """Get a vector from KeyedData, apply specified set of selectors with AND
96 logic, and return the shorter Vector.
97 """
99 vectorKey = Field[str](doc="column key to load from KeyedData")
101 selectors = ConfigurableActionStructField[VectorAction](
102 doc="Selectors for selecting rows, will be AND together",
103 )
105 def getInputSchema(self) -> KeyedDataSchema:
106 yield (self.vectorKey, Vector)
107 for action in self.selectors:
108 yield from action.getInputSchema()
110 def __call__(self, data: KeyedData, **kwargs) -> Vector:
111 mask: Optional[Vector] = None
112 for selector in self.selectors:
113 subMask = selector(data, **kwargs)
114 if mask is None:
115 mask = subMask
116 else:
117 mask *= subMask # type: ignore
118 return cast(Vector, data[self.vectorKey.format(**kwargs)])[mask]
121# Astronomical vectorActions
124class CalcSn(VectorAction):
125 """Calculate the signal-to-noise ratio from a single flux vector."""
127 fluxType = Field[str](doc="Flux type (vector key) to calculate the S/N.", default="{band}_psfFlux")
128 uncertaintySuffix = Field[str](
129 doc="Suffix to add to fluxType to specify the uncertainty column", default="Err"
130 )
132 def getInputSchema(self) -> KeyedDataSchema:
133 yield self.fluxType, Vector
134 yield f"{self.fluxType}{self.uncertaintySuffix}", Vector
136 def __call__(self, data: KeyedData, **kwargs) -> Vector:
137 signal = np.array(data[self.fluxType.format(**kwargs)])
138 noise = np.array(data[f"{self.fluxType}{self.uncertaintySuffix}".format(**kwargs)])
139 return divide(signal, noise)
142class ColorDiff(VectorAction):
143 """Calculate the difference between two colors from flux actions."""
145 color1_flux1 = ConfigurableActionField[VectorAction](doc="Action providing first color's first flux")
146 color1_flux2 = ConfigurableActionField[VectorAction](doc="Action providing first color's second flux")
147 color2_flux1 = ConfigurableActionField[VectorAction](doc="Action providing second color's first flux")
148 color2_flux2 = ConfigurableActionField[VectorAction](doc="Action providing second color's second flux")
149 returnMillimags = Field[bool](doc="Whether to return color_diff in millimags (mags if not)", default=True)
151 def getInputSchema(self) -> KeyedDataSchema:
152 yield from self.color1_flux1.getInputSchema()
153 yield from self.color1_flux2.getInputSchema()
154 yield from self.color2_flux1.getInputSchema()
155 yield from self.color2_flux2.getInputSchema()
157 def __call__(self, data: KeyedData, **kwargs) -> Vector:
158 color_diff = -2.5 * log10(
159 divide(
160 self.color1_flux1(data, **kwargs) * self.color2_flux2(data, **kwargs),
161 self.color1_flux2(data, **kwargs) * self.color2_flux1(data, **kwargs),
162 )
163 )
165 if self.returnMillimags:
166 color_diff *= 1000
168 return color_diff
171class ColorError(VectorAction):
172 """Calculate the error in a color from two different flux error columns."""
174 flux_err1 = ConfigurableActionField[VectorAction](doc="Action providing error for first flux")
175 flux_err2 = ConfigurableActionField[VectorAction](doc="Action providing error for second flux")
176 returnMillimags = Field[bool](doc="Whether to return color_err in millimags (mags if not)", default=True)
178 def getInputSchema(self) -> KeyedDataSchema:
179 yield from self.flux_err1.getInputSchema()
180 yield from self.flux_err2.getInputSchema()
182 def __call__(self, data: KeyedData, **kwargs) -> Vector:
183 flux_err1 = self.flux_err1(data, **kwargs)
184 flux_err2 = self.flux_err2(data, **kwargs)
185 color_err = (2.5 / np.log(10)) * np.hypot(flux_err1, flux_err2)
187 if self.returnMillimags:
188 color_err *= 1000
190 return color_err
193class ConvertFluxToMag(VectorAction):
194 """Turn nanojanskys into magnitudes."""
196 vectorKey = Field[str](doc="Key of flux vector to convert to mags")
197 fluxUnit = Field[str](doc="Astropy unit of flux vector", default="nJy")
198 returnMillimags = Field[bool](doc="Use millimags or not?", default=False)
200 def getInputSchema(self) -> KeyedDataSchema:
201 return ((self.vectorKey, Vector),)
203 def __call__(self, data: KeyedData, **kwargs) -> Vector:
204 return fluxToMag(
205 cast(Vector, data[self.vectorKey.format(**kwargs)]),
206 flux_unit=self.fluxUnit,
207 return_millimags=self.returnMillimags,
208 )
211class ConvertUnits(VectorAction):
212 """Convert the units of a vector."""
214 buildAction = ConfigurableActionField(doc="Action to build vector", default=LoadVector)
215 inUnit = Field[str](doc="input Astropy unit")
216 outUnit = Field[str](doc="output Astropy unit")
218 def getInputSchema(self) -> KeyedDataSchema:
219 return tuple(self.buildAction.getInputSchema())
221 def __call__(self, data: KeyedData, **kwargs) -> Vector:
222 dataWithUnit = self.buildAction(data, **kwargs) * u.Unit(self.inUnit)
223 return dataWithUnit.to(self.outUnit).value
226class MagDiff(VectorAction):
227 """Calculate the difference between two magnitudes;
228 each magnitude is derived from a flux column.
230 Notes
231 -----
232 The flux columns need to be in units (specifiable in
233 the fluxUnits1 and 2 config options) that can be converted
234 to janskys. This action doesn't have any calibration
235 information and assumes that the fluxes are already
236 calibrated.
237 """
239 col1 = Field[str](doc="Column to subtract from")
240 fluxUnits1 = Field[str](doc="Units for col1", default="nanojansky")
241 col2 = Field[str](doc="Column to subtract")
242 fluxUnits2 = Field[str](doc="Units for col2", default="nanojansky")
243 returnMillimags = Field[bool](doc="Use millimags or not?", default=True)
245 def getInputSchema(self) -> KeyedDataSchema:
246 return ((self.col1, Vector), (self.col2, Vector))
248 def __call__(self, data: KeyedData, **kwargs) -> Vector:
249 mag1 = fluxToMag(data[self.col1.format(**kwargs)], flux_unit=u.Unit(self.fluxUnits1))
250 mag2 = fluxToMag(data[self.col2.format(**kwargs)], flux_unit=u.Unit(self.fluxUnits2))
251 magDiff = mag1 - mag2
252 if self.returnMillimags:
253 magDiff *= 1000.0
254 return magDiff
257class ExtinctionCorrectedMagDiff(VectorAction):
258 """Compute the difference between two magnitudes and correct for extinction
259 By default bands are derived from the <band>_ prefix on flux columns,
260 per the naming convention in the Object Table:
261 e.g. the band of 'g_psfFlux' is 'g'. If column names follow another
262 convention, bands can alternatively be supplied via the band1 or band2
263 config parameters.
264 If band1 and band2 are supplied, the flux column names are ignored.
265 """
267 magDiff = ConfigurableActionField[VectorAction](
268 doc="Action that returns a difference in magnitudes", default=MagDiff
269 )
270 ebvCol = Field[str](doc="E(B-V) Column Name", default="ebv")
271 band1 = Field[str](
272 doc="Optional band for magDiff.col1. Supersedes column name prefix",
273 optional=True,
274 default=None,
275 )
276 band2 = Field[str](
277 doc="Optional band for magDiff.col2. Supersedes column name prefix",
278 optional=True,
279 default=None,
280 )
281 extinctionCoeffs = DictField[str, float](
282 doc="Dictionary of extinction coefficients for conversion from E(B-V) to extinction, A_band."
283 "Key must be the band",
284 optional=True,
285 default=None,
286 )
288 def getInputSchema(self) -> KeyedDataSchema:
289 return self.magDiff.getInputSchema() + ((self.ebvCol, Vector),)
291 def __call__(self, data: KeyedData, **kwargs) -> Vector:
292 diff = self.magDiff(data, **kwargs)
293 if not self.extinctionCoeffs:
294 _LOG.debug("No extinction Coefficients. Not applying extinction correction")
295 return diff
297 col1Band = self.band1 if self.band1 else self.magDiff.col1.split("_")[0]
298 col2Band = self.band2 if self.band2 else self.magDiff.col2.split("_")[0]
300 # Return plain MagDiff with warning if either coeff not found
301 for band in (col1Band, col2Band):
302 if band not in self.extinctionCoeffs:
303 _LOG.warning(
304 "%s band not found in coefficients dictionary: %s. Not applying extinction correction",
305 band,
306 self.extinctionCoeffs,
307 )
308 return diff
310 av1: float = self.extinctionCoeffs[col1Band]
311 av2: float = self.extinctionCoeffs[col2Band]
313 ebv = data[self.ebvCol]
314 # Ignore type until a more complete Vector protocol
315 correction = np.array((av1 - av2) * ebv) * u.mag # type: ignore
317 if self.magDiff.returnMillimags:
318 correction = correction.to(u.mmag)
320 return np.array(diff - correction.value)
323class RAcosDec(VectorAction):
324 """Construct a vector of RA*cos(Dec) in order to have commensurate values
325 between RA and Dec."""
327 raKey = Field[str](doc="RA coordinate", default="coord_ra")
328 decKey = Field[str](doc="Dec coordinate", default="coord_dec")
330 def getInputSchema(self) -> KeyedDataSchema:
331 return ((self.decKey, Vector), (self.raKey, Vector))
333 def __call__(self, data: KeyedData, **kwargs) -> Vector:
334 ra = np.array(data[self.raKey])
335 dec = np.array(data[self.decKey])
336 return ra * np.cos((dec * u.degree).to(u.radian).value)
339class AngularSeparation(VectorAction):
340 """Calculate the angular separation between two coordinate positions."""
342 raKey_A = Field[str](doc="RA coordinate for position A", default="coord_ra")
343 decKey_A = Field[str](doc="Dec coordinate for position A", default="coord_dec")
344 raKey_B = Field[str](doc="RA coordinate for position B", default="coord_ra")
345 decKey_B = Field[str](doc="Dec coordinate for position B", default="coord_dec")
346 outputUnit = Field[str](doc="Output astropy unit", default="milliarcsecond")
348 def getInputSchema(self) -> KeyedDataSchema:
349 return (
350 (self.decKey_A, Vector),
351 (self.raKey_A, Vector),
352 (self.decKey_B, Vector),
353 (self.raKey_B, Vector),
354 )
356 def __call__(self, data: KeyedData, **kwargs) -> Vector:
357 ra_A = np.array(data[self.raKey_A])
358 dec_A = np.array(data[self.decKey_A])
359 ra_B = np.array(data[self.raKey_B])
360 dec_B = np.array(data[self.decKey_B])
361 coord_A = SkyCoord(ra_A * u.degree, dec_A * u.degree)
362 coord_B = SkyCoord(ra_B * u.degree, dec_B * u.degree)
363 return coord_A.separation(coord_B).to(u.Unit(self.outputUnit)).value
366# Statistical vectorActions
369class PerGroupStatistic(VectorAction):
370 """Compute per-group statistic values and return result as a vector with
371 one element per group. The computed statistic can be any function accepted
372 by pandas DataFrameGroupBy.aggregate passed in as a string function name.
373 """
375 groupKey = Field[str](doc="Column key to use for forming groups", default="isolated_star_id")
376 buildAction = ConfigurableActionField[VectorAction](doc="Action to build vector", default=LoadVector)
377 func = Field[str](doc="Name of function to be applied per group")
379 def getInputSchema(self) -> KeyedDataSchema:
380 return tuple(self.buildAction.getInputSchema()) + ((self.groupKey, Vector),)
382 def __call__(self, data: KeyedData, **kwargs) -> Vector:
383 df = pd.DataFrame({"groupKey": data[self.groupKey], "value": self.buildAction(data, **kwargs)})
384 result = df.groupby("groupKey")["value"].aggregate(self.func)
385 return np.array(result)
388class ResidualWithPerGroupStatistic(VectorAction):
389 """Compute residual between individual elements of group and the per-group
390 statistic."""
392 groupKey = Field[str](doc="Column key to use for forming groups", default="isolated_star_id")
393 buildAction = ConfigurableActionField(doc="Action to build vector", default=LoadVector)
394 func = Field[str](doc="Name of function to be applied per group", default="mean")
396 def getInputSchema(self) -> KeyedDataSchema:
397 return tuple(self.buildAction.getInputSchema()) + ((self.groupKey, Vector),)
399 def __call__(self, data: KeyedData, **kwargs) -> Vector:
400 values = self.buildAction(data, **kwargs)
401 df = pd.DataFrame({"groupKey": data[self.groupKey], "value": values})
402 result = df.groupby("groupKey")["value"].aggregate(self.func)
404 joinedDf = df.join(result, on="groupKey", validate="m:1", lsuffix="_individual", rsuffix="_group")
406 result = joinedDf["value_individual"] - joinedDf["value_group"]
407 return np.array(result)
410class IsMatchedObjectSameClass(VectorAction):
411 """Action to return whether matched objects are the same class."""
413 key_is_ref_galaxy = Field[str](
414 doc="The key of the boolean field selecting reference galaxies",
415 )
416 key_is_ref_star = Field[str](
417 doc="The key of the boolean field selecting reference stars",
418 )
419 key_is_target_galaxy = Field[str](
420 doc="The key of the boolean field selecting observed galaxies",
421 )
422 key_is_target_star = Field[str](
423 doc="The key of the boolean field selecting observed stars",
424 )
426 def __call__(self, data: KeyedData, **kwargs) -> Vector:
427 mask = kwargs.get("mask")
428 is_ref_galaxy = data[self.key_is_ref_galaxy]
429 is_ref_star = data[self.key_is_ref_star]
430 is_target_galaxy = data[self.key_is_target_galaxy]
431 is_target_star = data[self.key_is_target_star]
432 if mask:
433 is_ref_galaxy = is_ref_galaxy[mask]
434 is_ref_star = is_ref_star[mask]
435 is_target_galaxy = is_target_galaxy[mask]
436 is_target_star = is_target_star[mask]
437 return (is_ref_galaxy & is_target_galaxy) | (is_ref_star & is_target_star)
439 def getInputSchema(self) -> KeyedDataSchema:
440 yield self.key_is_ref_galaxy, Vector
441 yield self.key_is_ref_star, Vector
442 yield self.key_is_target_galaxy, Vector
443 yield self.key_is_target_star, Vector