Coverage for tests / test_makeSurveyPropertyMaps.py: 17%

172 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-05 18:38 +0000

1# This file is part of pipe_tasks. 

2# 

3# LSST Data Management System 

4# This product includes software developed by the 

5# LSST Project (http://www.lsst.org/). 

6# See COPYRIGHT file at the top of the source tree. 

7# 

8# This program is free software: you can redistribute it and/or modify 

9# it under the terms of the GNU General Public License as published by 

10# the Free Software Foundation, either version 3 of the License, or 

11# (at your option) any later version. 

12# 

13# This program is distributed in the hope that it will be useful, 

14# but WITHOUT ANY WARRANTY; without even the implied warranty of 

15# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

16# GNU General Public License for more details. 

17# 

18# You should have received a copy of the LSST License Statement and 

19# the GNU General Public License along with this program. If not, 

20# see <https://www.lsstcorp.org/LegalNotices/>. 

21# 

22"""Test HealSparsePropertyMapTask. 

23""" 

24import unittest 

25import numpy as np 

26import healsparse as hsp 

27import esutil 

28import warnings 

29import logging 

30from astropy import units 

31 

32import lsst.utils.tests 

33import lsst.daf.butler 

34import lsst.afw.table as afwTable 

35import lsst.afw.geom as afwGeom 

36import lsst.afw.image as afwImage 

37from lsst.skymap.discreteSkyMap import DiscreteSkyMap 

38import lsst.geom as geom 

39 

40from lsst.pipe.base import InMemoryDatasetHandle 

41from lsst.pipe.tasks.healSparseMapping import HealSparsePropertyMapTask 

42from lsst.pipe.tasks.healSparseMappingProperties import (register_property_map, 

43 BasePropertyMap) 

44 

45from surveyPropertyMapsTestUtils import makeMockVisitSummary 

46 

47 

48# Test creation of an arbitrary new property map by registering it here 

49# and using it in the test class. 

50@register_property_map("dist_times_psfarea") 

51class DistTimesPsfAreaPropertyMap(BasePropertyMap): 

52 """Property map to compute the distance from the boresight center 

53 by the psf area. Do not try this at home.""" 

54 description = "Distance times psf area" 

55 unit = "deg*pixel**2" 

56 requires_psf = True 

57 

58 def _compute(self, row, ra, dec, scalings, psf_array=None): 

59 boresight = row.getVisitInfo().getBoresightRaDec() 

60 dist = esutil.coords.sphdist(ra, dec, 

61 boresight.getRa().asDegrees(), boresight.getDec().asDegrees()) 

62 return np.deg2rad(dist)*psf_array['psf_area'] 

63 

64 

65class HealSparsePropertyMapTaskTestCase(lsst.utils.tests.TestCase): 

66 """Test of HealSparsePropertyMapTask. 

67 

68 These tests bypass the middleware used for accessing data and 

69 managing Task execution. 

70 """ 

71 def setUp(self): 

72 tract = 0 

73 band = 'r' 

74 patch = 0 

75 visits = [100, 101] 

76 # Good to test crossing 0. 

77 ra_center = 0.0 

78 dec_center = -45.0 

79 pixel_scale = 0.2 

80 coadd_zp = 27.0 

81 

82 # Generate a mock skymap with one patch 

83 config = DiscreteSkyMap.ConfigClass() 

84 config.raList = [ra_center] 

85 config.decList = [dec_center] 

86 config.radiusList = [150*pixel_scale/3600.] 

87 config.patchInnerDimensions = (350, 350) 

88 config.patchBorder = 50 

89 config.tractOverlap = 0.0 

90 config.pixelScale = pixel_scale 

91 sky_map = DiscreteSkyMap(config) 

92 

93 visit_summaries = [makeMockVisitSummary(visit, 

94 ra_center=ra_center, 

95 dec_center=dec_center) 

96 for visit in visits] 

97 visit_summary_refs = [InMemoryDatasetHandle(visit_summary, visit=visit) 

98 for visit_summary, visit in zip(visit_summaries, visits)] 

99 self.visit_summary_dict = {visit: ref.get() 

100 for ref, visit in zip(visit_summary_refs, visits)} 

101 

102 # Generate an input map. Note that this does not need to be consistent 

103 # with the visit_summary projections, we're just tracking values. 

104 with warnings.catch_warnings(): 

105 # Healsparse will emit a warning if nside coverage is greater than 

106 # 128. In the case of generating patch input maps, and not global 

107 # maps, high nside coverage works fine, so we can suppress this 

108 # warning. 

109 warnings.simplefilter("ignore") 

