Coverage for tests / test_masked_image.py: 19%

104 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-30 09:07 +0000

1# This file is part of lsst-images. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# Use of this source code is governed by a 3-clause BSD-style 

10# license that can be found in the LICENSE file. 

11 

12from __future__ import annotations 

13 

14import os 

15import tempfile 

16import unittest 

17 

18import astropy.io.fits 

19import astropy.units as u 

20import numpy as np 

21from astro_metadata_translator import ObservationInfo 

22 

23from lsst.images import Box, Image, MaskedImage, MaskPlane, MaskSchema, get_legacy_visit_image_mask_planes 

24from lsst.images.fits import FitsCompressionOptions 

25from lsst.images.tests import RoundtripFits, assert_masked_images_equal, compare_masked_image_to_legacy 

26 

27DATA_DIR = os.environ.get("TESTDATA_IMAGES_DIR", None) 

28 

29 

30class MaskedImageTestCase(unittest.TestCase): 

31 """Tests for the MaskedImage class and the basics of the archive system.""" 

32 

33 def setUp(self) -> None: 

34 self.maxDiff = None 

35 self.rng = np.random.default_rng(500) 

36 self.obs_info = ObservationInfo(instrument="LSSTCam", detector_num=4) 

37 self.masked_image = MaskedImage( 

38 Image(self.rng.normal(100.0, 8.0, size=(200, 251)), dtype=np.float64, unit=u.nJy, start=(5, 8)), 

39 mask_schema=MaskSchema( 

40 [ 

41 MaskPlane("BAD", "Pixel is very bad, possibly downright evil."), 

42 MaskPlane("HUNGRY", "Pixel hasn't had enough to eat today."), 

43 ] 

44 ), 

45 metadata={"fifty": "5 * 10"}, 

46 obs_info=self.obs_info, 

47 ) 

48 self.masked_image.mask.array |= np.multiply.outer( 

49 self.masked_image.image.array < 102.0, 

50 self.masked_image.mask.schema.bitmask("BAD"), 

51 ) 

52 self.masked_image.mask.array |= np.multiply.outer( 

53 self.masked_image.image.array > 98.0, 

54 self.masked_image.mask.schema.bitmask("HUNGRY"), 

55 ) 

56 self.masked_image.variance.array = self.rng.normal(64.0, 0.5, size=self.masked_image.bbox.shape) 

57 

58 def test_construction(self) -> None: 

59 """Test that the MaskedImage construction (in setUp) worked.""" 

60 self.assertEqual(self.masked_image.bbox, Box.factory[5:205, 8:259]) 

61 self.assertEqual(self.masked_image.mask.bbox, self.masked_image.bbox) 

62 self.assertEqual(self.masked_image.variance.bbox, self.masked_image.bbox) 

63 self.assertEqual(self.masked_image.image.array.shape, self.masked_image.bbox.shape) 

64 self.assertEqual(self.masked_image.mask.array.shape, self.masked_image.bbox.shape + (1,)) 

65 self.assertEqual(self.masked_image.variance.array.shape, self.masked_image.bbox.shape) 

66 self.assertEqual(self.masked_image.unit, u.nJy) 

67 self.assertEqual(self.masked_image.variance.unit, u.nJy**2) 

68 self.assertEqual(self.masked_image.metadata, {"fifty": "5 * 10"}) 

69 self.assertEqual(self.masked_image.obs_info.instrument, "LSSTCam") 

70 # The checks below are subject to the vagaries of the RNG, but we want 

71 # the seed to be such that they all pass, or other tests will be 

72 # weaker. 

73 self.assertGreater( 

74 np.sum(self.masked_image.mask.array == self.masked_image.mask.schema.bitmask("BAD")), 0 

75 ) 

76 self.assertGreater( 

77 np.sum(self.masked_image.mask.array == self.masked_image.mask.schema.bitmask("HUNGRY")), 0 

78 ) 

