Coverage for tests / test_make_direct_warp.py: 18%

210 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-05 18:49 +0000

1# This file is part of drp_tasks. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21 

22from __future__ import annotations 

23 

24import copy 

25import unittest 

26from typing import Self, Type 

27 

28import numpy as np 

29 

30import lsst.afw.cameraGeom.testUtils 

31import lsst.afw.image 

32import lsst.afw.math as afwMath 

33import lsst.skymap as skyMap 

34import lsst.utils.tests 

35from lsst.afw.detection import GaussianPsf 

36from lsst.daf.butler import DataCoordinate, DimensionUniverse 

37from lsst.drp.tasks.make_direct_warp import MakeDirectWarpConfig, MakeDirectWarpTask, WarpDetectorInputs 

38from lsst.pipe.base import InMemoryDatasetHandle 

39from lsst.pipe.tasks.coaddBase import makeSkyInfo 

40 

41 

42class MakeWarpTestCase(lsst.utils.tests.TestCase): 

43 @classmethod 

44 def setUpClass(cls): 

45 np.random.seed(12345) 

46 

47 rng = np.random.Generator(np.random.MT19937(12345)) 

48 

49 cls.config = MakeDirectWarpConfig() 

50 cls.config.useVisitSummaryPsf = False 

51 cls.config.doSelectPreWarp = False 

52 cls.config.doWarpMaskedFraction = True 

53 cls.config.numberOfNoiseRealizations = 1 

54 

55 cls.exposurePhotoCalib = lsst.afw.image.PhotoCalib(1.0) 

56 # An external photoCalib calibration to return 

57 cls.externalPhotoCalib = lsst.afw.image.PhotoCalib(1e-6, 1e-8) 

58 

59 crpix = lsst.geom.Point2D(0, 0) 

60 crval = lsst.geom.SpherePoint(0, 45, lsst.geom.degrees) 

61 cdMatrix = lsst.afw.geom.makeCdMatrix(scale=1.0 * lsst.geom.arcseconds) 

62 cls.skyWcs = lsst.afw.geom.makeSkyWcs(crpix, crval, cdMatrix) 

63 externalCdMatrix = lsst.afw.geom.makeCdMatrix(scale=0.9 * lsst.geom.arcseconds) 

64 # An external skyWcs to return 

65 cls.externalSkyWcs = lsst.afw.geom.makeSkyWcs(crpix, crval, externalCdMatrix) 

66 

67 cls.exposure = lsst.afw.image.ExposureF(100, 150) 

68 cls.exposure.maskedImage.image.array = rng.random((150, 100)).astype(np.float32) * 1000 

69 cls.exposure.maskedImage.variance.array = rng.random((150, 100)).astype(np.float32) 

70 # mask at least one pixel 

71 cls.exposure.maskedImage.mask[5, 5] = 3 

72 # set the PhotoCalib and Wcs objects of this exposure. 

73 cls.exposure.setPhotoCalib(lsst.afw.image.PhotoCalib(1.0)) 

74 cls.exposure.setWcs(cls.skyWcs) 

75 cls.exposure.setPsf(GaussianPsf(5, 5, 2.5)) 

76 cls.exposure.setFilter(lsst.afw.image.FilterLabel(physical="fakeFilter", band="fake")) 

77 

78 cls.backgroundToPhotometricRatio = lsst.afw.image.ImageF(100, 150) 

79 cls.backgroundToPhotometricRatio.array[:, :] = 1.1 

80 

81 cls.visit = 100 

82 cls.detector = 5 

83 detectorName = f"detector {cls.detector}" 

84 detector = lsst.afw.cameraGeom.testUtils.DetectorWrapper(name=detectorName, id=cls.detector).detector 

85 cls.exposure.setDetector(detector) 

86 

87 dataId_dict = {"detector_id": cls.detector, "visit_id": 1248, "band": "i"} 

88 cls.dataId = cls.generate_data_id(**dataId_dict) 

89 simpleMapConfig = skyMap.discreteSkyMap.DiscreteSkyMapConfig() 

90 simpleMapConfig.raList = [crval.getRa().asDegrees()] 

91 simpleMapConfig.decList = [crval.getDec().asDegrees()] 

92 simpleMapConfig.radiusList = [0.1] 

