Coverage for python / lsst / summit / utils / guiders / metrics.py: 17%

220 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-06 09:15 +0000

1# This file is part of summit_utils. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21from __future__ import annotations 

22 

23import logging 

24from dataclasses import dataclass 

25 

26import numpy as np 

27import pandas as pd 

28from astropy.stats import mad_std 

29 

30from lsst.summit.utils.utils import RobustFitResult, RobustFitter 

31 

32__all__ = ["GuiderMetricsBuilder"] 

33 

34 

35class GuiderMetricsBuilder: 

36 """ 

37 Measure and organize guider performance metrics for a given exposure. 

38 

39 This class wraps the computation of both exposure-level counts (number of 

40 guiders, stars, measurements, fraction of valid stamps) and per-quantity 

41 trend metrics (ALT drift, AZ drift, rotator, photometry, PSF). Trend 

42 metrics include slope, intercept, trend RMSE, global scatter, outlier 

43 fraction, slope significance, and sample size. 

44 

45 Parameters 

46 ---------- 

47 starCatalog : `pandas.DataFrame` 

48 Catalog of guider star measurements, containing at least the columns 

49 required for the counts and trend metrics: ``expid``, ``elapsed_time``, 

50 and the measurement columns for each metric (e.g., ``dalt``, ``daz``, 

51 ``dtheta``, ``magoffset``, ``fwhm``). 

52 """ 

53 

54 def __init__(self, starCatalog: pd.DataFrame, nMissingStamps: int) -> None: 

55 self.starCatalog = starCatalog 

56 self.log = logging.getLogger(__name__) 

57 self.nMissingStamps = nMissingStamps 

58 

59 # Store the basic variable names for metrics 

60 self.baseVarsCols = { 

61 "altDrift": "dalt", 

62 "azDrift": "daz", 

63 "mag": "magoffset", 

64 "rotator": "dtheta", 

65 "psf": "fwhm", 

66 } 

67 self.baseVars = list(self.baseVarsCols.keys()) 

68 

69 # keep track if the metrics were build 

70 self.isBuilt = False 

71 

72 def buildMetrics(self, expid: int) -> pd.DataFrame: 

73 """ 

74 Compute all metrics for the specified exposure ID. 

75 

76 Parameters 

77 ---------- 

78 expid : `int` 

79 Exposure ID to compute metrics for. 

80 

81 Returns 

82 ------- 

83 metricsDf : `pandas.DataFrame` 

84 Single-row DataFrame with all computed metrics for the specified 

85 exposure ID. Columns include exposure counts (e.g., ``n_guiders``, 

86 ``n_stars``, per-guider flags) and each metric prefix 

87 (``alt_drift``, ``az_drift``, ``rotator``, ``mag``, ``psf``) 

88 expanded with its statistic names. 

89 """ 

90 self.expid = expid 

91 stars = self.starCatalog 

92 

93 # early exit if no data 

94 mask = stars["expid"].eq(expid) 

95 if not mask.any(): 

96 self.isBuilt = False 

97 self.log.warning(f"No data found for expid={expid}. Returning empty metrics DataFrame.") 

98 return pd.DataFrame(columns=self.metricsColumns) 

99 

100 # build metrics 

101 self.countsDf = computeExposureCounts(stars, self.nMissingStamps, expid) 

102 self.altDriftData: GuiderDriftResult = computeTrendMetrics(stars, "elapsed_time", "dalt", expid) 

103 self.azDriftData: GuiderDriftResult = computeTrendMetrics(stars, "elapsed_time", "daz", expid) 

104 self.rotatorData: GuiderDriftResult = computeTrendMetrics(stars, "elapsed_time", "dtheta", expid) 

105 self.magData: GuiderDriftResult = computeTrendMetrics(stars, "elapsed_time", "magoffset", expid) 

106 self.psfData: GuiderDriftResult = computeTrendMetrics(stars, "elapsed_time", "fwhm", expid) 

