Coverage for tests / test_assemble_cell_coadd.py: 22%

157 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-22 09:28 +0000

1# This file is part of drp_tasks. 

2# 

3# LSST Data Management System 

4# This product includes software developed by the 

5# LSST Project (http://www.lsst.org/). 

6# See COPYRIGHT file at the top of the source tree. 

7# 

8# This program is free software: you can redistribute it and/or modify 

9# it under the terms of the GNU General Public License as published by 

10# the Free Software Foundation, either version 3 of the License, or 

11# (at your option) any later version. 

12# 

13# This program is distributed in the hope that it will be useful, 

14# but WITHOUT ANY WARRANTY; without even the implied warranty of 

15# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

16# GNU General Public License for more details. 

17# 

18# You should have received a copy of the LSST License Statement and 

19# the GNU General Public License along with this program. If not, 

20# see <https://www.lsstcorp.org/LegalNotices/>. 

21# 

22 

23from __future__ import annotations 

24 

25import unittest 

26import warnings 

27from typing import TYPE_CHECKING, Iterable 

28 

29import galsim 

30import hpgeom as hpg 

31import numpy as np 

32from assemble_coadd_test_utils import MockCoaddTestData, makeMockSkyInfo 

33 

34import lsst.afw.geom as afwGeom 

35import lsst.afw.image as afwImage 

36import lsst.pipe.base as pipeBase 

37import lsst.utils.tests 

38from lsst.cell_coadds import CommonComponents 

39from lsst.drp.tasks.assemble_cell_coadd import ( 

40 AssembleCellCoaddConfig, 

41 AssembleCellCoaddTask, 

42 WarpInputs, 

43) 

44 

45if TYPE_CHECKING: 

46 from lsst.cell_coadds import ObservationIdentifiers 

47 

48__all__ = ( 

49 "MockAssembleCellCoaddConfig", 

50 "MockAssembleCellCoaddTask", 

51) 

52 

53 

54class MockAssembleCellCoaddConfig(AssembleCellCoaddConfig): 

55 def setDefaults(self): 

56 super().setDefaults() 

57 self.require_artifact_mask = False 

58 

59 

60class MockAssembleCellCoaddTask(AssembleCellCoaddTask): 

61 """Lightly modified version of `AssembleCellCoaddTask` for unit tests. 

62 

63 The modifications bypass the usual middleware for loading data and setting 

64 up the Task, and instead supply in-memory mock data references to the `run` 

65 method so that the coaddition algorithms can be tested without a Butler. 

66 """ 

67 

68 ConfigClass = MockAssembleCellCoaddConfig 

69 

70 def runQuantum( 

71 self, 

72 mockSkyInfo, 

73 warpRefList, 

74 maskedFractionRefList, 

75 noise0RefList, 

76 visitSummaryList=None, 

77 ): 

78 """Modified interface for testing coaddition algorithms without a 

79 Butler. 

80 

81 Parameters 

82 ---------- 

83 mockSkyInfo : `lsst.pipe.base.Struct` 

84 A simple container that supplies a bounding box and WCS in the 

85 same format as the output of 

86 `lsst.pipe.tasks.CoaddBaseTask.getSkyInfo` 

87 warpRefList : `list` of `lsst.pipe.tasks.MockExposureReference` 

88 Data references to the test exposures that will be coadded, 

89 using the Gen 3 API. 

90 

91 Returns 

92 ------- 

93 retStruct : `lsst.pipe.base.Struct` 

94 The coadded exposure and associated metadata. 

95 """ 

96 

97 self.common = CommonComponents( 

98 units=None, 

99 wcs=mockSkyInfo.wcs, 

100 band="i", 

101 identifiers=pipeBase.Struct(skymap=None, tract=0, patch=42, band="i"), 

102 ) 

103 

104 inputs = {} 

105 for warpInput, maskedFractionInput, noise0Input in zip( 

106 warpRefList, 

107 maskedFractionRefList, 

108 noise0RefList, 

109 ): 

