Coverage for tests / test_mask.py: 13%
130 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-15 00:04 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-15 00:04 +0000
1# This file is part of lsst-images.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# Use of this source code is governed by a 3-clause BSD-style
10# license that can be found in the LICENSE file.
12from __future__ import annotations
14import os
15import unittest
17import numpy as np
18from astro_metadata_translator import ObservationInfo
20import lsst.utils.tests
21from lsst.images import Box, Mask, MaskPlane, MaskSchema, get_legacy_visit_image_mask_planes
22from lsst.images.tests import RoundtripFits, assert_masks_equal, compare_mask_to_legacy
24DATA_DIR = os.environ.get("TESTDATA_IMAGES_DIR", None)
27class MaskTestCase(unittest.TestCase):
28 """Tests for Mask and its helper classes."""
30 def setUp(self) -> None:
31 self.maxDiff = None
32 self.rng = np.random.default_rng(500)
34 def make_mask_planes(self, n_planes: int, n_placeholders: int) -> list[MaskPlane | None]:
35 planes: list[MaskPlane | None] = []
36 for i in range(n_planes):
37 planes.append(MaskPlane(f"M{i}", f"D{i}"))
38 planes.extend([None] * n_placeholders)
39 self.rng.shuffle(planes)
40 return planes
42 def test_schema(self) -> None:
43 """Test MaskSchema."""
44 self.assertEqual(MaskSchema.bits_per_element(np.uint8), 8)
45 planes = self.make_mask_planes(17, 5)
46 with self.assertRaises(TypeError):
47 MaskSchema.bits_per_element(np.float32)
48 schema = MaskSchema(planes, dtype=np.uint8)
49 self.assertEqual(list(schema), planes)
50 self.assertEqual(len(schema), len(planes))
51 self.assertEqual(schema[5], planes[5])
52 self.assertEqual(
53 eval(repr(schema), {"dtype": np.dtype, "MaskSchema": MaskSchema, "MaskPlane": MaskPlane}), schema
54 )
55 string = str(schema)
56 self.assertEqual(len(string.split("\n")), 17)
57 bit5 = schema.bit("M5")
58 self.assertIn(f"M5 [{bit5.index}@{hex(bit5.mask)}]: D5", string)
59 self.assertEqual(schema, MaskSchema(planes, np.uint8))
60 self.assertNotEqual(schema, MaskSchema(planes, np.int16))
61 self.assertNotEqual(schema, MaskSchema(planes[:-1], np.uint8))
62 self.assertEqual(schema.dtype, np.dtype(np.uint8))
63 self.assertEqual(schema.mask_size, 3)
64 self.assertEqual(schema.names, {f"M{i}" for i in range(17)})
65 self.assertEqual(schema.descriptions, {f"M{i}": f"D{i}" for i in range(17)})
66 bit7 = schema.bit("M7")
67 bitmask57 = schema.bitmask("M5", "M7")
68 self.assertTrue(bitmask57[bit5.index] & bit5.mask)
69 self.assertTrue(bitmask57[bit7.index] & bit7.mask)
70 bitmask57[bit5.index] &= ~bit5.mask
71 bitmask57[bit7.index] &= ~bit7.mask
72 self.assertFalse(bitmask57.any())
73 splits = schema.split(np.int16)
74 self.assertEqual(len(splits), 2)
75 self.assertEqual(splits[0].mask_size, 1)
76 self.assertEqual(splits[1].mask_size, 1)
77 self.assertEqual(list(splits[0]) + list(splits[1]), [p for p in planes if p is not None])
78 self.assertEqual(len(splits[0]), 15)
79 self.assertEqual(len(splits[1]), 2)
81 def test_basics(self) -> None:
82 """Test some basic Mask functionality."""
83 planes = self.make_mask_planes(35, n_placeholders=5)
84 schema = MaskSchema(planes, dtype=np.uint8)
85 bbox = Box.factory[5:50, 6:60]
86 mask = Mask(
87 0,
88 schema=schema,
89 bbox=bbox,
90 metadata={"four_and_a_half": 4.5},
91 obs_info=ObservationInfo(instrument="LSSTCam"),
92 )
94 self.assertIs(mask[...], mask)
95 self.assertEqual(mask.__eq__(42), NotImplemented)
96 self.assertEqual(mask, mask)
97 self.assertEqual(mask.obs_info.instrument, "LSSTCam")
98 self.maxDiff = None
99 self.assertEqual(
100 str(mask),
101 "Mask([y=5:50, x=6:60], ['M34', 'M15', 'M29', 'M1', 'M20', 'M11', 'M13', 'M7', 'M17', 'M12', "
102 "'M31', 'M16', 'M2', 'M3', 'M8', 'M26', 'M22', 'M5', 'M18', 'M19', 'M24', 'M21', 'M27', 'M6', "
103 "'M28', 'M10', 'M4', 'M23', 'M0', 'M25', 'M9', 'M14', 'M33', 'M32', 'M30'])",
104 )
105 self.assertTrue(
106 repr(mask).startswith(
107 "Mask(..., bbox=Box(y=Interval(start=5, stop=50), x=Interval(start=6, stop=60)), "
108 "schema=MaskSchema([MaskPlane(name='M34', description='D34')"
109 ),
110 f"Repr: {mask!r}",
111 )
113 with self.assertRaises(TypeError):
114 # No bbox, size or array.
115 Mask(0, schema=schema)
117 with self.assertRaises(ValueError):
118 # Box mismatch.
119 Mask(mask.array, schema=schema, bbox=Box.factory[0:20, -5:45])
121 with self.assertRaises(ValueError):
122 # Shape mismatch.
123 Mask(mask.array, schema=schema, shape=(5, 10, 5))
125 with self.assertRaises(ValueError):
126 # Cannot be 2-D.
127 Mask(mask.array.reshape((2430, 5)), schema=schema, bbox=Box.factory[0:20, -5:45])
129 def test_read_write(self) -> None:
130 """Explicit calls to read and write fits."""
131 planes = self.make_mask_planes(35, n_placeholders=5)
132 schema = MaskSchema(planes, dtype=np.uint8)
133 bbox = Box.factory[5:50, 6:60]
134 mask = Mask(
135 0,
136 schema=schema,
137 bbox=bbox,
138 metadata={"four_and_a_half": 4.5},
139 obs_info=ObservationInfo(instrument="LSSTCam"),
140 )
141 with lsst.utils.tests.getTempFilePath(".fits") as tmpFile:
142 mask.write_fits(tmpFile)
144 new = Mask.read_fits(tmpFile)
145 self.assertEqual(new, mask)
146 # __eq__ ignores metadata.
147 self.assertEqual(new.metadata["four_and_a_half"], 4.5)
148 self.assertEqual(new.obs_info.instrument, "LSSTCam")
149 self.assertEqual(new.obs_info, mask.obs_info)
150 self.assertEqual(new.metadata, mask.metadata)
152 def test_serialize_multi(self) -> None:
153 """Test serializing a mask with more than 31 mask planes, requiring
154 more than one HDU and EXTVER.
156 Note that serialization for simpler cases is covered by
157 test_masked_image.py.
158 """
159 planes = self.make_mask_planes(35, n_placeholders=5)
160 schema = MaskSchema(planes, dtype=np.uint8)
161 bbox = Box.factory[5:50, 6:60]
162 mask = Mask(0, schema=schema, bbox=bbox, metadata={"four_and_a_half": 4.5})
163 shape = bbox.shape
164 for plane in schema:
165 if plane is not None:
166 mask.set(plane.name, self.rng.random(shape) > 0.5)
167 with RoundtripFits(self, mask) as roundtrip:
168 fits = roundtrip.inspect()
169 self.assertEqual(fits[1].header["EXTNAME"], "MASK")
170 self.assertEqual(fits[1].header.get("EXTVER", 1), 1)
171 self.assertEqual(fits[1].header["ZCMPTYPE"], "GZIP_2")
172 self.assertEqual(fits[2].header["EXTNAME"], "MASK")
173 self.assertEqual(fits[2].header["EXTVER"], 2)
174 self.assertEqual(fits[2].header["ZCMPTYPE"], "GZIP_2")
175 n = 0
176 for plane in planes:
177 if plane is not None:
178 hdu = fits[1] if n < 31 else fits[2]
179 self.assertEqual(hdu.header[f"MSKN{(n % 31) + 1:04d}"], plane.name)
180 self.assertEqual(hdu.header[f"MSKM{(n % 31) + 1:04d}"], 1 << (n % 31))
181 self.assertEqual(hdu.header[f"MSKD{(n % 31) + 1:04d}"], plane.description)
182 n += 1
183 assert_masks_equal(self, mask, roundtrip.result)
185 @unittest.skipUnless(DATA_DIR is not None, "TESTDATA_IMAGES_DIR is not in the environment.")
186 def test_legacy(self) -> None:
187 """Test Mask.read_legacy, Mask.to_legacy, and Mask.from_legacy."""
188 assert DATA_DIR is not None, "Guaranteed by decorator."
189 filename = os.path.join(DATA_DIR, "dp2", "legacy", "visit_image.fits")
190 plane_map = get_legacy_visit_image_mask_planes()
191 mask = Mask.read_legacy(filename, ext=2, plane_map=plane_map)
192 try:
193 from lsst.afw.image import MaskedImageFitsReader
194 except ImportError:
195 raise unittest.SkipTest("'lsst.afw.image' could not be imported.") from None
196 reader = MaskedImageFitsReader(filename)
197 self.assertEqual(mask.bbox, Box.from_legacy(reader.readBBox()))
198 legacy_mask = reader.readMask()
199 compare_mask_to_legacy(self, mask, legacy_mask, plane_map)
200 compare_mask_to_legacy(self, mask, mask.to_legacy(plane_map), plane_map)
201 assert_masks_equal(self, mask, Mask.from_legacy(legacy_mask, plane_map=plane_map))
204if __name__ == "__main__":
205 unittest.main()