Coverage for tests / test_masked_image.py: 19%
104 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-23 08:41 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-23 08:41 +0000
1# This file is part of lsst-images.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# Use of this source code is governed by a 3-clause BSD-style
10# license that can be found in the LICENSE file.
12from __future__ import annotations
14import os
15import tempfile
16import unittest
18import astropy.io.fits
19import astropy.units as u
20import numpy as np
21from astro_metadata_translator import ObservationInfo
23from lsst.images import Box, Image, MaskedImage, MaskPlane, MaskSchema, get_legacy_visit_image_mask_planes
24from lsst.images.fits import FitsCompressionOptions
25from lsst.images.tests import RoundtripFits, assert_masked_images_equal, compare_masked_image_to_legacy
27DATA_DIR = os.environ.get("TESTDATA_IMAGES_DIR", None)
30class MaskedImageTestCase(unittest.TestCase):
31 """Tests for the MaskedImage class and the basics of the archive system."""
33 def setUp(self) -> None:
34 self.maxDiff = None
35 self.rng = np.random.default_rng(500)
36 self.obs_info = ObservationInfo(instrument="LSSTCam", detector_num=4)
37 self.masked_image = MaskedImage(
38 Image(self.rng.normal(100.0, 8.0, size=(200, 251)), dtype=np.float64, unit=u.nJy, start=(5, 8)),
39 mask_schema=MaskSchema(
40 [
41 MaskPlane("BAD", "Pixel is very bad, possibly downright evil."),
42 MaskPlane("HUNGRY", "Pixel hasn't had enough to eat today."),
43 ]
44 ),
45 metadata={"fifty": "5 * 10"},
46 obs_info=self.obs_info,
47 )
48 self.masked_image.mask.array |= np.multiply.outer(
49 self.masked_image.image.array < 102.0,
50 self.masked_image.mask.schema.bitmask("BAD"),
51 )
52 self.masked_image.mask.array |= np.multiply.outer(
53 self.masked_image.image.array > 98.0,
54 self.masked_image.mask.schema.bitmask("HUNGRY"),
55 )
56 self.masked_image.variance.array = self.rng.normal(64.0, 0.5, size=self.masked_image.bbox.shape)
58 def test_construction(self) -> None:
59 """Test that the MaskedImage construction (in setUp) worked."""
60 self.assertEqual(self.masked_image.bbox, Box.factory[5:205, 8:259])
61 self.assertEqual(self.masked_image.mask.bbox, self.masked_image.bbox)
62 self.assertEqual(self.masked_image.variance.bbox, self.masked_image.bbox)
63 self.assertEqual(self.masked_image.image.array.shape, self.masked_image.bbox.shape)
64 self.assertEqual(self.masked_image.mask.array.shape, self.masked_image.bbox.shape + (1,))
65 self.assertEqual(self.masked_image.variance.array.shape, self.masked_image.bbox.shape)
66 self.assertEqual(self.masked_image.unit, u.nJy)
67 self.assertEqual(self.masked_image.variance.unit, u.nJy**2)
68 self.assertEqual(self.masked_image.metadata, {"fifty": "5 * 10"})
69 self.assertEqual(self.masked_image.obs_info.instrument, "LSSTCam")
70 # The checks below are subject to the vagaries of the RNG, but we want
71 # the seed to be such that they all pass, or other tests will be
72 # weaker.
73 self.assertGreater(
74 np.sum(self.masked_image.mask.array == self.masked_image.mask.schema.bitmask("BAD")), 0
75 )
76 self.assertGreater(
77 np.sum(self.masked_image.mask.array == self.masked_image.mask.schema.bitmask("HUNGRY")), 0
78 )
79 self.assertGreater(
80 np.sum(self.masked_image.mask.array == self.masked_image.mask.schema.bitmask("BAD", "HUNGRY")), 0
81 )
83 self.assertIs(self.masked_image[...], self.masked_image)
84 self.assertEqual(
85 str(self.masked_image), "MaskedImage(Image([y=5:205, x=8:259], float64), ['BAD', 'HUNGRY'])"
86 )
87 self.assertEqual(
88 repr(self.masked_image),
89 "MaskedImage(Image(..., bbox=Box(y=Interval(start=5, stop=205), x=Interval(start=8, stop=259)), "
90 "dtype=dtype('float64')), mask_schema=MaskSchema([MaskPlane(name='BAD', description='Pixel is "
91 "very bad, possibly downright evil.'), MaskPlane(name='HUNGRY', description=\"Pixel hasn't had "
92 "enough to eat today.\")], dtype=dtype('uint8')))",
93 )
94 copy = self.masked_image.copy()
95 original = self.masked_image.image.array[0, 0]
96 copy.image.array[0, 0] = 38.0
97 self.assertEqual(self.masked_image.image.array[0, 0], original)
98 self.assertEqual(copy.image.array[0, 0], 38.0)
100 # Test error conditions.
101 with self.assertRaises(ValueError):
102 # Disagreement over mask bbox.
103 MaskedImage(Image(42.0, shape=(5, 6)), mask=self.masked_image.mask)
104 with self.assertRaises(TypeError):
105 # No mask definition.
106 MaskedImage(self.masked_image.image, variance=self.masked_image.variance)
107 with self.assertRaises(TypeError):
108 # Can not provide mask and mask schema.
109 MaskedImage(
110 Image(42.0, shape=(5, 5)),
111 mask=self.masked_image.mask,
112 mask_schema=self.masked_image.mask.schema,
113 )
114 with self.assertRaises(ValueError):
115 # image and variance bbox disagreement.
116 MaskedImage(
117 Image(42.0, shape=(5, 5)),
118 mask_schema=self.masked_image.mask.schema,
119 variance=self.masked_image.variance,
120 )
121 with self.assertRaises(ValueError):
122 # no image unit but there is variance unit.
123 MaskedImage(
124 Image(42.0, shape=(5, 5)),
125 mask_schema=self.masked_image.mask.schema,
126 variance=Image(1.0, shape=(5, 5), unit=u.nJy),
127 )
128 with self.assertRaises(ValueError):
129 # image and variance units disagree.
130 MaskedImage(
131 Image(42.0, shape=(5, 5), unit=u.nJy),
132 mask_schema=self.masked_image.mask.schema,
133 variance=Image(1.0, shape=(5, 5), unit=u.nJy),
134 )
136 def test_subset(self) -> None:
137 """Test assignment of subset."""
138 copy = self.masked_image.copy()
139 subset = copy.local[0:10, 20:30].copy()
140 subset.image[...] = Image(42.0, shape=(10, 10), unit=u.nJy)
141 copy[subset.bbox] = subset
142 self.assertEqual(copy.image.array[0, 20], 42.0)
143 self.assertEqual(copy.image.array[0, 0], self.masked_image.image.array[0, 0])
145 def test_fits_roundtrip(self) -> None:
146 """Test that we can round-trip the MaskedImage through FITS, including
147 subimage reads.
148 """
149 subbox = Box.factory[11:20, 25:30]
150 subslices = (slice(6, 15), slice(17, 22))
151 np.testing.assert_array_equal(
152 self.masked_image.image.array[subslices], self.masked_image.image[subbox].array
153 )
154 with RoundtripFits(self, self.masked_image, "MaskedImageV2") as roundtrip:
155 subimage = roundtrip.get(bbox=subbox)
156 # Check that we used lossless compression (the default).
157 fits = roundtrip.inspect()
158 self.assertEqual(fits[1].header["ZCMPTYPE"], "GZIP_2")
159 self.assertEqual(fits[2].header["ZCMPTYPE"], "GZIP_2")
160 self.assertEqual(fits[3].header["ZCMPTYPE"], "GZIP_2")
161 assert_masked_images_equal(self, roundtrip.result, self.masked_image, expect_view=False)
162 assert_masked_images_equal(self, subimage, roundtrip.result[subbox], expect_view=False)
164 def test_fits_roundtrip_lossy(self) -> None:
165 """Test that we can round-trip the MaskedImage through FITS, including
166 subimage reads, with lossy compression.
167 """
168 subbox = Box.factory[11:20, 25:30]
169 subslices = (slice(6, 15), slice(17, 22))
170 np.testing.assert_array_equal(
171 self.masked_image.image.array[subslices], self.masked_image.image[subbox].array
172 )
173 with tempfile.NamedTemporaryFile(suffix=".fits", delete_on_close=False, delete=True) as tmp:
174 tmp.close()
175 self.masked_image.write_fits(
176 tmp.name,
177 image_compression=FitsCompressionOptions.LOSSY,
178 variance_compression=FitsCompressionOptions.LOSSY,
179 compression_seed=50,
180 )
181 roundtripped = MaskedImage.read_fits(tmp.name)
182 subimage = MaskedImage.read_fits(tmp.name, bbox=subbox)
183 with astropy.io.fits.open(tmp.name, disable_image_compression=True) as fits:
184 self.assertEqual(fits[1].header["ZCMPTYPE"], "RICE_1")
185 self.assertEqual(fits[2].header["ZCMPTYPE"], "GZIP_2")
186 self.assertEqual(fits[3].header["ZCMPTYPE"], "RICE_1")
187 assert_masked_images_equal(self, roundtripped, self.masked_image, expect_view=False, rtol=0.01)
188 assert_masked_images_equal(self, subimage, roundtripped[subbox], expect_view=False)
190 @unittest.skipUnless(DATA_DIR is not None, "TESTDATA_IMAGES_DIR is not in the environment.")
191 def test_legacy(self) -> None:
192 """Test MaskedImage.read_legacy, MaskedImage.to_legacy, and
193 MaskedImage.from_legacy.
194 """
195 assert DATA_DIR is not None, "Guaranteed by decorator."
196 filename = os.path.join(DATA_DIR, "dp2", "legacy", "visit_image.fits")
197 plane_map = get_legacy_visit_image_mask_planes()
198 masked_image = MaskedImage.read_legacy(filename, plane_map=plane_map)
199 try:
200 from lsst.afw.image import MaskedImageFitsReader
201 except ImportError:
202 raise unittest.SkipTest("'lsst.afw.image' could not be imported.") from None
203 reader = MaskedImageFitsReader(filename)
204 legacy_masked_image = reader.read()
205 compare_masked_image_to_legacy(
206 self, masked_image, legacy_masked_image, plane_map=plane_map, expect_view=False
207 )
208 compare_masked_image_to_legacy(
209 self,
210 masked_image,
211 masked_image.to_legacy(plane_map=plane_map),
212 plane_map=plane_map,
213 expect_view=True,
214 )
215 compare_masked_image_to_legacy(
216 self,
217 MaskedImage.from_legacy(legacy_masked_image, plane_map=plane_map),
218 legacy_masked_image,
219 expect_view=True,
220 plane_map=plane_map,
221 )
224if __name__ == "__main__":
225 unittest.main()