Coverage for tests / test_InputCount.py: 15%

155 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-30 08:55 +0000

1# This file is part of meas_base. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21 

22"""Tests for InputCounts measurement algorithm. 

23""" 

24import numpy as np 

25import itertools 

26from collections import namedtuple 

27 

28import unittest 

29import lsst.utils.tests 

30 

31import lsst.geom 

32import lsst.afw.detection as afwDetection 

33import lsst.afw.geom as afwGeom 

34import lsst.afw.image as afwImage 

35import lsst.afw.table as afwTable 

36import lsst.meas.base as measBase 

37 

38 

39try: 

40 display 

41 import matplotlib.pyplot as plt 

42 import matplotlib.patches as patches 

43except NameError: 

44 display = False 

45 

46 

47def ccdVennDiagram(exp, showImage=True, legendLocation='best'): 

48 """Display and exposure & bounding boxes for images which go into a coadd. 

49 

50 Parameters 

51 ---------- 

52 exp : `lsst.afw.image.Exposure` 

53 The exposure object to plot, must be the product of a coadd 

54 showImage : `bool`, optional 

55 Plot image data in addition to its bounding box 

56 legendLocation : `str`, optional 

57 Matplotlib legend location code. Can be: ``'best'``, ``'upper 

58 right'``, ``'upper left'``, ``'lower left'``, ``'lower right'``, 

59 ``'right'``, ``'center left'``, ``'center right'``, ``'lower 

60 center'``, ``'upper center'``, ``'center'`` 

61 

62 """ 

63 # Create the figure object 

64 fig = plt.figure() 

65 # Use all the built in matplotib line style attributes to create a list of 

66 # the possible styles 

67 linestyles = ['solid', 'dashed', 'dashdot', 'dotted'] 

68 colors = ['b', 'g', 'r', 'c', 'm', 'y', 'k'] 

69 # Calculate the cartisian product of the styles, and randomize the order, 

70 # to help each CCD get it's own color 

71 pcomb = np.random.permutation(list(itertools.product(colors, linestyles))) 

72 # Filter out a black solid box, as that will be the style of the given exp 

73 # object 

74 pcomb = pcomb[((pcomb[:, 0] == 'k') * (pcomb[:, 1] == 'solid')) is False] 

75 # Get the image properties 

76 origin = lsst.geom.PointD(exp.getXY0()) 

77 mainBox = exp.getBBox().getCorners() 

78 # Plot the exposure 

79 plt.gca().add_patch(patches.Rectangle((0, 0), *list(mainBox[2]-mainBox[0]), fill=False, label="exposure")) 

80 # Grab all of the CCDs that went into creating the exposure 

81 ccds = exp.getInfo().getCoaddInputs().ccds 

82 # Loop over and plot the extents of each ccd 

83 for i, ccd in enumerate(ccds): 

84 ccdBox = lsst.geom.Box2D(ccd.getBBox()) 

85 ccdCorners = ccdBox.getCorners() 

86 coaddCorners = [exp.getWcs().skyToPixel(ccd.getWcs().pixelToSky(point)) 

87 + (lsst.geom.PointD() - origin) for point in ccdCorners] 

88 plt.gca().add_patch(patches.Rectangle(coaddCorners[0], *list(coaddCorners[2]-coaddCorners[0]), 

89 fill=False, color=pcomb[i][0], ls=pcomb[i][1], 

90 label="CCD{}".format(i))) 

91 # If showImage is true, plot the data contained in exp as well as the 

92 # boundaries 

93 if showImage: 

94 plt.imshow(exp.image.array, cmap='Greys', origin='lower') 

95 plt.colorbar() 

96 # Adjust plot parameters and plot 

97 plt.gca().relim() 

98 plt.gca().autoscale_view() 

99 ylim = plt.gca().get_ylim() 

100 xlim = plt.gca().get_xlim() 

101 plt.gca().set_ylim(1.5*ylim[0], 1.5*ylim[1]) 

102 plt.gca().set_xlim(1.5*xlim[0], 1.5*xlim[1]) 

103 plt.legend(loc=legendLocation) 

104 fig.canvas.draw() 

105 plt.show() 

106 

107 

108class InputCountTest(lsst.utils.tests.TestCase): 

109 

110 @lsst.utils.tests.methodParameters(cellCoaddLike=[False, True]) 

111 def testInputCounts(self, cellCoaddLike): 

112 # Generate a simulated coadd of four overlapping-but-offset CCDs. 

113 # Populate it with three sources. 

114 # Demonstrate that we can correctly recover the number of images which 

115 # contribute to each source. 

116 

117 size = 20 # Size of images (pixels) 

118 value = 100.0 # Source flux 

119 

120 ccdPositions = [ 

121 lsst.geom.Point2D(8, 0), 

122 lsst.geom.Point2D(10, 10), 

123 lsst.geom.Point2D(-8, -8), 

124 lsst.geom.Point2D(-8, 8) 

125 ] 

