Coverage for python / lsst / analysis / tools / actions / vector / vectorActions.py: 46%

212 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-24 08:53 +0000

1# This file is part of analysis_tools. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21from __future__ import annotations 

22 

23__all__ = ( 

24 "LoadVector", 

25 "DownselectVector", 

26 "MultiCriteriaDownselectVector", 

27 "ConvertFluxToMag", 

28 "ConvertUnits", 

29 "CalcSn", 

30 "ColorDiff", 

31 "ColorError", 

32 "MagDiff", 

33 "ExtinctionCorrectedMagDiff", 

34 "PerGroupStatistic", 

35 "ResidualWithPerGroupStatistic", 

36 "RAcosDec", 

37 "AngularSeparation", 

38 "IsMatchedObjectSameClass", 

39) 

40 

41import logging 

42from typing import cast 

43 

44import numpy as np 

45import pandas as pd 

46from astropy import units as u 

47from astropy.coordinates import SkyCoord 

48 

49from lsst.pex.config import DictField, Field 

50from lsst.pex.config.configurableActions import ConfigurableActionField, ConfigurableActionStructField 

51 

52from ...interfaces import KeyedData, KeyedDataSchema, Vector, VectorAction 

53from ...math import divide, fluxToMag, log10 

54from .selectors import VectorSelector 

55 

56_LOG = logging.getLogger(__name__) 

57 

58# Basic vectorActions 

59 

60 

61class LoadVector(VectorAction): 

62 """Load and return a Vector from KeyedData.""" 

63 

64 vectorKey = Field[str](doc="Key of vector which should be loaded") 

65 

66 def getInputSchema(self) -> KeyedDataSchema: 

67 return ((self.vectorKey, Vector),) 

68 

69 def __call__(self, data: KeyedData, **kwargs) -> Vector: 

70 return np.array(cast(Vector, data[self.vectorKey.format(**kwargs)])) 

71 

72 

73class DownselectVector(VectorAction): 

74 """Get a vector from KeyedData, apply specified selector, return the 

75 shorter Vector. 

76 """ 

77 

78 vectorKey = Field[str](doc="column key to load from KeyedData") 

79 

80 selector = ConfigurableActionField[VectorAction]( 

81 doc="Action which returns a selection mask", default=VectorSelector 

82 ) 

83 

84 def getInputSchema(self) -> KeyedDataSchema: 

85 yield (self.vectorKey, Vector) 

86 yield from cast(VectorAction, self.selector).getInputSchema() 

87 

88 def __call__(self, data: KeyedData, **kwargs) -> Vector: 

89 # Explicitly comparing to True makes this work even if the selector 

90 # returns an int or other non-boolean typed array 

91 mask = np.array(cast(VectorAction, self.selector)(data, **kwargs) == True, dtype=bool) # noqa: E712 

92 return cast(Vector, data[self.vectorKey.format(**kwargs)])[mask] 

93 

94 

95class MultiCriteriaDownselectVector(VectorAction): 

96 """Get a vector from KeyedData, apply specified set of selectors with AND 

97 logic, and return the shorter Vector. 

98 """ 

99 

100 vectorKey = Field[str](doc="column key to load from KeyedData") 

101 

102 selectors = ConfigurableActionStructField[VectorAction]( 

103 doc="Selectors for selecting rows, will be AND together", 

104 ) 

105 

106 def getInputSchema(self) -> KeyedDataSchema: 

107 yield (self.vectorKey, Vector) 

108 for action in self.selectors: 

109 yield from action.getInputSchema() 

110 

111 def __call__(self, data: KeyedData, **kwargs) -> Vector: 

112 mask: Vector | None = None 

113 for selector in self.selectors: 

114 subMask = selector(data, **kwargs) 

115 if mask is None: 

116 mask = subMask 

117 else: 

118 mask *= subMask # type: ignore 

119 return cast(Vector, data[self.vectorKey.format(**kwargs)])[mask] 

120 

121 

122# Astronomical vectorActions 

123 

124 

125class CalcSn(VectorAction): 

126 """Calculate the signal-to-noise ratio from a single flux vector.""" 

127 

128 fluxType = Field[str](doc="Flux type (vector key) to calculate the S/N.", default="{band}_psfFlux") 

129 uncertaintySuffix = Field[str]( 

130 doc="Suffix to add to fluxType to specify the uncertainty column", default="Err" 

131 ) 

132 

133 def getInputSchema(self) -> KeyedDataSchema: 

134 yield self.fluxType, Vector 

135 yield f"{self.fluxType}{self.uncertaintySuffix}", Vector 

136 

137 def __call__(self, data: KeyedData, **kwargs) -> Vector: 

138 signal = np.array(data[self.fluxType.format(**kwargs)]) 

139 noise = np.array(data[f"{self.fluxType}{self.uncertaintySuffix}".format(**kwargs)]) 

