Coverage for python / lsst / summit / utils / guiders / tracking.py: 11%
165 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-18 09:28 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-18 09:28 +0000
1# This file is part of summit_utils.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
21from __future__ import annotations
23__all__ = ["GuiderStarTracker"]
25import logging
26from typing import TYPE_CHECKING
28import numpy as np
29import pandas as pd
31from .transformation import convertRoiToCcd, convertToAltaz, convertToFocalPlane
33if TYPE_CHECKING:
34 from .reading import GuiderData
36from .detection import GuiderStarTrackerConfig, buildReferenceCatalog, makeBlankCatalog, trackStarAcrossStamp
39class GuiderStarTracker:
40 """
41 Class to track stars in the Guider data.
43 Parameters
44 ----------
45 guiderData : `GuiderData`
46 GuiderData instance containing guider data and metadata.
47 config : `GuiderStarTrackerConfig`, optional
48 Config object with setup and quality control parameters. If None, a new
49 default GuiderStarTrackerConfig is created per GuiderStarTracker
50 instance.
51 """
53 def __init__(
54 self,
55 guiderData: GuiderData,
56 config: GuiderStarTrackerConfig | None = None,
57 ) -> None:
58 """
59 Initialize the GuiderStarTracker with guider data and configuration.
61 Parameters
62 ----------
63 guiderData : `GuiderData`
64 GuiderData instance containing guider data and metadata.
65 config : `GuiderStarTrackerConfig`, optional
66 Config object with setup and quality control parameters.
67 """
68 self.log = logging.getLogger(__name__)
69 self.guiderData = guiderData
70 self.nStamps = len(guiderData)
71 self.expid = guiderData.expid
72 self.shape = guiderData[guiderData.detNameMax, 0].shape
74 # detection and QC parameters from config
75 if config is None:
76 config = GuiderStarTrackerConfig()
77 self.config = config
79 # initialize outputs
80 self.blankStars = makeBlankCatalog()
81 self.columns = list(self.blankStars.columns)
83 def trackGuiderStars(self, refCatalog: None | pd.DataFrame = None) -> pd.DataFrame:
84 """
85 Track stars across guider exposures using a reference catalog.
87 Parameters
88 ----------
89 refCatalog : `pd.DataFrame`, optional
90 Reference catalog with known star positions per detector.
92 Returns
93 -------
94 stars : `pd.DataFrame`
95 DataFrame with tracked stars and their properties,
96 including positions, fluxes, and residual offsets.
97 """
98 if refCatalog is None:
99 self.log.info("Using self-generated refcat")
100 _refCatalog = buildReferenceCatalog(self.guiderData, self.log, self.config)
101 refCatalog = applyQualityCuts(_refCatalog, self.shape, self.config)
103 if refCatalog.empty:
104 self.log.warning(f"Reference catalog is empty for {self.expid}. No stars to track.")
105 return self.blankStars
107 trackedStarTables = []
108 for guiderName in self.guiderData.guiderNames:
109 refDet = refCatalog.query(f"detector == '{guiderName}'")
110 if refDet.empty:
111 self.log.warning(f"No stars found in reference catalog for {guiderName} in {self.expid}.")
112 continue
114 table = self._trackStarForOneGuider(refDet, guiderName)
115 if not table.empty:
116 trackedStarTables.append(table)
118 if not trackedStarTables:
119 self.log.warning(f"No stars tracked for {self.expid} for any guider.")
120 return self.blankStars
122 # build the final catalog
123 trackedStarCatalog = pd.concat(trackedStarTables, ignore_index=True)
125 # Set unique IDs for all tracked stars
126 trackedStarCatalog = setUniqueId(self.guiderData, trackedStarCatalog)
128 # Make the final DataFrame with selected columns
129 return trackedStarCatalog[self.columns]
131 def _trackStarForOneGuider(self, refCatalog: pd.DataFrame, guiderName: str) -> pd.DataFrame:
132 """
133 Track stars for a single guider using the reference catalog.
135 Iteratively tries to track stars starting with the highest SNR,
136 and falls back to the next best candidates if quality cuts fail.
138 Parameters
139 ----------
140 refCatalog : `pd.DataFrame`
141 Reference catalog with known star positions per detector.
142 guiderName : `str`
143 Name of the guider to process.
145 Returns
146 -------
147 stars : `pd.DataFrame`
148 DataFrame with tracked stars for the specified guider.
149 """
150 gd = self.guiderData
151 shape = self.shape
152 cfg = self.config
154 # Sort by SNR (brightest first) for iterative fallback
155 refCatalogSorted = refCatalog.sort_values(by="snr", ascending=False).reset_index(drop=True)
157 stars = None
158 for idx, row in refCatalogSorted.iterrows():
159 refCenter = (row["xroi"], row["yroi"])
160 starSnr = row["snr"]
162 self.log.debug(f"Attempting to track star {idx + 1} (SNR={starSnr:.2f}) for {guiderName}")
164 # Measure the stars across all stamps for this guider
165 starStamps = trackStarAcrossStamp(refCenter, gd, guiderName, cfg)
167 if starStamps.empty:
168 self.log.debug(f"Star {idx + 1} produced no detections")
169 continue
171 # Apply quality cuts to the tracked stars
172 stars = applyQualityCuts(starStamps, shape, cfg)
174 # Filter by minimum number of detections
175 minStampDetections = int(len(gd) * cfg.minValidStampFraction)
176 mask = stars["stamp"].groupby(stars["detector"]).transform("count") >= minStampDetections
177 stars = stars[mask].copy()
179 if not stars.empty:
180 self.log.debug(
181 f"Successfully tracked star {idx + 1} (SNR={starSnr:.2f}) "
182 f"for {guiderName} with {len(stars)} detections"
183 )
184 break
185 else:
186 self.log.debug(f"Star {idx + 1} (SNR={starSnr:.2f}) failed quality cuts for {guiderName}")
187 stars = None
189 if stars is None or stars.empty:
190 self.log.warning(
191 f"No stars passed quality cuts for {guiderName} in {self.expid}. "
192 f"Tried {len(refCatalogSorted)} candidates."
193 )
194 return self.blankStars
196 # convert to CCD, focal plane and Alt/Az coordinates
197 stars = convertToCcdFocalPlaneAltAz(stars, gd, guiderName)
199 # Compute the rotator angle: theta = np.arctan2(yfp, xfp)
200 stars = computeRotatorAngle(stars)
202 # Convert e1, e2 to Alt/Az coordinates
203 stars = convertEllipticity(stars, gd.camRotAngle)
205 # Add timestamp and elapsed time
206 stars = addTimeStamp(stars, gd, guiderName)
208 # Compute Offsets
209 stars = computeOffsets(stars)
211 return stars
214def addTimeStamp(stars: pd.DataFrame, guiderData: GuiderData, guiderName: str) -> pd.DataFrame:
215 """
216 Add timestamp and elapsed time to the star DataFrame.
218 Parameters
219 ----------
220 stars : `pd.DataFrame`
221 DataFrame with star measurements.
222 guiderData : `GuiderData`
223 GuiderData instance containing guider data and metadata.
224 guiderName : `str`
225 Name of the guider.
227 Returns
228 -------
229 stars : `pd.DataFrame`
230 DataFrame with added 'timestamp' and 'elapsed_time' columns.
231 """
232 gd = guiderData
233 # the stamp are aligned with the index of the timestamps
234 stampIndex = stars["stamp"].to_numpy()
235 indices = np.array([ix for ix in stampIndex], dtype=int)
237 # get the timestamp for each stamp
238 timeStamp = gd.timestampMap[guiderName][indices]
240 # inital time is the time of the first stamp of the guider with most stamps
241 t0 = gd.timestampMap[gd.detNameMax][0]
243 stars["timestamp"] = timeStamp
244 stars["elapsed_time"] = np.where(timeStamp.mask, np.nan, (timeStamp - t0).sec)
245 return stars
248def convertToCcdFocalPlaneAltAz(stars: pd.DataFrame, guiderData: GuiderData, guiderName: str) -> pd.DataFrame:
249 """
250 Convert star positions to CCD, focal plane, and Alt/Az coordinates.
252 Parameters
253 ----------
254 stars : `pd.DataFrame`
255 DataFrame with star measurements in ROI coordinates.
256 guiderData : `GuiderData`
257 GuiderData instance containing guider data and metadata.
258 guiderName : `str`
259 Name of the guider.
261 Returns
262 -------
263 stars : `pd.DataFrame`
264 DataFrame with added columns for CCD, focal plane,
265 and Alt/Az coordinates.
266 """
267 gd = guiderData
268 obsTime = gd.obsTime
269 detNum = gd.getGuiderDetNum(guiderName)
270 wcs = gd.getWcs(guiderName)
271 pixelScale = wcs.getPixelScale().asArcseconds()
273 # Convert to CCD coordinates
274 stars["xccd"], stars["yccd"] = convertRoiToCcd(
275 stars["xroi"].to_numpy(), stars["yroi"].to_numpy(), gd, guiderName
276 )
278 # Convert to focal plane coordinates
279 stars["xfp"], stars["yfp"] = convertToFocalPlane(
280 stars["xccd"].to_numpy(), stars["yccd"].to_numpy(), detNum
281 )
283 # Convert to Alt/Az coordinates
284 stars["alt"], stars["az"] = convertToAltaz(
285 stars["xccd"].to_numpy(), stars["yccd"].to_numpy(), wcs, obsTime
286 )
288 # Convert fwhm to arcsec
289 stars["fwhm"] = stars["fwhm"] * pixelScale
291 return stars
294def applyQualityCuts(
295 stars: pd.DataFrame, shape: tuple[float, float], config: GuiderStarTrackerConfig
296) -> pd.DataFrame:
297 """
298 Apply cuts according to min SNR, maximum ellipticity and edge margin.
300 Parameters
301 ----------
302 stars : `pd.DataFrame`
303 DataFrame with star measurements.
304 shape : `tuple[float, float]`
305 Shape of the ROI (cols, rows).
306 config : `GuiderStarTrackerConfig`
307 Configuration object with quality cut parameters.
309 Returns
310 -------
311 stars : `pd.DataFrame`
312 DataFrame with stars that passed the quality cuts.
313 """
314 minSnr = config.minSnr
315 maxEllipticity = config.maxEllipticity
316 edgeMargin = config.edgeMargin
317 roiCols, roiRows = shape
319 # Filter by minimum SNR
320 mask1 = (stars["snr"] >= minSnr) & (stars["flux"] > 0) & (stars["flux_err"] > 0)
322 # Filter by minimum number of detections and maximum ellipticity
323 eabs = np.hypot(stars["e1"], stars["e2"])
324 mask3 = (stars["e1"].abs() <= maxEllipticity) & (stars["e1"].abs() <= maxEllipticity)
325 mask3 &= eabs <= maxEllipticity
327 # Add edge margin mask
328 mask4 = (
329 (stars["xroi"] >= edgeMargin)
330 & (stars["xroi"] <= roiRows - edgeMargin)
331 & (stars["yroi"] >= edgeMargin)
332 & (stars["yroi"] <= roiCols - edgeMargin)
333 )
334 # Combine all masks
335 mask = mask1 & mask3 & mask4
336 return stars[mask].copy()
339def setUniqueId(guiderData: GuiderData, stars: pd.DataFrame) -> pd.DataFrame:
340 """
341 Assign unique IDs to tracked stars.
343 Parameters
344 ----------
345 guiderData : `GuiderData`
346 GuiderData instance containing guider data and metadata.
347 stars : `pd.DataFrame`
348 DataFrame with star measurements.
350 Returns
351 -------
352 stars : `pd.DataFrame`
353 DataFrame with added 'detid' and 'trackid' columns.
355 """
356 # Create a numeric “global” starid:
357 detMap = guiderData.guiderNameMap
358 stars["detid"] = stars["detector"].map(detMap)
359 stars["trackid"] = stars["detid"] * 1000 + stars["stamp"]
360 return stars
363def computeOffsets(stars: pd.DataFrame) -> pd.DataFrame:
364 """
365 Compute the offsets for each star in the catalog.
367 Parameters
368 ----------
369 stars : `pd.DataFrame`
370 DataFrame with star measurements.
372 Returns
373 -------
374 stars : `pd.DataFrame`
375 DataFrame with added offset columns:
376 - dx, dy : offsets in CCD coordinates (pixels)
377 - dxfp, dyfp : offsets in focal plane coordinates (mm)
378 - dalt, daz : offsets in Alt/Az coordinates (arcsec)
379 - dtheta : offset in rotator angle (arcsec)
380 - magoffset : magnitude offset (mmag)
382 """
383 # make reference positions
384 stars["xroi_ref"] = stars.groupby("detector")["xroi"].transform("median")
385 stars["yroi_ref"] = stars.groupby("detector")["yroi"].transform("median")
386 stars["xccd_ref"] = stars.groupby("detector")["xccd"].transform("median")
387 stars["yccd_ref"] = stars.groupby("detector")["yccd"].transform("median")
388 stars["xfp_ref"] = stars.groupby("detector")["xfp"].transform("median")
389 stars["yfp_ref"] = stars.groupby("detector")["yfp"].transform("median")
390 stars["alt_ref"] = stars.groupby("detector")["alt"].transform("median")
391 stars["az_ref"] = stars.groupby("detector")["az"].transform("median")
392 stars["theta_ref"] = stars.groupby("detector")["theta"].transform("median")
394 # Compute all your offsets
395 stars["dx"] = stars["xccd"] - stars["xccd_ref"]
396 stars["dy"] = stars["yccd"] - stars["yccd_ref"]
397 stars["dxfp"] = stars["xfp"] - stars["xfp_ref"]
398 stars["dyfp"] = stars["yfp"] - stars["yfp_ref"]
399 stars["dalt"] = (stars["alt"] - stars["alt_ref"]) * 3600
400 stars["daz"] = (stars["az"] - stars["az_ref"]) * 3600
401 stars["dtheta"] = (stars["theta"] - stars["theta_ref"]) * 3600
403 # Correct for cos(alt) in daz
404 stars["daz"] = np.cos(stars["alt_ref"] * np.pi / 180) * stars["daz"]
406 # compute mag offset
407 stars["flux_ref"] = stars.groupby("detector")["flux"].transform("median")
408 stars["flux_ref"] = pd.to_numeric(stars["flux_ref"], errors="coerce")
409 stars["flux"] = pd.to_numeric(stars["flux"], errors="coerce")
410 stars["magoffset"] = -2.5 * np.log10((stars["flux"] + 1e-12) / (stars["flux_ref"] + 1e-12))
411 # convert to mmag and replace inf with nan
412 stars["magoffset"] = 1000 * stars["magoffset"].replace([np.inf, -np.inf], np.nan)
414 # not detections have nan offsets
415 offsetCols = ["dx", "dy", "dxfp", "dyfp", "dalt", "daz", "dtheta", "magoffset"]
416 stars.loc[stars["flux"].isna(), offsetCols] = np.nan
418 return stars
421def computeRotatorAngle(stars: pd.DataFrame) -> pd.DataFrame:
422 """
423 Compute the rotator angle (theta) and its propagated 1-sigma uncertainty
424 for each row in `stars`.
426 Parameters
427 ----------
428 stars : `pd.DataFrame`
429 DataFrame with star measurements. Must contain 'xfp', 'yfp', 'xerr',
430 and 'yerr' columns.
432 Returns
433 -------
434 stars : `pd.DataFrame`
435 DataFrame with added 'theta' and 'theta_err' columns.
436 """
437 mm = 0.001 # convert microns to mm
438 xfp = stars["xfp"].to_numpy()
439 yfp = stars["yfp"].to_numpy()
440 sigmaX = stars["xerr"].to_numpy() * 10 * mm
441 sigmaY = stars["yerr"].to_numpy() * 10 * mm
443 # Angle in degrees
444 theta = np.degrees(np.arctan2(yfp, xfp))
446 # Denominator r^2 = x^2 + y^2
447 denom = xfp**2 + yfp**2
449 # Suppress warnings for rows where denom == 0; set sigmaTheta to NaN there.
450 with np.errstate(divide="ignore", invalid="ignore"):
451 sigmaThetaRad = np.sqrt((yfp**2 * sigmaX**2 + xfp**2 * sigmaY**2) / denom**2)
452 sigmaTheta = np.degrees(sigmaThetaRad)
453 sigmaTheta = np.where(denom == 0, np.nan, sigmaTheta)
455 stars["theta"] = theta
456 stars["theta_err"] = sigmaTheta * 3600 # convert to arcsec
457 return stars
460def convertEllipticity(stars: pd.DataFrame, camRotAngleDeg: float) -> pd.DataFrame:
461 """
462 Rotate ellipticity components (e1, e2) to Alt/Az.
464 Parameters
465 ----------
466 stars : `pd.DataFrame`
467 DataFrame with star measurements.
468 camRotAngleDeg : `float`
469 Camera rotator angle in degrees.
471 Returns
472 -------
473 stars : `pd.DataFrame`
474 DataFrame with added columns 'e1_altaz' and 'e2_altaz'.
475 """
476 e1_rot, e2_rot = rotateEllipticity(stars["e1"], stars["e2"], camRotAngleDeg)
477 stars["e1_altaz"] = e1_rot
478 stars["e2_altaz"] = e2_rot
479 return stars
482def rotateEllipticity(e1: float, e2: float, theta_deg: float) -> tuple[float, float]:
483 """
484 Rotate ellipticity components (e1, e2) by theta_deg degrees.
486 Parameters
487 ----------
488 e1 : `float`
489 First ellipticity component.
490 e2 : `float`
491 Second ellipticity component.
492 theta_deg : `float`
493 Rotation angle in degrees.
495 Returns
496 -------
497 e1_rot, e2_rot : `tuple[float, float]`
498 Rotated ellipticity components.
499 """
500 theta = np.deg2rad(theta_deg)
501 cos2t = np.cos(2 * theta)
502 sin2t = np.sin(2 * theta)
503 e1_rot = e1 * cos2t + e2 * sin2t
504 e2_rot = -e1 * sin2t + e2 * cos2t
505 return e1_rot, e2_rot