Coverage for python / lsst / pipe / tasks / brightStarSubtraction / brightStarCutout.py: 28%

156 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-18 09:04 +0000

1# This file is part of pipe_tasks. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21 

22__all__ = ["BrightStarCutoutConnections", "BrightStarCutoutConfig", "BrightStarCutoutTask"] 

23 

24import astropy.units as u 

25import numpy as np 

26from astropy.coordinates import SkyCoord 

27from astropy.table import Table 

28 

29from lsst.afw.cameraGeom import FIELD_ANGLE, FOCAL_PLANE, PIXELS 

30from lsst.afw.detection import footprintsToNumpy 

31from lsst.afw.geom import makeModifiedWcs 

32from lsst.afw.geom.transformFactory import makeTransform 

33from lsst.afw.image import ExposureF, MaskedImageF 

34from lsst.afw.math import BackgroundList, FixedKernel, WarpingControl, warpImage 

35from lsst.afw.table import SourceCatalog 

36from lsst.daf.base import PropertyList 

37from lsst.geom import ( 

38 AffineTransform, 

39 Angle, 

40 Box2I, 

41 Extent2D, 

42 Extent2I, 

43 Point2D, 

44 Point2I, 

45 SpherePoint, 

46 arcseconds, 

47 floor, 

48 radians, 

49) 

50from lsst.meas.algorithms import ( 

51 BrightStarStamp, 

52 BrightStarStamps, 

53 KernelPsf, 

54 LoadReferenceObjectsConfig, 

55 ReferenceObjectLoader, 

56 WarpedPsf, 

57) 

58from lsst.pex.config import ChoiceField, ConfigField, Field, ListField 

59from lsst.pipe.base import PipelineTask, PipelineTaskConfig, PipelineTaskConnections, Struct 

60from lsst.pipe.base.connectionTypes import Input, Output, PrerequisiteInput 

61from lsst.utils.timer import timeMethod 

62 

63NEIGHBOR_MASK_PLANE = "NEIGHBOR" 

64 

65 

66class BrightStarCutoutConnections( 

67 PipelineTaskConnections, 

68 dimensions=("instrument", "visit", "detector"), 

69): 

70 """Connections for BrightStarCutoutTask.""" 

71 

72 ref_cat = PrerequisiteInput( 

73 name="the_monster_20250219", 

74 storageClass="SimpleCatalog", 

75 doc="Reference catalog that contains bright star positions.", 

76 dimensions=("skypix",), 

77 multiple=True, 

78 deferLoad=True, 

79 ) 

80 input_exposure = Input( 

81 name="preliminary_visit_image", 

82 storageClass="ExposureF", 

83 doc="Background-subtracted input exposure from which to extract bright star stamp cutouts.", 

84 dimensions=("visit", "detector"), 

85 ) 

86 input_background = Input( 

87 name="preliminary_visit_image_background", 

88 storageClass="Background", 

89 doc="Background model for the input exposure, to be added back on during processing.", 

90 dimensions=("visit", "detector"), 

91 ) 

92 input_source_catalog = Input( 

93 name="single_visit_star_footprints", 

94 storageClass="SourceCatalog", 

95 doc="Source catalog containing footprints on the input exposure, used to mask neighboring sources.", 

96 dimensions=("visit", "detector"), 

97 ) 

98 bright_star_stamps = Output( 

99 name="bright_star_stamps", 

100 storageClass="BrightStarStamps", 

101 doc="Set of preprocessed postage stamp cutouts, each centered on a single bright star.", 

102 dimensions=("visit", "detector"), 

103 ) 

104 

105 

106class BrightStarCutoutConfig( 

107 PipelineTaskConfig, 

108 pipelineConnections=BrightStarCutoutConnections, 

109): 

110 """Configuration parameters for BrightStarCutoutTask.""" 

111 

112 # Star selection 

113 mag_range = ListField[float]( 

114 doc="Magnitude range in Gaia G. Cutouts will be made for all stars in this range.", 

115 default=[10, 18], 

116 ) 

