Coverage for python / lsst / meas / astrom / refit_pointing.py: 18%
130 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-06 08:43 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-06 08:43 +0000
1# This file is part of meas_astrom.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
22from __future__ import annotations
24__all__ = ("RefitPointingConfig", "RefitPointingTask", "NoVisitWcs")
26import math
28import numpy as np
30from lsst.geom import (
31 Angle,
32 Box2D,
33 Point2D,
34 SpherePoint,
35 SphereTransform,
36 arcseconds,
37 degrees,
38 radians,
39)
40from lsst.pex.config import Config, Field
41from lsst.pipe.base import AlgorithmError, Struct, Task
42from lsst.obs.base.visit_geometry import VisitGeometry
43from lsst.obs.base.utils import createInitialSkyWcsFromBoresight
44from lsst.sphgeom import ConvexPolygon
47class NoVisitWcs(AlgorithmError):
48 """Exception raised when there are no WCSs for any detectors in a visit."""
50 @property
51 def metadata(self):
52 return {}
55class RefitPointingConfig(Config):
56 grid_spacing = Field[float](
57 doc=(
58 "Spacing (in pixels) between grid points used to evaluate the WCS when fitting the pointing. "
59 "This can be a very sparse grid (there are only three degrees of freedom). "
60 "If the spacing does not divide the detector bounding box evenly, it is decreased slightly."
61 ),
62 dtype=float,
63 default=512.0,
64 )
65 rejection_threshold = Field(
66 doc=(
67 "If the distance between the target WCS position and the position predicted by the camera "
68 "geometry after refitting the pointing using just one detector exceeds this value (in "
69 "arcseconds) at any point on the pointing-fit grid, that detector is rejected from the pointing "
70 "fit. The quantity this threshold is applied to is saved in the wcs_detector_pointing_residual "
71 "column."
72 ),
73 dtype=float,
74 default=10.0,
75 )
76 nulling_threshold = Field(
77 doc=(
78 "If the distance between the target WCS position and the position predicted by the camera "
79 "geometry after refitting the pointing using all detectors exceeds this value (in arcseconds) "
80 "at any point on the pointing-fit grid, that detector's WCS is set to None in the catalog. "
81 "The quantity this threshold is applied to is saved in the wcs_visit_pointing_residual column."
82 ),
83 dtype=float,
84 default=60.0,
85 )
86 schema_prefix = Field(
87 doc="Prefix for all schema fields.",
88 dtype=str,
89 default="",
90 )
91 fallback_region_padding = Field(
92 doc=(
93 "Padding to add (in pixels) to the regions of detectors for which only a "
94 "pointing + camera geometry WCS is available."
95 ),
96 dtype=int,
97 default=50,
98 )
101class RefitPointingTask(Task):
102 """A task that uses the available WCSs of the detectors in a visit to
103 re-fit the pointing for that visit and compute new visit regions for the butler.
104 """
106 _DefaultName = "refitPointing"
107 ConfigClass = RefitPointingConfig
109 def __init__(self, config=None, *, schema, **kwargs):
110 super().__init__(config, **kwargs)
111 self._detector_pointing_residual_key = schema.addField(
112 self.config.schema_prefix + "wcs_detector_pointing_residual",
113 type="Angle",
114 doc=(
115 "Maximum difference (on the pointing-fit grid) between the target WCS position and "
116 "the position predicted by camera geometry, after re-pointing using the target WCS "
117 "for this detector only."
118 ),
119 )
120 self._visit_pointing_residual_key = schema.addField(
121 self.config.schema_prefix + "wcs_visit_pointing_residual",
122 type="Angle",
123 doc=(
124 "Maximum difference (on the pointing-fit grid) between the target WCS position and "
125 "the position predicted by camera geometry, after re-pointing using the target WCS "
126 "of all non-rejected detectors in the visit."
127 ),
128 )
129 self._rejected_key = schema.addField(
130 self.config.schema_prefix + "wcs_detector_pointing_rejected",
131 type="Flag",
132 doc=(
133 "Flag set if this detector was rejected from the pointing fit due to its "
134 "wcs_detector_pointing_residual value."
135 ),
136 )
137 self._rejection_threshold = self.config.rejection_threshold * arcseconds
138 self._nulling_threshold = self.config.nulling_threshold * arcseconds
140 def run(self, *, catalog, camera):
141 """Re-fit the pointing from the WCSs in a visit.
143 Parameters
144 ----------
145 catalog : `lsst.afw.table.ExposureCatalog`
146 A catalog of per-detector records for the visit. Columns with WCS
147 diagnostics are updatd in-place, and WCSs may be set to `None` if
148 they do not satisfy the `~RefitPointingConfig.nulling_threshold`.
149 camera : `lsst.afw.cameraGeom.Camera`
150 Camera geometry.
152 Returns
153 -------
154 results : `lsst.pipe.base.Struct`
155 A struct with the following attributes:
157 - boresight (`lsst.geom.SpherePoint`): new boresight location
158 - orientation (`lsst.geom.Angle`): new orientation angle
159 - catalog (`lsst.afw.table.ExposureCatalog`): the same catalog that
160 was passed in, after modification in-place.
161 - regions (`lsst.obs.base.VisitGeometry`): updated regions for the
162 visit and all detectors.
164 Raises
165 ------
166 NoVisitWcs
167 Raised if ``catalog`` is empty or if there are no WCSs for any
168 detectors.
169 """
170 if not catalog:
171 raise NoVisitWcs("No detector rows in visit catalog.")
172 boresight, orientation = self._fit_pointing(catalog, camera)
173 if (visit_info := catalog[0].getVisitInfo()) is not None:
174 old_boresight = visit_info.getBoresightRaDec()
175 offset = old_boresight.separation(boresight)
176 self.log.info(
177 "Re-fit pointing is %s, orientation=%0.2f deg (%0.2g deg from the original boresight).",
178 boresight,
179 orientation.asDegrees(),
180 offset.asDegrees(),
181 )
182 else:
183 self.log.info("Re-fit pointing is %s, orientation=%0.2f deg.", boresight, orientation.asDegrees())
184 self._null_bad(catalog)
185 regions = self._make_visit_geometry(boresight, orientation, catalog, camera)
186 return Struct(
187 boresight=boresight,
188 orientation=orientation,
189 catalog=catalog,
190 regions=regions,
191 )
193 def _fit_pointing(self, catalog, camera):
194 """Fit the pointing for a visit from the detectors in that visit that
195 have a fitted WCS.
197 Parameters
198 ----------
199 catalog : `lsst.afw.table.ExposureCatalog`
200 A catalog of per-detector records for the visit.
201 camera : `lsst.afw.cameraGeom.Camera`
202 Camera geometry.
204 Returns
205 -------
206 boresight : `lsst.geom.SpherePoint`
207 New boresight location.
208 orientation : `lsst.geom.Angle`
209 New orientation angle.
210 """
211 start_boresight: SpherePoint | None = None
212 start_orientation = 0.0 * degrees
213 start_y_axis_point: SpherePoint | None = None
214 detectors_kept: list[int] = []
215 start_xyz: dict[int, np.ndarray] = {}
216 target_xyz: dict[int, np.ndarray] = {}
217 for record in catalog:
218 detector_id = record.getId()
219 # We call the WCSs that were actually fit to the stars the "true"
220 # WCSs.
221 target_wcs = record.getWcs()
222 if target_wcs is None:
223 continue
224 try:
225 detector = camera[detector_id]
226 except LookupError:
227 self.log.warning("Detector %d has no camera geometry; skipping it.", detector_id)
228 continue
229 if start_boresight is None:
230 # We just need some semi-arbitrary point on the sky that lets
231 # extract the camera geometry part of a raw WCS. Might be
232 # helpful to have it in the right hemisphere, but otherwise it
233 # shouldn't matter.
234 start_boresight = target_wcs.pixelToSky(Point2D(0.0, 0.0))
235 # Make a raw-like WCS at the arbitrary boresight and orientation.
236 start_wcs = createInitialSkyWcsFromBoresight(
237 start_boresight, start_orientation, detector=detector
238 )
239 # Make a grid of positions for the detector and map them to the sky
240 # via both the true WCS and the arbitrary raw-like one, but in
241 # xyz unit-vector form.
242 pixel_x, pixel_y = self._make_grid(detector, self.config.grid_spacing)
243 start_ra, start_dec = start_wcs.pixelToSkyArray(pixel_x, pixel_y)
244 start_xyz[detector_id] = np.stack(
245 SpherePoint.toUnitXYZ(longitude=start_ra, latitude=start_dec, units=radians),
246 axis=1,
247 )
248 target_ra, target_dec = target_wcs.pixelToSkyArray(pixel_x, pixel_y)
249 target_xyz[detector_id] = np.stack(
250 SpherePoint.toUnitXYZ(longitude=target_ra, latitude=target_dec, units=radians),
251 axis=1,
252 )
253 # Fit the pointing using just the grid for this detector to see if
254 # the residuals are any good; they won't be if the target WCS is
255 # bonkers and makes the detector non-rectangular on the sky.
256 detector_transform = SphereTransform.fit_unit_vectors(
257 start_xyz[detector_id],
258 target_xyz[detector_id],
259 )
260 detector_pointing_residual = self._compute_pointing_residual(
261 detector_transform, start_xyz[detector_id], target_xyz[detector_id]
262 )
263 record.set(self._detector_pointing_residual_key, detector_pointing_residual)
264 if detector_pointing_residual > self._rejection_threshold:
265 record.set(self._rejected_key, True)
266 if not detectors_kept:
267 # This was the first detector we saw; need to reset.
268 start_boresight = None
269 self.log.warning(
270 'Dropping detector %d with detector pointing residual %0.2g" from pointing fit.',
271 detector_id,
272 detector_pointing_residual.asArcseconds(),
273 )
274 continue
275 detectors_kept.append(detector_id)
276 if not detectors_kept:
277 # Since we can't apply the nulling-threshold test, set all WCSs to
278 # None.
279 for record in catalog:
280 record.setWcs(None)
281 raise NoVisitWcs("No valid target WCSs were left after rejection.")
282 # Fit the spherical rotation that maps the points in the arbitrary
283 # start WCS to the target WCS, using all kept detectors.
284 transform = SphereTransform.fit_unit_vectors(
285 np.concatenate([start_xyz[i] for i in detectors_kept]),
286 np.concatenate([target_xyz[i] for i in detectors_kept]),
287 )
288 # Compute and record the residuals for each detector with this
289 # transform.
290 for record in catalog:
291 detector_id = record.getId()
292 if detector_id not in start_xyz:
293 # This detector already doesn't have a WCS.
294 continue
295 visit_pointing_residual = self._compute_pointing_residual(
296 transform, start_xyz[detector_id], target_xyz[detector_id]
297 )
298 record.set(self._visit_pointing_residual_key, visit_pointing_residual)
299 # If we apply that same rotation to our arbitrary start boresight, we
300 # get the boresight predicted by the target WCSs.
301 boresight = transform(start_boresight)
302 # If we apply that rotation to a point on the FIELD_ANGLE y-axis, we
303 # can similarly recover the orientation angle predicted by the target
304 # WCSs.
305 start_y_axis_point = start_boresight.offset(90 * degrees, 1.0 * degrees)
306 transformed_y_axis_point = transform(start_y_axis_point)
307 orientation = Angle(90, degrees) - boresight.bearingTo(transformed_y_axis_point)
308 if camera.getFocalPlaneParity():
309 raise NotImplementedError("Cameras with focal plane parity flips are not yet supported.")
310 return boresight, orientation
312 def _compute_pointing_residual(self, transform, from_xyz, to_xyz):
313 # Apply the transform to the start positions and subtract the
314 # target positions (all in 3-vector space) to get the residual
315 # 3-vectors.
316 residual_vecs = np.dot(transform.matrix, from_xyz.transpose()).transpose()
317 residual_vecs -= to_xyz
318 # Compute the squared chord length of the residual vectors, find
319 # the maximum of that over the grid (since everything else we do
320 # is monotonic), then translate that into an angle.
321 return 2.0 * np.arcsin(0.5 * np.sum(residual_vecs**2, axis=1).max() ** 0.5) * radians
323 def _null_bad(self, catalog):
324 for record in catalog:
325 visit_pointing_residual = record.get(self._visit_pointing_residual_key)
326 if visit_pointing_residual > self._nulling_threshold:
327 self.log.warning(
328 'Setting WCS to None for detector %d with visit pointing residual %0.2g".',
329 record.getId(),
330 visit_pointing_residual.asArcseconds(),
331 )
332 record.setWcs(None)
334 def _make_visit_geometry(self, boresight, orientation, catalog, camera):
335 """Create new sky regions for the visit and its detectors.
337 Parameters
338 ----------
339 boresight : `lsst.geom.SpherePoint`
340 New boresight location.
341 orientation : `lsst.geom.Angle`
342 New orientation angle.
343 catalog : `lsst.afw.table.ExposureCatalog`
344 A catalog of per-detector records for the visit with WCSs
345 A repointed raw-like WCS will be used for any detectors not in the
346 catalog or for which the catalog record does not have a WCS.
347 camera : `lsst.afw.cameraGeom.Camera`
348 Camera geometry.
350 Returns
351 -------
352 regions : `lsst.obs.base.visit_geometry.VisitGeometry`
353 Updated regions for the visit and all detectors in the camera.
354 """
355 detector_regions: dict[int, ConvexPolygon] = {}
356 all_vertices = []
357 for detector in camera:
358 wcs = None
359 if (record := catalog.find(detector.getId())) is not None:
360 wcs = record.getWcs()
361 pixel_bbox = Box2D(detector.getBBox())
362 if wcs is None or record.get(self._rejected_key):
363 wcs = createInitialSkyWcsFromBoresight(boresight, orientation, detector)
364 pixel_bbox.grow(self.config.fallback_region_padding)
365 corners = wcs.pixelToSky(pixel_bbox.getCorners())
366 vertices = [sp.getVector() for sp in corners]
367 detector_regions[detector.getId()] = ConvexPolygon(vertices)
368 all_vertices.extend(vertices)
369 visit_region = ConvexPolygon.convexHull(all_vertices)
370 return VisitGeometry(
371 boresight_ra=boresight.getRa().asDegrees(),
372 boresight_dec=boresight.getDec().asDegrees(),
373 orientation=orientation.asDegrees(),
374 visit_region=visit_region,
375 detector_regions=detector_regions,
376 )
378 def _make_grid(self, detector, spacing) -> tuple[np.ndarray, np.ndarray]:
379 pixel_bbox = Box2D(detector.getBBox())
380 n_x = math.ceil(pixel_bbox.width / spacing)
381 n_y = math.ceil(pixel_bbox.height / spacing)
382 # We add one to the dimensions since there's a point at the min and max
383 # in each dimension.
384 xs = np.linspace(pixel_bbox.x.min, pixel_bbox.x.max, n_x + 1)
385 ys = np.linspace(pixel_bbox.y.min, pixel_bbox.y.max, n_y + 1)
386 x, y = np.meshgrid(xs, ys)
387 return x.ravel(), y.ravel()