79 self.assertGreater( 

80 np.sum(self.masked_image.mask.array == self.masked_image.mask.schema.bitmask("BAD", "HUNGRY")), 0 

81 ) 

82 

83 self.assertIs(self.masked_image[...], self.masked_image) 

84 self.assertEqual( 

85 str(self.masked_image), "MaskedImage(Image([y=5:205, x=8:259], float64), ['BAD', 'HUNGRY'])" 

86 ) 

87 self.assertEqual( 

88 repr(self.masked_image), 

89 "MaskedImage(Image(..., bbox=Box(y=Interval(start=5, stop=205), x=Interval(start=8, stop=259)), " 

90 "dtype=dtype('float64')), mask_schema=MaskSchema([MaskPlane(name='BAD', description='Pixel is " 

91 "very bad, possibly downright evil.'), MaskPlane(name='HUNGRY', description=\"Pixel hasn't had " 

92 "enough to eat today.\")], dtype=dtype('uint8')))", 

93 ) 

94 copy = self.masked_image.copy() 

95 original = self.masked_image.image.array[0, 0] 

96 copy.image.array[0, 0] = 38.0 

97 self.assertEqual(self.masked_image.image.array[0, 0], original) 

98 self.assertEqual(copy.image.array[0, 0], 38.0) 

99 

100 # Test error conditions. 

101 with self.assertRaises(ValueError): 

102 # Disagreement over mask bbox. 

103 MaskedImage(Image(42.0, shape=(5, 6)), mask=self.masked_image.mask) 

104 with self.assertRaises(TypeError): 

105 # No mask definition. 

106 MaskedImage(self.masked_image.image, variance=self.masked_image.variance) 

107 with self.assertRaises(TypeError): 

108 # Can not provide mask and mask schema. 

109 MaskedImage( 

110 Image(42.0, shape=(5, 5)), 

111 mask=self.masked_image.mask, 

112 mask_schema=self.masked_image.mask.schema, 

113 ) 

114 with self.assertRaises(ValueError): 

115 # image and variance bbox disagreement. 

116 MaskedImage( 

117 Image(42.0, shape=(5, 5)), 

118 mask_schema=self.masked_image.mask.schema, 

119 variance=self.masked_image.variance, 

120 ) 

121 with self.assertRaises(ValueError): 

122 # no image unit but there is variance unit. 

123 MaskedImage( 

124 Image(42.0, shape=(5, 5)), 

125 mask_schema=self.masked_image.mask.schema, 

126 variance=Image(1.0, shape=(5, 5), unit=u.nJy), 

127 ) 

128 with self.assertRaises(ValueError): 

129 # image and variance units disagree. 

130 MaskedImage( 

131 Image(42.0, shape=(5, 5), unit=u.nJy), 

132 mask_schema=self.masked_image.mask.schema, 

133 variance=Image(1.0, shape=(5, 5), unit=u.nJy), 

134 ) 

135 

136 def test_subset(self) -> None: 

137 """Test assignment of subset.""" 

138 copy = self.masked_image.copy() 

139 subset = copy.local[0:10, 20:30].copy() 

140 subset.image[...] = Image(42.0, shape=(10, 10), unit=u.nJy) 

141 copy[subset.bbox] = subset 

142 self.assertEqual(copy.image.array[0, 20], 42.0) 

143 self.assertEqual(copy.image.array[0, 0], self.masked_image.image.array[0, 0]) 

144 

145 def test_fits_roundtrip(self) -> None: 

146 """Test that we can round-trip the MaskedImage through FITS, including 

147 subimage reads. 

148 """ 

149 subbox = Box.factory[11:20, 25:30] 

150 subslices = (slice(6, 15), slice(17, 22)) 

151 np.testing.assert_array_equal( 

152 self.masked_image.image.array[subslices], self.masked_image.image[subbox].array 

153 ) 

154 with RoundtripFits(self, self.masked_image, "MaskedImageV2") as roundtrip: 