126 

127 # Represent sources by a tuple of position and expected number of 

128 # contributing CCDs (based on the size/positions given above). 

129 Source = namedtuple("Source", ["pos", "count"]) 

130 sources = [ 

131 Source(pos=lsst.geom.Point2D(6, 6), count=2), 

132 Source(pos=lsst.geom.Point2D(10, 10), count=3), 

133 Source(pos=lsst.geom.Point2D(14, 14), count=1) 

134 ] 

135 

136 # These lines are used in the creation of WCS information 

137 scale = 1.0e-5 * lsst.geom.degrees 

138 cdMatrix = afwGeom.makeCdMatrix(scale=scale) 

139 crval = lsst.geom.SpherePoint(0.0, 0.0, lsst.geom.degrees) 

140 

141 # Construct the info needed to set the exposure object 

142 imageBox = lsst.geom.Box2I(lsst.geom.Point2I(0, 0), lsst.geom.Extent2I(size, size)) 

143 wcsRef = afwGeom.makeSkyWcs(crpix=lsst.geom.Point2D(0, 0), crval=crval, cdMatrix=cdMatrix) 

144 

145 # Create the exposure object, and set it up to be the output of a coadd 

146 exp = afwImage.ExposureF(size, size) 

147 exp.setWcs(wcsRef) 

148 exp.getInfo().setCoaddInputs(afwImage.CoaddInputs(afwTable.ExposureTable.makeMinimalSchema(), 

149 afwTable.ExposureTable.makeMinimalSchema())) 

150 

151 # Set the fake CCDs that "went into" making this coadd, using the 

152 # differing wcs objects created above. 

153 ccds = exp.getInfo().getCoaddInputs().ccds 

154 if cellCoaddLike: 

155 for pos in ccdPositions: 

156 record = ccds.addNew() 

157 wcs = afwGeom.makeSkyWcs(crpix=pos, crval=crval, cdMatrix=cdMatrix) 

158 corners = lsst.geom.Box2D(imageBox).getCorners() 

159 polygon = afwGeom.Polygon([wcsRef.skyToPixel(wcs.pixelToSky(corner)) for corner in corners]) 

160 record.validPolygon = polygon 

161 else: 

162 for pos in ccdPositions: 

163 record = ccds.addNew() 

164 record.setWcs(afwGeom.makeSkyWcs(crpix=pos, crval=crval, cdMatrix=cdMatrix)) 

165 record.setBBox(imageBox) 

166 record.setValidPolygon(afwGeom.Polygon(lsst.geom.Box2D(imageBox))) 

167 

168 # Configure a SingleFrameMeasurementTask to run InputCounts. 

169 measureSourcesConfig = measBase.SingleFrameMeasurementConfig() 

170 measureSourcesConfig.plugins.names = ["base_PeakCentroid", "base_InputCount"] 

171 measureSourcesConfig.slots.centroid = "base_PeakCentroid" 

172 measureSourcesConfig.slots.psfFlux = None 

173 measureSourcesConfig.slots.apFlux = None 

174 measureSourcesConfig.slots.modelFlux = None 

175 measureSourcesConfig.slots.gaussianFlux = None 

176 measureSourcesConfig.slots.calibFlux = None 

177 measureSourcesConfig.slots.shape = None 

178 measureSourcesConfig.validate() 

179 schema = afwTable.SourceTable.makeMinimalSchema() 

180 task = measBase.SingleFrameMeasurementTask(schema, config=measureSourcesConfig) 

181 catalog = afwTable.SourceCatalog(schema) 

182 

183 # Add simulated sources to the measurement catalog. 

184 for src in sources: 

185 spans = afwGeom.SpanSet.fromShape(1) 

186 spans = spans.shiftedBy(int(src.pos.getX()), int(src.pos.getY())) 

187 foot = afwDetection.Footprint(spans) 

188 peak = foot.getPeaks().addNew() 

189 peak.setFx(src.pos[0]) 

190 peak.setFy(src.pos[1]) 

191 peak.setPeakValue(value) 

192 catalog.addNew().setFootprint(foot) 

193 

194 task.run(catalog, exp) 

195 

196 for src, rec in zip(sources, catalog): 

197 self.assertEqual(rec.get("base_InputCount_value"), src.count) 

198 

199 if display: 

200 ccdVennDiagram(exp) 

201 

202 def _preparePlugin(self, addCoaddInputs): 

203 """Prepare a `SingleFrameInputCountPlugin` for running. 

204 

205 Sets up the plugin to run on an empty catalog together with a 

206 synthetic, content-free `~lsst.afw.image.ExposureF`. 

207 

208 Parameters 

209 ---------- 

210 addCoaddInputs : `bool` 

211 Should we add the coadd inputs? 

212 

213 Returns 

214 ------- 

215 inputCount : `SingleFrameInputCountPlugin` 

216 Initialized measurement plugin. 

217 catalog : `lsst.afw.table.SourceCatalog` 

218 Empty Catalog. 

219 exp : `lsst.afw.image.ExposureF` 

220 Synthetic exposure. 

221 """ 

