Coverage for python / lsst / summit / utils / guiders / reading.py: 26%

408 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-30 09:35 +0000

1# This file is part of summit_utils. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21from __future__ import annotations 

22 

23import logging 

24from typing import TYPE_CHECKING, Any 

25 

26__all__ = [ 

27 "GuiderReader", 

28 "convertRawStampsToView", 

29 "GuiderData", 

30] 

31 

32from dataclasses import dataclass 

33from functools import cached_property 

34 

35import matplotlib.pyplot as plt 

36import numpy as np 

37from astropy.time import Time 

38from numpy.typing import NDArray 

39from pydantic import BaseModel, ConfigDict, computed_field 

40 

41from lsst.afw import cameraGeom 

42from lsst.afw.image import ExposureF, ImageF, MaskedImageF, MaskX, VisitInfo 

43from lsst.daf.butler import Butler, DatasetNotFoundError 

44from lsst.meas.algorithms.stamps import Stamp, Stamps 

45from lsst.obs.lsst import LsstCam 

46from lsst.utils.plotting.figures import make_figure 

47 

48from .plotting import GuiderPlotter, labelDetector 

49from .transformation import ( 

50 getCamRotAngle, 

51 makeCcdToDvcsTransform, 

52 makeInitGuiderWcs, 

53 makeRoiBbox, 

54 roiImageToDvcs, 

55) 

56 

57if TYPE_CHECKING: 

58 from lsst.afw.cameraGeom import Camera, Detector 

59 from lsst.geom import SkyWcs 

60 

61 

62def make_subplot(nrows=1, ncols=1, **fig_kwargs): 

63 """Return (fig, axs) using LSST's make_figure.""" 

64 fig = make_figure(**fig_kwargs) 

65 axs = fig.subplots(nrows=nrows, ncols=ncols, squeeze=True) 

66 return fig, axs 

67 

68 

69class GuiderData(BaseModel): 

70 """ 

71 LSST guider data container. 

72 

73 Holds raw guider Stamps, WCS objects, and exposure metadata. Provides 

74 cached properties for timestamps, header information, and convenience 

75 (dict-like) access. 

76 

77 Parameters 

78 ---------- 

79 seqNum : `int` 

80 Sequence number. 

81 dayObs : `int` 

82 Day of observation (YYYYMMDD). 

83 guiderNameMap : `dict[str, int]` 

84 Detector name to detector number map. 

85 rawStampsMap : `dict[str, Stamps]` 

86 Detector name to raw Stamps map. 

87 wcsMap : `dict[str, SkyWcs]` 

88 Detector name to SkyWcs map. 

89 camRotAngle : `float` 

90 Camera rotation angle in degrees. 

91 isMedianSubtracted : `bool` 

92 If True, the raw stamps have been median-subtracted. 

93 columnMaskK : `float` 

94 Threshold factor used for column mask detection. 

95 view : `str` 

96 Output view: 'dvcs', 'ccd', or 'roi'. 

97 

98 Cached properties 

99 ----------------- 

100 guiderNames : sorted detector names. 

101 detNameMax : detector with most stamps. 

102 metadata : metadata dict for detNameMax. 

103 header : exposure-level header summary. 

104 expid : DAYOBS * 100000 + SEQNUM. 

105 timestamps : masked astropy Time array (TAI scale). 

106 stampsMap : Stamps for all detectors in requested view. 

107 roiAmpNames : detector → amplifier name map. 

108 guiderFrequency : stamp cadence in Hz. 

109 

110 Public methods 

111 -------------- 

112 printHeaderInfo() : print header summary. 

113 getWcs(det) : return SkyWcs for detector. 

114 getStampArrayCoadd(det) : median stack of stamps. 

115 getGuiderAmpName(det) : ROI amplifier name. 

116 getGuiderDetNum(det) : detector number. 

117 plotMosaic(...) : plot all detectors as a mosaic. 

118 plotStamp(...) : plot one stamp from a single detector. 

119 makeGif(...) : create a GIF animation of guider frames. 

120 

121 Iteration and indexing 

122 ---------------------- 

123 for det in guiderData # iterates detector names 

124 guiderData.items() # yields (det, Stamps) 

125 guiderData['R44_SG0'] # full Stamps object 

126 guiderData['R44_SG0', 3] # stamp ndarray 

127 guiderData[3] # stamp from detNameMax 

128 """ 

129 

130 model_config = ConfigDict(frozen=True, arbitrary_types_allowed=True) 

131 

132 seqNum: int 

133 dayObs: int 

134 guiderNameMap: dict[str, int] 

135 rawStampsMap: dict[str, Stamps] 

136 wcsMap: dict[str, Any] 

137 camRotAngle: float 

138 isMedianSubtracted: bool = False 

139 columnMaskK: float = 50.0 

140 view: str = "dvcs" 

141 

142 def __repr__(self) -> str: 

143 """ 

144 Return a compact one-line representation. 

145 

146 Returns 

147 ------- 

148 reprStr : `str` 

149 String summary with exposure id, number of stamps, view, and guider 

150 names. 

151 """ 

152 return ( 

153 f"GuiderData(seqNum={self.seqNum}, " 

154 f"dayObs={self.dayObs}, " 

155 f"nStamps={len(self)}, view='{self.view}', " 

156 f"guiders={self.guiderNames})" 

157 ) 

158 

159 def __len__(self) -> int: 

160 """ 

161 Return number of stamps for the detector with the maximum count. 

162 

163 Returns 

164 ------- 

165 nStamps : `int` 

166 Number of stamps (time samples) in the exposure for the primary 

167 detector. 

168 """ 

169 return len(self.rawStampsMap[self.detNameMax]) 

170 

171 @computed_field # type: ignore[prop-decorator] 

172 @cached_property 

173 def expid(self) -> int: 

174 """ 

175 Exposure identifier. 

176 

177 Returns 

178 ------- 

179 expid : `int` 

180 The exposure id. 

181 """ 

