Coverage for tests / test_make_direct_warp.py: 18%
210 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-15 00:19 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-15 00:19 +0000
1# This file is part of drp_tasks.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
22from __future__ import annotations
24import copy
25import unittest
26from typing import Self, Type
28import numpy as np
30import lsst.afw.cameraGeom.testUtils
31import lsst.afw.image
32import lsst.afw.math as afwMath
33import lsst.skymap as skyMap
34import lsst.utils.tests
35from lsst.afw.detection import GaussianPsf
36from lsst.daf.butler import DataCoordinate, DimensionUniverse
37from lsst.drp.tasks.make_direct_warp import MakeDirectWarpConfig, MakeDirectWarpTask, WarpDetectorInputs
38from lsst.pipe.base import InMemoryDatasetHandle
39from lsst.pipe.tasks.coaddBase import makeSkyInfo
42class MakeWarpTestCase(lsst.utils.tests.TestCase):
43 @classmethod
44 def setUpClass(cls):
45 np.random.seed(12345)
47 rng = np.random.Generator(np.random.MT19937(12345))
49 cls.config = MakeDirectWarpConfig()
50 cls.config.useVisitSummaryPsf = False
51 cls.config.doSelectPreWarp = False
52 cls.config.doWarpMaskedFraction = True
53 cls.config.numberOfNoiseRealizations = 1
55 cls.exposurePhotoCalib = lsst.afw.image.PhotoCalib(1.0)
56 # An external photoCalib calibration to return
57 cls.externalPhotoCalib = lsst.afw.image.PhotoCalib(1e-6, 1e-8)
59 crpix = lsst.geom.Point2D(0, 0)
60 crval = lsst.geom.SpherePoint(0, 45, lsst.geom.degrees)
61 cdMatrix = lsst.afw.geom.makeCdMatrix(scale=1.0 * lsst.geom.arcseconds)
62 cls.skyWcs = lsst.afw.geom.makeSkyWcs(crpix, crval, cdMatrix)
63 externalCdMatrix = lsst.afw.geom.makeCdMatrix(scale=0.9 * lsst.geom.arcseconds)
64 # An external skyWcs to return
65 cls.externalSkyWcs = lsst.afw.geom.makeSkyWcs(crpix, crval, externalCdMatrix)
67 cls.exposure = lsst.afw.image.ExposureF(100, 150)
68 cls.exposure.maskedImage.image.array = rng.random((150, 100)).astype(np.float32) * 1000
69 cls.exposure.maskedImage.variance.array = rng.random((150, 100)).astype(np.float32)
70 # mask at least one pixel
71 cls.exposure.maskedImage.mask[5, 5] = 3
72 # set the PhotoCalib and Wcs objects of this exposure.
73 cls.exposure.setPhotoCalib(lsst.afw.image.PhotoCalib(1.0))
74 cls.exposure.setWcs(cls.skyWcs)
75 cls.exposure.setPsf(GaussianPsf(5, 5, 2.5))
76 cls.exposure.setFilter(lsst.afw.image.FilterLabel(physical="fakeFilter", band="fake"))
78 cls.backgroundToPhotometricRatio = lsst.afw.image.ImageF(100, 150)
79 cls.backgroundToPhotometricRatio.array[:, :] = 1.1
81 cls.visit = 100
82 cls.detector = 5
83 detectorName = f"detector {cls.detector}"
84 detector = lsst.afw.cameraGeom.testUtils.DetectorWrapper(name=detectorName, id=cls.detector).detector
85 cls.exposure.setDetector(detector)
87 dataId_dict = {"detector_id": cls.detector, "visit_id": 1248, "band": "i"}
88 cls.dataId = cls.generate_data_id(**dataId_dict)
89 simpleMapConfig = skyMap.discreteSkyMap.DiscreteSkyMapConfig()
90 simpleMapConfig.raList = [crval.getRa().asDegrees()]
91 simpleMapConfig.decList = [crval.getDec().asDegrees()]
92 simpleMapConfig.radiusList = [0.1]
94 cls.simpleMap = skyMap.DiscreteSkyMap(simpleMapConfig)
95 cls.tractId = 0
96 cls.patchId = cls.simpleMap[0].findPatch(crval).sequential_index
97 cls.skyInfo = makeSkyInfo(cls.simpleMap, cls.tractId, cls.patchId)
99 @classmethod
100 def generate_data_id(
101 cls: Type[Self],
102 *,
103 tract: int = 9813,
104 patch: int = 42,
105 band: str = "r",
106 detector_id: int = 9,
107 visit_id: int = 1234,
108 detector_max: int = 109,
109 visit_max: int = 10000,
110 ) -> DataCoordinate:
111 """Generate a DataCoordinate instance to use as data_id.
113 Parameters
114 ----------
115 tract : `int`, optional
116 Tract ID for the data_id
117 patch : `int`, optional
118 Patch ID for the data_id
119 band : `str`, optional
120 Band for the data_id
121 detector_id : `int`, optional
122 Detector ID for the data_id
123 visit_id : `int`, optional
124 Visit ID for the data_id
125 detector_max : `int`, optional
126 Maximum detector ID for the data_id
127 visit_max : `int`, optional
128 Maximum visit ID for the data_id
130 Returns
131 -------
132 data_id : `lsst.daf.butler.DataCoordinate`
133 An expanded data_id instance.
134 """
135 universe = DimensionUniverse()
137 instrument = universe["instrument"]
138 instrument_record = instrument.RecordClass(
139 name="DummyCam",
140 class_name="lsst.obs.base.instrument_tests.DummyCam",
141 detector_max=detector_max,
142 visit_max=visit_max,
143 )
145 skymap = universe["skymap"]
146 skymap_record = skymap.RecordClass(name="test_skymap")
148 band_element = universe["band"]
149 band_record = band_element.RecordClass(name=band)
151 visit = universe["visit"]
152 visit_record = visit.RecordClass(id=visit_id, instrument="test")
154 detector = universe["detector"]
155 detector_record = detector.RecordClass(id=detector_id, instrument="test")
157 physical_filter = universe["physical_filter"]
158 physical_filter_record = physical_filter.RecordClass(name=band, instrument="test", band=band)
160 patch_element = universe["patch"]
161 patch_record = patch_element.RecordClass(
162 skymap="test_skymap",
163 tract=tract,
164 patch=patch,
165 )
167 if "day_obs" in universe:
168 day_obs_element = universe["day_obs"]
169 day_obs_record = day_obs_element.RecordClass(id=20240201, instrument="test")
170 else:
171 day_obs_record = None
173 # A dictionary with all the relevant records.
174 record = {
175 "instrument": instrument_record,
176 "visit": visit_record,
177 "detector": detector_record,
178 "patch": patch_record,
179 "tract": 9813,
180 "band": band_record.name,
181 "skymap": skymap_record.name,
182 "physical_filter": physical_filter_record,
183 }
185 if day_obs_record:
186 record["day_obs"] = day_obs_record
188 # A dictionary with all the relevant recordIds.
189 record_id = record.copy()
190 for key in ("visit", "detector"):
191 record_id[key] = record_id[key].id
193 # TODO: Catching mypy failures on Github Actions should be made easier,
194 # perhaps in DM-36873. Igroring these for now.
195 data_id = DataCoordinate.standardize(record_id, universe=universe)
196 return data_id.expanded(record)
198 def test_makeWarp(self):
199 """Test basic MakeDirectWarpTask."""
200 dataRef = InMemoryDatasetHandle(self.exposure.clone(), dataId=self.dataId)
201 config = copy.copy(self.config)
203 makeWarp = MakeDirectWarpTask(config=config)
204 warp_detector_inputs = {
205 dataRef.dataId.detector.id: WarpDetectorInputs(
206 exposure_or_handle=dataRef,
207 data_id=dataRef.dataId,
208 )
209 }
210 result = makeWarp.run(warp_detector_inputs, sky_info=copy.deepcopy(self.skyInfo), visit_summary=None)
212 warp = result.warp
213 mfrac = result.masked_fraction_warp
214 noise = result.noise_warp0
216 # Ensure we got an exposure out
217 self.assertIsInstance(warp, lsst.afw.image.ExposureF)
218 # Ensure that masked fraction is an ImageF object.
219 self.assertIsInstance(mfrac, lsst.afw.image.ImageF)
220 # Ensure that the noise image is a MaskedImageF object.
221 self.assertIsInstance(noise, lsst.afw.image.MaskedImageF)
222 # Check that the noise image is not accidentally the same as the image.
223 with self.assertRaises(AssertionError):
224 self.assertImagesAlmostEqual(noise.image, warp.image)
225 # Ensure the warp has valid pixels
226 self.assertGreater(np.isfinite(warp.image.array.ravel()).sum(), 0)
227 # Ensure the warp has the correct WCS
228 self.assertEqual(warp.getWcs(), self.skyInfo.wcs)
229 # Ensure that mfrac has pixels between 0 and 1
230 self.assertTrue(np.nanmax(mfrac.array) <= 1)
231 self.assertTrue(np.nanmin(mfrac.array) >= 0)
233 def make_backgroundList(self):
234 """Obtain a BackgroundList for the image."""
235 bgCtrl = afwMath.BackgroundControl(10, 10)
236 interpStyle = afwMath.Interpolate.AKIMA_SPLINE
237 undersampleStyle = afwMath.REDUCE_INTERP_ORDER
238 approxStyle = afwMath.ApproximateControl.UNKNOWN
239 approxOrderX = 0
240 approxOrderY = 0
241 approxWeighting = False
243 backgroundList = afwMath.BackgroundList()
245 bkgd = afwMath.makeBackground(self.exposure.image, bgCtrl)
247 backgroundList.append(
248 (bkgd, interpStyle, undersampleStyle, approxStyle, approxOrderX, approxOrderY, approxWeighting)
249 )
251 return backgroundList
253 @lsst.utils.tests.methodParametersProduct(
254 doApplyNewBackground=[True, False], doRevertOldBackground=[True, False]
255 )
256 def test_backgrounds(self, doApplyNewBackground, doRevertOldBackground):
257 """Test that applying and reverting backgrounds runs without errors,
258 especially on noise images.
259 """
260 dataRef = InMemoryDatasetHandle(self.exposure.clone(), dataId=self.dataId)
261 config = copy.copy(self.config)
263 backgroundList = self.make_backgroundList()
265 warp_detector_inputs = {
266 dataRef.dataId.detector.id: WarpDetectorInputs(
267 exposure_or_handle=dataRef,
268 data_id=dataRef.dataId,
269 background_apply=backgroundList if doApplyNewBackground else None,
270 background_revert=backgroundList if doRevertOldBackground else None,
271 )
272 }
274 config.numberOfNoiseRealizations = 1
275 config.doApplyNewBackground = doApplyNewBackground
276 config.doRevertOldBackground = doRevertOldBackground
278 makeWarp = MakeDirectWarpTask(config=config)
279 makeWarp.run(warp_detector_inputs, sky_info=copy.deepcopy(self.skyInfo), visit_summary=None)
281 def test_flat_background_ratio(self):
282 """Test that using the flat background ratio works."""
283 backgroundList = self.make_backgroundList()
285 dataRef = InMemoryDatasetHandle(self.exposure.clone(), dataId=self.dataId)
286 config = copy.copy(self.config)
288 warp_detector_inputs_basic = {
289 dataRef.dataId.detector.id: WarpDetectorInputs(
290 exposure_or_handle=dataRef,
291 data_id=dataRef.dataId,
292 )
293 }
295 makeWarpBasic = MakeDirectWarpTask(config=config)
296 resultBasic = makeWarpBasic.run(
297 warp_detector_inputs_basic,
298 sky_info=copy.deepcopy(self.skyInfo),
299 visit_summary=None,
300 )
302 dataRef = InMemoryDatasetHandle(self.exposure.clone(), dataId=self.dataId)
303 backgroundRatioDataRef = InMemoryDatasetHandle(
304 self.backgroundToPhotometricRatio.clone(),
305 dataId=self.dataId,
306 )
308 warp_detector_inputs = {
309 dataRef.dataId.detector.id: WarpDetectorInputs(
310 exposure_or_handle=dataRef,
311 data_id=dataRef.dataId,
312 background_apply=backgroundList,
313 background_revert=backgroundList,
314 background_ratio_or_handle=backgroundRatioDataRef,
315 )
316 }
318 config.numberOfNoiseRealizations = 1
319 config.doApplyNewBackground = True
320 config.doRevertOldBackground = True
321 config.doApplyFlatBackgroundRatio = True
323 makeWarp = MakeDirectWarpTask(config=config)
324 result = makeWarp.run(warp_detector_inputs, sky_info=copy.deepcopy(self.skyInfo), visit_summary=None)
326 finite = np.isfinite(result.warp.image.array)
327 delta = result.warp.image.array[finite] - resultBasic.warp.image.array[finite]
328 self.assertFloatsAlmostEqual(np.median(delta), 0.0, atol=1e-6)
330 def test_background_errors(self):
331 """Test that MakeDirectWarpTask raises errors when backgrounds are not
332 set correctly.
333 """
334 dataRef = InMemoryDatasetHandle(self.exposure.clone(), dataId=self.dataId)
335 config = copy.copy(self.config)
337 backgroundList = self.make_backgroundList()
338 config.numberOfNoiseRealizations = 1
340 warp_detector_inputs = {
341 dataRef.dataId.detector.id: WarpDetectorInputs(
342 exposure_or_handle=dataRef,
343 data_id=dataRef.dataId,
344 background_apply=backgroundList,
345 )
346 }
347 makeWarp = MakeDirectWarpTask(config=config)
348 with self.assertRaises(RuntimeError, msg="doApplyNewBackground is False, but"):
349 makeWarp.run(warp_detector_inputs, sky_info=copy.deepcopy(self.skyInfo), visit_summary=None)
351 warp_detector_inputs = {
352 dataRef.dataId.detector.id: WarpDetectorInputs(
353 exposure_or_handle=dataRef,
354 data_id=dataRef.dataId,
355 background_apply=None,
356 )
357 }
358 config.doApplyNewBackground = True
359 makeWarp = MakeDirectWarpTask(config=config)
360 with self.assertRaises(RuntimeError, msg="No background to apply"):
361 makeWarp.run(warp_detector_inputs, sky_info=copy.deepcopy(self.skyInfo), visit_summary=None)
363 warp_detector_inputs = {
364 dataRef.dataId.detector.id: WarpDetectorInputs(
365 exposure_or_handle=dataRef,
366 data_id=dataRef.dataId,
367 background_apply=backgroundList,
368 background_revert=backgroundList,
369 )
370 }
371 makeWarp = MakeDirectWarpTask(config=config)
372 with self.assertRaises(RuntimeError, msg="doRevertOldBackground is False, but"):
373 makeWarp.run(warp_detector_inputs, sky_info=copy.deepcopy(self.skyInfo), visit_summary=None)
375 warp_detector_inputs = {
376 dataRef.dataId.detector.id: WarpDetectorInputs(
377 exposure_or_handle=dataRef,
378 data_id=dataRef.dataId,
379 background_apply=backgroundList,
380 background_revert=None,
381 )
382 }
383 config.doRevertOldBackground = True
384 makeWarp = MakeDirectWarpTask(config=config)
385 with self.assertRaises(RuntimeError, msg="No background to revert"):
386 makeWarp.run(warp_detector_inputs, sky_info=copy.deepcopy(self.skyInfo), visit_summary=None)
388 def test_long_data_ids(self):
389 """Test MakeDirectWarpTask fails gracefully with no good pixels.
391 It should return an empty exposure, with no PSF.
392 """
393 dataRef = InMemoryDatasetHandle(self.exposure.clone(), dataId=self.dataId)
394 config = copy.copy(self.config)
396 warp_detector_inputs = {
397 dataRef.dataId.detector.id: WarpDetectorInputs(
398 exposure_or_handle=dataRef,
399 data_id=dataRef.dataId,
400 )
401 }
403 config.border = 0 # Repeated calls will expand it otherwise.
404 makeWarp_original = MakeDirectWarpTask(config=config)
405 makeWarp_short = MakeDirectWarpTask(config=config)
406 makeWarp_short.get_seed_from_data_id = (
407 lambda data_id: 2**32 - 1 + makeWarp_original.get_seed_from_data_id(data_id)
408 )
409 makeWarp_long = MakeDirectWarpTask(config=config)
410 makeWarp_long.get_seed_from_data_id = lambda data_id: 2**32 + makeWarp_original.get_seed_from_data_id(
411 data_id
412 )
414 result_long = makeWarp_long.run(
415 warp_detector_inputs,
416 sky_info=copy.deepcopy(self.skyInfo),
417 visit_summary=None,
418 )
419 result_short = makeWarp_short.run(
420 warp_detector_inputs,
421 sky_info=copy.deepcopy(self.skyInfo),
422 visit_summary=None,
423 )
424 result_original = makeWarp_original.run(
425 warp_detector_inputs,
426 sky_info=copy.deepcopy(self.skyInfo),
427 visit_summary=None,
428 )
430 self.assertMaskedImagesAlmostEqual(result_long.noise_warp0, result_original.noise_warp0, atol=6e-8)
431 with self.assertRaises(AssertionError):
432 self.assertMaskedImagesEqual(result_short.noise_warp0, result_original.noise_warp0)
435class MakeWarpNoGoodPixelsTestCase(MakeWarpTestCase):
437 @classmethod
438 def setUpClass(cls):
439 super().setUpClass()
440 cls.exposure.mask.array |= cls.exposure.mask.getPlaneBitMask("NO_DATA")
442 def test_makeWarp(self):
443 """Test MakeDirectWarpTask fails gracefully with no good pixels.
445 It should return an empty exposure, with no PSF.
446 """
447 dataRef = InMemoryDatasetHandle(self.exposure.clone(), dataId=self.dataId)
448 config = copy.copy(self.config)
450 makeWarp = MakeDirectWarpTask(config=config)
451 warp_detector_inputs = {
452 dataRef.dataId.detector.id: WarpDetectorInputs(
453 exposure_or_handle=dataRef,
454 data_id=dataRef.dataId,
455 )
456 }
457 result = makeWarp.run(warp_detector_inputs, sky_info=copy.deepcopy(self.skyInfo), visit_summary=None)
459 # Ensure we got None
460 self.assertIsNone(result.warp)
461 self.assertIsNone(result.masked_fraction_warp)
462 self.assertIsNone(result.noise_warp0)
464 def test_compare_warps(self):
465 """This test is not applicable when there are no good pixels."""
467 def test_long_data_ids(self):
468 """This test is not applicable when there are no good pixels."""
470 def test_flat_background_ratio(self):
471 """This test is not applicable when there are no good pixels."""
474def setup_module(module):
475 lsst.utils.tests.init()
478class MatchMemoryTestCase(lsst.utils.tests.MemoryTestCase):
479 pass
482if __name__ == "__main__": 482 ↛ 483line 482 didn't jump to line 483 because the condition on line 482 was never true
483 lsst.utils.tests.init()
484 unittest.main()