Coverage for tests / test_mask.py: 13%

130 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-18 09:00 +0000

1# This file is part of lsst-images. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# Use of this source code is governed by a 3-clause BSD-style 

10# license that can be found in the LICENSE file. 

11 

12from __future__ import annotations 

13 

14import os 

15import unittest 

16 

17import numpy as np 

18from astro_metadata_translator import ObservationInfo 

19 

20import lsst.utils.tests 

21from lsst.images import Box, Mask, MaskPlane, MaskSchema, get_legacy_visit_image_mask_planes 

22from lsst.images.tests import RoundtripFits, assert_masks_equal, compare_mask_to_legacy 

23 

24DATA_DIR = os.environ.get("TESTDATA_IMAGES_DIR", None) 

25 

26 

27class MaskTestCase(unittest.TestCase): 

28 """Tests for Mask and its helper classes.""" 

29 

30 def setUp(self) -> None: 

31 self.maxDiff = None 

32 self.rng = np.random.default_rng(500) 

33 

34 def make_mask_planes(self, n_planes: int, n_placeholders: int) -> list[MaskPlane | None]: 

35 planes: list[MaskPlane | None] = [] 

36 for i in range(n_planes): 

37 planes.append(MaskPlane(f"M{i}", f"D{i}")) 

38 planes.extend([None] * n_placeholders) 

39 self.rng.shuffle(planes) 

40 return planes 

41 

42 def test_schema(self) -> None: 

43 """Test MaskSchema.""" 

44 self.assertEqual(MaskSchema.bits_per_element(np.uint8), 8) 

45 planes = self.make_mask_planes(17, 5) 

46 with self.assertRaises(TypeError): 

47 MaskSchema.bits_per_element(np.float32) 

48 schema = MaskSchema(planes, dtype=np.uint8) 

49 self.assertEqual(list(schema), planes) 

50 self.assertEqual(len(schema), len(planes)) 

51 self.assertEqual(schema[5], planes[5]) 

52 self.assertEqual( 

53 eval(repr(schema), {"dtype": np.dtype, "MaskSchema": MaskSchema, "MaskPlane": MaskPlane}), schema 

54 ) 

55 string = str(schema) 

56 self.assertEqual(len(string.split("\n")), 17) 

57 bit5 = schema.bit("M5") 

58 self.assertIn(f"M5 [{bit5.index}@{hex(bit5.mask)}]: D5", string) 

59 self.assertEqual(schema, MaskSchema(planes, np.uint8)) 

60 self.assertNotEqual(schema, MaskSchema(planes, np.int16)) 

61 self.assertNotEqual(schema, MaskSchema(planes[:-1], np.uint8)) 

62 self.assertEqual(schema.dtype, np.dtype(np.uint8)) 

63 self.assertEqual(schema.mask_size, 3) 

64 self.assertEqual(schema.names, {f"M{i}" for i in range(17)}) 

65 self.assertEqual(schema.descriptions, {f"M{i}": f"D{i}" for i in range(17)}) 

66 bit7 = schema.bit("M7") 

67 bitmask57 = schema.bitmask("M5", "M7") 

68 self.assertTrue(bitmask57[bit5.index] & bit5.mask) 

69 self.assertTrue(bitmask57[bit7.index] & bit7.mask) 

70 bitmask57[bit5.index] &= ~bit5.mask 

71 bitmask57[bit7.index] &= ~bit7.mask 

72 self.assertFalse(bitmask57.any()) 

73 splits = schema.split(np.int16) 

74 self.assertEqual(len(splits), 2) 

75 self.assertEqual(splits[0].mask_size, 1) 

76 self.assertEqual(splits[1].mask_size, 1) 

77 self.assertEqual(list(splits[0]) + list(splits[1]), [p for p in planes if p is not None]) 

78 self.assertEqual(len(splits[0]), 15) 

79 self.assertEqual(len(splits[1]), 2) 

80 

81 def test_basics(self) -> None: 

82 """Test some basic Mask functionality.""" 

83 planes = self.make_mask_planes(35, n_placeholders=5) 

84 schema = MaskSchema(planes, dtype=np.uint8) 

85 bbox = Box.factory[5:50, 6:60] 

86 mask = Mask( 

87 0, 

88 schema=schema, 

89 bbox=bbox, 

90 metadata={"four_and_a_half": 4.5}, 

91 obs_info=ObservationInfo(instrument="LSSTCam"), 

92 ) 

93 

94 self.assertIs(mask[...], mask) 

95 self.assertEqual(mask.__eq__(42), NotImplemented) 

96 self.assertEqual(mask, mask) 

97 self.assertEqual(mask.obs_info.instrument, "LSSTCam") 

98 self.maxDiff = None 

99 self.assertEqual( 

100 str(mask), 

101 "Mask([y=5:50, x=6:60], ['M34', 'M15', 'M29', 'M1', 'M20', 'M11', 'M13', 'M7', 'M17', 'M12', " 

102 "'M31', 'M16', 'M2', 'M3', 'M8', 'M26', 'M22', 'M5', 'M18', 'M19', 'M24', 'M21', 'M27', 'M6', " 

103 "'M28', 'M10', 'M4', 'M23', 'M0', 'M25', 'M9', 'M14', 'M33', 'M32', 'M30'])", 

104 ) 