182 return self.dayObs * 100000 + self.seqNum 

183 

184 @computed_field # type: ignore[prop-decorator] 

185 @cached_property 

186 def guiderNames(self) -> list[str]: 

187 """ 

188 Names of the guider detectors. 

189 

190 Returns 

191 ------- 

192 guiderNames : `list[str]` 

193 Sorted list of guider detector names. 

194 """ 

195 return sorted(list(self.guiderNameMap.keys())) 

196 

197 @computed_field # type: ignore[prop-decorator] 

198 @cached_property 

199 def guiderIds(self) -> list[int]: 

200 """ 

201 IDs of the guider detectors. 

202 

203 Returns 

204 ------- 

205 guiderIds : `list[int]` 

206 Sorted list of guider detector numeric IDs. 

207 """ 

208 return sorted(list(self.guiderNameMap.values())) 

209 

210 @computed_field # type: ignore[prop-decorator] 

211 @cached_property 

212 def detNameMax(self) -> str: 

213 """ 

214 Detector name with the maximum number of stamps. 

215 

216 Returns 

217 ------- 

218 detName : `str` 

219 Detector name possessing the most stamps. 

220 """ 

221 return max(self.rawStampsMap, key=lambda k: len(self.rawStampsMap[k])) 

222 

223 @computed_field # type: ignore[prop-decorator] 

224 @cached_property 

225 def metadata(self) -> dict: 

226 """ 

227 Metadata for the detector with the most stamps. 

228 

229 Returns 

230 ------- 

231 metadata : `dict` 

232 Copy of the detector-level metadata dictionary. 

233 """ 

234 return self.rawStampsMap[self.detNameMax].metadata.toDict() 

235 

236 @computed_field # type: ignore[prop-decorator] 

237 @cached_property 

238 def header(self) -> dict[str, str | float | None]: 

239 """ 

240 Dictionary of header metadata for this GuiderData. 

241 Fields: n_stamps, freq, expid, filter, cam_rot_angle, start_time, 

242 roi_cols, roi_rows, shuttime, az_start, el_start, az_end, el_end, 

243 seeing. 

244 

245 Returns 

246 ------- 

247 header : `dict[str, str | float | None]` 

248 The headers. 

249 """ 

250 md = self.metadata 

251 info: dict[str, str | float | None] = {} 

252 info["n_stamps"] = len(self) 

253 info["freq"] = self.guiderFrequency 

254 # Ensure DAYOBS and SEQNUM are ints before math, keep ≤79 chars 

255 info["expid"] = self.expid 

256 info["filter"] = md.get("FILTBAND", "Unknown") 

257 info["cam_rot_angle"] = self.camRotAngle 

258 info["start_time"] = md.get("GDSSTART", None) 

259 info["roi_cols"] = int(md.get("ROICOLS", 0)) 

260 info["roi_rows"] = int(md.get("ROIROWS", 0)) 

261 info["shuttime"] = metadata_to_float(md, "SHUTTIME", np.nan) 

262 info["guider_duration"] = self.guiderDurationSec 

263 info["az_start"] = metadata_to_float(md, "AZSTART", np.nan) 

264 info["el_start"] = metadata_to_float(md, "ELSTART", np.nan) 

265 info["az_end"] = metadata_to_float(md, "AZEND", np.nan) 

266 info["el_end"] = metadata_to_float(md, "ELEND", np.nan) 

267 return info 

268 

269 def printHeaderInfo(self) -> None: 

270 """Print a concise summary of key header fields.""" 

271 print(f"Data Id: {self.expid}, filter-band: {self.header['filter']}") 

272 print(f"ROI Shape (row, col): {self.header['roi_rows']}, {self.header['roi_cols']}") 

273 print(f"With nStamps {len(self)} at {self.guiderFrequency} Hz") 

274 print( 

275 f"Acq. Start Time: {self.header['start_time']} \n" 

276 f"with readout duration: {self.header['guider_duration']:.2f} sec" 

277 ) 

278 

279 @computed_field # type: ignore[prop-decorator] 

280 @cached_property 

281 def weather(self) -> WeatherInfo: 

282 """ 

283 Weather information from metadata. 

284 

285 Returns 

286 ------- 

287 weather : `WeatherInfo` 

288 Parsed weather information. 

289 """ 

290 return WeatherInfo.fromMetadata(self.metadata) 

291 

292 @computed_field # type: ignore[prop-decorator] 

293 @cached_property 

294 def timestampMap(self) -> dict[str, Time]: 

295 """ 

296 Aligned timestamp arrays for all detectors. 

297 

298 Returns 

299 ------- 

300 timestampMap : `dict[str, Time]` 

301 Mapping from detector name to masked Time array (TAI scale). 

302 """ 

303 nStamps = len(self.rawStampsMap[self.detNameMax]) 

304 timestampMap: dict[str, Time] = {} 

305 for detName, stamps in self.rawStampsMap.items(): 

306 timestampMap[detName] = standardizeGuiderTimestamps(stamps, nStamps) 

307 return timestampMap 

308 

309 @computed_field # type: ignore[prop-decorator] 

310 @cached_property 

311 def guiderFrequency(self) -> float: 

312 """ 

313 Guider stamp cadence. 

314 

315 Returns 

316 ------- 

317 frequency : `float` 

318 Median stamp frequency in Hz. 

319 """ 

320 timestamps = self.timestampMap[self.detNameMax] 

321 # extract only the unmasked MJD values 

322 jd = timestamps[~timestamps.mask].jd 

323 period_sec = np.median(np.diff(jd)) * 86400.0 

324 return float(1.0 / period_sec) 

325 

326 @computed_field # type: ignore[prop-decorator] 

327 @cached_property 

328 def guiderDurationSec(self) -> float: 

329 """ 

330 Total guider duration. 

331 

332 Returns 

333 ------- 

334 durationSec : `float` 

335 Total elapsed guider duration in seconds (including final period). 

336 """ 

