Coverage for tests / test_InputCount.py: 15%
155 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-18 08:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-18 08:48 +0000
1# This file is part of meas_base.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
22"""Tests for InputCounts measurement algorithm.
23"""
24import numpy as np
25import itertools
26from collections import namedtuple
28import unittest
29import lsst.utils.tests
31import lsst.geom
32import lsst.afw.detection as afwDetection
33import lsst.afw.geom as afwGeom
34import lsst.afw.image as afwImage
35import lsst.afw.table as afwTable
36import lsst.meas.base as measBase
39try:
40 display
41 import matplotlib.pyplot as plt
42 import matplotlib.patches as patches
43except NameError:
44 display = False
47def ccdVennDiagram(exp, showImage=True, legendLocation='best'):
48 """Display and exposure & bounding boxes for images which go into a coadd.
50 Parameters
51 ----------
52 exp : `lsst.afw.image.Exposure`
53 The exposure object to plot, must be the product of a coadd
54 showImage : `bool`, optional
55 Plot image data in addition to its bounding box
56 legendLocation : `str`, optional
57 Matplotlib legend location code. Can be: ``'best'``, ``'upper
58 right'``, ``'upper left'``, ``'lower left'``, ``'lower right'``,
59 ``'right'``, ``'center left'``, ``'center right'``, ``'lower
60 center'``, ``'upper center'``, ``'center'``
62 """
63 # Create the figure object
64 fig = plt.figure()
65 # Use all the built in matplotib line style attributes to create a list of
66 # the possible styles
67 linestyles = ['solid', 'dashed', 'dashdot', 'dotted']
68 colors = ['b', 'g', 'r', 'c', 'm', 'y', 'k']
69 # Calculate the cartisian product of the styles, and randomize the order,
70 # to help each CCD get it's own color
71 pcomb = np.random.permutation(list(itertools.product(colors, linestyles)))
72 # Filter out a black solid box, as that will be the style of the given exp
73 # object
74 pcomb = pcomb[((pcomb[:, 0] == 'k') * (pcomb[:, 1] == 'solid')) is False]
75 # Get the image properties
76 origin = lsst.geom.PointD(exp.getXY0())
77 mainBox = exp.getBBox().getCorners()
78 # Plot the exposure
79 plt.gca().add_patch(patches.Rectangle((0, 0), *list(mainBox[2]-mainBox[0]), fill=False, label="exposure"))
80 # Grab all of the CCDs that went into creating the exposure
81 ccds = exp.getInfo().getCoaddInputs().ccds
82 # Loop over and plot the extents of each ccd
83 for i, ccd in enumerate(ccds):
84 ccdBox = lsst.geom.Box2D(ccd.getBBox())
85 ccdCorners = ccdBox.getCorners()
86 coaddCorners = [exp.getWcs().skyToPixel(ccd.getWcs().pixelToSky(point))
87 + (lsst.geom.PointD() - origin) for point in ccdCorners]
88 plt.gca().add_patch(patches.Rectangle(coaddCorners[0], *list(coaddCorners[2]-coaddCorners[0]),
89 fill=False, color=pcomb[i][0], ls=pcomb[i][1],
90 label="CCD{}".format(i)))
91 # If showImage is true, plot the data contained in exp as well as the
92 # boundaries
93 if showImage:
94 plt.imshow(exp.image.array, cmap='Greys', origin='lower')
95 plt.colorbar()
96 # Adjust plot parameters and plot
97 plt.gca().relim()
98 plt.gca().autoscale_view()
99 ylim = plt.gca().get_ylim()
100 xlim = plt.gca().get_xlim()
101 plt.gca().set_ylim(1.5*ylim[0], 1.5*ylim[1])
102 plt.gca().set_xlim(1.5*xlim[0], 1.5*xlim[1])
103 plt.legend(loc=legendLocation)
104 fig.canvas.draw()
105 plt.show()
108class InputCountTest(lsst.utils.tests.TestCase):
110 @lsst.utils.tests.methodParameters(cellCoaddLike=[False, True])
111 def testInputCounts(self, cellCoaddLike):
112 # Generate a simulated coadd of four overlapping-but-offset CCDs.
113 # Populate it with three sources.
114 # Demonstrate that we can correctly recover the number of images which
115 # contribute to each source.
117 size = 20 # Size of images (pixels)
118 value = 100.0 # Source flux
120 ccdPositions = [
121 lsst.geom.Point2D(8, 0),
122 lsst.geom.Point2D(10, 10),
123 lsst.geom.Point2D(-8, -8),
124 lsst.geom.Point2D(-8, 8)
125 ]
127 # Represent sources by a tuple of position and expected number of
128 # contributing CCDs (based on the size/positions given above).
129 Source = namedtuple("Source", ["pos", "count"])
130 sources = [
131 Source(pos=lsst.geom.Point2D(6, 6), count=2),
132 Source(pos=lsst.geom.Point2D(10, 10), count=3),
133 Source(pos=lsst.geom.Point2D(14, 14), count=1)
134 ]
136 # These lines are used in the creation of WCS information
137 scale = 1.0e-5 * lsst.geom.degrees
138 cdMatrix = afwGeom.makeCdMatrix(scale=scale)
139 crval = lsst.geom.SpherePoint(0.0, 0.0, lsst.geom.degrees)
141 # Construct the info needed to set the exposure object
142 imageBox = lsst.geom.Box2I(lsst.geom.Point2I(0, 0), lsst.geom.Extent2I(size, size))
143 wcsRef = afwGeom.makeSkyWcs(crpix=lsst.geom.Point2D(0, 0), crval=crval, cdMatrix=cdMatrix)
145 # Create the exposure object, and set it up to be the output of a coadd
146 exp = afwImage.ExposureF(size, size)
147 exp.setWcs(wcsRef)
148 exp.getInfo().setCoaddInputs(afwImage.CoaddInputs(afwTable.ExposureTable.makeMinimalSchema(),
149 afwTable.ExposureTable.makeMinimalSchema()))
151 # Set the fake CCDs that "went into" making this coadd, using the
152 # differing wcs objects created above.
153 ccds = exp.getInfo().getCoaddInputs().ccds
154 if cellCoaddLike:
155 for pos in ccdPositions:
156 record = ccds.addNew()
157 wcs = afwGeom.makeSkyWcs(crpix=pos, crval=crval, cdMatrix=cdMatrix)
158 corners = lsst.geom.Box2D(imageBox).getCorners()
159 polygon = afwGeom.Polygon([wcsRef.skyToPixel(wcs.pixelToSky(corner)) for corner in corners])
160 record.validPolygon = polygon
161 else:
162 for pos in ccdPositions:
163 record = ccds.addNew()
164 record.setWcs(afwGeom.makeSkyWcs(crpix=pos, crval=crval, cdMatrix=cdMatrix))
165 record.setBBox(imageBox)
166 record.setValidPolygon(afwGeom.Polygon(lsst.geom.Box2D(imageBox)))
168 # Configure a SingleFrameMeasurementTask to run InputCounts.
169 measureSourcesConfig = measBase.SingleFrameMeasurementConfig()
170 measureSourcesConfig.plugins.names = ["base_PeakCentroid", "base_InputCount"]
171 measureSourcesConfig.slots.centroid = "base_PeakCentroid"
172 measureSourcesConfig.slots.psfFlux = None
173 measureSourcesConfig.slots.apFlux = None
174 measureSourcesConfig.slots.modelFlux = None
175 measureSourcesConfig.slots.gaussianFlux = None
176 measureSourcesConfig.slots.calibFlux = None
177 measureSourcesConfig.slots.shape = None
178 measureSourcesConfig.validate()
179 schema = afwTable.SourceTable.makeMinimalSchema()
180 task = measBase.SingleFrameMeasurementTask(schema, config=measureSourcesConfig)
181 catalog = afwTable.SourceCatalog(schema)
183 # Add simulated sources to the measurement catalog.
184 for src in sources:
185 spans = afwGeom.SpanSet.fromShape(1)
186 spans = spans.shiftedBy(int(src.pos.getX()), int(src.pos.getY()))
187 foot = afwDetection.Footprint(spans)
188 peak = foot.getPeaks().addNew()
189 peak.setFx(src.pos[0])
190 peak.setFy(src.pos[1])
191 peak.setPeakValue(value)
192 catalog.addNew().setFootprint(foot)
194 task.run(catalog, exp)
196 for src, rec in zip(sources, catalog):
197 self.assertEqual(rec.get("base_InputCount_value"), src.count)
199 if display:
200 ccdVennDiagram(exp)
202 def _preparePlugin(self, addCoaddInputs):
203 """Prepare a `SingleFrameInputCountPlugin` for running.
205 Sets up the plugin to run on an empty catalog together with a
206 synthetic, content-free `~lsst.afw.image.ExposureF`.
208 Parameters
209 ----------
210 addCoaddInputs : `bool`
211 Should we add the coadd inputs?
213 Returns
214 -------
215 inputCount : `SingleFrameInputCountPlugin`
216 Initialized measurement plugin.
217 catalog : `lsst.afw.table.SourceCatalog`
218 Empty Catalog.
219 exp : `lsst.afw.image.ExposureF`
220 Synthetic exposure.
221 """
222 exp = afwImage.ExposureF(20, 20)
223 scale = 1.0e-5*lsst.geom.degrees
224 wcs = afwGeom.makeSkyWcs(crpix=lsst.geom.Point2D(0, 0),
225 crval=lsst.geom.SpherePoint(0.0, 0.0, lsst.geom.degrees),
226 cdMatrix=afwGeom.makeCdMatrix(scale=scale))
227 exp.setWcs(wcs)
228 if addCoaddInputs:
229 exp.getInfo().setCoaddInputs(afwImage.CoaddInputs(afwTable.ExposureTable.makeMinimalSchema(),
230 afwTable.ExposureTable.makeMinimalSchema()))
231 ccds = exp.getInfo().getCoaddInputs().ccds
232 record = ccds.addNew()
233 record.setWcs(wcs)
234 record.setBBox(exp.getBBox())
235 record.setValidPolygon(afwGeom.Polygon(lsst.geom.Box2D(exp.getBBox())))
237 schema = afwTable.SourceTable.makeMinimalSchema()
238 measBase.SingleFramePeakCentroidPlugin(measBase.SingleFramePeakCentroidConfig(),
239 "centroid", schema, None)
240 schema.getAliasMap().set("slot_Centroid", "centroid")
241 inputCount = measBase.SingleFrameInputCountPlugin(measBase.InputCountConfig(),
242 "inputCount", schema, None)
243 catalog = afwTable.SourceCatalog(schema)
244 return inputCount, catalog, exp
246 def testBadCentroid(self):
247 """Test that a NaN centroid raises and results in a correct flag.
249 The flag from the centroid slot should propagate to the badCentroid
250 flag on InputCount and the algorithm should throw a MeasurementError
251 when it encounters a NaN position.
252 """
253 inputCount, catalog, exp = self._preparePlugin(True)
254 record = catalog.addNew()
256 # The inputCount's badCentroid flag is an alias to the centroid's
257 # global flag, so it should be set immediately.
258 record.set("centroid_flag", True)
259 self.assertTrue(record.get("inputCount_flag_badCentroid"))
261 # Even though the source is flagged as bad, if the position is good we
262 # should still get a measurement.
263 record.set("slot_Centroid_x", 10)
264 record.set("slot_Centroid_y", 10)
265 inputCount.measure(record, exp)
266 self.assertTrue(record.get("inputCount_flag_badCentroid"))
267 self.assertEqual(record.get("inputCount_value"), 1)
269 # The centroid is bad (with a NaN) even though the centroid isn't
270 # flagged, so we should get a MeasurementError indicating an expected
271 # failure.
272 record = catalog.addNew()
273 record.set("slot_Centroid_x", float("nan"))
274 record.set("slot_Centroid_y", 12.345)
275 record.set("centroid_flag", False)
276 with self.assertRaises(measBase.MeasurementError) as measErr:
277 inputCount.measure(record, exp)
279 # Calling the fail() method should set the global flag.
280 record = catalog.addNew()
281 record.set("inputCount_flag", False)
282 inputCount.fail(record, measErr.exception)
283 self.assertTrue(record.get("inputCount_flag"))
285 def testBadCoaddInputs(self):
286 """Test that no coadd inputs raises and results in a correct flag.
288 When there are no coadd inputs on the input exposure we should throw a
289 MeasurementError and set both the global flag and flag_noInputs.
290 """
291 inputCount, catalog, exp = self._preparePlugin(False)
292 record = catalog.addNew()
294 # Initially, the record is not flagged.
295 self.assertFalse(record.get("inputCount_flag"))
296 self.assertFalse(record.get("inputCount_flag_noInputs"))
298 # There are no coadd inputs, so we should get a MeasurementError
299 # indicating an expected failure.
300 with self.assertRaises(measBase.MeasurementError) as measErr:
301 inputCount.measure(record, exp)
303 # Calling the fail() method should set the noInputs and global flags.
304 inputCount.fail(record, measErr.exception)
305 self.assertTrue(record.get("inputCount_flag"))
306 self.assertTrue(record.get("inputCount_flag_noInputs"))
309class TestMemory(lsst.utils.tests.MemoryTestCase):
310 pass
313def setup_module(module):
314 lsst.utils.tests.init()
317if __name__ == "__main__": 317 ↛ 318line 317 didn't jump to line 318 because the condition on line 317 was never true
318 lsst.utils.tests.init()
319 unittest.main()