105 self.assertTrue( 

106 repr(mask).startswith( 

107 "Mask(..., bbox=Box(y=Interval(start=5, stop=50), x=Interval(start=6, stop=60)), " 

108 "schema=MaskSchema([MaskPlane(name='M34', description='D34')" 

109 ), 

110 f"Repr: {mask!r}", 

111 ) 

112 

113 with self.assertRaises(TypeError): 

114 # No bbox, size or array. 

115 Mask(0, schema=schema) 

116 

117 with self.assertRaises(ValueError): 

118 # Box mismatch. 

119 Mask(mask.array, schema=schema, bbox=Box.factory[0:20, -5:45]) 

120 

121 with self.assertRaises(ValueError): 

122 # Shape mismatch. 

123 Mask(mask.array, schema=schema, shape=(5, 10, 5)) 

124 

125 with self.assertRaises(ValueError): 

126 # Cannot be 2-D. 

127 Mask(mask.array.reshape((2430, 5)), schema=schema, bbox=Box.factory[0:20, -5:45]) 

128 

129 def test_read_write(self) -> None: 

130 """Explicit calls to read and write fits.""" 

131 planes = self.make_mask_planes(35, n_placeholders=5) 

132 schema = MaskSchema(planes, dtype=np.uint8) 

133 bbox = Box.factory[5:50, 6:60] 

134 mask = Mask( 

135 0, 

136 schema=schema, 

137 bbox=bbox, 

138 metadata={"four_and_a_half": 4.5}, 

139 obs_info=ObservationInfo(instrument="LSSTCam"), 

140 ) 

141 with lsst.utils.tests.getTempFilePath(".fits") as tmpFile: 

142 mask.write_fits(tmpFile) 

143 

144 new = Mask.read_fits(tmpFile) 

145 self.assertEqual(new, mask) 

146 # __eq__ ignores metadata. 

147 self.assertEqual(new.metadata["four_and_a_half"], 4.5) 

148 self.assertEqual(new.obs_info.instrument, "LSSTCam") 

149 self.assertEqual(new.obs_info, mask.obs_info) 

150 self.assertEqual(new.metadata, mask.metadata) 

151 

152 def test_serialize_multi(self) -> None: 

153 """Test serializing a mask with more than 31 mask planes, requiring 

154 more than one HDU and EXTVER. 

155 

156 Note that serialization for simpler cases is covered by 

157 test_masked_image.py. 

158 """ 

159 planes = self.make_mask_planes(35, n_placeholders=5) 

160 schema = MaskSchema(planes, dtype=np.uint8) 

161 bbox = Box.factory[5:50, 6:60] 

162 mask = Mask(0, schema=schema, bbox=bbox, metadata={"four_and_a_half": 4.5}) 

163 shape = bbox.shape 

164 for plane in schema: 

165 if plane is not None: 

166 mask.set(plane.name, self.rng.random(shape) > 0.5) 

167 with RoundtripFits(self, mask) as roundtrip: 

168 fits = roundtrip.inspect() 

169 self.assertEqual(fits[1].header["EXTNAME"], "MASK") 

170 self.assertEqual(fits[1].header.get("EXTVER", 1), 1) 

171 self.assertEqual(fits[1].header["ZCMPTYPE"], "GZIP_2") 

172 self.assertEqual(fits[2].header["EXTNAME"], "MASK") 

173 self.assertEqual(fits[2].header["EXTVER"], 2) 

174 self.assertEqual(fits[2].header["ZCMPTYPE"], "GZIP_2") 

175 n = 0 

176 for plane in planes: 

177 if plane is not None: 

178 hdu = fits[1] if n < 31 else fits[2] 

179 self.assertEqual(hdu.header[f"MSKN{(n % 31) + 1:04d}"], plane.name) 

180 self.assertEqual(hdu.header[f"MSKM{(n % 31) + 1:04d}"], 1 << (n % 31)) 

181 self.assertEqual(hdu.header[f"MSKD{(n % 31) + 1:04d}"], plane.description) 

182 n += 1 

183 assert_masks_equal(self, mask, roundtrip.result) 

184 

185 @unittest.skipUnless(DATA_DIR is not None, "TESTDATA_IMAGES_DIR is not in the environment.") 

186 def test_legacy(self) -> None: 

187 """Test Mask.read_legacy, Mask.to_legacy, and Mask.from_legacy.""" 

188 assert DATA_DIR is not None, "Guaranteed by decorator." 

189 filename = os.path.join(DATA_DIR, "dp2", "legacy", "visit_image.fits") 

190 plane_map = get_legacy_visit_image_mask_planes() 

191 mask = Mask.read_legacy(filename, ext=2, plane_map=plane_map) 

192 try: 

193 from lsst.afw.image import MaskedImageFitsReader 

194 except ImportError: 

195 raise unittest.SkipTest("'lsst.afw.image' could not be imported.") from None 

196 reader = MaskedImageFitsReader(filename) 

197 self.assertEqual(mask.bbox, Box.from_legacy(reader.readBBox())) 

198 legacy_mask = reader.readMask() 

199 compare_mask_to_legacy(self, mask, legacy_mask, plane_map) 

200 compare_mask_to_legacy(self, mask, mask.to_legacy(plane_map), plane_map) 

201 assert_masks_equal(self, mask, Mask.from_legacy(legacy_mask, plane_map=plane_map)) 

202 

203 

204if __name__ == "__main__": 

205 unittest.main()