110 input_map = hsp.HealSparseMap.make_empty(nside_coverage=256, 

111 nside_sparse=32768, 

112 dtype=hsp.WIDE_MASK, 

113 wide_mask_maxbits=len(visits)*2) 

114 

115 patch_poly = afwGeom.Polygon(geom.Box2D(sky_map[tract][patch].getOuterBBox())) 

116 sph_pts = sky_map[tract].getWcs().pixelToSky(patch_poly.convexHull().getVertices()) 

117 patch_poly_radec = np.array([(sph.getRa().asDegrees(), sph.getDec().asDegrees()) 

118 for sph in sph_pts]) 

119 poly = hsp.Polygon(ra=patch_poly_radec[: -1, 0], 

120 dec=patch_poly_radec[: -1, 1], 

121 value=[0]) 

122 poly_pixels = poly.get_pixels(nside=input_map.nside_sparse) 

123 # The input map has full coverage for bits 0 and 1 

124 input_map.set_bits_pix(poly_pixels, [0]) 

125 input_map.set_bits_pix(poly_pixels, [1]) 

126 

127 input_map_ref = InMemoryDatasetHandle(input_map, patch=patch, tract=tract) 

128 self.input_map_dict = {patch: input_map_ref} 

129 

130 coadd = afwImage.ExposureF(sky_map[tract][patch].getOuterBBox(), 

131 sky_map[tract].getWcs()) 

132 instFluxMag0 = 10.**(coadd_zp/2.5) 

133 pc = afwImage.makePhotoCalibFromCalibZeroPoint(instFluxMag0) 

134 coadd.setPhotoCalib(pc) 

135 

136 # Mock the coadd input ccd table 

137 schema = afwTable.ExposureTable.makeMinimalSchema() 

138 schema.addField("ccd", type="I") 

139 schema.addField("visit", type="I") 

140 schema.addField("weight", type="F") 

141 ccds = afwTable.ExposureCatalog(schema) 

142 ccds.resize(2) 

143 ccds['id'] = np.arange(2) 

144 ccds['visit'][0] = visits[0] 

145 ccds['visit'][1] = visits[1] 

146 ccds['ccd'][0] = 0 

147 ccds['ccd'][1] = 1 

148 ccds['weight'] = 10.0 

149 for ccd_row in ccds: 

150 summary = self.visit_summary_dict[ccd_row['visit']].find(ccd_row['ccd']) 

151 ccd_row.setWcs(summary.getWcs()) 

152 ccd_row.setPsf(summary.getPsf()) 

153 ccd_row.setBBox(summary.getBBox()) 

154 ccd_row.setPhotoCalib(summary.getPhotoCalib()) 

155 

156 inputs = afwImage.CoaddInputs() 

157 inputs.ccds = ccds 

158 coadd.getInfo().setCoaddInputs(inputs) 

159 

160 metadata = {} 

161 for bit, row in enumerate(ccds): 

162 metadata[f"B{bit:04d}CCD"] = row["ccd"] 

163 metadata[f"B{bit:04d}VIS"] = row["visit"] 

164 metadata[f"B{bit:04d}WT"] = row["weight"] 

165 input_map.metadata = metadata 

166 

167 coadd_ref = InMemoryDatasetHandle(coadd, patch=patch, tract=tract, storageClass="ExposureF") 

168 self.coadd_dict = {patch: coadd_ref} 

169 

170 self.tract = tract 

171 self.band = band 

172 self.sky_map = sky_map 

173 self.input_map = input_map 

174 

175 def testPropertyMapCreation(self): 

176 """Test creation of property maps.""" 

177 config = HealSparsePropertyMapTask.ConfigClass() 

178 

179 # Add our new test map to the set of maps 

180 config.property_maps.names |= ['dist_times_psfarea'] 

181 config.property_maps['dist_times_psfarea'].do_min = True 

182 config.property_maps['dist_times_psfarea'].do_max = True 

183 config.property_maps['dist_times_psfarea'].do_mean = True 

184 

185 property_task = HealSparsePropertyMapTask(config=config) 

186 

187 property_task.run(self.sky_map, 

188 self.tract, 

189 self.band, 

190 self.coadd_dict, 

191 self.input_map_dict, 

192 self.visit_summary_dict) 

193 

194 valid_pixels = self.input_map.valid_pixels 

195 

196 # Verify each map exists and has the correct pixels set. 

197 for name, map_config, PropertyMapClass in config.property_maps.apply(): 

198 self.assertTrue(name in property_task.property_maps) 

199 property_map = property_task.property_maps[name] 

