Coverage for python / lsst / summit / utils / guiders / tracking.py: 11%

165 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-18 09:28 +0000

1# This file is part of summit_utils. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21from __future__ import annotations 

22 

23__all__ = ["GuiderStarTracker"] 

24 

25import logging 

26from typing import TYPE_CHECKING 

27 

28import numpy as np 

29import pandas as pd 

30 

31from .transformation import convertRoiToCcd, convertToAltaz, convertToFocalPlane 

32 

33if TYPE_CHECKING: 

34 from .reading import GuiderData 

35 

36from .detection import GuiderStarTrackerConfig, buildReferenceCatalog, makeBlankCatalog, trackStarAcrossStamp 

37 

38 

39class GuiderStarTracker: 

40 """ 

41 Class to track stars in the Guider data. 

42 

43 Parameters 

44 ---------- 

45 guiderData : `GuiderData` 

46 GuiderData instance containing guider data and metadata. 

47 config : `GuiderStarTrackerConfig`, optional 

48 Config object with setup and quality control parameters. If None, a new 

49 default GuiderStarTrackerConfig is created per GuiderStarTracker 

50 instance. 

51 """ 

52 

53 def __init__( 

54 self, 

55 guiderData: GuiderData, 

56 config: GuiderStarTrackerConfig | None = None, 

57 ) -> None: 

58 """ 

59 Initialize the GuiderStarTracker with guider data and configuration. 

60 

61 Parameters 

62 ---------- 

63 guiderData : `GuiderData` 

64 GuiderData instance containing guider data and metadata. 

65 config : `GuiderStarTrackerConfig`, optional 

66 Config object with setup and quality control parameters. 

67 """ 

68 self.log = logging.getLogger(__name__) 

69 self.guiderData = guiderData 

70 self.nStamps = len(guiderData) 

71 self.expid = guiderData.expid 

72 self.shape = guiderData[guiderData.detNameMax, 0].shape 

73 

74 # detection and QC parameters from config 

75 if config is None: 

76 config = GuiderStarTrackerConfig() 

77 self.config = config 

78 

79 # initialize outputs 

80 self.blankStars = makeBlankCatalog() 

81 self.columns = list(self.blankStars.columns) 

82 

83 def trackGuiderStars(self, refCatalog: None | pd.DataFrame = None) -> pd.DataFrame: 

84 """ 

85 Track stars across guider exposures using a reference catalog. 

86 

87 Parameters 

88 ---------- 

89 refCatalog : `pd.DataFrame`, optional 

90 Reference catalog with known star positions per detector. 

91 

92 Returns 

93 ------- 

94 stars : `pd.DataFrame` 

95 DataFrame with tracked stars and their properties, 

96 including positions, fluxes, and residual offsets. 

97 """ 

98 if refCatalog is None: 

99 self.log.info("Using self-generated refcat") 

100 _refCatalog = buildReferenceCatalog(self.guiderData, self.log, self.config) 

101 refCatalog = applyQualityCuts(_refCatalog, self.shape, self.config) 

102 

103 if refCatalog.empty: 

104 self.log.warning(f"Reference catalog is empty for {self.expid}. No stars to track.") 

105 return self.blankStars 

106 

107 trackedStarTables = [] 

108 for guiderName in self.guiderData.guiderNames: 

109 refDet = refCatalog.query(f"detector == '{guiderName}'") 

110 if refDet.empty: 

111 self.log.warning(f"No stars found in reference catalog for {guiderName} in {self.expid}.") 

112 continue 

113 

114 table = self._trackStarForOneGuider(refDet, guiderName) 

115 if not table.empty: 

116 trackedStarTables.append(table) 

117 

118 if not trackedStarTables: 

119 self.log.warning(f"No stars tracked for {self.expid} for any guider.") 

120 return self.blankStars 

121 

122 # build the final catalog 

123 trackedStarCatalog = pd.concat(trackedStarTables, ignore_index=True) 

124 

125 # Set unique IDs for all tracked stars 

126 trackedStarCatalog = setUniqueId(self.guiderData, trackedStarCatalog) 

127 