117 exclude_arcsec_radius = Field[float]( 

118 doc="No postage stamp will be generated for stars with a neighboring star in the range " 

119 "``exclude_mag_range`` mag within ``exclude_arcsec_radius`` arcseconds.", 

120 default=5, 

121 ) 

122 exclude_mag_range = ListField[float]( 

123 doc="No postage stamp will be generated for stars with a neighboring star in the range " 

124 "``exclude_mag_range`` mag within ``exclude_arcsec_radius`` arcseconds.", 

125 default=[0, 20], 

126 ) 

127 min_area_fraction = Field[float]( 

128 doc="Minimum fraction of the stamp area, post-masking, that must remain for a cutout to be retained.", 

129 default=0.1, 

130 ) 

131 bad_mask_planes = ListField[str]( 

132 doc="Mask planes that identify excluded pixels for the calculation of ``min_area_fraction``.", 

133 default=[ 

134 "BAD", 

135 "CR", 

136 "CROSSTALK", 

137 "EDGE", 

138 "NO_DATA", 

139 "SAT", 

140 "SUSPECT", 

141 "UNMASKEDNAN", 

142 NEIGHBOR_MASK_PLANE, 

143 ], 

144 ) 

145 min_focal_plane_radius = Field[float]( 

146 doc="Minimum distance to the center of the focal plane, in mm. " 

147 "Stars with a focal plane radius smaller than this will be omitted.", 

148 default=0.0, 

149 ) 

150 max_focal_plane_radius = Field[float]( 

151 doc="Maximum distance to the center of the focal plane, in mm. " 

152 "Stars with a focal plane radius larger than this will be omitted.", 

153 default=np.inf, 

154 ) 

155 

156 # Stamp geometry 

157 stamp_size = ListField[int]( 

158 doc="Size of the stamps to be extracted, in pixels.", 

159 default=[251, 251], 

160 ) 

161 warping_kernel_name = ChoiceField[str]( 

162 doc="Warping kernel for image data warping.", 

163 default="lanczos5", 

164 allowed={ 

165 "bilinear": "bilinear interpolation", 

166 "lanczos3": "Lanczos kernel of order 3", 

167 "lanczos4": "Lanczos kernel of order 4", 

168 "lanczos5": "Lanczos kernel of order 5", 

169 }, 

170 ) 

171 mask_warping_kernel_name = ChoiceField[str]( 

172 doc="Warping kernel for mask warping. Typically a more conservative kernel (e.g. with less ringing) " 

173 "is desirable for warping masks than for warping image data.", 

174 default="bilinear", 

175 allowed={ 

176 "bilinear": "bilinear interpolation", 

177 "lanczos3": "Lanczos kernel of order 3", 

178 "lanczos4": "Lanczos kernel of order 4", 

179 "lanczos5": "Lanczos kernel of order 5", 

180 }, 

181 ) 

182 

183 # Misc 

184 load_reference_objects_config = ConfigField[LoadReferenceObjectsConfig]( 

185 doc="Reference object loader for astrometric calibration.", 

186 ) 

187 ref_cat_filter_name = Field[str]( 

188 doc="Name of the filter in the reference catalog to use for star selection. ", 

189 default="phot_g_mean", 

190 ) 

191 

192 

193class BrightStarCutoutTask(PipelineTask): 

194 """Extract bright star cutouts, and warp to the same pixel grid. 

195 

196 The BrightStarCutoutTask is used to extract, process, and store small image 

197 cutouts (or "postage stamps") around bright stars. 

198 This task essentially consists of two principal steps. 

199 First, it identifies bright stars within an exposure using a reference 

200 catalog and extracts a stamp around each. 

201 Second, it shifts and warps each stamp to remove optical distortions and 

202 sample all stars on the same pixel grid. 

203 """ 

204 

205 ConfigClass = BrightStarCutoutConfig 

206 _DefaultName = "brightStarCutout" 

207 config: BrightStarCutoutConfig 

208 

209 def runQuantum(self, butlerQC, inputRefs, outputRefs): 

210 inputs = butlerQC.get(inputRefs) 

