Coverage for python / lsst / images / tests / _checks.py: 11%
303 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-01 08:36 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-01 08:36 +0000
1# This file is part of lsst-images.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# Use of this source code is governed by a 3-clause BSD-style
10# license that can be found in the LICENSE file.
12from __future__ import annotations
14__all__ = (
15 "arrays_to_legacy_points",
16 "assert_close",
17 "assert_images_equal",
18 "assert_masked_images_equal",
19 "assert_masks_equal",
20 "assert_projections_equal",
21 "assert_psfs_equal",
22 "check_astropy_wcs_interface",
23 "check_projection",
24 "check_transform",
25 "compare_aperture_corrections_to_legacy",
26 "compare_cell_coadd_to_legacy",
27 "compare_field_to_legacy",
28 "compare_image_to_legacy",
29 "compare_mask_to_legacy",
30 "compare_masked_image_to_legacy",
31 "compare_observation_summary_stats_to_legacy",
32 "compare_projection_to_legacy_wcs",
33 "compare_psf_to_legacy",
34 "compare_visit_image_to_legacy",
35 "legacy_coords_to_astropy",
36 "legacy_points_to_xy_array",
37)
39import dataclasses
40import math
41import unittest
42from collections.abc import Mapping
43from typing import TYPE_CHECKING, Any, Literal, cast
45import astropy.units as u
46import astropy.wcs.wcsapi
47import numpy as np
48from astropy.coordinates import SkyCoord
50from .._geom import XY, YX, BoundsError, Box
51from .._image import Image
52from .._mask import Mask, MaskPlane
53from .._masked_image import MaskedImage
54from .._observation_summary_stats import ObservationSummaryStats
55from .._transforms import DetectorFrame, Frame, Projection, SkyFrame, TractFrame, Transform
56from .._visit_image import VisitImage
57from ..aperture_corrections import ApertureCorrectionMap
58from ..cells import CellCoadd, CellIJ, CoaddProvenance
59from ..fields import BaseField
60from ..psfs import PointSpreadFunction
62if TYPE_CHECKING:
63 try:
64 from lsst.cell_coadds import MultipleCellCoadd
65 except ImportError:
66 type MultipleCellCoadd = Any # type: ignore[no-redef]
69def assert_close(
70 tc: unittest.TestCase,
71 a: np.ndarray | u.Quantity | float,
72 b: np.ndarray | u.Quantity | float,
73 **kwargs: Any,
74) -> None:
75 """Test that two arrays, floats, or quantities are almost equal.
77 Parameters
78 ----------
79 tc
80 Test case object with assert methods to use.
81 a
82 Array, float, or quantity to compare.
83 b
84 Array, float, or quantity to compare.
85 **kwargs
86 Forwarded to `astropy.units.allclose`.
87 """
88 tc.assertTrue(u.allclose(a, b, **kwargs), msg=f"{a} != {b}")
91def assert_images_equal(
92 tc: unittest.TestCase,
93 a: Image,
94 b: Image,
95 *,
96 rtol: float = 0.0,
97 atol: float = 0.0,
98 expect_view: bool | Literal["array"] | None = None,
99) -> None:
100 """Assert that two images are equal or nearly equal."""
101 tc.assertEqual(a.bbox, b.bbox)
102 tc.assertEqual(a.unit, b.unit)
103 assert_projections_equal(tc, a.projection, b.projection)
104 if expect_view is not None:
105 tc.assertEqual(np.may_share_memory(a.array, b.array), bool(expect_view))
106 if expect_view == "array":
107 tc.assertEqual(a.metadata, b.metadata)
108 else:
109 tc.assertEqual(a.metadata is b.metadata, expect_view)
110 if not expect_view:
111 assert_close(tc, a.array, b.array, atol=atol, rtol=rtol)
112 tc.assertEqual(a.metadata, b.metadata)
115def assert_masks_equal(tc: unittest.TestCase, a: Mask, b: Mask) -> None:
116 """Assert that two masks are equal or nearly equal."""
117 tc.assertEqual(a.bbox, b.bbox)
118 tc.assertEqual(a.schema, b.schema)
119 tc.assertEqual(a.metadata, b.metadata)
120 assert_projections_equal(tc, a.projection, b.projection)
121 np.testing.assert_array_equal(a.array, b.array)
124def assert_masked_images_equal(
125 tc: unittest.TestCase,
126 a: MaskedImage,
127 b: MaskedImage,
128 *,
129 rtol: float = 0.0,
130 atol: float = 0.0,
131 expect_view: bool | None = None,
132) -> None:
133 """Assert that two masked images are equal or nearly equal."""
134 tc.assertEqual(a.metadata, b.metadata)
135 assert_projections_equal(tc, a.projection, b.projection)
136 assert_images_equal(tc, a.image, b.image, rtol=rtol, atol=atol, expect_view=expect_view)
137 assert_masks_equal(tc, a.mask, b.mask)
138 assert_images_equal(tc, a.variance, b.variance, rtol=rtol, atol=atol, expect_view=expect_view)
141def assert_psfs_equal(
142 tc: unittest.TestCase,
143 psf1: PointSpreadFunction,
144 psf2: PointSpreadFunction,
145 points: YX[np.ndarray] | XY[np.ndarray] | None = None,
146) -> int:
147 """Compare two PSF objets.
149 Parameters
150 ----------
151 tc
152 Test case object with assert methods to use.
153 psf1
154 Point-spread function to test.
155 psf2
156 The other point-spread function to test.
157 points
158 Points to evaluate the PSFs at. If not provided, the intersection of
159 the PSF bounding boxes are used to generate points on a grid.
161 Returns
162 -------
163 `int`
164 The number of points actually tested.
165 """
166 if points is None:
167 points = psf1.bounds.bbox.intersection(psf1.bounds.bbox).meshgrid(3).map(np.ravel)
169 tc.assertEqual(psf1.kernel_bbox, psf2.kernel_bbox)
171 n_points_tested: int = 0
172 for x, y in zip(points.x, points.y):
173 if not psf1.bounds.contains(x=x, y=y):
174 with tc.assertRaises(BoundsError):
175 psf2.compute_kernel_image(x=x, y=y)
176 continue
177 tc.assertEqual(psf1.compute_kernel_image(x=x, y=y), psf2.compute_kernel_image(x=x, y=y))
178 tc.assertEqual(psf1.compute_stellar_bbox(x=x, y=y), psf2.compute_stellar_bbox(x=x, y=y))
179 tc.assertEqual(psf1.compute_stellar_image(x=x, y=y), psf2.compute_stellar_image(x=x, y=y))
180 n_points_tested += 1
181 return n_points_tested
184def compare_image_to_legacy(
185 tc: unittest.TestCase, image: Image, legacy_image: Any, expect_view: bool | None = None
186) -> None:
187 """Compare an `.Image` object to a legacy `lsst.afw.image.Image` object."""
188 tc.assertEqual(image.bbox, Box.from_legacy(legacy_image.getBBox()))
189 if expect_view is not None:
190 tc.assertEqual(np.may_share_memory(image.array, legacy_image.array), expect_view)
191 if not expect_view:
192 np.testing.assert_array_equal(image.array, legacy_image.array)
195def compare_mask_to_legacy(
196 tc: unittest.TestCase, mask: Mask, legacy_mask: Any, plane_map: Mapping[str, MaskPlane] | None = None
197) -> None:
198 """Compare a `.Mask` object to a legacy `lsst.afw.image.Mask` object."""
199 tc.assertEqual(mask.bbox, Box.from_legacy(legacy_mask.getBBox()))
200 if plane_map is None:
201 plane_map = {plane.name: plane for plane in mask.schema if plane is not None}
202 for old_name, new_plane in plane_map.items():
203 np.testing.assert_array_equal(
204 (legacy_mask.array & legacy_mask.getPlaneBitMask(old_name)).astype(bool),
205 mask.get(new_plane.name),
206 )
209def compare_masked_image_to_legacy(
210 tc: unittest.TestCase,
211 masked_image: MaskedImage,
212 legacy_masked_image: Any,
213 *,
214 plane_map: Mapping[str, MaskPlane] | None = None,
215 expect_view: bool | None = None,
216 alternates: Mapping[str, Any] | None = None,
217) -> None:
218 """Compare a `.MaskedImage` object to a legacy `lsst.afw.image.MaskedImage`
219 object.
221 Parameters
222 ----------
223 tc
224 Test case to use for asserts.
225 masked_image
226 New image to test.
227 legacy_masked_image
228 Legacy image to test against.
229 plane_map
230 Mapping between new and legacy mask planes.
231 expect_view
232 Whether to test that the image and variance arrays do or do not share
233 memory.
234 alternates
235 A mapping of other versions of one or more (new) components to also
236 check against the legacy versions of those components.
237 """
238 compare_image_to_legacy(tc, masked_image.image, legacy_masked_image.getImage(), expect_view=expect_view)
239 compare_mask_to_legacy(tc, masked_image.mask, legacy_masked_image.getMask(), plane_map=plane_map)
240 compare_image_to_legacy(
241 tc, masked_image.variance, legacy_masked_image.getVariance(), expect_view=expect_view
242 )
243 if alternates:
244 if image := alternates.get("image"):
245 compare_image_to_legacy(tc, image, legacy_masked_image.getImage(), expect_view=expect_view)
246 if mask := alternates.get("mask"):
247 compare_mask_to_legacy(tc, mask, legacy_masked_image.getMask(), plane_map=plane_map)
248 if variance := alternates.get("variance"):
249 compare_image_to_legacy(tc, variance, legacy_masked_image.getVariance(), expect_view=expect_view)
252def compare_visit_image_to_legacy(
253 tc: unittest.TestCase,
254 visit_image: VisitImage,
255 legacy_exposure: Any,
256 *,
257 plane_map: Mapping[str, MaskPlane] | None = None,
258 expect_view: bool | None = None,
259 instrument: str,
260 visit: int,
261 detector: int,
262 alternates: Mapping[str, Any] | None = None,
263) -> None:
264 """Compare a `.VisitImage` object to a legacy `lsst.afw.image.Exposure`
265 object.
267 Parameters
268 ----------
269 tc
270 Test case to use for asserts.
271 visit_image
272 New image to test.
273 legacy_exposure
274 Legacy image to test against.
275 plane_map
276 Mapping between new and legacy mask planes.
277 expect_view
278 Whether to test that the image and variance arrays do or do not share
279 memory.
280 instrument
281 Expected instrument name.
282 visit
283 Expected visit ID.
284 detector
285 Expected detector ID.
286 alternates
287 A mapping of other versions of one or more (new) components to also
288 check against the legacy versions of those components.
289 """
290 compare_masked_image_to_legacy(
291 tc,
292 visit_image,
293 legacy_exposure.getMaskedImage(),
294 plane_map=plane_map,
295 expect_view=expect_view,
296 alternates=alternates,
297 )
298 detector_bbox = Box.from_legacy(legacy_exposure.getDetector().getBBox())
299 compare_projection_to_legacy_wcs(
300 tc,
301 visit_image.projection,
302 legacy_exposure.getWcs(),
303 DetectorFrame(instrument=instrument, visit=visit, detector=detector, bbox=detector_bbox),
304 visit_image.bbox,
305 )
306 tc.assertIs(visit_image.projection, visit_image.mask.projection)
307 tc.assertIs(visit_image.projection, visit_image.variance.projection)
308 compare_psf_to_legacy(tc, visit_image.psf, legacy_exposure.getPsf())
309 compare_observation_summary_stats_to_legacy(
310 tc, visit_image.summary_stats, legacy_exposure.info.getSummaryStats()
311 )
312 # Make a tiny box for Field comparisons that need to make arrays; that can
313 # get expensive otherwisre.
314 tiny_bbox = detector_bbox.local[2:4, 3:6]
315 compare_aperture_corrections_to_legacy(
316 tc, visit_image.aperture_corrections, legacy_exposure.info.getApCorrMap(), tiny_bbox
317 )
318 if alternates:
319 if projection := alternates.get("projection"):
320 compare_projection_to_legacy_wcs(
321 tc,
322 projection,
323 legacy_exposure.getWcs(),
324 DetectorFrame(instrument=instrument, visit=visit, detector=detector, bbox=detector_bbox),
325 visit_image.bbox,
326 )
327 if psf := alternates.get("psf"):
328 compare_psf_to_legacy(tc, psf, legacy_exposure.getPsf())
329 if summary_stats := alternates.get("summary_stats"):
330 compare_observation_summary_stats_to_legacy(
331 tc, summary_stats, legacy_exposure.info.getSummaryStats()
332 )
333 if obs_info := alternates.get("obs_info"):
334 visitInfo = legacy_exposure.visitInfo
335 tc.assertEqual(obs_info.instrument, visitInfo.getInstrumentLabel())
336 if aperture_corrections := alternates.get("aperture_corrections"):
337 compare_aperture_corrections_to_legacy(
338 tc, aperture_corrections, legacy_exposure.info.getApCorrMap(), tiny_bbox
339 )
342def compare_cell_coadd_to_legacy(
343 tc: unittest.TestCase,
344 cell_coadd: CellCoadd,
345 legacy_cell_coadd: MultipleCellCoadd,
346 *,
347 tract_bbox: Box,
348 plane_map: Mapping[str, MaskPlane] | None = None,
349 alternates: Mapping[str, Any] | None = None,
350 psf_points: XY[np.ndarray] | YX[np.ndarray] | None = None,
351) -> None:
352 """Compare a `.cells.CellCoadd` object to a legacy
353 `lsst.cell_coadds.MultipleCellCoadd` object.
355 Parameters
356 ----------
357 tc
358 Test case to use for asserts.
359 cell_coadd
360 New coadd to test.
361 legacy_cell_coadd
362 Legacy coadd to test against.
363 tract_bbox
364 Bounding box of the full tract.
365 psf_points
366 Points to use to compare the PSFs.
367 plane_map
368 Mapping between new and legacy mask planes.
369 alternates
370 A mapping of other versions of one or more (new) components to also
371 check against the legacy versions of those components.
372 """
373 legacy_stitched = legacy_cell_coadd.stitch(cell_coadd.bbox.to_legacy())
374 compare_image_to_legacy(tc, cell_coadd.image, legacy_stitched.image, expect_view=False)
375 compare_mask_to_legacy(tc, cell_coadd.mask, legacy_stitched.mask, plane_map=plane_map)
376 compare_image_to_legacy(tc, cell_coadd.variance, legacy_stitched.variance, expect_view=False)
377 if legacy_stitched.mask_fractions is not None:
378 compare_image_to_legacy(
379 tc, cell_coadd.mask_fractions["rejected"], legacy_stitched.mask_fractions, expect_view=False
380 )
381 for n in range(legacy_stitched.n_noise_realizations):
382 compare_image_to_legacy(
383 tc, cell_coadd.noise_realizations[n], legacy_stitched.noise_realizations[n], expect_view=False
384 )
385 tc.assertEqual(cell_coadd.skymap, legacy_stitched.identifiers.skymap)
386 tc.assertEqual(cell_coadd.tract, legacy_stitched.identifiers.tract)
387 tc.assertEqual(cell_coadd.patch.index.x, legacy_stitched.identifiers.patch.x)
388 tc.assertEqual(cell_coadd.patch.index.y, legacy_stitched.identifiers.patch.y)
389 tc.assertEqual(cell_coadd.band, legacy_stitched.identifiers.band)
390 tc.assertTrue(tract_bbox.contains(cell_coadd.patch.outer_bbox))
391 tc.assertTrue(cell_coadd.patch.outer_bbox.contains(cell_coadd.patch.inner_bbox))
392 tc.assertTrue(cell_coadd.patch.outer_bbox.contains(cell_coadd.bbox))
393 tc.assertEqual(cell_coadd.unit, u.Unit(legacy_cell_coadd.common.units.value))
394 tc.assertTrue(cell_coadd.bounds.bbox.contains(cell_coadd.bbox))
395 tc.assertTrue(cell_coadd.grid.bbox.contains(cell_coadd.bbox))
396 compare_projection_to_legacy_wcs(
397 tc,
398 cell_coadd.projection,
399 legacy_cell_coadd.wcs,
400 TractFrame(
401 skymap=legacy_cell_coadd.identifiers.skymap,
402 tract=legacy_cell_coadd.identifiers.tract,
403 bbox=tract_bbox,
404 ),
405 cell_coadd.bbox,
406 is_fits=True,
407 )
408 tc.assertIs(cell_coadd.projection, cell_coadd.mask.projection)
409 tc.assertIs(cell_coadd.projection, cell_coadd.variance.projection)
410 compare_psf_to_legacy(
411 tc, cell_coadd.psf, legacy_stitched.psf, expect_legacy_raise_on_out_of_bounds=True, points=psf_points
412 )
413 compare_cell_coadd_provenance_to_legacy(tc, cell_coadd.provenance, legacy_cell_coadd)
414 if alternates:
415 if projection := alternates.get("projection"):
416 compare_projection_to_legacy_wcs(
417 tc,
418 projection,
419 legacy_stitched.wcs,
420 TractFrame(
421 skymap=legacy_cell_coadd.identifiers.skymap,
422 tract=legacy_cell_coadd.identifiers.tract,
423 bbox=tract_bbox,
424 ),
425 cell_coadd.bbox,
426 is_fits=True,
427 )
428 if psf := alternates.get("psf"):
429 compare_psf_to_legacy(tc, psf, legacy_stitched.psf, points=psf_points)
430 if provenance := alternates.get("provenance"):
431 compare_cell_coadd_provenance_to_legacy(tc, provenance, legacy_cell_coadd)
434def compare_cell_coadd_provenance_to_legacy(
435 tc: unittest.TestCase, provenance: CoaddProvenance, legacy_cell_coadd: MultipleCellCoadd
436) -> None:
437 """Compare a `.cells.CoaddProvenance` object to a legacy
438 `lsst.cell_coadds.MultipleCellCoadd` object.
440 Parameters
441 ----------
442 tc
443 Test case to use for asserts.
444 provenance
445 New provenance object to test.
446 legacy_cell_coadd
447 Legacy coadd to test against.
448 """
449 from lsst.cell_coadds import ObservationIdentifiers
451 for legacy_cell in legacy_cell_coadd.cells.values():
452 cell_index = CellIJ.from_legacy(legacy_cell.identifiers.cell)
453 prov = provenance[cell_index]
454 legacy_table = astropy.table.Table(
455 rows=[
456 [
457 ids.instrument,
458 ids.visit,
459 ids.detector,
460 ids.day_obs,
461 ids.physical_filter,
462 legacy_input.overlaps_center,
463 legacy_input.overlap_fraction,
464 legacy_input.weight,
465 legacy_input.psf_shape.getIxx(),
466 legacy_input.psf_shape.getIyy(),
467 legacy_input.psf_shape.getIxy(),
468 legacy_input.psf_shape_flag,
469 ]
470 for ids, legacy_input in legacy_cell.inputs.items()
471 ],
472 dtype=[
473 np.object_,
474 np.uint64,
475 np.uint16,
476 np.uint32,
477 np.object_,
478 np.bool_,
479 np.float64,
480 np.float64,
481 np.float64,
482 np.float64,
483 np.float64,
484 np.bool_,
485 ],
486 names=[
487 "instrument",
488 "visit",
489 "detector",
490 "day_obs",
491 "physical_filter",
492 "overlaps_center",
493 "overlap_fraction",
494 "weight",
495 "psf_shape_xx",
496 "psf_shape_yy",
497 "psf_shape_xy",
498 "psf_shape_flag",
499 ],
500 )
501 # For a single cell all 'inputs' are also 'contributions'.
502 tc.assertEqual(len(legacy_cell.inputs), len(prov.inputs))
503 tc.assertEqual(len(legacy_cell.inputs), len(prov.contributions))
504 prov.inputs.sort(["instrument", "visit", "detector"])
505 prov.contributions.sort(["instrument", "visit", "detector"])
506 legacy_table.sort(["instrument", "visit", "detector"])
507 np.testing.assert_array_equal(prov.inputs["instrument"], prov.contributions["instrument"])
508 np.testing.assert_array_equal(prov.inputs["visit"], prov.contributions["visit"])
509 np.testing.assert_array_equal(prov.inputs["detector"], prov.contributions["detector"])
510 np.testing.assert_array_equal(prov.inputs["instrument"], legacy_table["instrument"])
511 np.testing.assert_array_equal(prov.inputs["visit"], legacy_table["visit"])
512 np.testing.assert_array_equal(prov.inputs["detector"], legacy_table["detector"])
513 np.testing.assert_array_equal(prov.inputs["physical_filter"], legacy_table["physical_filter"])
514 np.testing.assert_array_equal(prov.inputs["day_obs"], legacy_table["day_obs"])
515 np.testing.assert_array_equal(prov.contributions["overlaps_center"], legacy_table["overlaps_center"])
516 np.testing.assert_array_equal(
517 prov.contributions["overlap_fraction"], legacy_table["overlap_fraction"]
518 )
519 np.testing.assert_array_equal(prov.contributions["weight"], legacy_table["weight"])
520 np.testing.assert_array_equal(prov.contributions["psf_shape_xx"], legacy_table["psf_shape_xx"])
521 np.testing.assert_array_equal(prov.contributions["psf_shape_yy"], legacy_table["psf_shape_yy"])
522 np.testing.assert_array_equal(prov.contributions["psf_shape_xy"], legacy_table["psf_shape_xy"])
523 np.testing.assert_array_equal(prov.contributions["psf_shape_flag"], legacy_table["psf_shape_flag"])
524 for row in prov.inputs:
525 polygon_key = ObservationIdentifiers(**{k: row[k] for k in row.keys() if k != "polygon"})
526 legacy_polygon = legacy_cell_coadd.common.visit_polygons[polygon_key]
527 tc.assertEqual(legacy_polygon, row["polygon"].to_legacy())
530def compare_psf_to_legacy(
531 tc: unittest.TestCase,
532 psf: PointSpreadFunction,
533 legacy_psf: Any,
534 points: YX[np.ndarray] | XY[np.ndarray] | None = None,
535 expect_legacy_raise_on_out_of_bounds: bool = False,
536) -> int:
537 """Compare a PSF model object to its legacy interface.
539 Parameters
540 ----------
541 tc
542 Test case object with assert methods to use.
543 psf
544 Point-spread function to test.
545 legacy_psf
546 Legacy `lsst.afw.detection.Psf` instance to compare with.
547 points
548 Points to evaluate the PSFs at. If not provided, the intersection of
549 the PSF bounding boxes are used to generate points on a grid.
550 expect_legacy_raise_on_out_of_bounds
551 If `True`, expect ``legacy_psf`` to raise
552 `lsst.afw.detection.InvalidPsfError` when evaluated at a position
553 considered out-of-bounds by ``psf``.
555 Returns
556 -------
557 `int`
558 The number of points actually tested.
559 """
560 from lsst.afw.detection import InvalidPsfError
562 if points is None:
563 points = psf.bounds.bbox.meshgrid(n=3).map(np.ravel)
564 legacy_points = arrays_to_legacy_points(points.x, points.y)
565 n_points_tested: int = 0
566 for p in legacy_points:
567 if not psf.bounds.contains(x=p.x, y=p.y):
568 if expect_legacy_raise_on_out_of_bounds:
569 with tc.assertRaises(InvalidPsfError):
570 legacy_psf.computeKernelImage(p)
571 continue
572 tc.assertEqual(psf.kernel_bbox, Box.from_legacy(legacy_psf.computeKernelBBox(p)))
573 tc.assertEqual(
574 psf.compute_kernel_image(x=p.x, y=p.y), Image.from_legacy(legacy_psf.computeKernelImage(p))
575 )
576 tc.assertEqual(
577 psf.compute_stellar_bbox(x=p.x, y=p.y), Box.from_legacy(legacy_psf.computeImageBBox(p))
578 )
579 tc.assertEqual(psf.compute_stellar_image(x=p.x, y=p.y), Image.from_legacy(legacy_psf.computeImage(p)))
580 n_points_tested += 1
581 return n_points_tested
584def compare_field_to_legacy(
585 tc: unittest.TestCase,
586 field: BaseField,
587 legacy_field: Any,
588 subimage_bbox: Box,
589) -> None:
590 """Test a Field object by comparing it to an equivalent
591 `lsst.afw.math.BoundedField`.
593 Parameters
594 ----------
595 tc
596 Test case object with assert methods to use.
597 field
598 Field to test.
599 legacy_field : ``lsst.afw.math.BoundedField``
600 Equivalent legacy bounded field.
601 subimage_bbox
602 Bounding box for full-image tests.
603 """
604 tc.assertEqual(field.bounds.bbox, Box.from_legacy(legacy_field.getBBox()))
605 # Pixel coordinates to test the numpy array interface with.
606 pixel_xy = field.bounds.bbox.meshgrid(n=5).map(np.ravel)
607 assert_close(tc, field(x=pixel_xy.x, y=pixel_xy.y), legacy_field.evaluate(pixel_xy.x, pixel_xy.y))
608 legacy_image_1 = Image(0, bbox=subimage_bbox, dtype=np.float64).to_legacy()
609 legacy_field.addToImage(legacy_image_1, overlapOnly=True)
610 assert_images_equal(tc, field.render(subimage_bbox), Image.from_legacy(legacy_image_1), rtol=1e-13)
613def compare_aperture_corrections_to_legacy(
614 tc: unittest.TestCase,
615 aperture_corrections: ApertureCorrectionMap,
616 legacy_ap_corr_map: Any,
617 subimage_bbox: Box,
618) -> None:
619 """Test an aperture correction `dict` by comparing it to an equivalent
620 `lsst.afw.image.ApCorrMap`.
622 Parameters
623 ----------
624 tc
625 Test case object with assert methods to use.
626 aperture_corrections
627 Dictionary to test.
628 legacy_ap_corr_map : ``lsst.afw.image.ApCorrMap``
629 Equivalent legacy aperture correction map.
630 subimage_bbox
631 Bounding box for full-image tests.
632 """
633 tc.assertEqual(aperture_corrections.keys(), set(legacy_ap_corr_map.keys()))
634 for name, field in aperture_corrections.items():
635 compare_field_to_legacy(tc, field, legacy_ap_corr_map[name], subimage_bbox)
638def compare_observation_summary_stats_to_legacy(
639 tc: unittest.TestCase,
640 summary_stats: ObservationSummaryStats,
641 legacy_summary_stats: Any,
642) -> None:
643 """Test an ObservationSummaryStats object by comparing it to an equivalent
644 `lsst.afw.image.ExposureSummaryStats`.
646 Parameters
647 ----------
648 tc
649 Test case object with assert methods to use.
650 summary_stats
651 Struct to test.
652 legacy : ``lsst.afw.image.ExposureSummaryStats``
653 Equivalent legacy struct.
654 """
655 for field in dataclasses.fields(legacy_summary_stats):
656 a = getattr(legacy_summary_stats, field.name)
657 b = getattr(summary_stats, field.name)
658 if isinstance(b, tuple):
659 for ai, bi in zip(a, b):
660 tc.assertTrue(ai == bi or (math.isnan(ai) and math.isnan(bi)), f"{field.name}: {a} != {b}")
661 else:
662 tc.assertTrue(a == b or (math.isnan(a) and math.isnan(b)), f"{field.name}: {a} != {b}")
665def compare_projection_to_legacy_wcs[F: Frame](
666 tc: unittest.TestCase,
667 projection: Projection[F],
668 legacy_wcs: Any,
669 pixel_frame: F,
670 subimage_bbox: Box,
671 is_fits: bool = False,
672) -> None:
673 """Test a Projection object by comparing it to an equivalent
674 `lsst.afw.geom.SkyWcs`.
676 Parameters
677 ----------
678 tc
679 Test case object with assert methods to use.
680 projection
681 Projection to test.
682 legacy_wcs : ``lsst.afw.geom.SkyWcs``
683 Equivalent legacy WCS.
684 pixel_frame
685 Expected pixel frame for the projection.
686 subimage_bbox
687 Bounding box of points to generate for tests.
688 is_fits
689 Whether this projection is expected to be exactly representable as a
690 FIT WCS. If `False` it is assumed to have a FITS approximation
691 attached instead.
692 """
693 # Pixel coordinates to test on over the subimage region of interest:
694 pixel_xy = subimage_bbox.meshgrid(step=50).map(np.ravel)
695 # Array indices of those pixel values (subtract off bbox starts):
696 subimage_array_xy = XY(x=pixel_xy.x - subimage_bbox.x.start, y=pixel_xy.y - subimage_bbox.y.start)
697 sky_coords = legacy_coords_to_astropy(
698 legacy_wcs.pixelToSky(arrays_to_legacy_points(pixel_xy.x, pixel_xy.y))
699 )
700 # Test transforming with the Projection itself, which also tests its
701 # nested Transform and an Astropy High-Level WCS view with no origin
702 # change.
703 check_projection(tc, projection, pixel_xy, sky_coords, pixel_frame)
704 # Also test the Astropy High-Level WCS view with an origin change to
705 # array indices.
706 check_astropy_wcs_interface(
707 tc, projection.as_astropy(subimage_bbox), subimage_array_xy, sky_coords, pixel_atol=1e-5
708 )
709 if is_fits:
710 fits_wcs = projection.as_fits_wcs(subimage_bbox, allow_approximation=True)
711 assert fits_wcs is not None
712 check_astropy_wcs_interface(tc, fits_wcs, subimage_array_xy, sky_coords, pixel_atol=1e-5)
713 # Use that FITS approximation to check that we can make a
714 # Projection from a FITS WCS, too.
715 fits_projection = Projection.from_fits_wcs(fits_wcs, pixel_frame)
716 check_projection(
717 tc,
718 fits_projection,
719 subimage_array_xy,
720 sky_coords,
721 pixel_frame,
722 pixel_atol=1e-5,
723 )
724 # We want Projections we create from a FITS WCS to be backed by an
725 # AST FrameSet so we can convert them into legacy
726 # `lsst.afw.geom.SkyWcs` objects if desired.
727 tc.assertIn("Begin FrameSet", fits_projection.show())
728 else:
729 tc.assertIsNone(projection.as_fits_wcs(subimage_bbox, allow_approximation=False))
730 # The legacy SkyWcs should instead have a FITS approximation
731 # attached; run the same tests on that.
732 assert projection.fits_approximation is not None
733 compare_projection_to_legacy_wcs(
734 tc,
735 projection.fits_approximation,
736 legacy_wcs.getFitsApproximation(),
737 pixel_frame,
738 subimage_bbox,
739 is_fits=True,
740 )
743def check_transform[I: Frame, O: Frame](
744 tc: unittest.TestCase,
745 transform: Transform[I, O],
746 input_xy: XY[np.ndarray],
747 output_xy: XY[np.ndarray],
748 in_frame: Frame,
749 out_frame: Frame,
750 *,
751 check_inverted: bool = True,
752 in_atol: u.Quantity | None = None,
753 out_atol: u.Quantity | None = None,
754) -> None:
755 """Test Transform against known arrays of input and output points.
757 Parameters
758 ----------
759 tc
760 Test case object with assert methods to use.
761 transform
762 Transform to test.
763 input_xy
764 Arrays of input points.
765 output_xy
766 Arrays of output points.
767 in_frame
768 Expected input frame.
769 out_frame
770 Expected output frame.
771 check_inverted
772 If `True`, recurse (once) to test the inverse transform.
773 in_atol
774 Expected absolute precision of input points.
775 out_atol
776 Expected absolute precision of output points.
777 """
778 tc.assertEqual(transform.in_frame, in_frame)
779 tc.assertEqual(transform.out_frame, out_frame)
780 in_atol_v = in_atol.to_value(in_frame.unit) if in_atol is not None else None
781 out_atol_v = out_atol.to_value(out_frame.unit) if out_atol is not None else None
782 # Test array interfaces.
783 test_output_xy = transform.apply_forward(x=input_xy.x, y=input_xy.y)
784 assert_close(tc, test_output_xy.x, output_xy.x, atol=out_atol_v)
785 assert_close(tc, test_output_xy.y, output_xy.y, atol=out_atol_v)
786 test_input_xy = transform.apply_inverse(x=output_xy.x, y=output_xy.y)
787 assert_close(tc, test_input_xy.x, input_xy.x, atol=in_atol_v)
788 assert_close(tc, test_input_xy.y, input_xy.y, atol=in_atol_v)
789 # Test scalar interfaces with numpy scalars.
790 for input_x, input_y, output_x, output_y in zip(input_xy.x, input_xy.y, output_xy.x, output_xy.y):
791 assert_close(tc, transform.apply_forward(x=input_x, y=input_y).x, output_x, atol=out_atol_v)
792 assert_close(tc, transform.apply_forward(x=input_x, y=input_y).y, output_y, atol=out_atol_v)
793 assert_close(tc, transform.apply_inverse(x=output_x, y=output_y).x, input_x, atol=in_atol_v)
794 assert_close(tc, transform.apply_inverse(x=output_x, y=output_y).y, input_y, atol=in_atol_v)
795 # Test quantity array interfaces.
796 input_q_xy = XY(x=input_xy.x * transform.in_frame.unit, y=input_xy.y * transform.in_frame.unit)
797 output_q_xy = XY(x=output_xy.x * transform.out_frame.unit, y=output_xy.y * transform.out_frame.unit)
798 test_output_q_xy = transform.apply_forward_q(x=input_q_xy.x, y=input_q_xy.y)
799 assert_close(tc, test_output_q_xy.x, output_q_xy.x, atol=out_atol)
800 assert_close(tc, test_output_q_xy.y, output_q_xy.y, atol=out_atol)
801 test_input_q_xy = transform.apply_inverse_q(x=output_q_xy.x, y=output_q_xy.y)
802 assert_close(tc, test_input_q_xy.x, input_q_xy.x, atol=in_atol)
803 assert_close(tc, test_input_q_xy.y, input_q_xy.y, atol=in_atol)
804 # Test quantity scalar interfaces.
805 for input_q_x, input_q_y, output_q_x, output_q_y in zip(
806 input_q_xy.x, input_q_xy.y, output_q_xy.x, output_q_xy.y
807 ):
808 assert_close(tc, transform.apply_forward_q(x=input_q_x, y=input_q_y).x, output_q_x, atol=out_atol)
809 assert_close(tc, transform.apply_forward_q(x=input_q_x, y=input_q_y).y, output_q_y, atol=out_atol)
810 assert_close(tc, transform.apply_inverse_q(x=output_q_x, y=output_q_y).x, input_q_x, atol=in_atol)
811 assert_close(tc, transform.apply_inverse_q(x=output_q_x, y=output_q_y).y, input_q_y, atol=in_atol)
812 if check_inverted:
813 # Test the inverse transform.
814 check_transform(
815 tc,
816 transform.inverted(),
817 output_xy,
818 input_xy,
819 out_frame,
820 in_frame,
821 check_inverted=False,
822 out_atol=in_atol,
823 in_atol=out_atol,
824 )
827def check_projection[P: Frame](
828 tc: unittest.TestCase,
829 projection: Projection[P],
830 pixel_xy: XY[np.ndarray],
831 sky_coords: SkyCoord,
832 pixel_frame: Frame,
833 *,
834 pixel_atol: float | None = None,
835 sky_atol: u.Quantity | None = None,
836) -> None:
837 """Test a `.Projection` instance against known arrays of pixel and sky
838 coordinates.
840 Parameters
841 ----------
842 tc
843 Test case object with assert methods to use.
844 projection
845 Projection to test.
846 pixel_xy
847 Arrays of pixel coordinates.
848 sky_coords
849 Corresponding sky coordinates.
850 pixel_frame
851 Expected pixel frame.
852 pixel_atol
853 Expected absolute precision of pixel points.
854 sky_atol
855 Expected absolute precision of sky coordinates.
856 """
857 tc.assertEqual(projection.pixel_frame, pixel_frame)
858 tc.assertEqual(projection.sky_frame, SkyFrame.ICRS)
859 sky_atol_v = sky_atol.to_value(SkyFrame.ICRS.unit) if sky_atol is not None else None
860 pixel_atol_q = pixel_atol * u.pix if pixel_atol is not None else None
861 # Test array interfaces.
862 test_pixel_xy = cast(XY[np.ndarray], projection.sky_to_pixel(sky_coords))
863 assert_close(tc, test_pixel_xy.x, pixel_xy.x, atol=pixel_atol)
864 assert_close(tc, test_pixel_xy.y, pixel_xy.y, atol=pixel_atol)
865 test_sky_astropy = projection.pixel_to_sky(x=pixel_xy.x, y=pixel_xy.y)
866 assert_close(tc, test_sky_astropy.ra, sky_coords.ra, atol=sky_atol_v)
867 assert_close(tc, test_sky_astropy.dec, sky_coords.dec, atol=sky_atol_v)
868 # Test scalar interfaces.
869 for pixel_x, pixel_y, sky_single in zip(pixel_xy.x, pixel_xy.y, sky_coords):
870 assert_close(tc, projection.sky_to_pixel(sky_single).x, pixel_x, atol=pixel_atol)
871 assert_close(tc, projection.sky_to_pixel(sky_single).y, pixel_y, atol=pixel_atol)
872 assert_close(tc, projection.pixel_to_sky(x=pixel_x, y=pixel_y).ra, sky_single.ra, atol=sky_atol_v)
873 assert_close(tc, projection.pixel_to_sky(x=pixel_x, y=pixel_y).dec, sky_single.dec, atol=sky_atol_v)
874 # Test the underlying Transform object.
875 sky_xy = XY(x=sky_coords.ra.to_value(u.rad), y=sky_coords.dec.to_value(u.rad))
876 check_transform(
877 tc,
878 projection.pixel_to_sky_transform,
879 pixel_xy,
880 sky_xy,
881 pixel_frame,
882 SkyFrame.ICRS,
883 check_inverted=False,
884 in_atol=pixel_atol_q,
885 out_atol=sky_atol,
886 )
887 check_transform(
888 tc,
889 projection.sky_to_pixel_transform,
890 sky_xy,
891 pixel_xy,
892 SkyFrame.ICRS,
893 pixel_frame,
894 check_inverted=False,
895 in_atol=sky_atol,
896 out_atol=pixel_atol_q,
897 )
898 # Test the Astropy interface adapter.
899 check_astropy_wcs_interface(
900 tc, projection.as_astropy(), pixel_xy, sky_coords, pixel_atol=pixel_atol, sky_atol=sky_atol
901 )
904def assert_projections_equal(
905 tc: unittest.TestCase,
906 a: Projection[Any] | None,
907 b: Projection[Any] | None,
908 expect_identity: bool | None = None,
909) -> None:
910 """Test that two `.Projection` instances are equivalent."""
911 if a is None and b is None:
912 return
913 assert a is not None and b is not None
914 match expect_identity:
915 case True:
916 tc.assertIs(a, b)
917 return
918 case False:
919 tc.assertIsNot(a, b)
920 case None if a is b:
921 return
922 tc.assertEqual(a.pixel_frame, b.pixel_frame)
923 tc.assertEqual(a.show(simplified=True), b.show(simplified=True))
924 assert_projections_equal(
925 tc, a.fits_approximation, cast(Projection[Any], b.fits_approximation), expect_identity=False
926 )
929def check_astropy_wcs_interface(
930 tc: unittest.TestCase,
931 wcs: astropy.wcs.wcsapi.BaseHighLevelWCS,
932 pixel_xy: XY[np.ndarray],
933 sky_coords: SkyCoord,
934 *,
935 pixel_atol: float | None = None,
936 sky_atol: u.Quantity | None = None,
937) -> None:
938 """Test an Astropy WCS instance against known arrays of pixel and
939 sky coordinates.
941 Parameters
942 ----------
943 tc
944 Test case object with assert methods to use.
945 wcs
946 Astropy WCS object to test.
947 pixel_xy
948 Arrays of pixel coordinates.
949 sky_coords
950 Corresponding sky coordinates.
951 pixel_atol
952 Expected absolute precision of pixel points.
953 sky_atol
954 Expected absolute precision of sky coordinates.
955 """
956 test_x, test_y = wcs.world_to_pixel(sky_coords)
957 assert_close(tc, test_x, pixel_xy.x, atol=pixel_atol)
958 assert_close(tc, test_y, pixel_xy.y, atol=pixel_atol)
959 test_sky_coords = wcs.pixel_to_world(pixel_xy.x, pixel_xy.y)
960 assert_close(tc, test_sky_coords.ra, sky_coords.ra, atol=sky_atol)
961 assert_close(tc, test_sky_coords.dec, sky_coords.dec, atol=sky_atol)
964def legacy_points_to_xy_array(legacy_points: list[Any]) -> XY[np.ndarray]:
965 """Convert a list of ``lsst.geom.Point2D`` objects to an `.XY` array."""
966 return XY(x=np.array([p.x for p in legacy_points]), y=np.array([p.y for p in legacy_points]))
969def legacy_coords_to_astropy(legacy_coords: list[Any]) -> SkyCoord:
970 """Convert a list of ``lsst.geom.SpherePoint`` objects to an Astropy
971 coordinate object.
972 """
973 return SkyCoord(
974 ra=np.array([p.getRa().asRadians() for p in legacy_coords]) * u.rad,
975 dec=np.array([p.getDec().asRadians() for p in legacy_coords]) * u.rad,
976 )
979def arrays_to_legacy_points(x: np.ndarray, y: np.ndarray) -> list[Any]:
980 """Convert arrays of ``x`` and ``y`` to a list of ``lsst.geom.Point2D``."""
981 from lsst.geom import Point2D
983 return [Point2D(x=xv, y=yv) for xv, yv in zip(x, y)]