337 timestamps = self.timestampMap[self.detNameMax] 

338 jd = timestamps[~timestamps.mask].jd 

339 duration_sec = (jd[-1] - jd[0]) * 86400.0 + 1 / self.guiderFrequency 

340 return float(duration_sec) 

341 

342 @computed_field # type: ignore[prop-decorator] 

343 @cached_property 

344 def stampsMap(self) -> dict[str, Stamps]: 

345 """ 

346 Stamps converted to requested view. 

347 

348 Returns 

349 ------- 

350 stampsMap : `dict[str, Stamps]` 

351 Mapping of detector name to converted Stamps object. 

352 """ 

353 result: dict[str, Stamps] = {} 

354 if self.view == "roi": 

355 return self.rawStampsMap 

356 else: 

357 for detName, rawStamps in self.rawStampsMap.items(): 

358 result[detName] = convertRawStampsToView( 

359 rawStamps, 

360 detName, 

361 len(self), 

362 view=self.view, 

363 ) 

364 return result 

365 

366 @computed_field # type: ignore[prop-decorator] 

367 @cached_property 

368 def roiAmpNames(self) -> dict[str, str]: 

369 """ 

370 Amplifier names used in the ROI. 

371 

372 Returns 

373 ------- 

374 ampNames : `dict[str, str]` 

375 Mapping from detector name to amplifier name active in ROI. 

376 """ 

377 ampNames: dict[str, str] = {} 

378 for detName in self.rawStampsMap.keys(): 

379 md = self.rawStampsMap[detName].metadata.toDict() 

380 segment = md["ROISEG"] 

381 ampName = "C" + segment[7:] 

382 ampNames[detName] = ampName 

383 return ampNames 

384 

385 def getStampArrayCoadd(self, detName: str) -> np.ndarray: 

386 """ 

387 Get the median-stacked stamp across time for the specified detector. 

388 

389 The stack is computed across the time axis after optional median-row 

390 bias removal (controlled by ``self.isMedianSubtracted``). 

391 

392 Parameters 

393 ---------- 

394 detName : `str` 

395 Guider detector name. 

396 

397 Returns 

398 ------- 

399 coadd : `np.ndarray` 

400 Median of all stamp arrays (time axis collapsed). 

401 """ 

402 if detName not in self.stampsMap: 

403 raise KeyError(f"{detName!r} not present in stampsMap") 

404 

405 stamps = self.stampsMap[detName] 

406 if len(stamps) == 0: 

407 raise ValueError(f"No stamps found for detector {detName!r}") 

408 

409 # Collect arrays, with optional bias subtraction. 

410 arrList = [self[detName, idx] for idx in range(len(stamps))] 

411 # Add a uniform [0, 1) dither per stamp before the median to break 

412 # integer quantization. Without this, the coadd pixel distribution 

413 # can be so narrow that STDEVCLIP returns 0. (See DM-54263.) 

414 stack = np.array(arrList, dtype=np.float32) 

415 rng = np.random.default_rng(seed=0) 

416 stack += rng.uniform(0, 1, size=stack.shape).astype(np.float32) 

417 return np.nanmedian(stack, axis=0) 

418 

419 def getGuiderAmpName(self, detName: str) -> str: 

420 """ 

421 Return amplifier name for a guider detector. 

422 

423 Parameters 

424 ---------- 

425 detName : `str` 

426 Guider detector name. 

427 

428 Returns 

429 ------- 

430 ampName : `str` 

431 Amplifier name string. 

432 """ 

433 if detName not in self.roiAmpNames: 

434 raise ValueError(f"Detector {detName} not found in roiAmpNames.") 

435 return self.roiAmpNames[detName] 

436 

437 def getGuiderDetNum(self, detName: str) -> int: 

438 """ 

439 Return detector number for a guider detector name. 

440 

441 Parameters 

442 ---------- 

443 detName : `str` 

444 Guider detector name. 

445 

446 Returns 

447 ------- 

448 detNum : `int` 

449 The numeric detector id. 

450 """ 

451 if detName not in self.guiderNameMap: 

452 raise ValueError(f"Detector {detName} not found in guiderNameMap.") 

453 return self.guiderNameMap[detName] 

454 

455 def getWcs(self, detName: str) -> SkyWcs: 

456 """ 

457 Return the wcs for a guider detector. 

458 

459 Parameters 

460 ---------- 

461 detName : `str` 

462 Guider detector name. 

463 

464 Returns 

465 ------- 

466 wcs : `SkyWcs` 

467 SkyWcs for the detector. 

468 """ 

469 try: 

470 return self.wcsMap[detName] 

471 except KeyError as e: 

472 raise KeyError(f"{detName!r} not found in wcsMap") from e 

473 

474 @computed_field # type: ignore[prop-decorator] 

475 @cached_property 

476 def obsTime(self) -> Time: 

477 """ 

478 Observation start time in TAI. 

479 

480 Returns 

481 ------- 

482 obsTime : `Time` 

483 Start time parsed from metadata (TAI scale). 

484 """ 

485 gdstart = self.header["start_time"] 

486 return Time(gdstart, format="isot", scale="tai") 

487 

488 @computed_field # type: ignore[prop-decorator] 

489 @cached_property 

490 def missingStampsMap(self) -> dict[str, int]: 

491 """ 

492 Number of missing stamps in the primary detector. 

493 

494 Returns 

495 ------- 

496 nMissing : dict[str, int] 

497 Count of missing stamps (NaN timestamps) in detNameMax. 

498 """ 

499 nMissing: dict[str, int] = {} 

500 for detName, timestamps in self.timestampMap.items(): 

501 nMissing[detName] = np.sum(timestamps.mask) 

502 return nMissing 

503 

504 @computed_field # type: ignore[prop-decorator] 

505 @cached_property 

506 def nMissingStamps(self) -> int: 

