Coverage for tests / test_diffractionSpikeMask.py: 19%

125 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-24 08:38 +0000

1# This file is part of pipe_tasks. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21 

22import os 

23import glob 

24import unittest 

25 

26import numpy as np 

27 

28import lsst.afw.image as afwImage 

29import lsst.geom as geom 

30from lsst.meas.algorithms.testUtils import MockReferenceObjectLoaderFromFiles 

31import lsst.meas.base.tests 

32from lsst.pipe.tasks.diffractionSpikeMask import DiffractionSpikeMaskTask, DiffractionSpikeMaskConfig 

33from lsst.pipe.tasks.colorterms import Colorterm, ColortermDict, ColortermLibrary 

34import lsst.utils.tests 

35 

36from utils import makeTestVisitInfo 

37 

38 

39TESTDIR = os.path.abspath(os.path.dirname(__file__)) 

40RefCatDir = os.path.join(TESTDIR, "data", "sdssrefcat") 

41 

42testColorterms = ColortermLibrary(data={ 

43 "test*": ColortermDict(data={ 

44 "test-g": Colorterm(primary="g", secondary="r", c0=0.00, c1=0.00), 

45 "test-r": Colorterm(primary="r", secondary="i", c0=0.00, c1=0.00, c2=0.00), 

46 "test-i": Colorterm(primary="i", secondary="z", c0=1.00, c1=0.00, c2=0.00), 

47 "test-z": Colorterm(primary="z", secondary="i", c0=0.00, c1=0.00, c2=0.00), 

48 }) 

49}) 

50 

51 

52class DiffractionSpikeMaskTest(lsst.utils.tests.TestCase): 

53 

54 def setUp(self): 

55 

56 # Load sample input from disk 

57 testDir = os.path.dirname(__file__) 

58 

59 # The .xy.fits file has sources in the range ~ [0,2000],[0,4500] 

60 # which is bigger than the exposure 

61 self.bbox = geom.Box2I(geom.Point2I(0, 0), geom.Extent2I(2048, 4612)) 

62 

63 smallExposure = afwImage.ExposureF(os.path.join(testDir, "data", "v695833-e0-c000-a00.sci.fits")) 

64 self.exposure = afwImage.ExposureF(self.bbox) 

65 self.exposure.setWcs(smallExposure.getWcs()) 

66 self.exposure.setFilter(afwImage.FilterLabel(band="i", physical="test-i")) 

67 self.exposure.info.setVisitInfo(makeTestVisitInfo()) 

68 

69 # Make a reference loader 

70 filenames = sorted(glob.glob(os.path.join(RefCatDir, 'ref_cats', 'cal_ref_cat', '??????.fits'))) 

71 self.refObjLoader = MockReferenceObjectLoaderFromFiles(filenames, htmLevel=8) 

72 

73 def tearDown(self): 

74 del self.exposure 

75 del self.refObjLoader 

76 

77 def test_raiseWithoutLoader(self): 

78 """The task should raise an error if no reference catalog loader is 

79 configured. 

80 """ 

81 config = DiffractionSpikeMaskConfig() 

82 task = DiffractionSpikeMaskTask(config=config) 

83 with self.assertRaises(RuntimeError): 

84 task.run(self.exposure) 

85 

86 def test_loadAndMaskStars(self): 

87 """Run the bright star mask with a selection of reference sources.""" 

88 

89 config = DiffractionSpikeMaskConfig(magnitudeThreshold=16) 

90 task = DiffractionSpikeMaskTask(self.refObjLoader, config=config) 

91 exposure = self.exposure.clone() 

92 # Set the saturated mask plane in half of the image 

93 saturatedMaskBit = exposure.mask.getPlaneBitMask(config.saturatedMaskPlane) 

94 bbox = exposure.getBBox() 