211 ref_obj_loader = ReferenceObjectLoader( 

212 dataIds=[ref.datasetRef.dataId for ref in inputRefs.ref_cat], 

213 refCats=inputs.pop("ref_cat"), 

214 name=self.config.connections.ref_cat, 

215 config=self.config.load_reference_objects_config, 

216 ) 

217 output = self.run(**inputs, ref_obj_loader=ref_obj_loader) 

218 # Only ingest Stamp if it exists; prevents ingesting an empty FITS file 

219 if output: 

220 butlerQC.put(output, outputRefs) 

221 

222 @timeMethod 

223 def run( 

224 self, 

225 input_exposure: ExposureF, 

226 input_background: BackgroundList, 

227 input_source_catalog: SourceCatalog, 

228 ref_obj_loader: ReferenceObjectLoader, 

229 ): 

230 """Identify bright stars within an exposure using a reference catalog, 

231 extract stamps around each and warp/shift stamps onto a common frame. 

232 

233 Parameters 

234 ---------- 

235 input_exposure : `~lsst.afw.image.ExposureF` 

236 The background-subtracted image to extract bright star stamps. 

237 input_background : `~lsst.afw.math.BackgroundList` 

238 The background model associated with the input exposure. 

239 input_source_catalog : `~lsst.afw.table.SourceCatalog` 

240 The source catalog containing footprints on the input exposure. 

241 ref_obj_loader : `~lsst.meas.algorithms.ReferenceObjectLoader` 

242 Loader to find objects within a reference catalog. 

243 

244 Returns 

245 ------- 

246 bright_star_stamps : `~lsst.meas.algorithms.BrightStarStamps` 

247 A set of postage stamp cutouts, each centered on a bright star. 

248 """ 

249 bright_stars = self._get_bright_stars(ref_obj_loader, input_exposure) 

250 

251 bright_star_stamps = self._get_bright_star_stamps( 

252 input_exposure, 

253 input_background, 

254 input_source_catalog, 

255 bright_stars, 

256 ) 

257 

258 return Struct(bright_star_stamps=bright_star_stamps) 

259 

260 def _get_bright_stars( 

261 self, 

262 ref_obj_loader: ReferenceObjectLoader, 

263 input_exposure: ExposureF, 

264 ) -> Table: 

265 """Get a table of bright stars from the reference catalog. 

266 

267 Trim the reference catalog to only those objects within the exposure 

268 bounding box. 

269 Then, select bright stars based on the specified magnitude range, 

270 isolation criteria, and optionally focal plane radius criteria. 

271 Finally, add columns with pixel coordinates and focal plane coordinates 

272 for each bright star. 

273 

274 Parameters 

275 ---------- 

276 ref_obj_loader : `~lsst.meas.algorithms.ReferenceObjectLoader` 

277 Loader to find objects within a reference catalog. 

278 input_exposure : `~lsst.afw.image.ExposureF` 

279 The exposure for which bright stars are being selected. 

280 

281 Returns 

282 ------- 

283 bright_stars : `~astropy.table.Table` 

284 Table of bright stars within the exposure. 

285 """ 

286 bbox = input_exposure.getBBox() 

287 wcs = input_exposure.getWcs() 

288 detector = input_exposure.detector 

289 

290 # Load all ref cat stars within the padded exposure bounding box 

291 within_region = ref_obj_loader.loadPixelBox(bbox, wcs, self.config.ref_cat_filter_name) 

292 ref_cat_full = within_region.refCat 

293 flux_field: str = within_region.fluxField 

294 exclude_arcsec_radius = self.config.exclude_arcsec_radius * u.arcsec 

295 

296 # Convert mag ranges to flux in nJy for comparison with ref cat fluxes 

297 flux_range_candidate = sorted(((self.config.mag_range * u.ABmag).to(u.nJy)).to_value()) 

298 flux_range_neighbor = sorted(((self.config.exclude_mag_range * u.ABmag).to(u.nJy)).to_value()) 

299 

300 # Create a subset of ref cat stars that includes all stars that could 

301 # potentially be either a candidate or a neighbor based on flux 

302 flux_min = np.min((flux_range_candidate[0], flux_range_neighbor[0])) 