128 # Make the final DataFrame with selected columns 

129 return trackedStarCatalog[self.columns] 

130 

131 def _trackStarForOneGuider(self, refCatalog: pd.DataFrame, guiderName: str) -> pd.DataFrame: 

132 """ 

133 Track stars for a single guider using the reference catalog. 

134 

135 Iteratively tries to track stars starting with the highest SNR, 

136 and falls back to the next best candidates if quality cuts fail. 

137 

138 Parameters 

139 ---------- 

140 refCatalog : `pd.DataFrame` 

141 Reference catalog with known star positions per detector. 

142 guiderName : `str` 

143 Name of the guider to process. 

144 

145 Returns 

146 ------- 

147 stars : `pd.DataFrame` 

148 DataFrame with tracked stars for the specified guider. 

149 """ 

150 gd = self.guiderData 

151 shape = self.shape 

152 cfg = self.config 

153 

154 # Sort by SNR (brightest first) for iterative fallback 

155 refCatalogSorted = refCatalog.sort_values(by="snr", ascending=False).reset_index(drop=True) 

156 

157 stars = None 

158 for idx, row in refCatalogSorted.iterrows(): 

159 refCenter = (row["xroi"], row["yroi"]) 

160 starSnr = row["snr"] 

161 

162 self.log.debug(f"Attempting to track star {idx + 1} (SNR={starSnr:.2f}) for {guiderName}") 

163 

164 # Measure the stars across all stamps for this guider 

165 starStamps = trackStarAcrossStamp(refCenter, gd, guiderName, cfg) 

166 

167 if starStamps.empty: 

168 self.log.debug(f"Star {idx + 1} produced no detections") 

169 continue 

170 

171 # Apply quality cuts to the tracked stars 

172 stars = applyQualityCuts(starStamps, shape, cfg) 

173 

174 # Filter by minimum number of detections 

175 minStampDetections = int(len(gd) * cfg.minValidStampFraction) 

176 mask = stars["stamp"].groupby(stars["detector"]).transform("count") >= minStampDetections 

177 stars = stars[mask].copy() 

178 

179 if not stars.empty: 

180 self.log.debug( 

181 f"Successfully tracked star {idx + 1} (SNR={starSnr:.2f}) " 

182 f"for {guiderName} with {len(stars)} detections" 

183 ) 

184 break 

185 else: 

186 self.log.debug(f"Star {idx + 1} (SNR={starSnr:.2f}) failed quality cuts for {guiderName}") 

187 stars = None 

188 

189 if stars is None or stars.empty: 

190 self.log.warning( 

191 f"No stars passed quality cuts for {guiderName} in {self.expid}. " 

192 f"Tried {len(refCatalogSorted)} candidates." 

193 ) 

194 return self.blankStars 

195 

196 # convert to CCD, focal plane and Alt/Az coordinates 

197 stars = convertToCcdFocalPlaneAltAz(stars, gd, guiderName) 

198 

199 # Compute the rotator angle: theta = np.arctan2(yfp, xfp) 

200 stars = computeRotatorAngle(stars) 

201 

202 # Convert e1, e2 to Alt/Az coordinates 

203 stars = convertEllipticity(stars, gd.camRotAngle) 

204 

205 # Add timestamp and elapsed time 

206 stars = addTimeStamp(stars, gd, guiderName) 

207 

208 # Compute Offsets 

209 stars = computeOffsets(stars) 

210 

211 return stars 

212 

213 

214def addTimeStamp(stars: pd.DataFrame, guiderData: GuiderData, guiderName: str) -> pd.DataFrame: 

215 """ 

216 Add timestamp and elapsed time to the star DataFrame. 

217 

218 Parameters 

219 ---------- 

220 stars : `pd.DataFrame` 

221 DataFrame with star measurements. 

222 guiderData : `GuiderData` 

223 GuiderData instance containing guider data and metadata. 

224 guiderName : `str` 

225 Name of the guider. 

226 

227 Returns 

228 ------- 

229 stars : `pd.DataFrame` 

230 DataFrame with added 'timestamp' and 'elapsed_time' columns. 

231 """ 