507 """ 

508 Total number of missing stamps across all detectors. 

509 

510 Returns 

511 ------- 

512 nMissing : int 

513 Total count of missing stamps across all detectors. 

514 """ 

515 return sum(self.missingStampsMap.values()) 

516 

517 @computed_field # type: ignore[prop-decorator] 

518 @cached_property 

519 def alt(self) -> float: 

520 """Return altitude (el_start) from the guider data header.""" 

521 raw = self.header.get("el_start") 

522 if raw is None: 

523 raise KeyError("Header key 'el_start' is missing or None") 

524 return float(raw) 

525 

526 @computed_field # type: ignore[prop-decorator] 

527 @cached_property 

528 def az(self) -> float: 

529 """Return azimuth (az_start) from the guider data header.""" 

530 raw = self.header.get("az_start") 

531 if raw is None: 

532 raise KeyError("Header key 'az_start' is missing or None") 

533 return float(raw) 

534 

535 # Iterable / dict-like helpers 

536 def __iter__(self): 

537 """Iterate over detector names in guiderNames order.""" 

538 return iter(self.guiderNames) 

539 

540 def items(self): 

541 """Yield (detName, stamps) pairs like dict.items().""" 

542 for det in self.guiderNames: 

543 yield det, self.stampsMap[det] 

544 

545 def keys(self): 

546 """Iterate over detector names (dict-like .keys()).""" 

547 return iter(self.guiderNames) 

548 

549 def values(self): 

550 """Iterate over Stamps objects in guiderNames order.""" 

551 for det in self.guiderNames: 

552 yield self.stampsMap[det] 

553 

554 def __getitem__(self, key): 

555 """ 

556 Direct stamp access helper. 

557 

558 Example usage: 

559 gd["R44_SG0"] -> full `Stamps` object 

560 gd["R44_SG0", 3] -> stamp #3 ndarray 

561 gd[3] -> stamp #3 from `detNameMax` 

562 

563 Parameters 

564 ---------- 

565 key : `str | int | slice | tuple` 

566 Access pattern: 

567 - 'DET' -> Stamps 

568 - ('DET', i) -> single stamp ndarray 

569 - i (int) -> stamp i from primary detector 

570 - ('DET', slice) -> list of ndarrays 

571 

572 Returns 

573 ------- 

574 result : `Stamps | np.ndarray | list[np.ndarray]` 

575 Retrieved object according to key specification. 

576 """ 

577 # Single detector name -> full Stamps 

578 if isinstance(key, str): 

579 return self.stampsMap[key] 

580 

581 # idx only -> assume detNameMax 

582 if isinstance(key, (int, slice)): 

583 key = (self.detNameMax, key) 

584 

585 if isinstance(key, tuple): 

586 if len(key) != 2: 

587 raise TypeError("Key must be (detName, idx) or (idx,)") 

588 detName, idx = key[0], key[1] 

589 

590 if detName not in self.stampsMap: 

591 raise KeyError(f"{detName!r} not found in stampsMap") 

592 

593 stamps = self.stampsMap[detName] 

594 # slice returns list of ndarrays 

595 if isinstance(idx, slice): 

596 arrays = [self._processStampArray(stamp, detName) for stamp in stamps[idx]] 

597 return arrays 

598 # int -> single ndarray 

599 return self._processStampArray(stamps[idx], detName) 

600 

601 raise TypeError("Invalid key type for GuiderData indexing.") 

602 

603 def _processStampArray(self, stamp: Stamp, detName: str) -> np.ndarray: 

604 """ 

605 Convert a Stamp to an ndarray with optional median-row subtraction. 

606 

607 Parameters 

608 ---------- 

609 stamp : `Stamp` 

610 Input stamp object. 

611 detName : `str` 

612 Detector name (for row-axis logic). 

613 

614 Returns 

615 ------- 

616 array : `np.ndarray` 

617 2D image array (bias-corrected if configured). 

618 """ 

619 arr = stamp.stamp_im.image.array 

620 return arr 

621 

622 @computed_field # type: ignore[prop-decorator] 

623 @cached_property 

624 def plotter(self) -> GuiderPlotter: 

625 return GuiderPlotter(self) 

626 

627 def plotStamp( 

628 self, detName: str, stampNum: int, plo: float = 90, phi: float = 99.5, figsize=(10, 8) 

629 ) -> plt.Figure: 

630 """ 

631 Plot a single guider stamp. 

632 

633 Parameters 

634 ---------- 

635 detName : `str` 

636 Detector name. 

637 stampNum : `int` 

638 Stamp index. 

639 plo : `float`, optional 

640 Lower percentile stretch. 

641 phi : `float`, optional 

642 Upper percentile stretch. 

643 figsize : `tuple`, optional 

644 Figure size. 

645 

646 Returns 

647 ------- 

648 fig : `plt.Figure` 

649 The resulting figure. 

650 """ 

651 fig, axs = make_subplot(nrows=1, ncols=1, figsize=figsize) 

652 img = self[detName, stampNum] if stampNum > 0 else self.getStampArrayCoadd(detName) 

653 _ = axs.imshow( 

654 img, 

655 origin="lower", 

656 vmin=np.nanpercentile(img, plo), 

657 vmax=np.nanpercentile(img, phi), 

658 cmap="Greys", 

659 ) 

660 _ = labelDetector(axs, detName) 

661 axs.set_xlabel("X (pixels)", fontsize=11) 

662 axs.set_ylabel("Y (pixels)", fontsize=11) 

663 axs.set_title(f"{self.expid}") 

664 return fig 

665 

666 

667class GuiderReader: 

668 """ 

669 Utility to fetch LSST guider data via Butler. 

670 

671 Example: 

672 from lsst.summit.utils.guiders.reading import GuiderReader 

673 from lsst.daf.butler import Butler 

674 butler = Butler("embargo", collections="LSSTCam/raw/guider") 

675 

676 seqNum, dayObs = 461, 20250425 

677 reader = GuiderReader(butler, view="dvcs") 

678 guiderData = reader.get(dayObs=dayObs, seqNum=seqNum) 

679 """ 