110 inputs[warpInput.dataId] = WarpInputs( 

111 warp=warpInput, 

112 masked_fraction=maskedFractionInput, 

113 noise_warps=[noise0Input], 

114 ) 

115 

116 retStruct = self.run( 

117 inputs=inputs, 

118 skyInfo=mockSkyInfo, 

119 visitSummaryList=visitSummaryList, 

120 ) 

121 

122 return retStruct 

123 

124 

125class AssembleCellCoaddTestCase(lsst.utils.tests.TestCase): 

126 """Tests of AssembleCellCoaddTask. 

127 

128 These tests bypass the middleware used for accessing data and managing Task 

129 execution. 

130 """ 

131 

132 @classmethod 

133 def setUpClass(cls) -> None: 

134 np.random.seed(42) 

135 rng = np.random.Generator(np.random.MT19937(42)) 

136 patch = 42 

137 tract = 0 

138 testData = MockCoaddTestData(fluxRange=1e4) 

139 exposures = {} 

140 matchedExposures = {} 

141 masked_fraction_images = {} 

142 noise0_masked_images = {} 

143 for expId in range(100, 110): 

144 exposures[expId], matchedExposures[expId] = testData.makeTestImage(expId) 

145 masked_fraction_images[expId] = afwImage.ImageF(bbox=exposures[expId].getBBox()) 

146 masked_fraction_images[expId].array[:, :] = rng.random(masked_fraction_images[expId].array.shape) 

147 noise0_masked_images[expId] = afwImage.MaskedImageF(bbox=exposures[expId].getBBox()) 

148 noise0_masked_images[expId].image.array[:, :] = rng.normal( 

149 0, 1, noise0_masked_images[expId].image.array.shape 

150 ) * (exposures[expId].variance.array**0.5) 

151 

152 cls.handleList = testData.makeDataRefList(exposures, patch=patch, tract=tract) 

153 cls.maskedFractionRefList = testData.makeDataRefList(masked_fraction_images, patch=patch, tract=tract) 

154 cls.noise0RefList = testData.makeDataRefList(noise0_masked_images, patch=patch, tract=tract) 

155 cls.visitSummaryList = [ 

156 testData.makeVisitSummaryTableHandle(warpHandle) for warpHandle in cls.handleList 

157 ] 

158 cls.skyInfo = makeMockSkyInfo(testData.bbox, testData.wcs, patch=patch) 

159 

160 def runTask( 

161 self, 

162 config=None, 

163 warpRefList=None, 

164 maskedFractionRefList=None, 

165 noise0RefList=None, 

166 visitSummaryList=None, 

167 ) -> None: 

168 if config is None: 

169 config = MockAssembleCellCoaddConfig() 

170 config.do_input_map = True 

171 assembleTask = MockAssembleCellCoaddTask(config=config) 

172 if warpRefList is None: 

173 warpRefList = self.handleList 

174 if maskedFractionRefList is None: 

175 maskedFractionRefList = self.maskedFractionRefList 

176 if noise0RefList is None: 

177 noise0RefList = self.noise0RefList 

178 if visitSummaryList is None: 

179 visitSummaryList = self.visitSummaryList 

180 

181 self.result = assembleTask.runQuantum( 

182 self.skyInfo, 

183 warpRefList=warpRefList, 

184 maskedFractionRefList=maskedFractionRefList, 

185 noise0RefList=noise0RefList, 

186 visitSummaryList=visitSummaryList, 

187 ) 

188 

189 def checkSortOrder(self, inputs: Iterable[ObservationIdentifiers]) -> None: 

190 """Check that the inputs are sorted. 

191 

192 The inputs must be sorted first by visit, and within the same visit, 

193 by detector. 

194 

195 Parameters 

196 ---------- 

197 inputs : `Iterable` [`ObservationIdentifiers`] 

198 The inputs to be checked. 

199 """ 

200 visit, detector = -np.inf, -np.inf # Previous visit, detector IDs. 

