210 inputs = butlerQC.get(inputRefs)
211 ref_obj_loader = ReferenceObjectLoader(
212 dataIds=[ref.datasetRef.dataId
for ref
in inputRefs.ref_cat],
213 refCats=inputs.pop(
"ref_cat"),
214 name=self.config.connections.ref_cat,
215 config=self.config.load_reference_objects_config,
217 output = self.
run(**inputs, ref_obj_loader=ref_obj_loader)
220 butlerQC.put(output, outputRefs)
262 ref_obj_loader: ReferenceObjectLoader,
263 input_exposure: ExposureF,
265 """Get a table of bright stars from the reference catalog.
267 Trim the reference catalog to only those objects within the exposure
269 Then, select bright stars based on the specified magnitude range,
270 isolation criteria, and optionally focal plane radius criteria.
271 Finally, add columns with pixel coordinates and focal plane coordinates
272 for each bright star.
276 ref_obj_loader : `~lsst.meas.algorithms.ReferenceObjectLoader`
277 Loader to find objects within a reference catalog.
278 input_exposure : `~lsst.afw.image.ExposureF`
279 The exposure for which bright stars are being selected.
283 bright_stars : `~astropy.table.Table`
284 Table of bright stars within the exposure.
286 bbox = input_exposure.getBBox()
287 wcs = input_exposure.getWcs()
288 detector = input_exposure.detector
291 within_region = ref_obj_loader.loadPixelBox(bbox, wcs, self.config.ref_cat_filter_name)
292 ref_cat_full = within_region.refCat
293 flux_field: str = within_region.fluxField
294 exclude_arcsec_radius = self.config.exclude_arcsec_radius * u.arcsec
297 flux_range_candidate = sorted(((self.config.mag_range * u.ABmag).to(u.nJy)).to_value())
298 flux_range_neighbor = sorted(((self.config.exclude_mag_range * u.ABmag).to(u.nJy)).to_value())
302 flux_min = np.min((flux_range_candidate[0], flux_range_neighbor[0]))
303 flux_max = np.max((flux_range_candidate[1], flux_range_neighbor[1]))
304 stars_subset = (ref_cat_full[flux_field] >= flux_min) & (ref_cat_full[flux_field] <= flux_max)
305 ref_cat_subset_columns = (
"id",
"coord_ra",
"coord_dec", flux_field)
306 ref_cat_subset = Table(ref_cat_full.extract(*ref_cat_subset_columns, where=stars_subset))
307 flux_subset = ref_cat_subset[flux_field]
310 is_candidate = (flux_subset >= flux_range_candidate[0]) & (flux_subset <= flux_range_candidate[1])
311 is_neighbor = (flux_subset >= flux_range_neighbor[0]) & (flux_subset <= flux_range_neighbor[1])
314 coords = SkyCoord(ref_cat_subset[
"coord_ra"], ref_cat_subset[
"coord_dec"], unit=
"rad")
315 coords_candidate = coords[is_candidate]
316 coords_neighbor = coords[is_neighbor]
319 is_candidate_isolated = np.ones(len(coords_candidate), dtype=bool)
320 if len(coords_neighbor) > 0:
321 _, indices_candidate, angular_separation, _ = coords_candidate.search_around_sky(
322 coords_neighbor, exclude_arcsec_radius
324 indices_candidate = indices_candidate[angular_separation > 0 * u.arcsec]
325 is_candidate_isolated[indices_candidate] =
False
328 bright_stars = ref_cat_subset[is_candidate][is_candidate_isolated]
330 flux_nanojansky = bright_stars[flux_field][:] * u.nJy
331 bright_stars[
"mag"] = flux_nanojansky.to(u.ABmag).to_value()
333 zip_ra_dec = zip(bright_stars[
"coord_ra"] * radians, bright_stars[
"coord_dec"] * radians)
334 sphere_points = [
SpherePoint(ra, dec)
for ra, dec
in zip_ra_dec]
335 pixel_coords = wcs.skyToPixel(sphere_points)
336 bright_stars[
"pixel_x"] = [pixel_coord.x
for pixel_coord
in pixel_coords]
337 bright_stars[
"pixel_y"] = [pixel_coord.y
for pixel_coord
in pixel_coords]
339 mm_coords = detector.transform(pixel_coords, PIXELS, FOCAL_PLANE)
340 mm_coords_x = np.array([mm_coord.x
for mm_coord
in mm_coords])
341 mm_coords_y = np.array([mm_coord.y
for mm_coord
in mm_coords])
342 radius_mm = np.sqrt(mm_coords_x**2 + mm_coords_y**2)
343 theta_radians = np.arctan2(mm_coords_y, mm_coords_x)
344 bright_stars[
"radius_mm"] = radius_mm
345 bright_stars[
"theta_radians"] = theta_radians
349 within_bbox = bright_stars[
"pixel_x"] >= bbox.getMinX()
350 within_bbox &= bright_stars[
"pixel_x"] <= bbox.getMaxX()
351 within_bbox &= bright_stars[
"pixel_y"] >= bbox.getMinY()
352 within_bbox &= bright_stars[
"pixel_y"] <= bbox.getMaxY()
353 within_radii = bright_stars[
"radius_mm"] >= self.config.min_focal_plane_radius
354 within_radii &= bright_stars[
"radius_mm"] <= self.config.max_focal_plane_radius
355 bright_stars = bright_stars[within_bbox & within_radii]
358 "Identified %i of %i reference catalog stars that are in the field of view, are in the "
359 "range %s mag, and that have no neighboring stars in the range %s mag within %s arcsec.",
362 self.config.mag_range,
363 self.config.exclude_mag_range,
364 self.config.exclude_arcsec_radius,
371 input_exposure: ExposureF,
372 input_background: BackgroundList |
None,
373 footprints: SourceCatalog | np.ndarray,
375 ) -> BrightStarStamps |
None:
376 """Extract and warp bright star stamps.
378 For each bright star, extract a stamp from the input exposure centered
379 on the star's pixel coordinates.
380 Then, shift and warp the stamp to recenter on the star and align each
381 to the same orientation.
382 Finally, check the fraction of the stamp area that is masked
383 (e.g. due to neighboring sources or bad pixels), and only retain stamps
384 with sufficient unmasked area.
388 input_exposure : `~lsst.afw.image.ExposureF`
389 The science image to extract bright star stamps.
390 input_background : `~lsst.afw.math.BackgroundList` | None
391 The background model associated with the input exposure.
392 If provided, this will be added back on to the input image.
393 footprints : `~lsst.afw.table.SourceCatalog` | `numpy.ndarray`
394 The source catalog containing footprints on the input exposure, or
395 a 2D numpy array with the same dimensions as the input exposure
396 where each pixel value corresponds to the source footprint ID.
397 bright_stars : `~astropy.table.Table`
398 Table of bright stars for which to extract stamps.
402 bright_star_stamps : `~lsst.meas.algorithms.BrightStarStamps` | None
403 A set of postage stamp cutouts, each centered on a bright star.
404 If no bright star stamps are retained post-masking, returns `None`.
406 warp_control =
WarpingControl(self.config.warping_kernel_name, self.config.mask_warping_kernel_name)
407 bbox = input_exposure.getBBox()
410 input_MI = input_exposure.getMaskedImage()
411 if input_background
is not None:
412 input_MI += input_background.getImage()
413 input_MI = input_exposure.photoCalib.calibrateImage(input_MI)
416 input_MI.mask.addMaskPlane(NEIGHBOR_MASK_PLANE)
417 if isinstance(footprints, SourceCatalog):
418 footprints = footprintsToNumpy(footprints, bbox, asBool=
False)
421 pixel_scale = input_exposure.wcs.getPixelScale(bbox.getCenter()).asArcseconds() * arcseconds
422 pixels_to_boresight_pseudopixels = input_exposure.detector.getTransform(PIXELS, FIELD_ANGLE).then(
423 makeTransform(AffineTransform.makeScaling(1 / pixel_scale.asRadians()))
427 stamp_radius = floor(
Extent2D(*self.config.stamp_size) / 2)
429 stamp_radius_padded = floor((
Extent2D(*self.config.stamp_size) * 1.42) / 2)
433 for bright_star
in bright_stars:
434 pix_coord =
Point2D(bright_star[
"pixel_x"], bright_star[
"pixel_y"])
437 neighbor_bit_mask = input_MI.mask.getPlaneBitMask(NEIGHBOR_MASK_PLANE)
438 input_MI.mask.clearMaskPlane(input_MI.mask.getMaskPlaneDict()[NEIGHBOR_MASK_PLANE])
439 bright_star_id = footprints[int(pix_coord.y), int(pix_coord.x)]
440 neighbor_mask = (footprints != 0) & (footprints != bright_star_id)
441 input_MI.mask.array[neighbor_mask] |= neighbor_bit_mask
444 boresight_pseudopixel_coord = pixels_to_boresight_pseudopixels.applyForward(pix_coord)
446 rotation = makeTransform(AffineTransform.makeRotation(-bright_star[
"theta_radians"] * radians))
447 pixels_to_stamp_frame = pixels_to_boresight_pseudopixels.then(shift).then(rotation)
450 stamp_MI = MaskedImageF(stamp_bbox_padded)
451 warpImage(stamp_MI, input_MI, pixels_to_stamp_frame, warp_control)
452 stamp_MI = stamp_MI[stamp_bbox]
455 bad_bit_mask = stamp_MI.mask.getPlaneBitMask(self.config.bad_mask_planes)
456 good = (stamp_MI.mask.array & bad_bit_mask) == 0
457 good_frac = np.sum(good) / stamp_MI.mask.array.size
458 if good_frac < self.config.min_area_fraction:
461 warped_psf =
WarpedPsf(input_exposure.getPsf(), pixels_to_stamp_frame, warp_control)
463 stamp_wcs = makeModifiedWcs(pixels_to_stamp_frame, input_exposure.wcs,
False)
465 stamp = BrightStarStamp(
469 visit=input_exposure.visitInfo.getId(),
470 detector=input_exposure.detector.getId(),
471 ref_id=bright_star[
"id"],
472 ref_mag=bright_star[
"mag"],
474 focal_plane_radius=bright_star[
"radius_mm"],
475 focal_plane_angle=
Angle(bright_star[
"theta_radians"], radians),
480 pedestal_scale_cov=
None,
486 global_reduced_chi_squared=
None,
487 global_degrees_of_freedom=
None,
488 psf_reduced_chi_squared=
None,
489 psf_degrees_of_freedom=
None,
490 psf_masked_flux_fraction=
None,
494 num_stars = len(bright_stars)
495 num_excluded = num_stars - len(stamps)
496 percent_excluded = 100.0 * num_excluded / num_stars
if num_stars > 0
else 0.0
498 "Extracted %i bright star stamp%s. "
499 "Excluded %i star%s (%.1f%%) due to a masked area fraction < %s.",
501 "" if len(stamps) == 1
else "s",
503 "" if num_excluded == 1
else "s",
505 self.config.min_area_fraction,
511 focal_plane_radii = [stamp.focal_plane_radius
for stamp
in stamps]
513 metadata[
"FOCAL_PLANE_RADIUS_MIN"] = np.min(focal_plane_radii)
514 metadata[
"FOCAL_PLANE_RADIUS_MAX"] = np.max(focal_plane_radii)
515 return BrightStarStamps(stamps, metadata=metadata)