Coverage for python / lsst / analysis / tools / actions / vector / vectorActions.py: 46%

212 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-22 09:09 +0000

1# This file is part of analysis_tools. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21from __future__ import annotations 

22 

23__all__ = ( 

24 "LoadVector", 

25 "DownselectVector", 

26 "MultiCriteriaDownselectVector", 

27 "ConvertFluxToMag", 

28 "ConvertUnits", 

29 "CalcSn", 

30 "ColorDiff", 

31 "ColorError", 

32 "MagDiff", 

33 "ExtinctionCorrectedMagDiff", 

34 "PerGroupStatistic", 

35 "ResidualWithPerGroupStatistic", 

36 "RAcosDec", 

37 "AngularSeparation", 

38 "IsMatchedObjectSameClass", 

39) 

40 

41import logging 

42from typing import Optional, cast 

43 

44import numpy as np 

45import pandas as pd 

46from astropy import units as u 

47from astropy.coordinates import SkyCoord 

48from lsst.pex.config import DictField, Field 

49from lsst.pex.config.configurableActions import ConfigurableActionField, ConfigurableActionStructField 

50 

51from ...interfaces import KeyedData, KeyedDataSchema, Vector, VectorAction 

52from ...math import divide, fluxToMag, log10 

53from .selectors import VectorSelector 

54 

55_LOG = logging.getLogger(__name__) 

56 

57# Basic vectorActions 

58 

59 

60class LoadVector(VectorAction): 

61 """Load and return a Vector from KeyedData.""" 

62 

63 vectorKey = Field[str](doc="Key of vector which should be loaded") 

64 

65 def getInputSchema(self) -> KeyedDataSchema: 

66 return ((self.vectorKey, Vector),) 

67 

68 def __call__(self, data: KeyedData, **kwargs) -> Vector: 

69 return np.array(cast(Vector, data[self.vectorKey.format(**kwargs)])) 

70 

71 

72class DownselectVector(VectorAction): 

73 """Get a vector from KeyedData, apply specified selector, return the 

74 shorter Vector. 

75 """ 

76 

77 vectorKey = Field[str](doc="column key to load from KeyedData") 

78 

79 selector = ConfigurableActionField[VectorAction]( 

80 doc="Action which returns a selection mask", default=VectorSelector 

81 ) 

82 

83 def getInputSchema(self) -> KeyedDataSchema: 

84 yield (self.vectorKey, Vector) 

85 yield from cast(VectorAction, self.selector).getInputSchema() 

86 

87 def __call__(self, data: KeyedData, **kwargs) -> Vector: 

88 # Explicitly comparing to True makes this work even if the selector 

89 # returns an int or other non-boolean typed array 

90 mask = np.array(cast(VectorAction, self.selector)(data, **kwargs) == True, dtype=bool) # noqa: E712 

91 return cast(Vector, data[self.vectorKey.format(**kwargs)])[mask] 

92 

93 

94class MultiCriteriaDownselectVector(VectorAction): 

95 """Get a vector from KeyedData, apply specified set of selectors with AND 

96 logic, and return the shorter Vector. 

97 """ 

98 

99 vectorKey = Field[str](doc="column key to load from KeyedData") 

100 

101 selectors = ConfigurableActionStructField[VectorAction]( 

102 doc="Selectors for selecting rows, will be AND together", 

103 ) 

104 

105 def getInputSchema(self) -> KeyedDataSchema: 

106 yield (self.vectorKey, Vector) 

107 for action in self.selectors: 

108 yield from action.getInputSchema() 

109 

110 def __call__(self, data: KeyedData, **kwargs) -> Vector: 

111 mask: Optional[Vector] = None 

112 for selector in self.selectors: 

113 subMask = selector(data, **kwargs) 

114 if mask is None: 

115 mask = subMask 

116 else: 

117 mask *= subMask # type: ignore 

118 return cast(Vector, data[self.vectorKey.format(**kwargs)])[mask] 

119 

120 

121# Astronomical vectorActions 

122 

123 

124class CalcSn(VectorAction): 

125 """Calculate the signal-to-noise ratio from a single flux vector.""" 

126 

127 fluxType = Field[str](doc="Flux type (vector key) to calculate the S/N.", default="{band}_psfFlux") 

128 uncertaintySuffix = Field[str]( 

129 doc="Suffix to add to fluxType to specify the uncertainty column", default="Err" 

130 ) 

131 

132 def getInputSchema(self) -> KeyedDataSchema: 

133 yield self.fluxType, Vector 

134 yield f"{self.fluxType}{self.uncertaintySuffix}", Vector 

135 

136 def __call__(self, data: KeyedData, **kwargs) -> Vector: 

137 signal = np.array(data[self.fluxType.format(**kwargs)]) 

138 noise = np.array(data[f"{self.fluxType}{self.uncertaintySuffix}".format(**kwargs)]) 