201 for _, obsId in enumerate(inputs): 

202 with self.subTest(input_number=repr(obsId)): 

203 self.assertGreaterEqual(obsId.visit, visit) 

204 if visit == obsId.visit: 

205 with self.subTest(detector_number=repr(obsId.detector)): 

206 self.assertGreaterEqual(obsId.detector, detector) 

207 

208 visit, detector = obsId.visit, obsId.detector 

209 

210 def checkRun(self, assembleTask): 

211 """Check that the task runs successfully.""" 

212 result = assembleTask.runQuantum(self.skyInfo, self.handleList) 

213 

214 # Check that we produced an exposure. 

215 self.assertTrue(result.multipleCellCoadd is not None) 

216 # Check that the visit_count method returns a number less than or equal 

217 # to the total number of input exposures available. 

218 max_visit_count = len(self.handleList) 

219 for cellId, singleCellCoadd in result.multipleCellCoadd.cells.items(): 

220 with self.subTest(x=repr(cellId.x), y=repr(cellId.y)): 

221 self.assertLessEqual(singleCellCoadd.visit_count, max_visit_count) 

222 # Check that the aperture correction maps are not None. 

223 with self.subTest(x=repr(cellId.x), y=repr(cellId.y)): 

224 self.assertTrue(singleCellCoadd.aperture_correction_map is not None) 

225 # Check that the inputs are sorted. 

226 with self.subTest(x=repr(cellId.x), y=repr(cellId.y)): 

227 self.checkSortOrder(singleCellCoadd.inputs) 

228 

229 def test_assemble_basic(self): 

230 """Test that AssembleCellCoaddTask runs successfully without errors. 

231 

232 This test does not check the correctness of the coaddition algorithms. 

233 This is intended to prevent the code from bit rotting. 

234 """ 

235 self.runTask() 

236 # Check that we produced an exposure. 

237 self.assertTrue(self.result.multipleCellCoadd is not None) 

238 self.assertTrue(self.result.inputMap is not None) 

239 

240 # Check the input map. 

241 inputMap = self.result.inputMap 

242 

243 warp_input_list = [handle.get() for handle in self.handleList] 

244 visit_detectors = [] 

245 for warp_input in warp_input_list: 

246 for row in warp_input.getInfo().getCoaddInputs().ccds: 

247 visit_detectors.append((int(row["visit"]), int(row["ccd"]))) 

248 

249 for bit, (visit, detector) in enumerate(visit_detectors): 

250 self.assertEqual(inputMap.metadata[f"B{bit:04d}VIS"], visit) 

251 self.assertEqual(inputMap.metadata[f"B{bit:04d}CCD"], detector) 

252 self.assertGreater(inputMap.metadata[f"B{bit:04d}WT"], 0.0) 

253 

254 coadd_poly = afwGeom.Polygon(lsst.geom.Box2D(self.result.multipleCellCoadd.outer_bbox)) 

255 sph_pts = self.result.multipleCellCoadd.wcs.pixelToSky(coadd_poly) 

256 radec = np.asarray([(sph.getRa().asDegrees(), sph.getDec().asDegrees()) for sph in sph_pts]) 

257 pixels = hpg.query_polygon(inputMap.nside_sparse, radec[:-1, 0], radec[:-1, 1]) 

258 np.testing.assert_array_equal(pixels, inputMap.valid_pixels) 

259 

260 def test_assemble_empty(self): 

261 """Test that AssembleCellCoaddTask runs successfully without errors 

262 when no input exposures are provided.""" 

263 self.result = None # so tearDown has something. 

264 with self.assertRaises(pipeBase.NoWorkFound, msg="No cells could be populated for the cell coadd."): 

265 self.runTask(warpRefList=[], maskedFractionRefList=[], noise0RefList=[], visitSummaryList=[]) 

266 

267 def test_assemble_without_visitSummary(self): 

268 """Test that AssembleCellCoaddTask calculates detector weights and 

269 runs successfully without errors when no visit summaries are provided. 

270 """ 