107 

108 # Set the built state to true 

109 self.isBuilt = True 

110 

111 # build MetricResult objects to a DataFrame 

112 return self.toDataFrame() 

113 

114 def detrendStars(self) -> pd.DataFrame: 

115 """ 

116 Detrend the star catalog using the computed slopes. 

117 

118 This method modifies the `starCatalog` in place, adding new columns 

119 with detrended values for each input column. The detrended columns 

120 are named with a '_corr' suffix (e.g., 'dalt_corr', 'daz_corr'). 

121 

122 Returns 

123 ------- 

124 stars : `pandas.DataFrame` 

125 The modified star catalog with detrended measurement columns. 

126 """ 

127 if not self.isBuilt: 

128 raise RuntimeError("Metrics have not been built. Call buildMetrics(expid) first.") 

129 # Create a copy to avoid modifying the original catalog 

130 stars = self.starCatalog.copy() 

131 # Get the metrics DataFrame 

132 metrics = self.toDataFrame() 

133 

134 _prefixCol = list(self.baseVarsCols.keys()) 

135 prefixCol = [toSnakeCase(p) for p in _prefixCol] 

136 

137 starsDetrended = detrendBaseVariables(stars, metrics, prefixCol) 

138 starsDetrended = detrendFocalPlaneVariables(starsDetrended, metrics) 

139 return starsDetrended 

140 

141 def toDataFrame(self) -> pd.DataFrame: 

142 """ 

143 Assemble all computed counts and trend metrics into a single row. 

144 

145 Returns 

146 ------- 

147 metricsDf : `pandas.DataFrame` 

148 DataFrame with one row for the currently set ``expid``. Columns 

149 include exposure counts (e.g., ``n_guiders``, ``n_stars``, 

150 per-guider flags) and each metric prefix (``alt_drift``, 

151 ``az_drift``, ``rotator``, ``mag``, ``psf``) expanded with its 

152 statistic names. 

153 """ 

154 if not self.isBuilt: 

155 raise RuntimeError("Metrics have not been built. Call buildMetrics(expid) first.") 

156 

157 listDf = [ 

158 self.countsDf, 

159 self.azDriftData.toDataFrame("az_drift", index=self.expid), 

160 self.altDriftData.toDataFrame("alt_drift", index=self.expid), 

161 self.rotatorData.toDataFrame("rotator", index=self.expid), 

162 self.magData.toDataFrame("mag", index=self.expid), 

163 self.psfData.toDataFrame("psf", index=self.expid), 

164 ] 

165 return pd.concat(listDf, axis=1) 

166 

167 @property 

168 def metricsColumns(self) -> list[str]: 

169 """ 

170 List of expected output column names for the metrics DataFrame. 

171 

172 Combines the base count columns and each metric prefix with all 

173 statistic suffixes. 

174 

175 Returns 

176 ------- 

177 columns : `list` of `str` 

178 All column names in the order they will appear in the DataFrame 

179 returned by ``toDataFrame()`` or ``buildMetrics()``. 

180 """ 

181 baseCols = ["n_guiders", "n_stars", "n_measurements", "fraction_possible_measurements"] 

182 statVars = [ 

183 "slope", 

184 "intercept", 

185 "trend_rmse", 

186 "global_std", 

187 "outlier_frac", 

188 "slope_significance", 

189 "nsize", 

190 ] 

191 columns = baseCols[:] 

192 for var in self.baseVars: 

193 for stat in statVars: 

194 varSnakeCase = toSnakeCase(var) 

195 columns.append(f"{varSnakeCase}_{stat}") 

196 return columns 

197 

198 def printSummary(self) -> None: 

199 """ 

200 Print a human-readable summary of all metrics. 

201 

202 Each metric's slope is scaled by the exposure time. 

203 

204 """ 

205 # Guard if buildMetrics found no data 

206 if not self.isBuilt: 

207 raise RuntimeError("Metrics have not been built. Call buildMetrics(expid) first.") 