93 

94 cls.simpleMap = skyMap.DiscreteSkyMap(simpleMapConfig) 

95 cls.tractId = 0 

96 cls.patchId = cls.simpleMap[0].findPatch(crval).sequential_index 

97 cls.skyInfo = makeSkyInfo(cls.simpleMap, cls.tractId, cls.patchId) 

98 

99 @classmethod 

100 def generate_data_id( 

101 cls: Type[Self], 

102 *, 

103 tract: int = 9813, 

104 patch: int = 42, 

105 band: str = "r", 

106 detector_id: int = 9, 

107 visit_id: int = 1234, 

108 detector_max: int = 109, 

109 visit_max: int = 10000, 

110 ) -> DataCoordinate: 

111 """Generate a DataCoordinate instance to use as data_id. 

112 

113 Parameters 

114 ---------- 

115 tract : `int`, optional 

116 Tract ID for the data_id 

117 patch : `int`, optional 

118 Patch ID for the data_id 

119 band : `str`, optional 

120 Band for the data_id 

121 detector_id : `int`, optional 

122 Detector ID for the data_id 

123 visit_id : `int`, optional 

124 Visit ID for the data_id 

125 detector_max : `int`, optional 

126 Maximum detector ID for the data_id 

127 visit_max : `int`, optional 

128 Maximum visit ID for the data_id 

129 

130 Returns 

131 ------- 

132 data_id : `lsst.daf.butler.DataCoordinate` 

133 An expanded data_id instance. 

134 """ 

135 universe = DimensionUniverse() 

136 

137 instrument = universe["instrument"] 

138 instrument_record = instrument.RecordClass( 

139 name="DummyCam", 

140 class_name="lsst.obs.base.instrument_tests.DummyCam", 

141 detector_max=detector_max, 

142 visit_max=visit_max, 

143 ) 

144 

145 skymap = universe["skymap"] 

146 skymap_record = skymap.RecordClass(name="test_skymap") 

147 

148 band_element = universe["band"] 

149 band_record = band_element.RecordClass(name=band) 

150 

151 visit = universe["visit"] 

152 visit_record = visit.RecordClass(id=visit_id, instrument="test") 

153 

154 detector = universe["detector"] 

155 detector_record = detector.RecordClass(id=detector_id, instrument="test") 

156 

157 physical_filter = universe["physical_filter"] 

158 physical_filter_record = physical_filter.RecordClass(name=band, instrument="test", band=band) 

159 

160 patch_element = universe["patch"] 

161 patch_record = patch_element.RecordClass( 

162 skymap="test_skymap", 

163 tract=tract, 

164 patch=patch, 

165 ) 

166 

167 if "day_obs" in universe: 

168 day_obs_element = universe["day_obs"] 

169 day_obs_record = day_obs_element.RecordClass(id=20240201, instrument="test") 

170 else: 

171 day_obs_record = None 

172 

173 # A dictionary with all the relevant records. 

174 record = { 

175 "instrument": instrument_record, 

176 "visit": visit_record, 

177 "detector": detector_record, 

178 "patch": patch_record, 

179 "tract": 9813, 

180 "band": band_record.name, 

181 "skymap": skymap_record.name, 

182 "physical_filter": physical_filter_record, 

183 } 

184 

185 if day_obs_record: 

186 record["day_obs"] = day_obs_record 

187 

188 # A dictionary with all the relevant recordIds. 

189 record_id = record.copy() 

190 for key in ("visit", "detector"): 

191 record_id[key] = record_id[key].id 

192 

193 # TODO: Catching mypy failures on Github Actions should be made easier, 

194 # perhaps in DM-36873. Igroring these for now. 

195 data_id = DataCoordinate.standardize(record_id, universe=universe) 

196 return data_id.expanded(record) 

197 

198 def test_makeWarp(self): 

199 """Test basic MakeDirectWarpTask.""" 

200 dataRef = InMemoryDatasetHandle(self.exposure.clone(), dataId=self.dataId) 

201 config = copy.copy(self.config) 

202 

203 makeWarp = MakeDirectWarpTask(config=config) 

204 warp_detector_inputs = { 

205 dataRef.dataId.detector.id: WarpDetectorInputs( 

206 exposure_or_handle=dataRef, 

207 data_id=dataRef.dataId, 

208 ) 

209 } 