271 self.runTask(visitSummaryList=[]) 

272 # Check that we produced an exposure. 

273 self.assertTrue(self.result.multipleCellCoadd is not None) 

274 self.assertTrue(self.result.inputMap is not None) 

275 

276 # TODO: Remove this test in DM-49401 

277 @lsst.utils.tests.methodParameters(do_scale_zero_point=[False, True]) 

278 def test_do_scale_zero_point(self, do_scale_zero_point): 

279 config = MockAssembleCellCoaddConfig() 

280 with warnings.catch_warnings(): 

281 warnings.filterwarnings("ignore", category=FutureWarning) 

282 config.do_scale_zero_point = do_scale_zero_point 

283 self.runTask(config) 

284 # Check that we produced an exposure. 

285 self.assertTrue(self.result.multipleCellCoadd is not None) 

286 self.assertTrue(self.result.inputMap is not None) 

287 

288 @lsst.utils.tests.methodParameters(do_calculate_weight_from_warp=[False, True]) 

289 def test_do_calculate_weight_from_warp(self, do_calculate_weight_from_warp): 

290 config = MockAssembleCellCoaddConfig() 

291 config.do_calculate_weight_from_warp = do_calculate_weight_from_warp 

292 self.runTask(config) 

293 # Check that we produced an exposure. 

294 self.assertTrue(self.result.multipleCellCoadd is not None) 

295 self.assertTrue(self.result.inputMap is not None) 

296 

297 def test_visit_count(self): 

298 """Check that the visit_count method returns a number less than or 

299 equal to the total number of input exposures available. 

300 """ 

301 self.runTask() 

302 max_visit_count = len(self.handleList) 

303 for cellId, singleCellCoadd in self.result.multipleCellCoadd.cells.items(): 

304 with self.subTest(x=repr(cellId.x), y=repr(cellId.y)): 

305 self.assertLessEqual(singleCellCoadd.visit_count, max_visit_count) 

306 

307 def test_inputs_sorted(self): 

308 """Check that the inputs are sorted. 

309 

310 The ordering is that inputs are sorted first by visit, and within the 

311 same visit, they are ordered by detector. 

312 """ 

313 self.runTask() 

314 for _, singleCellCoadd in self.result.multipleCellCoadd.cells.items(): 

315 self.checkSortOrder(singleCellCoadd.inputs) 

316 

317 def test_psf_normalization(self): 

318 """Check that the sum of PSF images is close to 1.""" 

319 self.runTask() 

320 for cellId, singleCellCoadd in self.result.multipleCellCoadd.cells.items(): 

321 with self.subTest(x=repr(cellId.x), y=repr(cellId.y)): 

322 self.assertFloatsAlmostEqual(singleCellCoadd.psf_image.array.sum(), 1.0, rtol=None, atol=1e-7) 

323 

324 def test_psf_centering(self): 

325 """Check that the PSF images are centered.""" 

326 self.runTask() 

327 for cellId, singleCellCoadd in self.result.multipleCellCoadd.cells.items(): 

328 with self.subTest(x=repr(cellId.x), y=repr(cellId.y)): 

329 shape = galsim.hsm.FindAdaptiveMom(galsim.Image(singleCellCoadd.psf_image.array)) 

330 self.assertFloatsAlmostEqual( 

331 shape.moments_centroid.x, round(shape.moments_centroid.x), rtol=None, atol=0.01 

332 ) 

333 self.assertFloatsAlmostEqual( 

334 shape.moments_centroid.y, round(shape.moments_centroid.y), rtol=None, atol=0.01 

335 ) 

336 

337 

338class MyMemoryTestCase(lsst.utils.tests.MemoryTestCase): 

339 pass 

340 

341 

342def setup_module(module): 

343 lsst.utils.tests.init() 

344 

345 

346if __name__ == "__main__": 346 ↛ 347line 346 didn't jump to line 347 because the condition on line 346 was never true

347 lsst.utils.tests.init() 

348 unittest.main()