208 

209 # set units (ensure consistency with y units!) 

210 self.altDriftData.units = "arcsec" 

211 self.azDriftData.units = "arcsec" 

212 self.rotatorData.units = "arcsec" 

213 self.magData.units = "mag" 

214 self.psfData.units = "arcsec" 

215 

216 exptime = self.countsDf["exptime"].values[0] 

217 

218 # Print summaries 

219 header1 = makeHeader("Guider Metrics Summary") 

220 print("\n".join(header1)) 

221 print("Exposure ID:", self.expid) 

222 print(f"Exposure time: {exptime:.2f} sec") 

223 printExposureCounts(self.countsDf) 

224 

225 self.azDriftData.pprint("Az") 

226 self.altDriftData.pprint("Alt") 

227 self.rotatorData.pprint("Rotator") 

228 self.magData.pprint("Mag") 

229 self.psfData.pprint("PSF FWHM") 

230 

231 

232def computeTrendMetrics( 

233 stars: pd.DataFrame, 

234 timeCol: str, 

235 yCol: str, 

236 expid: int, 

237) -> GuiderDriftResult: 

238 """ 

239 Compute robust linear trend metrics for a given measurement column versus 

240 time within a single exposure. 

241 

242 The function fits a robust linear model to the specified `yCol` as a 

243 function of `timeCol` for rows matching the given `expid`. It returns a 

244 `MetricResult` containing the slope, intercept, trend RMSE, robust global 

245 scatter, outlier fraction, slope significance, and sample size. 

246 

247 Parameters 

248 ---------- 

249 stars : `pandas.DataFrame` 

250 Table of star measurements containing at least the columns `timeCol`, 

251 `yCol`, and `expid`. 

252 timeCol : `str` 

253 Name of the time column (e.g., ``"elapsed_time"``). 

254 yCol : `str` 

255 Name of the dependent variable column to fit (e.g., ``"dalt"``, 

256 ``"magoffset"``). 

257 expid : `int` 

258 Exposure identifier used to filter the rows. 

259 

260 Returns 

261 ------- 

262 metrics : `MetricResult` 

263 Dataclass containing the computed trend metrics. If there are no data 

264 in `yCol` after filtering, all fields are set to NaN/None and `nsize` 

265 is zero. 

266 

267 Raises 

268 ------ 

269 KeyError 

270 If `timeCol` or `yCol` are not present in `stars`. 

271 """ 

272 s = stars.loc[stars["expid"].eq(expid), [timeCol, yCol, "exptime"]].dropna() 

273 if s.empty or s[yCol].nunique() < 2: 

274 empty_mask = np.zeros((0,), dtype=bool) 

275 return GuiderDriftResult( 

276 fit=RobustFitResult( 

277 slope=np.nan, 

278 intercept=np.nan, 

279 scatter=np.nan, 

280 outlierMask=empty_mask, 

281 slopePValue=np.nan, 

282 slopeStdErr=np.nan, 

283 slopeTValue=np.nan, 

284 interceptPValue=np.nan, 

285 interceptStdErr=np.nan, 

286 interceptTValue=np.nan, 

287 ), 

288 globalStd=np.nan, 

289 nsize=0, 

290 units="", 

291 exptime=np.nan, 

292 ) 

293 

294 x = s[timeCol].to_numpy() 

295 y = s[yCol].to_numpy() 

296 exptime = s["exptime"].max() 

297 global_std = float(mad_std(y)) 

298 

299 fitter = RobustFitter() 

300 fit_res = fitter.fit(x, y) 

301 

302 return GuiderDriftResult( 

303 fit=fit_res, 

304 globalStd=global_std, 

305 nsize=int(y.size), 

306 units="", 

307 exptime=exptime, 

308 ) 

309 

310 

311@dataclass(slots=True) 

312class GuiderDriftResult: 