95 bbox.grow(-lsst.geom.Extent2I(0, bbox.height//4)) 

96 exposure[bbox].mask.array |= saturatedMaskBit 

97 brightCat = task.run(exposure=exposure) 

98 self.assertGreater(len(brightCat), 0) 

99 # Verify that the new mask plane has been added 

100 spikeMaskBit = exposure.mask.getPlaneBitMask(config.spikeMask) 

101 # The images should not be modified 

102 self.assertImagesEqual(self.exposure.image, exposure.image) 

103 self.assertImagesEqual(self.exposure.variance, exposure.variance) 

104 # Ensure that the mask has changed 

105 self.assertFloatsNotEqual(self.exposure.mask.array, exposure.mask.array) 

106 

107 # Check that the mask is set for the bright sources inside the image 

108 # Note that the catalog will include bright sources *off* the image that 

109 # have long enough diffraction spikes to overlap the edge of the image 

110 xvals, yvals = exposure.wcs.skyToPixelArray(brightCat[config.raKey], brightCat[config.decKey]) 

111 bbox = exposure.getBBox() 

112 # Shrink the bounding box so that the bbox.contains check below is 

113 # sufficient to avoid errors. 

114 bbox.grow(-2) 

115 points = [geom.Point2D(xv, yv) for xv, yv in zip(xvals, yvals)] 

116 inside = 0 

117 outside = 0 

118 for pt in points: 

119 if bbox.contains(int(pt.getX()), int(pt.getY())): 

120 ptBox = geom.Box2I.makeCenteredBox(pt, geom.Extent2I(3, 3)) 

121 maskSet = exposure[ptBox].mask.array & spikeMaskBit > 0 

122 self.assertTrue(np.all(maskSet)) 

123 inside += 1 

124 else: 

125 outside += 1 

126 self.assertGreater(inside, 0) 

127 self.assertGreater(outside, 0) 

128 

129 def test_noBrightStars(self): 

130 """Run the bright star mask with no bright stars.""" 

131 

132 # Set a very high magnitude limit so that no stars are selected 

133 config = DiffractionSpikeMaskConfig(magnitudeThreshold=0) 

134 task = DiffractionSpikeMaskTask(self.refObjLoader, config=config) 

135 exposure = self.exposure.clone() 

136 brightCat = task.run(exposure=exposure) 

137 self.assertEqual(len(brightCat), 0) 

138 # Verify that the new mask plane has been added 

139 exposure.mask.getPlaneBitMask(config.spikeMask) 

140 # The images should not be modified 

141 self.assertImagesEqual(self.exposure.image, exposure.image) 

142 

143 def test_maskSources(self): 

144 """Verify that sources on and off the image are masked correctly.""" 

145 task = DiffractionSpikeMaskTask(self.refObjLoader, config=DiffractionSpikeMaskConfig()) 

146 task.set_diffraction_angle(self.exposure) 

147 self.exposure.mask.addMaskPlane(task.config.spikeMask) 

148 

149 xSize, ySize = self.bbox.getDimensions() 

150 x0, y0 = self.bbox.getBegin() 

151 x1, y1 = self.bbox.getEnd() 

152 

153 nBright = 50 

154 rng = np.random.RandomState(3) 

155 xLoc = np.arange(x0 - xSize/4, x1 + xSize/4) 

156 rng.shuffle(xLoc) 

157 xLoc = xLoc[:nBright] 

158 yLoc = np.arange(y0 - ySize/4, y1 + ySize/4) 

159 rng.shuffle(yLoc) 

160 yLoc = yLoc[:nBright] 

161 spikeRadii = np.arange(10, 200) 

162 rng.shuffle(spikeRadii) 

163 spikeRadii = spikeRadii[:nBright] 

164 saturatedBox = geom.Box2I(self.bbox.getBegin(), geom.Extent2I(xSize, ySize//2)) 

165 baseMask = self.exposure.mask.clone() 

166 baseMask[saturatedBox].array |= baseMask.getPlaneBitMask(task.config.saturatedMaskPlane) 

167 # There are four classes of sources: 

168 # 1. Bright sources on the image with saturated cores - masked 

169 # 2. Sources on the image without saturated cores - not masked 

170 # 3. Bright sources off the image with predicted diffraction spikes that 

171 # overlap the image - masked 

172 # 4. Bright sources off the image that are far away enough that any 

173 # diffraction spikes would not overlap the image - not masked 

174 nClass1 = 0 

175 nClass2 = 0 

176 nClass3 = 0 

177 nClass4 = 0 

178 selectedSources = task.selectSources(xLoc, yLoc, spikeRadii, baseMask) 

179 for x, y, r, selected in zip(xLoc, yLoc, spikeRadii, selectedSources): 

180 mask = baseMask.clone() 

181 isInImage = self.bbox.contains(geom.Point2I(x, y)) 

182 # Bright sources on the image with saturated cores. 

183 if isInImage and selected: 

184 nClass1 += 1 

185 task.maskSources([x], [y], [r], mask) 

186 self.assertGreater(np.sum(mask.array & mask.getPlaneBitMask(task.config.spikeMask) > 0), 0) 

187 

188 # Bright sources on the image without saturated cores. 

189 if isInImage and not selected: 

190 nClass2 += 1 

191 # Do *not* run task.maskSources in this case, since these are 

192 # skipped. 

193 

194 # Bright sources off the image that we predict should overlap the 

195 # image, should set the SPIKE mask for some pixels. 

196 if not isInImage and selected: 

197 nClass3 += 1 

198 task.maskSources([x], [y], [r], mask) 

199 self.assertGreater(np.sum(mask.array & mask.getPlaneBitMask(task.config.spikeMask) > 0), 0) 

200 

201 # Sources off the image that are skipped in source selection should 

202 # not change the mask even if we do calculate their SPIKE mask. 

203 if not isInImage and not selected: 

204 nClass4 += 1 

205 task.maskSources([x], [y], [r], mask) 

206 self.assertMasksEqual(mask, baseMask) 

207 # Verify that the test points were sufficient to exercise all classes. 

208 self.assertGreater(nClass1, 0) 

209 self.assertGreater(nClass2, 0) 

210 self.assertGreater(nClass3, 0) 

211 self.assertGreater(nClass4, 0) 

212 

213 

214class MemoryTestCase(lsst.utils.tests.MemoryTestCase): 

215 pass 

216 

217 

218def setup_module(module): 

219 lsst.utils.tests.init() 

220 

221 

222if __name__ == "__main__": 222 ↛ 223line 222 didn't jump to line 223 because the condition on line 222 was never true

223 lsst.utils.tests.init() 

224 unittest.main()