139 return divide(signal, noise) 

140 

141 

142class ColorDiff(VectorAction): 

143 """Calculate the difference between two colors from flux actions.""" 

144 

145 color1_flux1 = ConfigurableActionField[VectorAction](doc="Action providing first color's first flux") 

146 color1_flux2 = ConfigurableActionField[VectorAction](doc="Action providing first color's second flux") 

147 color2_flux1 = ConfigurableActionField[VectorAction](doc="Action providing second color's first flux") 

148 color2_flux2 = ConfigurableActionField[VectorAction](doc="Action providing second color's second flux") 

149 returnMillimags = Field[bool](doc="Whether to return color_diff in millimags (mags if not)", default=True) 

150 

151 def getInputSchema(self) -> KeyedDataSchema: 

152 yield from self.color1_flux1.getInputSchema() 

153 yield from self.color1_flux2.getInputSchema() 

154 yield from self.color2_flux1.getInputSchema() 

155 yield from self.color2_flux2.getInputSchema() 

156 

157 def __call__(self, data: KeyedData, **kwargs) -> Vector: 

158 color_diff = -2.5 * log10( 

159 divide( 

160 self.color1_flux1(data, **kwargs) * self.color2_flux2(data, **kwargs), 

161 self.color1_flux2(data, **kwargs) * self.color2_flux1(data, **kwargs), 

162 ) 

163 ) 

164 

165 if self.returnMillimags: 

166 color_diff *= 1000 

167 

168 return color_diff 

169 

170 

171class ColorError(VectorAction): 

172 """Calculate the error in a color from two different flux error columns.""" 

173 

174 flux_err1 = ConfigurableActionField[VectorAction](doc="Action providing error for first flux") 

175 flux_err2 = ConfigurableActionField[VectorAction](doc="Action providing error for second flux") 

176 returnMillimags = Field[bool](doc="Whether to return color_err in millimags (mags if not)", default=True) 

177 

178 def getInputSchema(self) -> KeyedDataSchema: 

179 yield from self.flux_err1.getInputSchema() 

180 yield from self.flux_err2.getInputSchema() 

181 

182 def __call__(self, data: KeyedData, **kwargs) -> Vector: 

183 flux_err1 = self.flux_err1(data, **kwargs) 

184 flux_err2 = self.flux_err2(data, **kwargs) 

185 color_err = (2.5 / np.log(10)) * np.hypot(flux_err1, flux_err2) 

186 

187 if self.returnMillimags: 

188 color_err *= 1000 

189 

190 return color_err 

191 

192 

193class ConvertFluxToMag(VectorAction): 

194 """Turn nanojanskys into magnitudes.""" 

195 

196 vectorKey = Field[str](doc="Key of flux vector to convert to mags") 

197 fluxUnit = Field[str](doc="Astropy unit of flux vector", default="nJy") 

198 returnMillimags = Field[bool](doc="Use millimags or not?", default=False) 

199 

200 def getInputSchema(self) -> KeyedDataSchema: 

201 return ((self.vectorKey, Vector),) 

202 

203 def __call__(self, data: KeyedData, **kwargs) -> Vector: 

204 return fluxToMag( 

205 cast(Vector, data[self.vectorKey.format(**kwargs)]), 

206 flux_unit=self.fluxUnit, 

207 return_millimags=self.returnMillimags, 

208 ) 

209 

210 

211class ConvertUnits(VectorAction): 

212 """Convert the units of a vector.""" 

213 

214 buildAction = ConfigurableActionField(doc="Action to build vector", default=LoadVector) 

215 inUnit = Field[str](doc="input Astropy unit") 

216 outUnit = Field[str](doc="output Astropy unit") 

217 

218 def getInputSchema(self) -> KeyedDataSchema: 

219 return tuple(self.buildAction.getInputSchema()) 

220 

221 def __call__(self, data: KeyedData, **kwargs) -> Vector: 

222 dataWithUnit = self.buildAction(data, **kwargs) * u.Unit(self.inUnit) 

223 return dataWithUnit.to(self.outUnit).value 

224 

225 

226class MagDiff(VectorAction): 

227 """Calculate the difference between two magnitudes; 

228 each magnitude is derived from a flux column. 

229 

230 Notes 

231 ----- 

232 The flux columns need to be in units (specifiable in 

233 the fluxUnits1 and 2 config options) that can be converted 

234 to janskys. This action doesn't have any calibration 

235 information and assumes that the fluxes are already 

236 calibrated. 

237 """ 

238 

239 col1 = Field[str](doc="Column to subtract from") 

240 fluxUnits1 = Field[str](doc="Units for col1", default="nanojansky") 

241 col2 = Field[str](doc="Column to subtract") 

242 fluxUnits2 = Field[str](doc="Units for col2", default="nanojansky") 