313 """ 

314 Metrics for guider data derived from a robust linear trend fit. 

315 

316 This dataclass wraps a `RobustFitResult` with guider-specific 

317 fields. It stores the global scatter, number of valid points, 

318 and domain metadata such as units and exposure time. Properties 

319 provide easy access to slope, intercept, trend RMSE, outlier 

320 fraction, and slope significance. 

321 

322 Parameters 

323 ---------- 

324 fit : `RobustFitResult` 

325 The result of the robust fit containing slope and intercept. 

326 globalStd : `float` 

327 Robust global standard deviation of the dependent values. 

328 nsize : `int` 

329 Number of valid points used in the fit. 

330 units : `str`, optional 

331 Units of the dependent variable. Default is empty string. 

332 exptime : `float`, optional 

333 Exposure time in seconds. Default is 1.0. 

334 """ 

335 

336 fit: RobustFitResult # composition, not duplication 

337 globalStd: float # robust global std of y 

338 nsize: int 

339 units: str = "" 

340 exptime: float = 1.0 

341 

342 def __post_init__(self) -> None: 

343 assert self.fit is not None, "fit must be provided" 

344 

345 @property 

346 def slope(self) -> float: 

347 return self.fit.slope 

348 

349 @property 

350 def intercept(self) -> float: 

351 return self.fit.intercept 

352 

353 @property 

354 def trendRmse(self) -> float: 

355 return self.fit.scatter 

356 

357 @property 

358 def outlierFrac(self) -> float: 

359 m = self.fit.outlierMask 

360 return float(np.count_nonzero(m)) / float(m.size) if m.size else np.nan 

361 

362 @property 

363 def slopeSignificance(self) -> float | None: 

364 t = self.fit.slopeTValue 

365 return abs(float(t)) if t is not None else None 

366 

367 def toDataFrame(self, prefix: str, index: int = 0) -> pd.DataFrame: 

368 """ 

369 Convert the stored metrics into a single-row DataFrame. 

370 

371 Parameters 

372 ---------- 

373 prefix : `str` 

374 Prefix to add to each metric's column name in the output. 

375 index : `int`, optional 

376 Index value for the returned DataFrame row. 

377 

378 Returns 

379 ------- 

380 metrics : `pandas.DataFrame` 

381 Single-row DataFrame containing the numeric/statistical fields of 

382 this result, prefixed with `prefix`. 

383 """ 

384 row = { 

385 f"{prefix}_slope": self.slope, 

386 f"{prefix}_intercept": self.intercept, 

387 f"{prefix}_trend_rmse": self.trendRmse, 

388 f"{prefix}_global_std": self.globalStd, 

389 f"{prefix}_outlier_frac": self.outlierFrac, 

390 f"{prefix}_slope_significance": self.slopeSignificance, 

391 f"{prefix}_nsize": self.nsize, 

392 } 

393 return pd.DataFrame([row], index=[index]) 

394 

395 def pprint(self, title: str) -> None: 

396 """ 

397 Print the stored metrics in a formatted, human-readable block. 

398 

399 Parameters 

400 ---------- 

401 title : `str` 

402 Title to display for the metric block. 

403 """ 

404 if not title: 

405 title = "Metric" 

406 units = self.units 

407 exptime = self.exptime 

408 

409 header = makeHeader(f"Metrics Summary: {title}", nchar=40) 

410 print("\n".join(header)) 

411 slope_per_exp = self.slope * exptime 

412 print(f" Slope : {slope_per_exp:.3f} {units} per exposure") 

413 sig = "—" if self.slopeSignificance is None else f"{self.slopeSignificance:.1f}" 

414 print(f" Slope signif. : {sig} sigma") 

415 print(f" Intercept : {self.intercept:.3f} {units}") 

416 print(f" Trend RMSE : {self.trendRmse:.3f} {units}") 

417 print(f" Global std : {self.globalStd:.3f} {units}") 

418 print(f" Outlier frac : {self.outlierFrac:.2%}") 