210 result = makeWarp.run(warp_detector_inputs, sky_info=copy.deepcopy(self.skyInfo), visit_summary=None) 

211 

212 warp = result.warp 

213 mfrac = result.masked_fraction_warp 

214 noise = result.noise_warp0 

215 

216 # Ensure we got an exposure out 

217 self.assertIsInstance(warp, lsst.afw.image.ExposureF) 

218 # Ensure that masked fraction is an ImageF object. 

219 self.assertIsInstance(mfrac, lsst.afw.image.ImageF) 

220 # Ensure that the noise image is a MaskedImageF object. 

221 self.assertIsInstance(noise, lsst.afw.image.MaskedImageF) 

222 # Check that the noise image is not accidentally the same as the image. 

223 with self.assertRaises(AssertionError): 

224 self.assertImagesAlmostEqual(noise.image, warp.image) 

225 # Ensure the warp has valid pixels 

226 self.assertGreater(np.isfinite(warp.image.array.ravel()).sum(), 0) 

227 # Ensure the warp has the correct WCS 

228 self.assertEqual(warp.getWcs(), self.skyInfo.wcs) 

229 # Ensure that mfrac has pixels between 0 and 1 

230 self.assertTrue(np.nanmax(mfrac.array) <= 1) 

231 self.assertTrue(np.nanmin(mfrac.array) >= 0) 

232 

233 def make_backgroundList(self): 

234 """Obtain a BackgroundList for the image.""" 

235 bgCtrl = afwMath.BackgroundControl(10, 10) 

236 interpStyle = afwMath.Interpolate.AKIMA_SPLINE 

237 undersampleStyle = afwMath.REDUCE_INTERP_ORDER 

238 approxStyle = afwMath.ApproximateControl.UNKNOWN 

239 approxOrderX = 0 

240 approxOrderY = 0 

241 approxWeighting = False 

242 

243 backgroundList = afwMath.BackgroundList() 

244 

245 bkgd = afwMath.makeBackground(self.exposure.image, bgCtrl) 

246 

247 backgroundList.append( 

248 (bkgd, interpStyle, undersampleStyle, approxStyle, approxOrderX, approxOrderY, approxWeighting) 

249 ) 

250 

251 return backgroundList 

252 

253 @lsst.utils.tests.methodParametersProduct( 

254 doApplyNewBackground=[True, False], doRevertOldBackground=[True, False] 

255 ) 

256 def test_backgrounds(self, doApplyNewBackground, doRevertOldBackground): 

257 """Test that applying and reverting backgrounds runs without errors, 

258 especially on noise images. 

259 """ 

260 dataRef = InMemoryDatasetHandle(self.exposure.clone(), dataId=self.dataId) 

261 config = copy.copy(self.config) 

262 

263 backgroundList = self.make_backgroundList() 

264 

265 warp_detector_inputs = { 

266 dataRef.dataId.detector.id: WarpDetectorInputs( 

267 exposure_or_handle=dataRef, 

268 data_id=dataRef.dataId, 

269 background_apply=backgroundList if doApplyNewBackground else None, 

270 background_revert=backgroundList if doRevertOldBackground else None, 

271 ) 

272 } 

273 

274 config.numberOfNoiseRealizations = 1 

275 config.doApplyNewBackground = doApplyNewBackground 

276 config.doRevertOldBackground = doRevertOldBackground 

277 

278 makeWarp = MakeDirectWarpTask(config=config) 

279 makeWarp.run(warp_detector_inputs, sky_info=copy.deepcopy(self.skyInfo), visit_summary=None) 

280 

281 def test_flat_background_ratio(self): 

282 """Test that using the flat background ratio works.""" 

283 backgroundList = self.make_backgroundList() 

284 

285 dataRef = InMemoryDatasetHandle(self.exposure.clone(), dataId=self.dataId) 

286 config = copy.copy(self.config) 

287 

288 warp_detector_inputs_basic = { 

289 dataRef.dataId.detector.id: WarpDetectorInputs( 

290 exposure_or_handle=dataRef, 

291 data_id=dataRef.dataId, 

292 ) 

293 } 

294 