243 returnMillimags = Field[bool](doc="Use millimags or not?", default=True) 

244 

245 def getInputSchema(self) -> KeyedDataSchema: 

246 return ((self.col1, Vector), (self.col2, Vector)) 

247 

248 def __call__(self, data: KeyedData, **kwargs) -> Vector: 

249 mag1 = fluxToMag(data[self.col1.format(**kwargs)], flux_unit=u.Unit(self.fluxUnits1)) 

250 mag2 = fluxToMag(data[self.col2.format(**kwargs)], flux_unit=u.Unit(self.fluxUnits2)) 

251 magDiff = mag1 - mag2 

252 if self.returnMillimags: 

253 magDiff *= 1000.0 

254 return magDiff 

255 

256 

257class ExtinctionCorrectedMagDiff(VectorAction): 

258 """Compute the difference between two magnitudes and correct for extinction 

259 By default bands are derived from the <band>_ prefix on flux columns, 

260 per the naming convention in the Object Table: 

261 e.g. the band of 'g_psfFlux' is 'g'. If column names follow another 

262 convention, bands can alternatively be supplied via the band1 or band2 

263 config parameters. 

264 If band1 and band2 are supplied, the flux column names are ignored. 

265 """ 

266 

267 magDiff = ConfigurableActionField[VectorAction]( 

268 doc="Action that returns a difference in magnitudes", default=MagDiff 

269 ) 

270 ebvCol = Field[str](doc="E(B-V) Column Name", default="ebv") 

271 band1 = Field[str]( 

272 doc="Optional band for magDiff.col1. Supersedes column name prefix", 

273 optional=True, 

274 default=None, 

275 ) 

276 band2 = Field[str]( 

277 doc="Optional band for magDiff.col2. Supersedes column name prefix", 

278 optional=True, 

279 default=None, 

280 ) 

281 extinctionCoeffs = DictField[str, float]( 

282 doc="Dictionary of extinction coefficients for conversion from E(B-V) to extinction, A_band." 

283 "Key must be the band", 

284 optional=True, 

285 default=None, 

286 ) 

287 

288 def getInputSchema(self) -> KeyedDataSchema: 

289 return self.magDiff.getInputSchema() + ((self.ebvCol, Vector),) 

290 

291 def __call__(self, data: KeyedData, **kwargs) -> Vector: 

292 diff = self.magDiff(data, **kwargs) 

293 if not self.extinctionCoeffs: 

294 _LOG.debug("No extinction Coefficients. Not applying extinction correction") 

295 return diff 

296 

297 col1Band = self.band1 if self.band1 else self.magDiff.col1.split("_")[0] 

298 col2Band = self.band2 if self.band2 else self.magDiff.col2.split("_")[0] 

299 

300 # Return plain MagDiff with warning if either coeff not found 

301 for band in (col1Band, col2Band): 

302 if band not in self.extinctionCoeffs: 

303 _LOG.warning( 

304 "%s band not found in coefficients dictionary: %s. Not applying extinction correction", 

305 band, 

306 self.extinctionCoeffs, 

307 ) 

308 return diff 

309 

310 av1: float = self.extinctionCoeffs[col1Band] 

311 av2: float = self.extinctionCoeffs[col2Band] 

312 

313 ebv = data[self.ebvCol] 

314 # Ignore type until a more complete Vector protocol 

315 correction = np.array((av1 - av2) * ebv) * u.mag # type: ignore 

316 

317 if self.magDiff.returnMillimags: 

318 correction = correction.to(u.mmag) 

319 

320 return np.array(diff - correction.value) 

321 

322 

323class RAcosDec(VectorAction): 

324 """Construct a vector of RA*cos(Dec) in order to have commensurate values 

325 between RA and Dec.""" 

326 

327 raKey = Field[str](doc="RA coordinate", default="coord_ra") 

328 decKey = Field[str](doc="Dec coordinate", default="coord_dec") 

329 

330 def getInputSchema(self) -> KeyedDataSchema: 

331 return ((self.decKey, Vector), (self.raKey, Vector)) 

332 

333 def __call__(self, data: KeyedData, **kwargs) -> Vector: 

334 ra = np.array(data[self.raKey]) 

335 dec = np.array(data[self.decKey]) 

336 return ra * np.cos((dec * u.degree).to(u.radian).value) 

337 

338 

339class AngularSeparation(VectorAction): 

340 """Calculate the angular separation between two coordinate positions.""" 

341 

342 raKey_A = Field[str](doc="RA coordinate for position A", default="coord_ra") 

343 decKey_A = Field[str](doc="Dec coordinate for position A", default="coord_dec") 

344 raKey_B = Field[str](doc="RA coordinate for position B", default="coord_ra") 

345 decKey_B = Field[str](doc="Dec coordinate for position B", default="coord_dec") 