680 

681 def __init__(self, butler: Butler, view: str = "dvcs"): 

682 self.butler = butler 

683 self.view = view 

684 self.log = logging.getLogger(__name__) 

685 # Define camera objects 

686 self.camera = LsstCam.getCamera() 

687 

688 # Build guiderNameMap 

689 self.guiderNameMap: dict[str, int] = {} 

690 for detector in self.camera: 

691 if detector.getType() == cameraGeom.DetectorType.GUIDER: 

692 detName = detector.getName() 

693 self.guiderNameMap[detName] = detector.getId() 

694 

695 self.guiderDetNames = list(self.guiderNameMap.keys()) 

696 self.nGuiders = len(self.guiderNameMap) 

697 

698 def get( 

699 self, 

700 dayObs: int, 

701 seqNum: int, 

702 doSubtractMedian: bool = True, 

703 columnMaskK: float = 50.0, 

704 scienceDetNum: int = 94, 

705 ) -> GuiderData: 

706 """ 

707 Retrieve guider data for a given dayObs / seqNum. 

708 

709 Parameters 

710 ---------- 

711 dayObs : `int` 

712 Day of observation in YYYYMMDD format. 

713 seqNum : `int` 

714 Sequence number. 

715 doSubtractMedian : `bool`, optional 

716 If True, subtract median row bias from each stamp. 

717 columnMaskK : `float`, optional 

718 Threshold factor for column mask detection. 

719 scienceDetNum : `int`, optional 

720 Science detector number for WCS reference. 

721 

722 Returns 

723 ------- 

724 guiderData : `GuiderData` 

725 Assembled guider data object. 

726 """ 

727 # Check if the guider name is swapped (dayObs < 20250509) 

728 # modifies self.guiderNameMap in place if necessary 

729 self.applyGuiderNameSwapIfNeeded(dayObs) 

730 

731 rawStampsDict = self.getGuiderRawStamps(dayObs, seqNum) 

732 

733 # Determine the maximum number of stamps among all guiders 

734 nStampsList = [len(stamps) for stamps in rawStampsDict.values()] 

735 nStamps = max(nStampsList) 

736 

737 if nStamps <= 1: 

738 raise RuntimeError( 

739 f"Only {nStamps} stamps found for dayObs {dayObs}, seqNum {seqNum}. " 

740 "At least 2 stamps are required to create GuiderData." 

741 ) 

742 

743 visitInfo = getVisitInfo(self.butler, dayObs, seqNum, scienceDetNum) 

744 wcsMapDict = makeInitGuiderWcs(self.camera, visitInfo) 

745 camRotAngle = getCamRotAngle(visitInfo) 

746 

747 processedStampsDict = self.processStamps( 

748 rawStampsDict, 

749 doSubtractMedian, 

750 columnMaskK, 

751 ) 

752 guiderData = GuiderData( 

753 seqNum=seqNum, 

754 dayObs=dayObs, 

755 view=self.view, 

756 rawStampsMap=processedStampsDict, 

757 guiderNameMap=self.guiderNameMap, 

758 wcsMap=wcsMapDict, 

759 camRotAngle=camRotAngle, 

760 isMedianSubtracted=doSubtractMedian, 

761 columnMaskK=columnMaskK, 

762 ) 

763 return guiderData 

764 

765 def getGuiderRawStamps(self, dayObs: int, seqNum: int) -> dict[str, Stamps]: 

766 """ 

767 Fetch raw guider Stamps for all detectors. 

768 

769 Parameters 

770 ---------- 

771 dayObs : `int` 

772 Observation day (YYYYMMDD). 

773 seqNum : `int` 

774 Sequence number. 

775 

776 Returns 

777 ------- 

778 rawStamps : `dict[str, Stamps]` 

779 Mapping from detector name to raw Stamps object. 

780 """ 

781 rawStamps: dict[str, Stamps] = {} 

782 for detName, detNum in self.guiderNameMap.items(): 

783 try: 

784 rawStamps[detName] = self.butler.get( 

785 "guider_raw", 

786 day_obs=dayObs, 

787 seq_num=seqNum, 

788 detector=detNum, 

789 instrument="LSSTCam", 

790 ) 

791 except DatasetNotFoundError as e: 

792 raise DatasetNotFoundError(f"No data for {detName} on {dayObs=} {seqNum=}") from e 

793 return rawStamps 

794 

795 def processStamps( 

796 self, 

797 rawStampsDict: dict[str, Stamps], 

798 doSubtractMedian: bool, 

799 columnMaskK: float, 

800 ) -> dict[str, Stamps]: 

801 """ 

802 Apply median row bias subtraction and per-stamp column masking. 

803 

804 

805 Parameters 

806 ---------- 

807 rawStampsDict : `dict[str, Stamps]` 

808 ROI view of stamps from butler `guider_raw`. 

809 doSubtractMedian : `bool`, optional 

810 If True, subtract median row bias. 

811 columnMaskK : `float`, optional 

812 Threshold factor for column mask detection. 

813 

814 Returns 

815 ------- 

816 processedStamps : `dict[str, Stamps]` 

817 New Stamps objects with processed (copied) data. 

818 Per-stamp column masks are included in each stamp's MaskedImageF. 

819 """ 

820 processedStamps: dict[str, Stamps] = {} 

821 

822 for detName, stamps in rawStampsDict.items(): 

823 if len(stamps) == 0: 

824 processedStamps[detName] = stamps 

825 continue 

826 

827 stampList: list[Stamp] = [] 

828 for i in range(len(stamps)): 

829 # Work on a copy - never modify original 

830 data = stamps[i].stamp_im.image.array.copy() 

831 

832 # Compute median row bias 

833 medianRows = np.nanmedian(data, axis=0) 

834 medianValue = np.nanmedian(data) 