295 makeWarpBasic = MakeDirectWarpTask(config=config) 

296 resultBasic = makeWarpBasic.run( 

297 warp_detector_inputs_basic, 

298 sky_info=copy.deepcopy(self.skyInfo), 

299 visit_summary=None, 

300 ) 

301 

302 dataRef = InMemoryDatasetHandle(self.exposure.clone(), dataId=self.dataId) 

303 backgroundRatioDataRef = InMemoryDatasetHandle( 

304 self.backgroundToPhotometricRatio.clone(), 

305 dataId=self.dataId, 

306 ) 

307 

308 warp_detector_inputs = { 

309 dataRef.dataId.detector.id: WarpDetectorInputs( 

310 exposure_or_handle=dataRef, 

311 data_id=dataRef.dataId, 

312 background_apply=backgroundList, 

313 background_revert=backgroundList, 

314 background_ratio_or_handle=backgroundRatioDataRef, 

315 ) 

316 } 

317 

318 config.numberOfNoiseRealizations = 1 

319 config.doApplyNewBackground = True 

320 config.doRevertOldBackground = True 

321 config.doApplyFlatBackgroundRatio = True 

322 

323 makeWarp = MakeDirectWarpTask(config=config) 

324 result = makeWarp.run(warp_detector_inputs, sky_info=copy.deepcopy(self.skyInfo), visit_summary=None) 

325 

326 finite = np.isfinite(result.warp.image.array) 

327 delta = result.warp.image.array[finite] - resultBasic.warp.image.array[finite] 

328 self.assertFloatsAlmostEqual(np.median(delta), 0.0, atol=1e-6) 

329 

330 def test_background_errors(self): 

331 """Test that MakeDirectWarpTask raises errors when backgrounds are not 

332 set correctly. 

333 """ 

334 dataRef = InMemoryDatasetHandle(self.exposure.clone(), dataId=self.dataId) 

335 config = copy.copy(self.config) 

336 

337 backgroundList = self.make_backgroundList() 

338 config.numberOfNoiseRealizations = 1 

339 

340 warp_detector_inputs = { 

341 dataRef.dataId.detector.id: WarpDetectorInputs( 

342 exposure_or_handle=dataRef, 

343 data_id=dataRef.dataId, 

344 background_apply=backgroundList, 

345 ) 

346 } 

347 makeWarp = MakeDirectWarpTask(config=config) 

348 with self.assertRaises(RuntimeError, msg="doApplyNewBackground is False, but"): 

349 makeWarp.run(warp_detector_inputs, sky_info=copy.deepcopy(self.skyInfo), visit_summary=None) 

350 

351 warp_detector_inputs = { 

352 dataRef.dataId.detector.id: WarpDetectorInputs( 

353 exposure_or_handle=dataRef, 

354 data_id=dataRef.dataId, 

355 background_apply=None, 

356 ) 

357 } 

358 config.doApplyNewBackground = True 

359 makeWarp = MakeDirectWarpTask(config=config) 

360 with self.assertRaises(RuntimeError, msg="No background to apply"): 

361 makeWarp.run(warp_detector_inputs, sky_info=copy.deepcopy(self.skyInfo), visit_summary=None) 

362 

363 warp_detector_inputs = { 

364 dataRef.dataId.detector.id: WarpDetectorInputs( 

365 exposure_or_handle=dataRef, 

366 data_id=dataRef.dataId, 

367 background_apply=backgroundList, 

368 background_revert=backgroundList, 

369 ) 

370 } 

371 makeWarp = MakeDirectWarpTask(config=config) 

372 with self.assertRaises(RuntimeError, msg="doRevertOldBackground is False, but"): 

373 makeWarp.run(warp_detector_inputs, sky_info=copy.deepcopy(self.skyInfo), visit_summary=None) 

374 

375 warp_detector_inputs = { 

376 dataRef.dataId.detector.id: WarpDetectorInputs( 

377 exposure_or_handle=dataRef, 

378 data_id=dataRef.dataId, 

379 background_apply=backgroundList, 

380 background_revert=None, 

381 ) 

382 } 

383 config.doRevertOldBackground = True 

384 makeWarp = MakeDirectWarpTask(config=config) 

385 with self.assertRaises(RuntimeError, msg="No background to revert"): 