140 return divide(signal, noise) 

141 

142 

143class ColorDiff(VectorAction): 

144 """Calculate the difference between two colors from flux actions.""" 

145 

146 color1_flux1 = ConfigurableActionField[VectorAction](doc="Action providing first color's first flux") 

147 color1_flux2 = ConfigurableActionField[VectorAction](doc="Action providing first color's second flux") 

148 color2_flux1 = ConfigurableActionField[VectorAction](doc="Action providing second color's first flux") 

149 color2_flux2 = ConfigurableActionField[VectorAction](doc="Action providing second color's second flux") 

150 returnMillimags = Field[bool](doc="Whether to return color_diff in millimags (mags if not)", default=True) 

151 

152 def getInputSchema(self) -> KeyedDataSchema: 

153 yield from self.color1_flux1.getInputSchema() 

154 yield from self.color1_flux2.getInputSchema() 

155 yield from self.color2_flux1.getInputSchema() 

156 yield from self.color2_flux2.getInputSchema() 

157 

158 def __call__(self, data: KeyedData, **kwargs) -> Vector: 

159 color_diff = -2.5 * log10( 

160 divide( 

161 self.color1_flux1(data, **kwargs) * self.color2_flux2(data, **kwargs), 

162 self.color1_flux2(data, **kwargs) * self.color2_flux1(data, **kwargs), 

163 ) 

164 ) 

165 

166 if self.returnMillimags: 

167 color_diff *= 1000 

168 

169 return color_diff 

170 

171 

172class ColorError(VectorAction): 

173 """Calculate the error in a color from two different flux error columns.""" 

174 

175 flux_err1 = ConfigurableActionField[VectorAction](doc="Action providing error for first flux") 

176 flux_err2 = ConfigurableActionField[VectorAction](doc="Action providing error for second flux") 

177 returnMillimags = Field[bool](doc="Whether to return color_err in millimags (mags if not)", default=True) 

178 

179 def getInputSchema(self) -> KeyedDataSchema: 

180 yield from self.flux_err1.getInputSchema() 

181 yield from self.flux_err2.getInputSchema() 

182 

183 def __call__(self, data: KeyedData, **kwargs) -> Vector: 

184 flux_err1 = self.flux_err1(data, **kwargs) 

185 flux_err2 = self.flux_err2(data, **kwargs) 

186 color_err = (2.5 / np.log(10)) * np.hypot(flux_err1, flux_err2) 

187 

188 if self.returnMillimags: 

189 color_err *= 1000 

190 

191 return color_err 

192 

193 

194class ConvertFluxToMag(VectorAction): 

195 """Turn nanojanskys into magnitudes.""" 

196 

197 vectorKey = Field[str](doc="Key of flux vector to convert to mags") 

198 fluxUnit = Field[str](doc="Astropy unit of flux vector", default="nJy") 

199 returnMillimags = Field[bool](doc="Use millimags or not?", default=False) 

200 

201 def getInputSchema(self) -> KeyedDataSchema: 

202 return ((self.vectorKey, Vector),) 

203 

204 def __call__(self, data: KeyedData, **kwargs) -> Vector: 

205 return fluxToMag( 

206 cast(Vector, data[self.vectorKey.format(**kwargs)]), 

207 flux_unit=self.fluxUnit, 

208 return_millimags=self.returnMillimags, 

209 ) 

210 

211 

212class ConvertUnits(VectorAction): 

213 """Convert the units of a vector.""" 

214 

215 buildAction = ConfigurableActionField(doc="Action to build vector", default=LoadVector) 

216 inUnit = Field[str](doc="input Astropy unit") 

217 outUnit = Field[str](doc="output Astropy unit") 

218 

219 def getInputSchema(self) -> KeyedDataSchema: 

220 return tuple(self.buildAction.getInputSchema()) 

221 

222 def __call__(self, data: KeyedData, **kwargs) -> Vector: 

223 dataWithUnit = self.buildAction(data, **kwargs) * u.Unit(self.inUnit) 

224 return dataWithUnit.to(self.outUnit).value 

225 

226 

227class MagDiff(VectorAction): 

228 """Calculate the difference between two magnitudes; 

229 each magnitude is derived from a flux column. 

230 

231 Notes 

232 ----- 

233 The flux columns need to be in units (specifiable in 

234 the fluxUnits1 and 2 config options) that can be converted 

235 to janskys. This action doesn't have any calibration 

236 information and assumes that the fluxes are already 

237 calibrated. 

238 """ 

239 

240 col1 = Field[str](doc="Column to subtract from") 

241 fluxUnits1 = Field[str](doc="Units for col1", default="nanojansky") 

242 col2 = Field[str](doc="Column to subtract") 

243 fluxUnits2 = Field[str](doc="Units for col2", default="nanojansky") 