835 # Replace NaN medians (fully masked columns) with global median 

836 medianRows = np.where(np.isnan(medianRows), medianValue, medianRows) 

837 

838 # Compute column mask on bias-subtracted data 

839 colMask = getColumnMask(data - medianRows[np.newaxis, :], k=columnMaskK) 

840 

841 # Create mask array for MaskedImageF (BAD=1 for masked cols) 

842 nRows, nCols = data.shape 

843 maskArray = np.zeros((nRows, nCols), dtype=np.uint32) 

844 maskArray[colMask] = 1 # Set BAD bit for masked columns 

845 

846 # Apply median row bias subtraction 

847 if doSubtractMedian: 

848 data = data - medianRows[np.newaxis, :] 

849 

850 # Fill masked columns with global median 

851 if colMask.any(): 

852 medianValue = np.nanmedian(data[~colMask]) if (~colMask).any() else np.nan 

853 data[colMask] = medianValue 

854 

855 # Create MaskedImageF with image and mask 

856 image = ImageF(array=data.astype(np.float32)) 

857 mask = MaskX(nCols, nRows) 

858 mask.array[:] = maskArray 

859 outImg = MaskedImageF(image, mask) 

860 

861 stampList.append( 

862 Stamp( 

863 stamp_im=outImg, 

864 archive_element=stamps[i].archive_element, 

865 metadata=stamps[i].metadata, 

866 ) 

867 ) 

868 

869 processedStamps[detName] = Stamps(stampList, stamps.metadata, use_mask=True, use_archive=True) 

870 

871 return processedStamps 

872 

873 def getAxisRowMap(self) -> dict[str, int]: 

874 """ 

875 Axis mapping (row axis) for guider detectors. 

876 

877 Returns 

878 ------- 

879 axisMap : `dict[str, int]` 

880 Dictionary with detector names as keys and axis mapping as values. 

881 The axis mapping is 1 for X and 0 for Y. 

882 """ 

883 camera = LsstCam.getCamera() 

884 axisMap: dict[str, int] = {} 

885 for detName, detNum in self.guiderNameMap.items(): 

886 # Get the detector object from the camera 

887 detector = camera[detNum] 

888 # Determine the row axis based on the orientation 

889 nq = detector.getOrientation().getNQuarter() 

890 axisMap[detName] = 0 if nq % 2 == 0 else 1 

891 return axisMap 

892 

893 def applyGuiderNameSwapIfNeeded(self, dayObs: int) -> None: 

894 """ 

895 Apply guider name swap (SG0/SG1) for early data if required. 

896 

897 Parameters 

898 ---------- 

899 dayObs : `int` 

900 Observation day (YYYYMMDD) used to decide whether to swap. 

901 """ 

902 if getattr(self, "_guiderNameMapSwapped", False): 

903 return # Already swapped; do nothing 

904 if dayObs < 20250509: 

905 newMap = {} 

906 for detName, detNum in self.guiderNameMap.items(): 

907 if detName.endswith("SG0"): 

908 swapped = detName.replace("SG0", "SG1") 

909 elif detName.endswith("SG1"): 

910 swapped = detName.replace("SG1", "SG0") 

911 else: 

912 swapped = detName 

913 swappedDetNum = self.camera[swapped].getId() 

914 newMap[swapped] = swappedDetNum 

915 self.guiderNameMap = newMap 

916 self._guiderNameMapSwapped = True 

917 

918 

919def getVisitInfo(butler: Butler, dayObs: int, seqNum: int, scienceDetNum: int) -> VisitInfo: 

920 """ 

921 Retrieve VisitInfo for a given dayObs / seqNum. 

922 

923 Parameters 

924 ---------- 

925 butler : `Butler` 

926 Active Butler instance. 

927 dayObs : `int` 

928 Observation day (YYYYMMDD). 

929 seqNum : `int` 

930 Sequence number. 

931 scienceDetNum : `int` 

932 Science detector number. 

933 

934 Returns 

935 ------- 

936 visitInfo : `VisitInfo` 

937 VisitInfo object retrieved from butler. 

938 """ 

939 dataId = { 

940 "instrument": "LSSTCam", 

941 "day_obs": dayObs, 

942 "seq_num": seqNum, 

943 "detector": scienceDetNum, 

944 } 

945 visitInfo = butler.get("raw.visitInfo", dataId) 

946 return visitInfo 

947 

948 

949# Helper functions for stamp conversion 

950def _makeRoiTransforms(metadata: dict, detector: Detector, camera: Camera) -> tuple[tuple, str]: 

951 """ 

952 Construct ROI transforms and derive amplifier name. 

953 

954 Parameters 

955 ---------- 

956 metadata : `dict` 

957 Exposure-level metadata. 

958 detector : `Detector` 

959 The detector. 

960 camera : `Camera` 

961 The camera object. 

962 

963 Returns 

964 ------- 

965 result : `tuple[tuple, str]` 

966 ((ccdViewBbox, fwd, back), ampName) 

967 """ 

968 ampName = "C" + metadata["ROISEG"][7:] 

969 ccdViewBbox = makeRoiBbox(metadata, camera) 

970 fwd, back = makeCcdToDvcsTransform(ccdViewBbox, detector.getOrientation().getNQuarter()) 

971 return (ccdViewBbox, fwd, back), ampName 

972 

973 

974def _convertMaskedImage( 

975 maskedImage: MaskedImageF, 

976 stampMetadata: dict[str, Any], 

977 metadata: dict[str, Any], 

978 transforms: tuple, 

979 detector: Detector, 

980 ampName: str, 

981 camera: Camera, 

982 view: str, 

983) -> Stamp: 