200 if map_config.do_min: 

201 self.assertTrue(hasattr(property_map, 'min_map')) 

202 np.testing.assert_array_equal(property_map.min_map.valid_pixels, valid_pixels) 

203 metadata = property_map.min_map.metadata 

204 self.assertIsNotNone(metadata["DESCRIPTION"]) 

205 self.assertEqual(metadata["OPERATION"], "minimum") 

206 unit = units.Unit(metadata["UNIT"]) 

207 self.assertIsNotNone(unit) 

208 else: 

209 self.assertFalse(hasattr(property_map, 'min_map')) 

210 if map_config.do_max: 

211 self.assertTrue(hasattr(property_map, 'max_map')) 

212 np.testing.assert_array_equal(property_map.max_map.valid_pixels, valid_pixels) 

213 metadata = property_map.max_map.metadata 

214 self.assertIsNotNone(metadata["DESCRIPTION"]) 

215 self.assertEqual(metadata["OPERATION"], "maximum") 

216 unit = units.Unit(metadata["UNIT"]) 

217 self.assertIsNotNone(unit) 

218 else: 

219 self.assertFalse(hasattr(property_map, 'max_map')) 

220 if map_config.do_mean: 

221 self.assertTrue(hasattr(property_map, 'mean_map')) 

222 np.testing.assert_array_equal(property_map.mean_map.valid_pixels, valid_pixels) 

223 metadata = property_map.mean_map.metadata 

224 self.assertIsNotNone(metadata["DESCRIPTION"]) 

225 self.assertEqual(metadata["OPERATION"], "mean") 

226 unit = units.Unit(metadata["UNIT"]) 

227 self.assertIsNotNone(unit) 

228 else: 

229 self.assertFalse(hasattr(property_map, 'mean_map')) 

230 if map_config.do_weighted_mean: 

231 self.assertTrue(hasattr(property_map, 'weighted_mean_map')) 

232 np.testing.assert_array_equal(property_map.weighted_mean_map.valid_pixels, valid_pixels) 

233 metadata = property_map.weighted_mean_map.metadata 

234 self.assertIsNotNone(metadata["DESCRIPTION"]) 

235 self.assertEqual(metadata["OPERATION"], "weighted mean") 

236 unit = units.Unit(metadata["UNIT"]) 

237 self.assertIsNotNone(unit) 

238 else: 

239 self.assertFalse(hasattr(property_map, 'weighted_mean_map')) 

240 if map_config.do_sum: 

241 self.assertTrue(hasattr(property_map, 'sum_map')) 

242 np.testing.assert_array_equal(property_map.sum_map.valid_pixels, valid_pixels) 

243 metadata = property_map.sum_map.metadata 

244 self.assertIsNotNone(metadata["DESCRIPTION"]) 

245 self.assertEqual(metadata["OPERATION"], "sum") 

246 unit = units.Unit(metadata["UNIT"]) 

247 self.assertIsNotNone(unit) 

248 else: 

249 self.assertFalse(hasattr(property_map, 'sum_map')) 

250 

251 def testPropertyMapCreationEmptyInputMap(self): 

252 """Test creation of maps with an empty input map (DM-37837).""" 

253 # Replace the input map with an empty one. 

254 with warnings.catch_warnings(): 

255 warnings.simplefilter("ignore") 

256 self.input_map_dict[0] = InMemoryDatasetHandle(hsp.HealSparseMap.make_empty_like( 

257 self.input_map_dict[0].inMemoryDataset 

258 ), **self.input_map_dict[0].dataId) 

259 

260 config = HealSparsePropertyMapTask.ConfigClass() 

261 

262 property_task = HealSparsePropertyMapTask(config=config) 

263 

264 with self.assertLogs(level=logging.WARNING) as cm: 

265 property_task.run(self.sky_map, 

266 self.tract, 

267 self.band, 

268 self.coadd_dict, 

269 self.input_map_dict, 

270 self.visit_summary_dict) 

271 self.assertIn("No valid pixels", cm.output[0]) 

272 

273 # Ensure we have no valid pixels output. 

274 self.assertEqual(property_task.property_maps["exposure_time"].sum_map.valid_pixels.size, 0) 

275 

276 

277class MyMemoryTestCase(lsst.utils.tests.MemoryTestCase): 

278 pass 

279 

280 

281def setup_module(module): 

282 lsst.utils.tests.init() 

283 

284 

285if __name__ == "__main__": 285 ↛ 286line 285 didn't jump to line 286 because the condition on line 285 was never true

286 lsst.utils.tests.init() 

287 unittest.main()