346 outputUnit = Field[str](doc="Output astropy unit", default="milliarcsecond") 

347 

348 def getInputSchema(self) -> KeyedDataSchema: 

349 return ( 

350 (self.decKey_A, Vector), 

351 (self.raKey_A, Vector), 

352 (self.decKey_B, Vector), 

353 (self.raKey_B, Vector), 

354 ) 

355 

356 def __call__(self, data: KeyedData, **kwargs) -> Vector: 

357 ra_A = np.array(data[self.raKey_A]) 

358 dec_A = np.array(data[self.decKey_A]) 

359 ra_B = np.array(data[self.raKey_B]) 

360 dec_B = np.array(data[self.decKey_B]) 

361 coord_A = SkyCoord(ra_A * u.degree, dec_A * u.degree) 

362 coord_B = SkyCoord(ra_B * u.degree, dec_B * u.degree) 

363 return coord_A.separation(coord_B).to(u.Unit(self.outputUnit)).value 

364 

365 

366# Statistical vectorActions 

367 

368 

369class PerGroupStatistic(VectorAction): 

370 """Compute per-group statistic values and return result as a vector with 

371 one element per group. The computed statistic can be any function accepted 

372 by pandas DataFrameGroupBy.aggregate passed in as a string function name. 

373 """ 

374 

375 groupKey = Field[str](doc="Column key to use for forming groups", default="isolated_star_id") 

376 buildAction = ConfigurableActionField[VectorAction](doc="Action to build vector", default=LoadVector) 

377 func = Field[str](doc="Name of function to be applied per group") 

378 

379 def getInputSchema(self) -> KeyedDataSchema: 

380 return tuple(self.buildAction.getInputSchema()) + ((self.groupKey, Vector),) 

381 

382 def __call__(self, data: KeyedData, **kwargs) -> Vector: 

383 df = pd.DataFrame({"groupKey": data[self.groupKey], "value": self.buildAction(data, **kwargs)}) 

384 result = df.groupby("groupKey")["value"].aggregate(self.func) 

385 return np.array(result) 

386 

387 

388class ResidualWithPerGroupStatistic(VectorAction): 

389 """Compute residual between individual elements of group and the per-group 

390 statistic.""" 

391 

392 groupKey = Field[str](doc="Column key to use for forming groups", default="isolated_star_id") 

393 buildAction = ConfigurableActionField(doc="Action to build vector", default=LoadVector) 

394 func = Field[str](doc="Name of function to be applied per group", default="mean") 

395 

396 def getInputSchema(self) -> KeyedDataSchema: 

397 return tuple(self.buildAction.getInputSchema()) + ((self.groupKey, Vector),) 

398 

399 def __call__(self, data: KeyedData, **kwargs) -> Vector: 

400 values = self.buildAction(data, **kwargs) 

401 df = pd.DataFrame({"groupKey": data[self.groupKey], "value": values}) 

402 result = df.groupby("groupKey")["value"].aggregate(self.func) 

403 

404 joinedDf = df.join(result, on="groupKey", validate="m:1", lsuffix="_individual", rsuffix="_group") 

405 

406 result = joinedDf["value_individual"] - joinedDf["value_group"] 

407 return np.array(result) 

408 

409 

410class IsMatchedObjectSameClass(VectorAction): 

411 """Action to return whether matched objects are the same class.""" 

412 

413 key_is_ref_galaxy = Field[str]( 

414 doc="The key of the boolean field selecting reference galaxies", 

415 ) 

416 key_is_ref_star = Field[str]( 

417 doc="The key of the boolean field selecting reference stars", 

418 ) 

419 key_is_target_galaxy = Field[str]( 

420 doc="The key of the boolean field selecting observed galaxies", 

421 ) 

422 key_is_target_star = Field[str]( 

423 doc="The key of the boolean field selecting observed stars", 

424 ) 

425 

426 def __call__(self, data: KeyedData, **kwargs) -> Vector: 

427 mask = kwargs.get("mask") 

428 is_ref_galaxy = data[self.key_is_ref_galaxy] 

429 is_ref_star = data[self.key_is_ref_star] 

430 is_target_galaxy = data[self.key_is_target_galaxy] 

431 is_target_star = data[self.key_is_target_star] 

432 if mask: 

433 is_ref_galaxy = is_ref_galaxy[mask] 

434 is_ref_star = is_ref_star[mask] 

435 is_target_galaxy = is_target_galaxy[mask] 

436 is_target_star = is_target_star[mask] 

437 return (is_ref_galaxy & is_target_galaxy) | (is_ref_star & is_target_star) 

438 

439 def getInputSchema(self) -> KeyedDataSchema: 

440 yield self.key_is_ref_galaxy, Vector 

441 yield self.key_is_ref_star, Vector 

442 yield self.key_is_target_galaxy, Vector 

443 yield self.key_is_target_star, Vector