232 gd = guiderData 

233 # the stamp are aligned with the index of the timestamps 

234 stampIndex = stars["stamp"].to_numpy() 

235 indices = np.array([ix for ix in stampIndex], dtype=int) 

236 

237 # get the timestamp for each stamp 

238 timeStamp = gd.timestampMap[guiderName][indices] 

239 

240 # inital time is the time of the first stamp of the guider with most stamps 

241 t0 = gd.timestampMap[gd.detNameMax][0] 

242 

243 stars["timestamp"] = timeStamp 

244 stars["elapsed_time"] = np.where(timeStamp.mask, np.nan, (timeStamp - t0).sec) 

245 return stars 

246 

247 

248def convertToCcdFocalPlaneAltAz(stars: pd.DataFrame, guiderData: GuiderData, guiderName: str) -> pd.DataFrame: 

249 """ 

250 Convert star positions to CCD, focal plane, and Alt/Az coordinates. 

251 

252 Parameters 

253 ---------- 

254 stars : `pd.DataFrame` 

255 DataFrame with star measurements in ROI coordinates. 

256 guiderData : `GuiderData` 

257 GuiderData instance containing guider data and metadata. 

258 guiderName : `str` 

259 Name of the guider. 

260 

261 Returns 

262 ------- 

263 stars : `pd.DataFrame` 

264 DataFrame with added columns for CCD, focal plane, 

265 and Alt/Az coordinates. 

266 """ 

267 gd = guiderData 

268 obsTime = gd.obsTime 

269 detNum = gd.getGuiderDetNum(guiderName) 

270 wcs = gd.getWcs(guiderName) 

271 pixelScale = wcs.getPixelScale().asArcseconds() 

272 

273 # Convert to CCD coordinates 

274 stars["xccd"], stars["yccd"] = convertRoiToCcd( 

275 stars["xroi"].to_numpy(), stars["yroi"].to_numpy(), gd, guiderName 

276 ) 

277 

278 # Convert to focal plane coordinates 

279 stars["xfp"], stars["yfp"] = convertToFocalPlane( 

280 stars["xccd"].to_numpy(), stars["yccd"].to_numpy(), detNum 

281 ) 

282 

283 # Convert to Alt/Az coordinates 

284 stars["alt"], stars["az"] = convertToAltaz( 

285 stars["xccd"].to_numpy(), stars["yccd"].to_numpy(), wcs, obsTime 

286 ) 

287 

288 # Convert fwhm to arcsec 

289 stars["fwhm"] = stars["fwhm"] * pixelScale 

290 

291 return stars 

292 

293 

294def applyQualityCuts( 

295 stars: pd.DataFrame, shape: tuple[float, float], config: GuiderStarTrackerConfig 

296) -> pd.DataFrame: 

297 """ 

298 Apply cuts according to min SNR, maximum ellipticity and edge margin. 

299 

300 Parameters 

301 ---------- 

302 stars : `pd.DataFrame` 

303 DataFrame with star measurements. 

304 shape : `tuple[float, float]` 

305 Shape of the ROI (cols, rows). 

306 config : `GuiderStarTrackerConfig` 

307 Configuration object with quality cut parameters. 

308 

309 Returns 

310 ------- 

311 stars : `pd.DataFrame` 

312 DataFrame with stars that passed the quality cuts. 

313 """ 

314 minSnr = config.minSnr 

315 maxEllipticity = config.maxEllipticity 

316 edgeMargin = config.edgeMargin 

317 roiCols, roiRows = shape 

318 

319 # Filter by minimum SNR 

320 mask1 = (stars["snr"] >= minSnr) & (stars["flux"] > 0) & (stars["flux_err"] > 0) 

321 

322 # Filter by minimum number of detections and maximum ellipticity 

323 eabs = np.hypot(stars["e1"], stars["e2"]) 

324 mask3 = (stars["e1"].abs() <= maxEllipticity) & (stars["e1"].abs() <= maxEllipticity) 

325 mask3 &= eabs <= maxEllipticity 

326 

327 # Add edge margin mask 