155 subimage = roundtrip.get(bbox=subbox) 

156 # Check that we used lossless compression (the default). 

157 fits = roundtrip.inspect() 

158 self.assertEqual(fits[1].header["ZCMPTYPE"], "GZIP_2") 

159 self.assertEqual(fits[2].header["ZCMPTYPE"], "GZIP_2") 

160 self.assertEqual(fits[3].header["ZCMPTYPE"], "GZIP_2") 

161 assert_masked_images_equal(self, roundtrip.result, self.masked_image, expect_view=False) 

162 assert_masked_images_equal(self, subimage, roundtrip.result[subbox], expect_view=False) 

163 

164 def test_fits_roundtrip_lossy(self) -> None: 

165 """Test that we can round-trip the MaskedImage through FITS, including 

166 subimage reads, with lossy compression. 

167 """ 

168 subbox = Box.factory[11:20, 25:30] 

169 subslices = (slice(6, 15), slice(17, 22)) 

170 np.testing.assert_array_equal( 

171 self.masked_image.image.array[subslices], self.masked_image.image[subbox].array 

172 ) 

173 with tempfile.NamedTemporaryFile(suffix=".fits", delete_on_close=False, delete=True) as tmp: 

174 tmp.close() 

175 self.masked_image.write_fits( 

176 tmp.name, 

177 image_compression=FitsCompressionOptions.LOSSY, 

178 variance_compression=FitsCompressionOptions.LOSSY, 

179 compression_seed=50, 

180 ) 

181 roundtripped = MaskedImage.read_fits(tmp.name) 

182 subimage = MaskedImage.read_fits(tmp.name, bbox=subbox) 

183 with astropy.io.fits.open(tmp.name, disable_image_compression=True) as fits: 

184 self.assertEqual(fits[1].header["ZCMPTYPE"], "RICE_1") 

185 self.assertEqual(fits[2].header["ZCMPTYPE"], "GZIP_2") 

186 self.assertEqual(fits[3].header["ZCMPTYPE"], "RICE_1") 

187 assert_masked_images_equal(self, roundtripped, self.masked_image, expect_view=False, rtol=0.01) 

188 assert_masked_images_equal(self, subimage, roundtripped[subbox], expect_view=False) 

189 

190 @unittest.skipUnless(DATA_DIR is not None, "TESTDATA_IMAGES_DIR is not in the environment.") 

191 def test_legacy(self) -> None: 

192 """Test MaskedImage.read_legacy, MaskedImage.to_legacy, and 

193 MaskedImage.from_legacy. 

194 """ 

195 assert DATA_DIR is not None, "Guaranteed by decorator." 

196 filename = os.path.join(DATA_DIR, "dp2", "legacy", "visit_image.fits") 

197 plane_map = get_legacy_visit_image_mask_planes() 

198 masked_image = MaskedImage.read_legacy(filename, plane_map=plane_map) 

199 try: 

200 from lsst.afw.image import MaskedImageFitsReader 

201 except ImportError: 

202 raise unittest.SkipTest("'lsst.afw.image' could not be imported.") from None 

203 reader = MaskedImageFitsReader(filename) 

204 legacy_masked_image = reader.read() 

205 compare_masked_image_to_legacy( 

206 self, masked_image, legacy_masked_image, plane_map=plane_map, expect_view=False 

207 ) 

208 compare_masked_image_to_legacy( 

209 self, 

210 masked_image, 

211 masked_image.to_legacy(plane_map=plane_map), 

212 plane_map=plane_map, 

213 expect_view=True, 

214 ) 

215 compare_masked_image_to_legacy( 

216 self, 

217 MaskedImage.from_legacy(legacy_masked_image, plane_map=plane_map), 

218 legacy_masked_image, 

219 expect_view=True, 

220 plane_map=plane_map, 

221 ) 

222 

223 

224if __name__ == "__main__": 

225 unittest.main()