984 """ 

985 Convert one masked image ROI and build a Stamp. 

986 

987 Parameters 

988 ---------- 

989 maskedImage : `MaskedImageF` 

990 Input masked image for the ROI. 

991 stampMetadata : `dict` 

992 Stamp-level metadata (modified in-place). 

993 metadata : `dict` 

994 Exposure-level metadata. 

995 transforms : `tuple` 

996 Tuple containing (ccdViewBbox, forwardTransform, inverseTransform). 

997 detector : `Detector` 

998 Camera detector. 

999 ampName : `str` 

1000 Amplifier name. 

1001 camera : `Camera` 

1002 Camera object. 

1003 view : `str` 

1004 Output view ('dvcs', 'ccd', or 'roi'). 

1005 

1006 Returns 

1007 ------- 

1008 stamp : `Stamp` 

1009 Converted stamp object. 

1010 """ 

1011 ccdViewBbox, fwd, back = transforms 

1012 rawArray = maskedImage.getImage().getArray() 

1013 dvcsArray = roiImageToDvcs(rawArray, metadata, detector, ampName, camera, view=view) 

1014 outImg = MaskedImageF(dvcsArray) 

1015 archiveElement = [ccdViewBbox, fwd, back] 

1016 return Stamp(outImg, archiveElement, metadata=stampMetadata) 

1017 

1018 

1019def _blankStamp( 

1020 stampIdx: int, 

1021 metadata: dict[str, Any], 

1022 transforms: tuple, 

1023) -> Stamp: 

1024 """ 

1025 Create a blank (zero) stamp for a missing index. 

1026 

1027 Parameters 

1028 ---------- 

1029 stampIdx : `int` 

1030 Missing stamp index. 

1031 metadata : `dict` 

1032 Original metadata container. 

1033 transforms : `tuple` 

1034 (ccdViewBbox, forwardTransform, inverseTransform). 

1035 

1036 Returns 

1037 ------- 

1038 stamp : `Stamp` 

1039 Placeholder blank stamp with NaN timestamp. 

1040 """ 

1041 ccdViewBbox, fwd, back = transforms 

1042 nRows, nCols = int(metadata["ROIROWS"]), int(metadata["ROICOLS"]) 

1043 blankArray = np.zeros((nRows, nCols), dtype=np.float32) 

1044 blankImg = ExposureF(MaskedImageF(ImageF(array=blankArray))) 

1045 missingMetadata = metadata.copy() 

1046 missingMetadata["DAQSTAMP"] = stampIdx 

1047 missingMetadata["STMPTMJD"] = np.nan 

1048 archiveElement = [ccdViewBbox, fwd, back] 

1049 return Stamp(blankImg, archiveElement, metadata=missingMetadata) 

1050 

1051 

1052def convertRawStampsToView( 

1053 rawStamps: Stamps, 

1054 detName: str, 

1055 nStamps: int, 

1056 view: str = "dvcs", 

1057) -> Stamps: 

1058 """ 

1059 Convert guider stamps from raw ROI to a requested view ('dvcs', 'ccd', or 

1060 'roi'). Handles missing stamps and preserves metadata and order. 

1061 

1062 Parameters 

1063 ---------- 

1064 rawStamps : `Stamps` 

1065 Input raw ROI stamps. 

1066 detName : `str` 

1067 Detector name. 

1068 nStamps : `int` 

1069 Target total number of stamps (after filling gaps). 

1070 view : `str`, optional 

1071 Output view: 'dvcs', 'ccd', or 'roi'. 

1072 

1073 Returns 

1074 ------- 

1075 stampsOut : `Stamps` 

1076 Converted stamps with gaps filled by blank stamps. 

1077 """ 

1078 camera = LsstCam.getCamera() 

1079 detector = camera[detName] 

1080 metadata = rawStamps.metadata 

1081 metadataDict = metadata.toDict() 

1082 

1083 # Ensure CCDSLOT matches swapped name if applicable 

1084 metadata["CCDSLOT"] = detName[4:7] 

1085 

1086 # Align timestamps and find valid/missing indices 

1087 timestamps = standardizeGuiderTimestamps(rawStamps, nStamps) 

1088 validIndices = np.where(~timestamps.mask)[0].tolist() 

1089 missingIndices = np.where(timestamps.mask)[0].tolist() 

1090 

1091 # Pre‑compute transforms once 

1092 transforms, ampName = _makeRoiTransforms(metadataDict, detector, camera) 

1093 

1094 stampsDict: dict[int, Stamp] = {} 

1095 mIdx = 0 # index into masked images list 

1096 

1097 # Column masking is now done in GuiderReader.processStamps() before this 

1098 # function is called, so we no longer need to compute or apply it here. 

1099 

1100 for idx in validIndices: 

1101 maskedImage = rawStamps.getMaskedImages()[mIdx] 

1102 stampMeta = rawStamps[mIdx].metadata 

1103 stampMeta["DAQSTAMP"] = stampMeta.get("DAQSTAMP", idx) 

1104 stampsDict[idx] = _convertMaskedImage( 

1105 maskedImage, stampMeta, metadataDict, transforms, detector, ampName, camera, view 

1106 ) 

1107 mIdx += 1 

1108 

1109 # Fill gaps with blanks 

1110 for idx in missingIndices: 

1111 stampsDict[idx] = _blankStamp(idx, metadataDict, transforms) 

1112 

1113 # Assemble in order 

1114 stampList = [stampsDict[i] for i in range(nStamps)] 

1115 return Stamps(stampList, metadata, use_archive=True) 

1116 

1117 

1118def standardizeGuiderTimestamps(rawStamps: Stamps, nStamps: int) -> Time: 

1119 """ 

1120 Return a masked `Time` array of length `nStamps` for one guider. 

1121 

1122 Missing stamps are filled with `NaN` and masked so every detector shares a 

1123 uniform, sortable timestamp vector. Result is in MJD, `scale="tai"`. 

1124 

1125 Parameters 

1126 ---------- 

1127 rawStamps : `Stamps` 

1128 Input stamps (may have missing indices). 

1129 nStamps : `int` 

1130 Desired total length (maximum among guiders). 

1131 

1132 Returns 

1133 ------- 

1134 fullTimestamps : `Time` 

1135 Masked Time array (scale='tai') with inferred cadence and gaps masked. 

1136 """ 