303 flux_max = np.max((flux_range_candidate[1], flux_range_neighbor[1])) 

304 stars_subset = (ref_cat_full[flux_field] >= flux_min) & (ref_cat_full[flux_field] <= flux_max) 

305 ref_cat_subset_columns = ("id", "coord_ra", "coord_dec", flux_field) 

306 ref_cat_subset = Table(ref_cat_full.extract(*ref_cat_subset_columns, where=stars_subset)) 

307 flux_subset = ref_cat_subset[flux_field] 

308 

309 # Identify candidate bright stars and their neighbors based on flux 

310 is_candidate = (flux_subset >= flux_range_candidate[0]) & (flux_subset <= flux_range_candidate[1]) 

311 is_neighbor = (flux_subset >= flux_range_neighbor[0]) & (flux_subset <= flux_range_neighbor[1]) 

312 

313 # Trim star coordinates to candidate and neighbor subsets 

314 coords = SkyCoord(ref_cat_subset["coord_ra"], ref_cat_subset["coord_dec"], unit="rad") 

315 coords_candidate = coords[is_candidate] 

316 coords_neighbor = coords[is_neighbor] 

317 

318 # Identify candidate bright stars that have no contaminant neighbors 

319 is_candidate_isolated = np.ones(len(coords_candidate), dtype=bool) 

320 if len(coords_neighbor) > 0: 

321 _, indices_candidate, angular_separation, _ = coords_candidate.search_around_sky( 

322 coords_neighbor, exclude_arcsec_radius 

323 ) 

324 indices_candidate = indices_candidate[angular_separation > 0 * u.arcsec] # Exclude self-matches 

325 is_candidate_isolated[indices_candidate] = False 

326 

327 # Trim ref cat subset to isolated bright stars; add ancillary data 

328 bright_stars = ref_cat_subset[is_candidate][is_candidate_isolated] 

329 

330 flux_nanojansky = bright_stars[flux_field][:] * u.nJy 

331 bright_stars["mag"] = flux_nanojansky.to(u.ABmag).to_value() # AB magnitudes 

332 

333 zip_ra_dec = zip(bright_stars["coord_ra"] * radians, bright_stars["coord_dec"] * radians) 

334 sphere_points = [SpherePoint(ra, dec) for ra, dec in zip_ra_dec] 

335 pixel_coords = wcs.skyToPixel(sphere_points) 

336 bright_stars["pixel_x"] = [pixel_coord.x for pixel_coord in pixel_coords] 

337 bright_stars["pixel_y"] = [pixel_coord.y for pixel_coord in pixel_coords] 

338 

339 mm_coords = detector.transform(pixel_coords, PIXELS, FOCAL_PLANE) 

340 mm_coords_x = np.array([mm_coord.x for mm_coord in mm_coords]) 

341 mm_coords_y = np.array([mm_coord.y for mm_coord in mm_coords]) 

342 radius_mm = np.sqrt(mm_coords_x**2 + mm_coords_y**2) 

343 theta_radians = np.arctan2(mm_coords_y, mm_coords_x) 

344 bright_stars["radius_mm"] = radius_mm 

345 bright_stars["theta_radians"] = theta_radians 

346 

347 # Trim bright star catalog to those within the exposure bounding box, 

348 # and optionally within a range of focal plane radii 

349 within_bbox = bright_stars["pixel_x"] >= bbox.getMinX() 

350 within_bbox &= bright_stars["pixel_x"] <= bbox.getMaxX() 

351 within_bbox &= bright_stars["pixel_y"] >= bbox.getMinY() 

352 within_bbox &= bright_stars["pixel_y"] <= bbox.getMaxY() 

353 within_radii = bright_stars["radius_mm"] >= self.config.min_focal_plane_radius 

354 within_radii &= bright_stars["radius_mm"] <= self.config.max_focal_plane_radius 

355 bright_stars = bright_stars[within_bbox & within_radii] 

356 

357 self.log.info( 

358 "Identified %i of %i reference catalog stars that are in the field of view, are in the " 

359 "range %s mag, and that have no neighboring stars in the range %s mag within %s arcsec.", 

360 len(bright_stars), 

361 len(ref_cat_full), 

362 self.config.mag_range, 

363 self.config.exclude_mag_range, 

364 self.config.exclude_arcsec_radius, 

365 ) 

