Coverage for tests / test_cell_coadd.py: 35%
51 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-01 08:35 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-01 08:35 +0000
1# This file is part of lsst-images.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# Use of this source code is governed by a 3-clause BSD-style
10# license that can be found in the LICENSE file.
12from __future__ import annotations
14import os
15import pickle
16import unittest
17from typing import Any
19import numpy as np
21from lsst.images import YX, Box, Interval, get_legacy_deep_coadd_mask_planes
22from lsst.images.cells import CellCoadd, CellIJ
23from lsst.images.tests import (
24 DP2_COADD_DATA_ID,
25 DP2_COADD_MISSING_CELL,
26 RoundtripFits,
27 assert_masked_images_equal,
28 assert_psfs_equal,
29 compare_cell_coadd_to_legacy,
30)
32DATA_DIR = os.environ.get("TESTDATA_IMAGES_DIR", None)
35@unittest.skipUnless(DATA_DIR is not None, "TESTDATA_IMAGES_DIR is not in the environment.")
36class CellCoaddTestCase(unittest.TestCase):
37 """Tests for the CellCoadd class and its many component classes."""
39 @classmethod
40 def setUpClass(cls) -> None:
41 assert DATA_DIR is not None, "Guaranteed by decorator."
42 cls.filename = os.path.join(DATA_DIR, "dp2", "legacy", "deep_coadd_cell_predetection.fits")
43 cls.plane_map = get_legacy_deep_coadd_mask_planes()
44 cls.missing_cell = CellIJ(**DP2_COADD_MISSING_CELL)
45 try:
46 from lsst.cell_coadds import MultipleCellCoadd
48 cls.legacy_cell_coadd = MultipleCellCoadd.read_fits(cls.filename)
49 except ImportError:
50 raise unittest.SkipTest("lsst.cell_coadds could not be imported.") from None
51 with open(os.path.join(DATA_DIR, "dp2", "legacy", "skyMap.pickle"), "rb") as stream:
52 cls.skymap = pickle.load(stream)
53 cls.cell_coadd = CellCoadd.from_legacy(
54 cls.legacy_cell_coadd,
55 plane_map=cls.plane_map,
56 tract_info=cls.skymap[DP2_COADD_DATA_ID["tract"]],
57 )
59 def make_psf_points(self, bbox: Box) -> YX[np.ndarray]:
60 """Make arrays of points to test PSFs at, given a bbox that is assumed
61 to be snapped to the cell_coadd grid.
62 """
63 xc, yc = np.meshgrid(
64 np.arange(
65 bbox.x.start + self.cell_coadd.grid.cell_shape.x * 0.5,
66 bbox.x.stop,
67 self.cell_coadd.grid.cell_shape.x,
68 ),
69 np.arange(
70 bbox.y.start + self.cell_coadd.grid.cell_shape.y * 0.5,
71 bbox.y.stop,
72 self.cell_coadd.grid.cell_shape.y,
73 ),
74 )
75 return YX(
76 y=yc.ravel() + self.rng.uniform(-0.4, 0.4, size=yc.size),
77 x=xc.ravel() + self.rng.uniform(-0.4, 0.4, size=xc.size),
78 )
80 def setUp(self) -> None:
81 self.rng = np.random.default_rng(44)
82 self.psf_points = self.make_psf_points(self.cell_coadd.bbox)
84 def test_from_legacy(self) -> None:
85 """Test constructing a CellCoadd by converting a legacy
86 lsst.cell_coadds.MultipleCellCoadd.
87 """
88 self.assertEqual(self.cell_coadd.bounds.missing, {self.missing_cell})
89 self.assertEqual(self.cell_coadd.bbox, Box.factory[12900:13500, 9600:10050])
90 compare_cell_coadd_to_legacy(
91 self,
92 self.cell_coadd,
93 self.legacy_cell_coadd,
94 tract_bbox=Box.from_legacy(self.skymap[DP2_COADD_DATA_ID["tract"]].getBBox()),
95 plane_map=self.plane_map,
96 psf_points=self.psf_points,
97 )
99 def test_roundtrip(self) -> None:
100 """Test serializing a CellCoadd and reading it back in, including
101 subimage and component reads.
102 """
103 with RoundtripFits(self, self.cell_coadd, "CellCoadd") as roundtrip:
104 # Check a subimage read. The subbox only overlaps (but does not
105 # fully cover) the middle 2 (of 4) cells in y, while covering
106 # exactly the last column of cells in x. It does not cover the
107 # missing cell.
108 subbox = Box.factory[
109 self.cell_coadd.bbox.y.start + 252 : self.cell_coadd.bbox.y.stop - 175,
110 self.cell_coadd.bbox.x.stop - 150 : self.cell_coadd.bbox.x.stop,
111 ]
112 subimage = roundtrip.get(bbox=subbox)
113 assert_masked_images_equal(self, subimage, self.cell_coadd[subbox], expect_view=False)
114 alternates: dict[str, Any] = {}
115 with self.subTest():
116 subpsf = roundtrip.get("psf", bbox=subbox)
117 self.assertEqual(
118 subpsf.bounds.bbox,
119 Box(
120 y=Interval.factory[
121 self.cell_coadd.bbox.y.start + 150 : self.cell_coadd.bbox.y.stop - 150
122 ],
123 x=subbox.x,
124 ),
125 )
126 assert_psfs_equal(self, subpsf, self.cell_coadd.psf, points=self.make_psf_points(subbox))
127 self.assertEqual(roundtrip.get("bbox"), self.cell_coadd.bbox)
128 alternates = {
129 k: roundtrip.get(k)
130 for k in ["projection", "image", "mask", "variance", "psf", "provenance"]
131 }
132 self.assertEqual(self.cell_coadd.bounds.missing, {self.missing_cell})
133 self.assertEqual(self.cell_coadd.bbox, Box.factory[12900:13500, 9600:10050])
134 compare_cell_coadd_to_legacy(
135 self,
136 roundtrip.result,
137 self.legacy_cell_coadd,
138 tract_bbox=Box.from_legacy(self.skymap[DP2_COADD_DATA_ID["tract"]].getBBox()),
139 plane_map=self.plane_map,
140 alternates=alternates,
141 psf_points=self.psf_points,
142 )
145if __name__ == "__main__":
146 unittest.main()