244 returnMillimags = Field[bool](doc="Use millimags or not?", default=True) 

245 

246 def getInputSchema(self) -> KeyedDataSchema: 

247 return ((self.col1, Vector), (self.col2, Vector)) 

248 

249 def __call__(self, data: KeyedData, **kwargs) -> Vector: 

250 mag1 = fluxToMag(data[self.col1.format(**kwargs)], flux_unit=u.Unit(self.fluxUnits1)) 

251 mag2 = fluxToMag(data[self.col2.format(**kwargs)], flux_unit=u.Unit(self.fluxUnits2)) 

252 magDiff = mag1 - mag2 

253 if self.returnMillimags: 

254 magDiff *= 1000.0 

255 return magDiff 

256 

257 

258class ExtinctionCorrectedMagDiff(VectorAction): 

259 """Compute the difference between two magnitudes and correct for extinction 

260 By default bands are derived from the <band>_ prefix on flux columns, 

261 per the naming convention in the Object Table: 

262 e.g. the band of 'g_psfFlux' is 'g'. If column names follow another 

263 convention, bands can alternatively be supplied via the band1 or band2 

264 config parameters. 

265 If band1 and band2 are supplied, the flux column names are ignored. 

266 """ 

267 

268 magDiff = ConfigurableActionField[VectorAction]( 

269 doc="Action that returns a difference in magnitudes", default=MagDiff 

270 ) 

271 ebvCol = Field[str](doc="E(B-V) Column Name", default="ebv") 

272 band1 = Field[str]( 

273 doc="Optional band for magDiff.col1. Supersedes column name prefix", 

274 optional=True, 

275 default=None, 

276 ) 

277 band2 = Field[str]( 

278 doc="Optional band for magDiff.col2. Supersedes column name prefix", 

279 optional=True, 

280 default=None, 

281 ) 

282 extinctionCoeffs = DictField[str, float]( 

283 doc="Dictionary of extinction coefficients for conversion from E(B-V) to extinction, A_band." 

284 "Key must be the band", 

285 optional=True, 

286 default=None, 

287 ) 

288 

289 def getInputSchema(self) -> KeyedDataSchema: 

290 return self.magDiff.getInputSchema() + ((self.ebvCol, Vector),) 

291 

292 def __call__(self, data: KeyedData, **kwargs) -> Vector: 

293 diff = self.magDiff(data, **kwargs) 

294 if not self.extinctionCoeffs: 

295 _LOG.debug("No extinction Coefficients. Not applying extinction correction") 

296 return diff 

297 

298 col1Band = self.band1 if self.band1 else self.magDiff.col1.split("_")[0] 

299 col2Band = self.band2 if self.band2 else self.magDiff.col2.split("_")[0] 

300 

301 # Return plain MagDiff with warning if either coeff not found 

302 for band in (col1Band, col2Band): 

303 if band not in self.extinctionCoeffs: 

304 _LOG.warning( 

305 "%s band not found in coefficients dictionary: %s. Not applying extinction correction", 

306 band, 

307 self.extinctionCoeffs, 

308 ) 

309 return diff 

310 

311 av1: float = self.extinctionCoeffs[col1Band] 

312 av2: float = self.extinctionCoeffs[col2Band] 

313 

314 ebv = data[self.ebvCol] 

315 # Ignore type until a more complete Vector protocol 

316 correction = np.array((av1 - av2) * ebv) * u.mag # type: ignore 

317 

318 if self.magDiff.returnMillimags: 

319 correction = correction.to(u.mmag) 

320 

321 return np.array(diff - correction.value) 

322 

323 

324class RAcosDec(VectorAction): 

325 """Construct a vector of RA*cos(Dec) in order to have commensurate values 

326 between RA and Dec.""" 

327 

328 raKey = Field[str](doc="RA coordinate", default="coord_ra") 

329 decKey = Field[str](doc="Dec coordinate", default="coord_dec") 

330 

331 def getInputSchema(self) -> KeyedDataSchema: 

332 return ((self.decKey, Vector), (self.raKey, Vector)) 

333 

334 def __call__(self, data: KeyedData, **kwargs) -> Vector: 

335 ra = np.array(data[self.raKey]) 

336 dec = np.array(data[self.decKey]) 

337 return ra * np.cos((dec * u.degree).to(u.radian).value) 

338 

339 

340class AngularSeparation(VectorAction): 

341 """Calculate the angular separation between two coordinate positions.""" 

342 

343 raKey_A = Field[str](doc="RA coordinate for position A", default="coord_ra") 

344 decKey_A = Field[str](doc="Dec coordinate for position A", default="coord_dec") 

345 raKey_B = Field[str](doc="RA coordinate for position B", default="coord_ra") 

346 decKey_B = Field[str](doc="Dec coordinate for position B", default="coord_dec") 