419 print(f" N (points) : {self.nsize:d}\n") 

420 

421 

422def detrendFocalPlaneVariables( 

423 stars: pd.DataFrame, 

424 metricsDf: pd.DataFrame, 

425) -> pd.DataFrame: 

426 """ 

427 Detrend focal plane measurement columns in the star catalog using slopes 

428 from Alt/Az drift metrics. Project the Alt/Az slopes onto the focal plane 

429 to correct the dX and dyfp measurements. 

430 

431 This function modifies the input `stars` DataFrame in place, subtracting 

432 the projected linear trend (slope * elapsed_time) from the dX and dyfp 

433 columns. The slopes are taken from the `metricsDf` for the corresponding 

434 `expid`. 

435 

436 Parameters 

437 ---------- 

438 stars : `pandas.DataFrame` 

439 DataFrame containing star measurements with at least the columns 

440 ``expid``, ``elapsed_time``, ``dX``, ``dyfp``, and guider position 

441 columns ``alt``, ``az``. 

442 metricsDf : `pandas.DataFrame` 

443 DataFrame containing computed metrics with slope columns named as 

444 ``alt_drift_slope`` and ``az_drift_slope``. 

445 

446 Returns 

447 ------- 

448 stars : `pandas.DataFrame` 

449 The modified input DataFrame with detrended dX and dyfp columns. 

450 """ 

451 # Validate metrics columns 

452 alt_slope_col = "alt_drift_slope" 

453 az_slope_col = "az_drift_slope" 

454 

455 # Extract slopes (Alt/Az per second) 

456 s_alt = float(metricsDf[alt_slope_col].values[0]) 

457 s_az = float(metricsDf[az_slope_col].values[0]) 

458 

459 # Validate required columns in stars 

460 required_cols = ["dxfp", "dyfp", "elapsed_time", "dalt", "daz"] 

461 for col in required_cols: 

462 if col not in stars.columns: 

463 raise KeyError(f"Required column '{col}' not found in stars DataFrame.") 

464 

465 # Build design matrices using only finite rows 

466 A = stars[["dalt", "daz"]].to_numpy(dtype=float) 

467 B = stars[["dxfp", "dyfp"]].to_numpy(dtype=float) 

468 finite_mask = np.isfinite(A).all(axis=1) & np.isfinite(B).all(axis=1) 

469 A = A[finite_mask] 

470 B = B[finite_mask] 

471 

472 if A.shape[0] < 3: 

473 # Not enough information to estimate a stable 2x2 mapping 

474 raise ValueError("Insufficient finite samples to estimate focal-plane projection (need >= 3 rows).") 

475 

476 # Solve for the best-fit 2x2 linear mapping M such that A @ M ≈ B 

477 # Uses least squares across both X and Y simultaneously 

478 # np.linalg.lstsq returns the matrix that minimizes ||A M - B||_F 

479 M, *_ = np.linalg.lstsq(A, B, rcond=None) 

480 

481 # Project Alt/Az slopes into focal plane slopes 

482 s_fp = M @ np.array([s_alt, s_az], dtype=float) 

483 s_xfp = float(s_fp[0]) 

484 s_yfp = float(s_fp[1]) 

485 

486 # Apply detrending: 

487 # new_value = original_value - projected_slope * elapsed_time 

488 t = stars["elapsed_time"].to_numpy(dtype=float) 

489 dxfp = stars["dxfp"].to_numpy(dtype=float) - s_xfp * t 

490 stars["dxfp_corr"] = dxfp - np.nanmedian(dxfp) 

491 

492 dyfp = stars["dyfp"].to_numpy(dtype=float) - s_yfp * t 

493 stars["dyfp_corr"] = dyfp - np.nanmedian(dyfp) 

494 

495 return stars 

496 

497 

498def detrendBaseVariables( 

499 stars: pd.DataFrame, 

500 metricsDf: pd.DataFrame, 

501 prefixCol: list[str], 

502) -> pd.DataFrame: 