328 mask4 = ( 

329 (stars["xroi"] >= edgeMargin) 

330 & (stars["xroi"] <= roiRows - edgeMargin) 

331 & (stars["yroi"] >= edgeMargin) 

332 & (stars["yroi"] <= roiCols - edgeMargin) 

333 ) 

334 # Combine all masks 

335 mask = mask1 & mask3 & mask4 

336 return stars[mask].copy() 

337 

338 

339def setUniqueId(guiderData: GuiderData, stars: pd.DataFrame) -> pd.DataFrame: 

340 """ 

341 Assign unique IDs to tracked stars. 

342 

343 Parameters 

344 ---------- 

345 guiderData : `GuiderData` 

346 GuiderData instance containing guider data and metadata. 

347 stars : `pd.DataFrame` 

348 DataFrame with star measurements. 

349 

350 Returns 

351 ------- 

352 stars : `pd.DataFrame` 

353 DataFrame with added 'detid' and 'trackid' columns. 

354 

355 """ 

356 # Create a numeric “global” starid: 

357 detMap = guiderData.guiderNameMap 

358 stars["detid"] = stars["detector"].map(detMap) 

359 stars["trackid"] = stars["detid"] * 1000 + stars["stamp"] 

360 return stars 

361 

362 

363def computeOffsets(stars: pd.DataFrame) -> pd.DataFrame: 

364 """ 

365 Compute the offsets for each star in the catalog. 

366 

367 Parameters 

368 ---------- 

369 stars : `pd.DataFrame` 

370 DataFrame with star measurements. 

371 

372 Returns 

373 ------- 

374 stars : `pd.DataFrame` 

375 DataFrame with added offset columns: 

376 - dx, dy : offsets in CCD coordinates (pixels) 

377 - dxfp, dyfp : offsets in focal plane coordinates (mm) 

378 - dalt, daz : offsets in Alt/Az coordinates (arcsec) 

379 - dtheta : offset in rotator angle (arcsec) 

380 - magoffset : magnitude offset (mmag) 

381 

382 """ 

383 # make reference positions 

384 stars["xroi_ref"] = stars.groupby("detector")["xroi"].transform("median") 

385 stars["yroi_ref"] = stars.groupby("detector")["yroi"].transform("median") 

386 stars["xccd_ref"] = stars.groupby("detector")["xccd"].transform("median") 

387 stars["yccd_ref"] = stars.groupby("detector")["yccd"].transform("median") 

388 stars["xfp_ref"] = stars.groupby("detector")["xfp"].transform("median") 

389 stars["yfp_ref"] = stars.groupby("detector")["yfp"].transform("median") 

390 stars["alt_ref"] = stars.groupby("detector")["alt"].transform("median") 

391 stars["az_ref"] = stars.groupby("detector")["az"].transform("median") 

392 stars["theta_ref"] = stars.groupby("detector")["theta"].transform("median") 

393 

394 # Compute all your offsets 

395 stars["dx"] = stars["xccd"] - stars["xccd_ref"] 

396 stars["dy"] = stars["yccd"] - stars["yccd_ref"] 

397 stars["dxfp"] = stars["xfp"] - stars["xfp_ref"] 

398 stars["dyfp"] = stars["yfp"] - stars["yfp_ref"] 

399 stars["dalt"] = (stars["alt"] - stars["alt_ref"]) * 3600 

400 stars["daz"] = (stars["az"] - stars["az_ref"]) * 3600 

401 stars["dtheta"] = (stars["theta"] - stars["theta_ref"]) * 3600 

402 

403 # Correct for cos(alt) in daz 

404 stars["daz"] = np.cos(stars["alt_ref"] * np.pi / 180) * stars["daz"] 

405 

406 # compute mag offset 

407 stars["flux_ref"] = stars.groupby("detector")["flux"].transform("median") 

408 stars["flux_ref"] = pd.to_numeric(stars["flux_ref"], errors="coerce") 

409 stars["flux"] = pd.to_numeric(stars["flux"], errors="coerce") 

410 stars["magoffset"] = -2.5 * np.log10((stars["flux"] + 1e-12) / (stars["flux_ref"] + 1e-12)) 