222 exp = afwImage.ExposureF(20, 20) 

223 scale = 1.0e-5*lsst.geom.degrees 

224 wcs = afwGeom.makeSkyWcs(crpix=lsst.geom.Point2D(0, 0), 

225 crval=lsst.geom.SpherePoint(0.0, 0.0, lsst.geom.degrees), 

226 cdMatrix=afwGeom.makeCdMatrix(scale=scale)) 

227 exp.setWcs(wcs) 

228 if addCoaddInputs: 

229 exp.getInfo().setCoaddInputs(afwImage.CoaddInputs(afwTable.ExposureTable.makeMinimalSchema(), 

230 afwTable.ExposureTable.makeMinimalSchema())) 

231 ccds = exp.getInfo().getCoaddInputs().ccds 

232 record = ccds.addNew() 

233 record.setWcs(wcs) 

234 record.setBBox(exp.getBBox()) 

235 record.setValidPolygon(afwGeom.Polygon(lsst.geom.Box2D(exp.getBBox()))) 

236 

237 schema = afwTable.SourceTable.makeMinimalSchema() 

238 measBase.SingleFramePeakCentroidPlugin(measBase.SingleFramePeakCentroidConfig(), 

239 "centroid", schema, None) 

240 schema.getAliasMap().set("slot_Centroid", "centroid") 

241 inputCount = measBase.SingleFrameInputCountPlugin(measBase.InputCountConfig(), 

242 "inputCount", schema, None) 

243 catalog = afwTable.SourceCatalog(schema) 

244 return inputCount, catalog, exp 

245 

246 def testBadCentroid(self): 

247 """Test that a NaN centroid raises and results in a correct flag. 

248 

249 The flag from the centroid slot should propagate to the badCentroid 

250 flag on InputCount and the algorithm should throw a MeasurementError 

251 when it encounters a NaN position. 

252 """ 

253 inputCount, catalog, exp = self._preparePlugin(True) 

254 record = catalog.addNew() 

255 

256 # The inputCount's badCentroid flag is an alias to the centroid's 

257 # global flag, so it should be set immediately. 

258 record.set("centroid_flag", True) 

259 self.assertTrue(record.get("inputCount_flag_badCentroid")) 

260 

261 # Even though the source is flagged as bad, if the position is good we 

262 # should still get a measurement. 

263 record.set("slot_Centroid_x", 10) 

264 record.set("slot_Centroid_y", 10) 

265 inputCount.measure(record, exp) 

266 self.assertTrue(record.get("inputCount_flag_badCentroid")) 

267 self.assertEqual(record.get("inputCount_value"), 1) 

268 

269 # The centroid is bad (with a NaN) even though the centroid isn't 

270 # flagged, so we should get a MeasurementError indicating an expected 

271 # failure. 

272 record = catalog.addNew() 

273 record.set("slot_Centroid_x", float("nan")) 

274 record.set("slot_Centroid_y", 12.345) 

275 record.set("centroid_flag", False) 

276 with self.assertRaises(measBase.MeasurementError) as measErr: 

277 inputCount.measure(record, exp) 

278 

279 # Calling the fail() method should set the global flag. 

280 record = catalog.addNew() 

281 record.set("inputCount_flag", False) 

282 inputCount.fail(record, measErr.exception) 

283 self.assertTrue(record.get("inputCount_flag")) 

284 

285 def testBadCoaddInputs(self): 

286 """Test that no coadd inputs raises and results in a correct flag. 

287 

288 When there are no coadd inputs on the input exposure we should throw a 

289 MeasurementError and set both the global flag and flag_noInputs. 

290 """ 

291 inputCount, catalog, exp = self._preparePlugin(False) 

292 record = catalog.addNew() 

293 

294 # Initially, the record is not flagged. 

295 self.assertFalse(record.get("inputCount_flag")) 

296 self.assertFalse(record.get("inputCount_flag_noInputs")) 

297 

298 # There are no coadd inputs, so we should get a MeasurementError 

299 # indicating an expected failure. 

300 with self.assertRaises(measBase.MeasurementError) as measErr: 

301 inputCount.measure(record, exp) 

302 

303 # Calling the fail() method should set the noInputs and global flags. 

304 inputCount.fail(record, measErr.exception) 

305 self.assertTrue(record.get("inputCount_flag")) 

306 self.assertTrue(record.get("inputCount_flag_noInputs")) 

307 

308 

309class TestMemory(lsst.utils.tests.MemoryTestCase): 

310 pass 

311 

312 

313def setup_module(module): 

314 lsst.utils.tests.init() 

315 

316 

317if __name__ == "__main__": 317 ↛ 318line 317 didn't jump to line 318 because the condition on line 317 was never true

318 lsst.utils.tests.init() 

319 unittest.main()