Coverage for tests / test_diffractionSpikeMask.py: 19%
125 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-21 10:40 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-21 10:40 +0000
1# This file is part of pipe_tasks.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
22import os
23import glob
24import unittest
26import numpy as np
28import lsst.afw.image as afwImage
29import lsst.geom as geom
30from lsst.meas.algorithms.testUtils import MockReferenceObjectLoaderFromFiles
31import lsst.meas.base.tests
32from lsst.pipe.tasks.diffractionSpikeMask import DiffractionSpikeMaskTask, DiffractionSpikeMaskConfig
33from lsst.pipe.tasks.colorterms import Colorterm, ColortermDict, ColortermLibrary
34import lsst.utils.tests
36from utils import makeTestVisitInfo
39TESTDIR = os.path.abspath(os.path.dirname(__file__))
40RefCatDir = os.path.join(TESTDIR, "data", "sdssrefcat")
42testColorterms = ColortermLibrary(data={
43 "test*": ColortermDict(data={
44 "test-g": Colorterm(primary="g", secondary="r", c0=0.00, c1=0.00),
45 "test-r": Colorterm(primary="r", secondary="i", c0=0.00, c1=0.00, c2=0.00),
46 "test-i": Colorterm(primary="i", secondary="z", c0=1.00, c1=0.00, c2=0.00),
47 "test-z": Colorterm(primary="z", secondary="i", c0=0.00, c1=0.00, c2=0.00),
48 })
49})
52class DiffractionSpikeMaskTest(lsst.utils.tests.TestCase):
54 def setUp(self):
56 # Load sample input from disk
57 testDir = os.path.dirname(__file__)
59 # The .xy.fits file has sources in the range ~ [0,2000],[0,4500]
60 # which is bigger than the exposure
61 self.bbox = geom.Box2I(geom.Point2I(0, 0), geom.Extent2I(2048, 4612))
63 smallExposure = afwImage.ExposureF(os.path.join(testDir, "data", "v695833-e0-c000-a00.sci.fits"))
64 self.exposure = afwImage.ExposureF(self.bbox)
65 self.exposure.setWcs(smallExposure.getWcs())
66 self.exposure.setFilter(afwImage.FilterLabel(band="i", physical="test-i"))
67 self.exposure.info.setVisitInfo(makeTestVisitInfo())
69 # Make a reference loader
70 filenames = sorted(glob.glob(os.path.join(RefCatDir, 'ref_cats', 'cal_ref_cat', '??????.fits')))
71 self.refObjLoader = MockReferenceObjectLoaderFromFiles(filenames, htmLevel=8)
73 def tearDown(self):
74 del self.exposure
75 del self.refObjLoader
77 def test_raiseWithoutLoader(self):
78 """The task should raise an error if no reference catalog loader is
79 configured.
80 """
81 config = DiffractionSpikeMaskConfig()
82 task = DiffractionSpikeMaskTask(config=config)
83 with self.assertRaises(RuntimeError):
84 task.run(self.exposure)
86 def test_loadAndMaskStars(self):
87 """Run the bright star mask with a selection of reference sources."""
89 config = DiffractionSpikeMaskConfig(magnitudeThreshold=16)
90 task = DiffractionSpikeMaskTask(self.refObjLoader, config=config)
91 exposure = self.exposure.clone()
92 # Set the saturated mask plane in half of the image
93 saturatedMaskBit = exposure.mask.getPlaneBitMask(config.saturatedMaskPlane)
94 bbox = exposure.getBBox()
95 bbox.grow(-lsst.geom.Extent2I(0, bbox.height//4))
96 exposure[bbox].mask.array |= saturatedMaskBit
97 brightCat = task.run(exposure=exposure)
98 self.assertGreater(len(brightCat), 0)
99 # Verify that the new mask plane has been added
100 spikeMaskBit = exposure.mask.getPlaneBitMask(config.spikeMask)
101 # The images should not be modified
102 self.assertImagesEqual(self.exposure.image, exposure.image)
103 self.assertImagesEqual(self.exposure.variance, exposure.variance)
104 # Ensure that the mask has changed
105 self.assertFloatsNotEqual(self.exposure.mask.array, exposure.mask.array)
107 # Check that the mask is set for the bright sources inside the image
108 # Note that the catalog will include bright sources *off* the image that
109 # have long enough diffraction spikes to overlap the edge of the image
110 xvals, yvals = exposure.wcs.skyToPixelArray(brightCat[config.raKey], brightCat[config.decKey])
111 bbox = exposure.getBBox()
112 # Shrink the bounding box so that the bbox.contains check below is
113 # sufficient to avoid errors.
114 bbox.grow(-2)
115 points = [geom.Point2D(xv, yv) for xv, yv in zip(xvals, yvals)]
116 inside = 0
117 outside = 0
118 for pt in points:
119 if bbox.contains(int(pt.getX()), int(pt.getY())):
120 ptBox = geom.Box2I.makeCenteredBox(pt, geom.Extent2I(3, 3))
121 maskSet = exposure[ptBox].mask.array & spikeMaskBit > 0
122 self.assertTrue(np.all(maskSet))
123 inside += 1
124 else:
125 outside += 1
126 self.assertGreater(inside, 0)
127 self.assertGreater(outside, 0)
129 def test_noBrightStars(self):
130 """Run the bright star mask with no bright stars."""
132 # Set a very high magnitude limit so that no stars are selected
133 config = DiffractionSpikeMaskConfig(magnitudeThreshold=0)
134 task = DiffractionSpikeMaskTask(self.refObjLoader, config=config)
135 exposure = self.exposure.clone()
136 brightCat = task.run(exposure=exposure)
137 self.assertEqual(len(brightCat), 0)
138 # Verify that the new mask plane has been added
139 exposure.mask.getPlaneBitMask(config.spikeMask)
140 # The images should not be modified
141 self.assertImagesEqual(self.exposure.image, exposure.image)
143 def test_maskSources(self):
144 """Verify that sources on and off the image are masked correctly."""
145 task = DiffractionSpikeMaskTask(self.refObjLoader, config=DiffractionSpikeMaskConfig())
146 task.set_diffraction_angle(self.exposure)
147 self.exposure.mask.addMaskPlane(task.config.spikeMask)
149 xSize, ySize = self.bbox.getDimensions()
150 x0, y0 = self.bbox.getBegin()
151 x1, y1 = self.bbox.getEnd()
153 nBright = 50
154 rng = np.random.RandomState(3)
155 xLoc = np.arange(x0 - xSize/4, x1 + xSize/4)
156 rng.shuffle(xLoc)
157 xLoc = xLoc[:nBright]
158 yLoc = np.arange(y0 - ySize/4, y1 + ySize/4)
159 rng.shuffle(yLoc)
160 yLoc = yLoc[:nBright]
161 spikeRadii = np.arange(10, 200)
162 rng.shuffle(spikeRadii)
163 spikeRadii = spikeRadii[:nBright]
164 saturatedBox = geom.Box2I(self.bbox.getBegin(), geom.Extent2I(xSize, ySize//2))
165 baseMask = self.exposure.mask.clone()
166 baseMask[saturatedBox].array |= baseMask.getPlaneBitMask(task.config.saturatedMaskPlane)
167 # There are four classes of sources:
168 # 1. Bright sources on the image with saturated cores - masked
169 # 2. Sources on the image without saturated cores - not masked
170 # 3. Bright sources off the image with predicted diffraction spikes that
171 # overlap the image - masked
172 # 4. Bright sources off the image that are far away enough that any
173 # diffraction spikes would not overlap the image - not masked
174 nClass1 = 0
175 nClass2 = 0
176 nClass3 = 0
177 nClass4 = 0
178 selectedSources = task.selectSources(xLoc, yLoc, spikeRadii, baseMask)
179 for x, y, r, selected in zip(xLoc, yLoc, spikeRadii, selectedSources):
180 mask = baseMask.clone()
181 isInImage = self.bbox.contains(geom.Point2I(x, y))
182 # Bright sources on the image with saturated cores.
183 if isInImage and selected:
184 nClass1 += 1
185 task.maskSources([x], [y], [r], mask)
186 self.assertGreater(np.sum(mask.array & mask.getPlaneBitMask(task.config.spikeMask) > 0), 0)
188 # Bright sources on the image without saturated cores.
189 if isInImage and not selected:
190 nClass2 += 1
191 # Do *not* run task.maskSources in this case, since these are
192 # skipped.
194 # Bright sources off the image that we predict should overlap the
195 # image, should set the SPIKE mask for some pixels.
196 if not isInImage and selected:
197 nClass3 += 1
198 task.maskSources([x], [y], [r], mask)
199 self.assertGreater(np.sum(mask.array & mask.getPlaneBitMask(task.config.spikeMask) > 0), 0)
201 # Sources off the image that are skipped in source selection should
202 # not change the mask even if we do calculate their SPIKE mask.
203 if not isInImage and not selected:
204 nClass4 += 1
205 task.maskSources([x], [y], [r], mask)
206 self.assertMasksEqual(mask, baseMask)
207 # Verify that the test points were sufficient to exercise all classes.
208 self.assertGreater(nClass1, 0)
209 self.assertGreater(nClass2, 0)
210 self.assertGreater(nClass3, 0)
211 self.assertGreater(nClass4, 0)
214class MemoryTestCase(lsst.utils.tests.MemoryTestCase):
215 pass
218def setup_module(module):
219 lsst.utils.tests.init()
222if __name__ == "__main__": 222 ↛ 223line 222 didn't jump to line 223 because the condition on line 222 was never true
223 lsst.utils.tests.init()
224 unittest.main()