347 outputUnit = Field[str](doc="Output astropy unit", default="milliarcsecond") 

348 

349 def getInputSchema(self) -> KeyedDataSchema: 

350 return ( 

351 (self.decKey_A, Vector), 

352 (self.raKey_A, Vector), 

353 (self.decKey_B, Vector), 

354 (self.raKey_B, Vector), 

355 ) 

356 

357 def __call__(self, data: KeyedData, **kwargs) -> Vector: 

358 ra_A = np.array(data[self.raKey_A]) 

359 dec_A = np.array(data[self.decKey_A]) 

360 ra_B = np.array(data[self.raKey_B]) 

361 dec_B = np.array(data[self.decKey_B]) 

362 coord_A = SkyCoord(ra_A * u.degree, dec_A * u.degree) 

363 coord_B = SkyCoord(ra_B * u.degree, dec_B * u.degree) 

364 return coord_A.separation(coord_B).to(u.Unit(self.outputUnit)).value 

365 

366 

367# Statistical vectorActions 

368 

369 

370class PerGroupStatistic(VectorAction): 

371 """Compute per-group statistic values and return result as a vector with 

372 one element per group. The computed statistic can be any function accepted 

373 by pandas DataFrameGroupBy.aggregate passed in as a string function name. 

374 """ 

375 

376 groupKey = Field[str](doc="Column key to use for forming groups", default="isolated_star_id") 

377 buildAction = ConfigurableActionField[VectorAction](doc="Action to build vector", default=LoadVector) 

378 func = Field[str](doc="Name of function to be applied per group") 

379 

380 def getInputSchema(self) -> KeyedDataSchema: 

381 return tuple(self.buildAction.getInputSchema()) + ((self.groupKey, Vector),) 

382 

383 def __call__(self, data: KeyedData, **kwargs) -> Vector: 

384 df = pd.DataFrame({"groupKey": data[self.groupKey], "value": self.buildAction(data, **kwargs)}) 

385 result = df.groupby("groupKey")["value"].aggregate(self.func) 

386 return np.array(result) 

387 

388 

389class ResidualWithPerGroupStatistic(VectorAction): 

390 """Compute residual between individual elements of group and the per-group 

391 statistic.""" 

392 

393 groupKey = Field[str](doc="Column key to use for forming groups", default="isolated_star_id") 

394 buildAction = ConfigurableActionField(doc="Action to build vector", default=LoadVector) 

395 func = Field[str](doc="Name of function to be applied per group", default="mean") 

396 

397 def getInputSchema(self) -> KeyedDataSchema: 

398 return tuple(self.buildAction.getInputSchema()) + ((self.groupKey, Vector),) 

399 

400 def __call__(self, data: KeyedData, **kwargs) -> Vector: 

401 values = self.buildAction(data, **kwargs) 

402 df = pd.DataFrame({"groupKey": data[self.groupKey], "value": values}) 

403 result = df.groupby("groupKey")["value"].aggregate(self.func) 

404 

405 joinedDf = df.join(result, on="groupKey", validate="m:1", lsuffix="_individual", rsuffix="_group") 

406 

407 result = joinedDf["value_individual"] - joinedDf["value_group"] 

408 return np.array(result) 

409 

410 

411class IsMatchedObjectSameClass(VectorAction): 

412 """Action to return whether matched objects are the same class.""" 

413 

414 key_is_ref_galaxy = Field[str]( 

415 doc="The key of the boolean field selecting reference galaxies", 

416 ) 

417 key_is_ref_star = Field[str]( 

418 doc="The key of the boolean field selecting reference stars", 

419 ) 

420 key_is_target_galaxy = Field[str]( 

421 doc="The key of the boolean field selecting observed galaxies", 

422 ) 

423 key_is_target_star = Field[str]( 

424 doc="The key of the boolean field selecting observed stars", 

425 ) 

426 

427 def __call__(self, data: KeyedData, **kwargs) -> Vector: 

428 mask = kwargs.get("mask") 

429 is_ref_galaxy = data[self.key_is_ref_galaxy] 

430 is_ref_star = data[self.key_is_ref_star] 

431 is_target_galaxy = data[self.key_is_target_galaxy] 

432 is_target_star = data[self.key_is_target_star] 

433 if mask: 

434 is_ref_galaxy = is_ref_galaxy[mask] 

435 is_ref_star = is_ref_star[mask] 

436 is_target_galaxy = is_target_galaxy[mask] 

437 is_target_star = is_target_star[mask] 

438 return (is_ref_galaxy & is_target_galaxy) | (is_ref_star & is_target_star) 

439 

440 def getInputSchema(self) -> KeyedDataSchema: 

441 yield self.key_is_ref_galaxy, Vector 

442 yield self.key_is_ref_star, Vector 

443 yield self.key_is_target_galaxy, Vector 

444 yield self.key_is_target_star, Vector