1137 timestampsList = [stamp.metadata.get("STMPTMJD", np.nan) for stamp in rawStamps] 

1138 mjdArray = np.ma.masked_invalid(timestampsList) 

1139 timestamps = Time(mjdArray, format="mjd", scale="tai") 

1140 

1141 # infer frequency from valid timestamps 

1142 jdDiff = np.diff(timestamps.jd) 

1143 freqDays = np.nanmedian(jdDiff) 

1144 startJd = timestamps[0].jd 

1145 

1146 if np.isnan(freqDays) or freqDays <= 0: 

1147 raise ValueError( 

1148 f"Invalid frequency {freqDays} derived from timestamps. " 

1149 "Ensure that the timestamps are valid and evenly spaced." 

1150 ) 

1151 

1152 timestampsIdeal = Time(startJd + np.arange(nStamps) * freqDays, format="jd", scale="tai") 

1153 

1154 tolerance = 0.5 * freqDays 

1155 actualJd = timestamps.jd 

1156 

1157 # Build aligned timestamp list with np.nan where missing 

1158 mjdList = [] 

1159 for idealJd in timestampsIdeal.jd: 

1160 # check if any actual timestamp is close to this ideal timestamp 

1161 if np.any(np.abs(actualJd - idealJd) < tolerance): 

1162 # match found — find the closest 

1163 idx = np.argmin(np.abs(actualJd - idealJd)) 

1164 mjdList.append(actualJd[idx]) 

1165 else: 

1166 # no match — this is a missing timestamp 

1167 mjdList.append(np.nan) 

1168 

1169 mjdArray = np.ma.masked_invalid(mjdList) 

1170 fullTimestamps = Time(mjdArray, format="mjd", scale="tai") 

1171 fullTimestamps.freq = freqDays 

1172 return fullTimestamps 

1173 

1174 

1175@dataclass 

1176class WeatherInfo: 

1177 """ 

1178 Container for weather information. 

1179 

1180 Parameters 

1181 ---------- 

1182 temperature : `float` 

1183 Temperature in degrees Celsius. 

1184 humidity : `float` 

1185 Relative humidity in percent. 

1186 pressure : `float` 

1187 Atmospheric pressure in hPa. 

1188 windSpeed : `float` 

1189 Wind speed in m/s. 

1190 windDir : `float` 

1191 Wind direction in degrees from North. 

1192 """ 

1193 

1194 temperature: float 

1195 humidity: float 

1196 pressure: float 

1197 windSpeed: float 

1198 windDir: float 

1199 seeing: float 

1200 

1201 @staticmethod 

1202 def fromMetadata(metadata: dict) -> WeatherInfo: 

1203 """ 

1204 Create a WeatherInfo object from metadata dictionary. 

1205 

1206 Parameters 

1207 ---------- 

1208 metadata : `dict` 

1209 Metadata dictionary containing weather keys. 

1210 

1211 Returns 

1212 ------- 

1213 weather : `WeatherInfo` 

1214 Parsed weather information. 

1215 """ 

1216 return WeatherInfo( 

1217 temperature=metadata_to_float(metadata, "AIRTEMP", np.nan), 

1218 humidity=metadata_to_float(metadata, "HUMIDITY", np.nan), 

1219 pressure=metadata_to_float(metadata, "PRESSURE", np.nan), 

1220 windSpeed=metadata_to_float(metadata, "WINDSPD", np.nan), 

1221 windDir=metadata_to_float(metadata, "WINDDIR", np.nan), 

1222 seeing=metadata_to_float(metadata, "SEEING", np.nan), 

1223 ) 

1224 

1225 

1226def metadata_to_float(metadata: dict, key: str, default: float = np.nan) -> float: 

1227 """ 

1228 Safely convert a metadata value to float. 

1229 

1230 Parameters 

1231 ---------- 

1232 metadata : `dict` 

1233 Metadata dictionary. 

1234 key : `str` 

1235 Key to look up in the metadata. 

1236 default : `float`, optional 

1237 Default value if key is missing or conversion fails. 

1238 

1239 Returns 

1240 ------- 

1241 value : `float` 

1242 Converted float value or default. 

1243 """ 

1244 value = metadata[key] 

1245 if value is None: 

1246 return default 

1247 else: 

1248 return float(value) 

1249 

1250 

1251def mad(x: np.ndarray) -> float: 

1252 """ 

1253 Median absolute deviation metric for std. 

1254 

1255 Parameters 

1256 ---------- 

1257 x : `ndarray` 

1258 Input array. 

1259 

1260 Returns 

1261 ------- 

1262 value : `float` 

1263 Robust standard deviation 

1264 """ 

1265 med = np.nanmedian(x) 

1266 return 1.4826 * np.nanmedian(np.abs(x - med)) 

1267 

1268 

1269def getColumnMask(img: NDArray[np.floating], k: float = 6.0) -> NDArray[np.bool_]: 

1270 """Return a boolean mask for bad columns based on robust median statistics. 

1271 

1272 Parameters 

1273 ---------- 

1274 img : `ndarray` 

1275 2D image array. 

1276 k : `float`, optional 

1277 Threshold factor for MAD. 

1278 

1279 Returns 

1280 ------- 

1281 mask : `ndarray` 

1282 Boolean mask of same shape as img, ``True`` for bad columns. 

1283 """ 

1284 columnMedian = np.nanmedian(img, axis=0) 

1285 center = np.nanmedian(columnMedian) 

1286 

1287 # MAD of column medians 

1288 diffs = np.abs(columnMedian - center) 

1289 s = np.nanmedian(diffs) 

1290 

1291 if s == 0 or not np.isfinite(s): 

1292 return np.zeros(img.shape, dtype=bool) 

1293 

1294 badCols = diffs > k * s 

1295 return np.broadcast_to(badCols, img.shape).copy()