411 # convert to mmag and replace inf with nan 

412 stars["magoffset"] = 1000 * stars["magoffset"].replace([np.inf, -np.inf], np.nan) 

413 

414 # not detections have nan offsets 

415 offsetCols = ["dx", "dy", "dxfp", "dyfp", "dalt", "daz", "dtheta", "magoffset"] 

416 stars.loc[stars["flux"].isna(), offsetCols] = np.nan 

417 

418 return stars 

419 

420 

421def computeRotatorAngle(stars: pd.DataFrame) -> pd.DataFrame: 

422 """ 

423 Compute the rotator angle (theta) and its propagated 1-sigma uncertainty 

424 for each row in `stars`. 

425 

426 Parameters 

427 ---------- 

428 stars : `pd.DataFrame` 

429 DataFrame with star measurements. Must contain 'xfp', 'yfp', 'xerr', 

430 and 'yerr' columns. 

431 

432 Returns 

433 ------- 

434 stars : `pd.DataFrame` 

435 DataFrame with added 'theta' and 'theta_err' columns. 

436 """ 

437 mm = 0.001 # convert microns to mm 

438 xfp = stars["xfp"].to_numpy() 

439 yfp = stars["yfp"].to_numpy() 

440 sigmaX = stars["xerr"].to_numpy() * 10 * mm 

441 sigmaY = stars["yerr"].to_numpy() * 10 * mm 

442 

443 # Angle in degrees 

444 theta = np.degrees(np.arctan2(yfp, xfp)) 

445 

446 # Denominator r^2 = x^2 + y^2 

447 denom = xfp**2 + yfp**2 

448 

449 # Suppress warnings for rows where denom == 0; set sigmaTheta to NaN there. 

450 with np.errstate(divide="ignore", invalid="ignore"): 

451 sigmaThetaRad = np.sqrt((yfp**2 * sigmaX**2 + xfp**2 * sigmaY**2) / denom**2) 

452 sigmaTheta = np.degrees(sigmaThetaRad) 

453 sigmaTheta = np.where(denom == 0, np.nan, sigmaTheta) 

454 

455 stars["theta"] = theta 

456 stars["theta_err"] = sigmaTheta * 3600 # convert to arcsec 

457 return stars 

458 

459 

460def convertEllipticity(stars: pd.DataFrame, camRotAngleDeg: float) -> pd.DataFrame: 

461 """ 

462 Rotate ellipticity components (e1, e2) to Alt/Az. 

463 

464 Parameters 

465 ---------- 

466 stars : `pd.DataFrame` 

467 DataFrame with star measurements. 

468 camRotAngleDeg : `float` 

469 Camera rotator angle in degrees. 

470 

471 Returns 

472 ------- 

473 stars : `pd.DataFrame` 

474 DataFrame with added columns 'e1_altaz' and 'e2_altaz'. 

475 """ 

476 e1_rot, e2_rot = rotateEllipticity(stars["e1"], stars["e2"], camRotAngleDeg) 

477 stars["e1_altaz"] = e1_rot 

478 stars["e2_altaz"] = e2_rot 

479 return stars 

480 

481 

482def rotateEllipticity(e1: float, e2: float, theta_deg: float) -> tuple[float, float]: 

483 """ 

484 Rotate ellipticity components (e1, e2) by theta_deg degrees. 

485 

486 Parameters 

487 ---------- 

488 e1 : `float` 

489 First ellipticity component. 

490 e2 : `float` 

491 Second ellipticity component. 

492 theta_deg : `float` 

493 Rotation angle in degrees. 

494 

495 Returns 

496 ------- 

497 e1_rot, e2_rot : `tuple[float, float]` 

498 Rotated ellipticity components. 

499 """ 

500 theta = np.deg2rad(theta_deg) 

501 cos2t = np.cos(2 * theta) 

502 sin2t = np.sin(2 * theta) 

503 e1_rot = e1 * cos2t + e2 * sin2t 

504 e2_rot = -e1 * sin2t + e2 * cos2t 

505 return e1_rot, e2_rot