503 """ 

504 Detrend specified measurement columns in the star catalog using slopes 

505 from the metrics DataFrame. 

506 

507 This function modifies the input `stars` DataFrame in place, subtracting 

508 the linear trend (slope * elapsed_time) from each specified measurement 

509 column. The slopes are taken from the `metricsDf` for the corresponding 

510 `expid`. 

511 

512 Parameters 

513 ---------- 

514 stars : `pandas.DataFrame` 

515 DataFrame containing star measurements with at least the columns 

516 ``expid``, ``elapsed_time``, and the measurement columns to detrend. 

517 metricsDf : `pandas.DataFrame` 

518 DataFrame containing computed metrics with slope columns named as 

519 ``{prefix}_slope`` for each prefix in `prefixCol`. 

520 prefixCol : `list` of `str` 

521 List of prefixes corresponding to measurement columns to detrend. 

522 For example, if ``"alt_drift"`` is in this list, the function will 

523 look for a column named ``"alt_drift_slope"`` in `metricsDf` and a 

524 column named ``"dalt"`` in `stars`. 

525 

526 Returns 

527 ------- 

528 stars : `pandas.DataFrame` 

529 The modified input DataFrame with detrended measurement columns. 

530 """ 

531 for prefix in prefixCol: 

532 slopeCol = f"{prefix}_slope" 

533 if slopeCol not in metricsDf.columns: 

534 raise KeyError(f"Slope column '{slopeCol}' not found in metricsDf.") 

535 slope = metricsDf[slopeCol].values[0] 

536 

537 # Determine the corresponding measurement column in stars 

538 if prefix == "alt_drift": 

539 measCol = "dalt" 

540 elif prefix == "az_drift": 

541 measCol = "daz" 

542 elif prefix == "rotator": 

543 measCol = "dtheta" 

544 elif prefix == "mag": 

545 measCol = "magoffset" 

546 elif prefix == "psf": 

547 measCol = "fwhm" 

548 else: 

549 raise ValueError(f"Unknown prefix '{prefix}'. Cannot determine measurement column.") 

550 

551 if measCol not in stars.columns: 

552 raise KeyError(f"Measurement column '{measCol}' not found in stars.") 

553 

554 # Apply detrending: new_value = original_value - slope * elapsed_time 

555 det = stars[measCol] - slope * stars["elapsed_time"] 

556 stars[measCol + "_corr"] = det - np.nanmedian(det) 

557 

558 return stars 

559 

560 

561def toSnakeCase(name: str) -> str: 

562 """ 

563 Convert a camelCase or PascalCase string to snake_case. 

564 

565 Parameters 

566 ---------- 

567 name : `str` 

568 Input string in camelCase or PascalCase. 

569 

570 Returns 

571 ------- 

572 snake_case : `str` 

573 Converted string in snake_case. 

574 """ 

575 return "".join(["_" + c.lower() if c.isupper() else c for c in name]).lstrip("_") 

576 

577 

578def computeExposureCounts(stars: pd.DataFrame, nMissingStamps: int, expid: int) -> pd.DataFrame: 

579 """ 

580 Compute guider/star/measurement counts for the given expid. 

581 

582 Parameters 

583 ---------- 

584 stars : `pandas.DataFrame` 

585 Star measurement rows. 

586 nMissingStamps : `int` 

587 Number of missing stamps for the exposure. 

588 expid : `int` 

589 Exposure ID to filter on. 

590 

591 Returns 

592 ------- 

593 countsDf : `pandas.DataFrame` 

594 Single-row DataFrame with counts for the exposure. 

595 """ 

596 s = stars.loc[stars["expid"].eq(expid)] 

597 exptime = s["exptime"].max() if not s.empty else np.nan 

598 

599 # Guiders and stars per guider 

600 nGuiders = s["detector"].nunique() 

601 nUnique = s["detid"].nunique() 

602 nMissing = nMissingStamps 