366 

367 return bright_stars 

368 

369 def _get_bright_star_stamps( 

370 self, 

371 input_exposure: ExposureF, 

372 input_background: BackgroundList | None, 

373 footprints: SourceCatalog | np.ndarray, 

374 bright_stars: Table, 

375 ) -> BrightStarStamps | None: 

376 """Extract and warp bright star stamps. 

377 

378 For each bright star, extract a stamp from the input exposure centered 

379 on the star's pixel coordinates. 

380 Then, shift and warp the stamp to recenter on the star and align each 

381 to the same orientation. 

382 Finally, check the fraction of the stamp area that is masked 

383 (e.g. due to neighboring sources or bad pixels), and only retain stamps 

384 with sufficient unmasked area. 

385 

386 Parameters 

387 ---------- 

388 input_exposure : `~lsst.afw.image.ExposureF` 

389 The science image to extract bright star stamps. 

390 input_background : `~lsst.afw.math.BackgroundList` | None 

391 The background model associated with the input exposure. 

392 If provided, this will be added back on to the input image. 

393 footprints : `~lsst.afw.table.SourceCatalog` | `numpy.ndarray` 

394 The source catalog containing footprints on the input exposure, or 

395 a 2D numpy array with the same dimensions as the input exposure 

396 where each pixel value corresponds to the source footprint ID. 

397 bright_stars : `~astropy.table.Table` 

398 Table of bright stars for which to extract stamps. 

399 

400 Returns 

401 ------- 

402 bright_star_stamps : `~lsst.meas.algorithms.BrightStarStamps` | None 

403 A set of postage stamp cutouts, each centered on a bright star. 

404 If no bright star stamps are retained post-masking, returns `None`. 

405 """ 

406 warp_control = WarpingControl(self.config.warping_kernel_name, self.config.mask_warping_kernel_name) 

407 bbox = input_exposure.getBBox() 

408 

409 # Prepare masked image for warping 

410 input_MI = input_exposure.getMaskedImage() 

411 if input_background is not None: 

412 input_MI += input_background.getImage() 

413 input_MI = input_exposure.photoCalib.calibrateImage(input_MI) # to nJy 

414 

415 # Generate unique footprint IDs for NEIGHBOR masking 

416 input_MI.mask.addMaskPlane(NEIGHBOR_MASK_PLANE) 

417 if isinstance(footprints, SourceCatalog): 

418 footprints = footprintsToNumpy(footprints, bbox, asBool=False) 

419 

420 # Establish pixel-to-boresight-pseudopixel transform 

421 pixel_scale = input_exposure.wcs.getPixelScale(bbox.getCenter()).asArcseconds() * arcseconds 

422 pixels_to_boresight_pseudopixels = input_exposure.detector.getTransform(PIXELS, FIELD_ANGLE).then( 

423 makeTransform(AffineTransform.makeScaling(1 / pixel_scale.asRadians())) 

424 ) 

425 

426 # Stamp bounding boxes 

427 stamp_radius = floor(Extent2D(*self.config.stamp_size) / 2) 

428 stamp_bbox = Box2I(Point2I(0, 0), Extent2I(1, 1)).dilatedBy(stamp_radius) 

429 stamp_radius_padded = floor((Extent2D(*self.config.stamp_size) * 1.42) / 2) 

430 stamp_bbox_padded = Box2I(Point2I(0, 0), Extent2I(1, 1)).dilatedBy(stamp_radius_padded) 

431 

432 stamps = [] 

433 for bright_star in bright_stars: 

434 pix_coord = Point2D(bright_star["pixel_x"], bright_star["pixel_y"]) 

435 

436 # Set NEIGHBOR mask plane for all sources except the current one 

437 neighbor_bit_mask = input_MI.mask.getPlaneBitMask(NEIGHBOR_MASK_PLANE) 

438 input_MI.mask.clearMaskPlane(input_MI.mask.getMaskPlaneDict()[NEIGHBOR_MASK_PLANE]) 