386 makeWarp.run(warp_detector_inputs, sky_info=copy.deepcopy(self.skyInfo), visit_summary=None) 

387 

388 def test_long_data_ids(self): 

389 """Test MakeDirectWarpTask fails gracefully with no good pixels. 

390 

391 It should return an empty exposure, with no PSF. 

392 """ 

393 dataRef = InMemoryDatasetHandle(self.exposure.clone(), dataId=self.dataId) 

394 config = copy.copy(self.config) 

395 

396 warp_detector_inputs = { 

397 dataRef.dataId.detector.id: WarpDetectorInputs( 

398 exposure_or_handle=dataRef, 

399 data_id=dataRef.dataId, 

400 ) 

401 } 

402 

403 config.border = 0 # Repeated calls will expand it otherwise. 

404 makeWarp_original = MakeDirectWarpTask(config=config) 

405 makeWarp_short = MakeDirectWarpTask(config=config) 

406 makeWarp_short.get_seed_from_data_id = ( 

407 lambda data_id: 2**32 - 1 + makeWarp_original.get_seed_from_data_id(data_id) 

408 ) 

409 makeWarp_long = MakeDirectWarpTask(config=config) 

410 makeWarp_long.get_seed_from_data_id = lambda data_id: 2**32 + makeWarp_original.get_seed_from_data_id( 

411 data_id 

412 ) 

413 

414 result_long = makeWarp_long.run( 

415 warp_detector_inputs, 

416 sky_info=copy.deepcopy(self.skyInfo), 

417 visit_summary=None, 

418 ) 

419 result_short = makeWarp_short.run( 

420 warp_detector_inputs, 

421 sky_info=copy.deepcopy(self.skyInfo), 

422 visit_summary=None, 

423 ) 

424 result_original = makeWarp_original.run( 

425 warp_detector_inputs, 

426 sky_info=copy.deepcopy(self.skyInfo), 

427 visit_summary=None, 

428 ) 

429 

430 self.assertMaskedImagesAlmostEqual(result_long.noise_warp0, result_original.noise_warp0, atol=6e-8) 

431 with self.assertRaises(AssertionError): 

432 self.assertMaskedImagesEqual(result_short.noise_warp0, result_original.noise_warp0) 

433 

434 

435class MakeWarpNoGoodPixelsTestCase(MakeWarpTestCase): 

436 

437 @classmethod 

438 def setUpClass(cls): 

439 super().setUpClass() 

440 cls.exposure.mask.array |= cls.exposure.mask.getPlaneBitMask("NO_DATA") 

441 

442 def test_makeWarp(self): 

443 """Test MakeDirectWarpTask fails gracefully with no good pixels. 

444 

445 It should return an empty exposure, with no PSF. 

446 """ 

447 dataRef = InMemoryDatasetHandle(self.exposure.clone(), dataId=self.dataId) 

448 config = copy.copy(self.config) 

449 

450 makeWarp = MakeDirectWarpTask(config=config) 

451 warp_detector_inputs = { 

452 dataRef.dataId.detector.id: WarpDetectorInputs( 

453 exposure_or_handle=dataRef, 

454 data_id=dataRef.dataId, 

455 ) 

456 } 

457 result = makeWarp.run(warp_detector_inputs, sky_info=copy.deepcopy(self.skyInfo), visit_summary=None) 

458 

459 # Ensure we got None 

460 self.assertIsNone(result.warp) 

461 self.assertIsNone(result.masked_fraction_warp) 

462 self.assertIsNone(result.noise_warp0) 

463 

464 def test_compare_warps(self): 

465 """This test is not applicable when there are no good pixels.""" 

466 

467 def test_long_data_ids(self): 

468 """This test is not applicable when there are no good pixels.""" 

469 

470 def test_flat_background_ratio(self): 

471 """This test is not applicable when there are no good pixels.""" 

472 

473 

474def setup_module(module): 

475 lsst.utils.tests.init() 

476 

477 

478class MatchMemoryTestCase(lsst.utils.tests.MemoryTestCase): 

479 pass 

480 

481 

482if __name__ == "__main__": 482 ↛ 483line 482 didn't jump to line 483 because the condition on line 482 was never true

483 lsst.utils.tests.init() 

484 unittest.main()