603 counts = s.groupby("detector")["detid"].nunique().to_dict() 

604 guiderNames = sorted(s["detector"].unique()) 

605 guidersPresent = {f"{det}": (counts.get(det, 0) > 0) for det in guiderNames} 

606 

607 # Valid measurements 

608 maskValid = (s["stamp"] >= 0) & (s["xccd"].notna()) 

609 nMeas = int(maskValid.sum()) 

610 

611 # Fraction of valid stamps (protect against div-by-zero) 

612 nStamps = s["stamp"].nunique() 

613 totalPossible = nGuiders * nStamps 

614 fracValid = (nMeas / totalPossible) if totalPossible > 0 else np.nan 

615 

616 row = { 

617 "n_guiders": nGuiders, 

618 "n_stars": nUnique, 

619 "n_missing_stamps": int(nMissing), 

620 "n_measurements": nMeas, 

621 "fraction_possible_measurements": fracValid, 

622 "exptime": exptime, 

623 } 

624 row.update(guidersPresent) 

625 return pd.DataFrame([row], index=[expid]) 

626 

627 

628def printExposureCounts(countsDf: pd.DataFrame, precision: int = 3) -> None: 

629 """ 

630 Print exposure-level counts from a single-row counts DataFrame. 

631 

632 Parameters 

633 ---------- 

634 countsDf : `pandas.DataFrame` 

635 DataFrame with a single row containing exposure counts (e.g., 

636 n_guiders, n_stars, per-guider flags). 

637 precision : `int`, optional 

638 Number of decimal places for fractional values. 

639 

640 """ 

641 row = countsDf.iloc[0].to_dict() 

642 

643 lines = makeHeader("Exposure Counts") 

644 lines += [ 

645 f"Tracked stars: {int(row.get('n_stars', 0))}", 

646 f"Missing stamps: {int(row.get('n_missing_stamps', 0))}", 

647 f"Measurements: {int(row.get('n_measurements', 0))}", 

648 ] 

649 frac = row.get("fraction_possible_measurements") 

650 if isinstance(frac, (int, float, np.floating)): 

651 lines.append(f"Possible meas. frac: {float(frac):.{precision}f}") 

652 

653 # Per-guider boolean flags if present 

654 guider_flags = [k for k in row.keys() if k.startswith("R") and "_SG" in k] 

655 if guider_flags: 

656 present = [g for g in sorted(guider_flags) if bool(row.get(g))] 

657 lines.append(f"Guiders used: {', '.join(present) if present else '—'}") 

658 

659 print("\n".join(lines)) 

660 print() 

661 

662 

663def makeHeader(title: str, nchar: int = 40) -> list[str]: 

664 """ 

665 Create a formatted header block with horizontal lines. 

666 

667 The header consists of a top line, a centered title line with padding, and 

668 a bottom line. The line width is the maximum of `nchar` and the title 

669 length plus 10 characters. 

670 

671 Parameters 

672 ---------- 

673 title : `str` 

674 Text to display in the header. 

675 nchar : `int`, optional 

676 Minimum width of the horizontal lines. Increased automatically if the 

677 title requires more space. 

678 

679 Returns 

680 ------- 

681 header_lines : `list` of `str` 

682 List of three strings: the top line, the title line, and the bottom 

683 line. 

684 """ 

685 width = max(nchar, len(title) + 10) 

686 line = "─" * width 

687 header = f"{' ' * 5}{title}{' ' * 5}" 

688 return [line, header, line] 

689 

690 

691def makeLine(nchar: int = 40) -> str: 

692 """ 

693 Create a horizontal line of box-drawing characters. 

694 

695 Parameters 

696 ---------- 

697 nchar : `int`, optional 

698 Number of characters in the line. Default is 40. 

699 

700 Returns 

701 ------- 

702 line : `str` 

703 String consisting of `nchar` repetitions of the '─' character. 

704 """ 

705 return "─" * nchar