439 bright_star_id = footprints[int(pix_coord.y), int(pix_coord.x)] 

440 neighbor_mask = (footprints != 0) & (footprints != bright_star_id) 

441 input_MI.mask.array[neighbor_mask] |= neighbor_bit_mask 

442 

443 # Define linear shifting and rotation to recenter and align stamps 

444 boresight_pseudopixel_coord = pixels_to_boresight_pseudopixels.applyForward(pix_coord) 

445 shift = makeTransform(AffineTransform(Point2D(0, 0) - boresight_pseudopixel_coord)) 

446 rotation = makeTransform(AffineTransform.makeRotation(-bright_star["theta_radians"] * radians)) 

447 pixels_to_stamp_frame = pixels_to_boresight_pseudopixels.then(shift).then(rotation) 

448 

449 # Warp the image and mask to the stamp frame 

450 stamp_MI = MaskedImageF(stamp_bbox_padded) 

451 warpImage(stamp_MI, input_MI, pixels_to_stamp_frame, warp_control) 

452 stamp_MI = stamp_MI[stamp_bbox] 

453 

454 # Check mask coverage, update metadata 

455 bad_bit_mask = stamp_MI.mask.getPlaneBitMask(self.config.bad_mask_planes) 

456 good = (stamp_MI.mask.array & bad_bit_mask) == 0 

457 good_frac = np.sum(good) / stamp_MI.mask.array.size 

458 if good_frac < self.config.min_area_fraction: 

459 continue 

460 

461 warped_psf = WarpedPsf(input_exposure.getPsf(), pixels_to_stamp_frame, warp_control) 

462 stamp_psf = KernelPsf(FixedKernel(warped_psf.computeKernelImage(Point2D(0, 0)))) 

463 stamp_wcs = makeModifiedWcs(pixels_to_stamp_frame, input_exposure.wcs, False) 

464 

465 stamp = BrightStarStamp( 

466 stamp_im=stamp_MI, 

467 psf=stamp_psf, 

468 wcs=stamp_wcs, 

469 visit=input_exposure.visitInfo.getId(), 

470 detector=input_exposure.detector.getId(), 

471 ref_id=bright_star["id"], 

472 ref_mag=bright_star["mag"], 

473 position=pix_coord, 

474 focal_plane_radius=bright_star["radius_mm"], 

475 focal_plane_angle=Angle(bright_star["theta_radians"], radians), 

476 scale=None, 

477 scale_err=None, 

478 pedestal=None, 

479 pedestal_err=None, 

480 pedestal_scale_cov=None, 

481 gradient_x=None, 

482 gradient_y=None, 

483 curvature_x=None, 

484 curvature_y=None, 

485 curvature_xy=None, 

486 global_reduced_chi_squared=None, 

487 global_degrees_of_freedom=None, 

488 psf_reduced_chi_squared=None, 

489 psf_degrees_of_freedom=None, 

490 psf_masked_flux_fraction=None, 

491 ) 

492 stamps.append(stamp) 

493 

494 num_stars = len(bright_stars) 

495 num_excluded = num_stars - len(stamps) 

496 percent_excluded = 100.0 * num_excluded / num_stars if num_stars > 0 else 0.0 

497 self.log.info( 

498 "Extracted %i bright star stamp%s. " 

499 "Excluded %i star%s (%.1f%%) due to a masked area fraction < %s.", 

500 len(stamps), 

501 "" if len(stamps) == 1 else "s", 

502 num_excluded, 

503 "" if num_excluded == 1 else "s", 

504 percent_excluded, 

505 self.config.min_area_fraction, 

506 ) 

507 

508 if not stamps: 

509 return None 

510 

511 focal_plane_radii = [stamp.focal_plane_radius for stamp in stamps] 

512 metadata = PropertyList() 

513 metadata["FOCAL_PLANE_RADIUS_MIN"] = np.min(focal_plane_radii) 

514 metadata["FOCAL_PLANE_RADIUS_MAX"] = np.max(focal_plane_radii) 

515 return BrightStarStamps(stamps, metadata=metadata)