Coverage for tests / test_assemble_cell_coadd.py: 22%
157 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-01 08:45 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-01 08:45 +0000
1# This file is part of drp_tasks.
2#
3# LSST Data Management System
4# This product includes software developed by the
5# LSST Project (http://www.lsst.org/).
6# See COPYRIGHT file at the top of the source tree.
7#
8# This program is free software: you can redistribute it and/or modify
9# it under the terms of the GNU General Public License as published by
10# the Free Software Foundation, either version 3 of the License, or
11# (at your option) any later version.
12#
13# This program is distributed in the hope that it will be useful,
14# but WITHOUT ANY WARRANTY; without even the implied warranty of
15# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16# GNU General Public License for more details.
17#
18# You should have received a copy of the LSST License Statement and
19# the GNU General Public License along with this program. If not,
20# see <https://www.lsstcorp.org/LegalNotices/>.
21#
23from __future__ import annotations
25import unittest
26import warnings
27from typing import TYPE_CHECKING, Iterable
29import galsim
30import hpgeom as hpg
31import numpy as np
32from assemble_coadd_test_utils import MockCoaddTestData, makeMockSkyInfo
34import lsst.afw.geom as afwGeom
35import lsst.afw.image as afwImage
36import lsst.pipe.base as pipeBase
37import lsst.utils.tests
38from lsst.cell_coadds import CommonComponents
39from lsst.drp.tasks.assemble_cell_coadd import (
40 AssembleCellCoaddConfig,
41 AssembleCellCoaddTask,
42 WarpInputs,
43)
45if TYPE_CHECKING:
46 from lsst.cell_coadds import ObservationIdentifiers
48__all__ = (
49 "MockAssembleCellCoaddConfig",
50 "MockAssembleCellCoaddTask",
51)
54class MockAssembleCellCoaddConfig(AssembleCellCoaddConfig):
55 def setDefaults(self):
56 super().setDefaults()
57 self.require_artifact_mask = False
60class MockAssembleCellCoaddTask(AssembleCellCoaddTask):
61 """Lightly modified version of `AssembleCellCoaddTask` for unit tests.
63 The modifications bypass the usual middleware for loading data and setting
64 up the Task, and instead supply in-memory mock data references to the `run`
65 method so that the coaddition algorithms can be tested without a Butler.
66 """
68 ConfigClass = MockAssembleCellCoaddConfig
70 def runQuantum(
71 self,
72 mockSkyInfo,
73 warpRefList,
74 maskedFractionRefList,
75 noise0RefList,
76 visitSummaryList=None,
77 ):
78 """Modified interface for testing coaddition algorithms without a
79 Butler.
81 Parameters
82 ----------
83 mockSkyInfo : `lsst.pipe.base.Struct`
84 A simple container that supplies a bounding box and WCS in the
85 same format as the output of
86 `lsst.pipe.tasks.CoaddBaseTask.getSkyInfo`
87 warpRefList : `list` of `lsst.pipe.tasks.MockExposureReference`
88 Data references to the test exposures that will be coadded,
89 using the Gen 3 API.
91 Returns
92 -------
93 retStruct : `lsst.pipe.base.Struct`
94 The coadded exposure and associated metadata.
95 """
97 self.common = CommonComponents(
98 units=None,
99 wcs=mockSkyInfo.wcs,
100 band="i",
101 identifiers=pipeBase.Struct(skymap=None, tract=0, patch=42, band="i"),
102 )
104 inputs = {}
105 for warpInput, maskedFractionInput, noise0Input in zip(
106 warpRefList,
107 maskedFractionRefList,
108 noise0RefList,
109 ):
110 inputs[warpInput.dataId] = WarpInputs(
111 warp=warpInput,
112 masked_fraction=maskedFractionInput,
113 noise_warps=[noise0Input],
114 )
116 retStruct = self.run(
117 inputs=inputs,
118 skyInfo=mockSkyInfo,
119 visitSummaryList=visitSummaryList,
120 )
122 return retStruct
125class AssembleCellCoaddTestCase(lsst.utils.tests.TestCase):
126 """Tests of AssembleCellCoaddTask.
128 These tests bypass the middleware used for accessing data and managing Task
129 execution.
130 """
132 @classmethod
133 def setUpClass(cls) -> None:
134 np.random.seed(42)
135 rng = np.random.Generator(np.random.MT19937(42))
136 patch = 42
137 tract = 0
138 testData = MockCoaddTestData(fluxRange=1e4)
139 exposures = {}
140 matchedExposures = {}
141 masked_fraction_images = {}
142 noise0_masked_images = {}
143 for expId in range(100, 110):
144 exposures[expId], matchedExposures[expId] = testData.makeTestImage(expId)
145 masked_fraction_images[expId] = afwImage.ImageF(bbox=exposures[expId].getBBox())
146 masked_fraction_images[expId].array[:, :] = rng.random(masked_fraction_images[expId].array.shape)
147 noise0_masked_images[expId] = afwImage.MaskedImageF(bbox=exposures[expId].getBBox())
148 noise0_masked_images[expId].image.array[:, :] = rng.normal(
149 0, 1, noise0_masked_images[expId].image.array.shape
150 ) * (exposures[expId].variance.array**0.5)
152 cls.handleList = testData.makeDataRefList(exposures, patch=patch, tract=tract)
153 cls.maskedFractionRefList = testData.makeDataRefList(masked_fraction_images, patch=patch, tract=tract)
154 cls.noise0RefList = testData.makeDataRefList(noise0_masked_images, patch=patch, tract=tract)
155 cls.visitSummaryList = [
156 testData.makeVisitSummaryTableHandle(warpHandle) for warpHandle in cls.handleList
157 ]
158 cls.skyInfo = makeMockSkyInfo(testData.bbox, testData.wcs, patch=patch)
160 def runTask(
161 self,
162 config=None,
163 warpRefList=None,
164 maskedFractionRefList=None,
165 noise0RefList=None,
166 visitSummaryList=None,
167 ) -> None:
168 if config is None:
169 config = MockAssembleCellCoaddConfig()
170 config.do_input_map = True
171 assembleTask = MockAssembleCellCoaddTask(config=config)
172 if warpRefList is None:
173 warpRefList = self.handleList
174 if maskedFractionRefList is None:
175 maskedFractionRefList = self.maskedFractionRefList
176 if noise0RefList is None:
177 noise0RefList = self.noise0RefList
178 if visitSummaryList is None:
179 visitSummaryList = self.visitSummaryList
181 self.result = assembleTask.runQuantum(
182 self.skyInfo,
183 warpRefList=warpRefList,
184 maskedFractionRefList=maskedFractionRefList,
185 noise0RefList=noise0RefList,
186 visitSummaryList=visitSummaryList,
187 )
189 def checkSortOrder(self, inputs: Iterable[ObservationIdentifiers]) -> None:
190 """Check that the inputs are sorted.
192 The inputs must be sorted first by visit, and within the same visit,
193 by detector.
195 Parameters
196 ----------
197 inputs : `Iterable` [`ObservationIdentifiers`]
198 The inputs to be checked.
199 """
200 visit, detector = -np.inf, -np.inf # Previous visit, detector IDs.
201 for _, obsId in enumerate(inputs):
202 with self.subTest(input_number=repr(obsId)):
203 self.assertGreaterEqual(obsId.visit, visit)
204 if visit == obsId.visit:
205 with self.subTest(detector_number=repr(obsId.detector)):
206 self.assertGreaterEqual(obsId.detector, detector)
208 visit, detector = obsId.visit, obsId.detector
210 def checkRun(self, assembleTask):
211 """Check that the task runs successfully."""
212 result = assembleTask.runQuantum(self.skyInfo, self.handleList)
214 # Check that we produced an exposure.
215 self.assertTrue(result.multipleCellCoadd is not None)
216 # Check that the visit_count method returns a number less than or equal
217 # to the total number of input exposures available.
218 max_visit_count = len(self.handleList)
219 for cellId, singleCellCoadd in result.multipleCellCoadd.cells.items():
220 with self.subTest(x=repr(cellId.x), y=repr(cellId.y)):
221 self.assertLessEqual(singleCellCoadd.visit_count, max_visit_count)
222 # Check that the aperture correction maps are not None.
223 with self.subTest(x=repr(cellId.x), y=repr(cellId.y)):
224 self.assertTrue(singleCellCoadd.aperture_correction_map is not None)
225 # Check that the inputs are sorted.
226 with self.subTest(x=repr(cellId.x), y=repr(cellId.y)):
227 self.checkSortOrder(singleCellCoadd.inputs)
229 def test_assemble_basic(self):
230 """Test that AssembleCellCoaddTask runs successfully without errors.
232 This test does not check the correctness of the coaddition algorithms.
233 This is intended to prevent the code from bit rotting.
234 """
235 self.runTask()
236 # Check that we produced an exposure.
237 self.assertTrue(self.result.multipleCellCoadd is not None)
238 self.assertTrue(self.result.inputMap is not None)
240 # Check the input map.
241 inputMap = self.result.inputMap
243 warp_input_list = [handle.get() for handle in self.handleList]
244 visit_detectors = []
245 for warp_input in warp_input_list:
246 for row in warp_input.getInfo().getCoaddInputs().ccds:
247 visit_detectors.append((int(row["visit"]), int(row["ccd"])))
249 for bit, (visit, detector) in enumerate(visit_detectors):
250 self.assertEqual(inputMap.metadata[f"B{bit:04d}VIS"], visit)
251 self.assertEqual(inputMap.metadata[f"B{bit:04d}CCD"], detector)
252 self.assertGreater(inputMap.metadata[f"B{bit:04d}WT"], 0.0)
254 coadd_poly = afwGeom.Polygon(lsst.geom.Box2D(self.result.multipleCellCoadd.outer_bbox))
255 sph_pts = self.result.multipleCellCoadd.wcs.pixelToSky(coadd_poly)
256 radec = np.asarray([(sph.getRa().asDegrees(), sph.getDec().asDegrees()) for sph in sph_pts])
257 pixels = hpg.query_polygon(inputMap.nside_sparse, radec[:-1, 0], radec[:-1, 1])
258 np.testing.assert_array_equal(pixels, inputMap.valid_pixels)
260 def test_assemble_empty(self):
261 """Test that AssembleCellCoaddTask runs successfully without errors
262 when no input exposures are provided."""
263 self.result = None # so tearDown has something.
264 with self.assertRaises(pipeBase.NoWorkFound, msg="No cells could be populated for the cell coadd."):
265 self.runTask(warpRefList=[], maskedFractionRefList=[], noise0RefList=[], visitSummaryList=[])
267 def test_assemble_without_visitSummary(self):
268 """Test that AssembleCellCoaddTask calculates detector weights and
269 runs successfully without errors when no visit summaries are provided.
270 """
271 self.runTask(visitSummaryList=[])
272 # Check that we produced an exposure.
273 self.assertTrue(self.result.multipleCellCoadd is not None)
274 self.assertTrue(self.result.inputMap is not None)
276 # TODO: Remove this test in DM-49401
277 @lsst.utils.tests.methodParameters(do_scale_zero_point=[False, True])
278 def test_do_scale_zero_point(self, do_scale_zero_point):
279 config = MockAssembleCellCoaddConfig()
280 with warnings.catch_warnings():
281 warnings.filterwarnings("ignore", category=FutureWarning)
282 config.do_scale_zero_point = do_scale_zero_point
283 self.runTask(config)
284 # Check that we produced an exposure.
285 self.assertTrue(self.result.multipleCellCoadd is not None)
286 self.assertTrue(self.result.inputMap is not None)
288 @lsst.utils.tests.methodParameters(do_calculate_weight_from_warp=[False, True])
289 def test_do_calculate_weight_from_warp(self, do_calculate_weight_from_warp):
290 config = MockAssembleCellCoaddConfig()
291 config.do_calculate_weight_from_warp = do_calculate_weight_from_warp
292 self.runTask(config)
293 # Check that we produced an exposure.
294 self.assertTrue(self.result.multipleCellCoadd is not None)
295 self.assertTrue(self.result.inputMap is not None)
297 def test_visit_count(self):
298 """Check that the visit_count method returns a number less than or
299 equal to the total number of input exposures available.
300 """
301 self.runTask()
302 max_visit_count = len(self.handleList)
303 for cellId, singleCellCoadd in self.result.multipleCellCoadd.cells.items():
304 with self.subTest(x=repr(cellId.x), y=repr(cellId.y)):
305 self.assertLessEqual(singleCellCoadd.visit_count, max_visit_count)
307 def test_inputs_sorted(self):
308 """Check that the inputs are sorted.
310 The ordering is that inputs are sorted first by visit, and within the
311 same visit, they are ordered by detector.
312 """
313 self.runTask()
314 for _, singleCellCoadd in self.result.multipleCellCoadd.cells.items():
315 self.checkSortOrder(singleCellCoadd.inputs)
317 def test_psf_normalization(self):
318 """Check that the sum of PSF images is close to 1."""
319 self.runTask()
320 for cellId, singleCellCoadd in self.result.multipleCellCoadd.cells.items():
321 with self.subTest(x=repr(cellId.x), y=repr(cellId.y)):
322 self.assertFloatsAlmostEqual(singleCellCoadd.psf_image.array.sum(), 1.0, rtol=None, atol=1e-7)
324 def test_psf_centering(self):
325 """Check that the PSF images are centered."""
326 self.runTask()
327 for cellId, singleCellCoadd in self.result.multipleCellCoadd.cells.items():
328 with self.subTest(x=repr(cellId.x), y=repr(cellId.y)):
329 shape = galsim.hsm.FindAdaptiveMom(galsim.Image(singleCellCoadd.psf_image.array))
330 self.assertFloatsAlmostEqual(
331 shape.moments_centroid.x, round(shape.moments_centroid.x), rtol=None, atol=0.01
332 )
333 self.assertFloatsAlmostEqual(
334 shape.moments_centroid.y, round(shape.moments_centroid.y), rtol=None, atol=0.01
335 )
338class MyMemoryTestCase(lsst.utils.tests.MemoryTestCase):
339 pass
342def setup_module(module):
343 lsst.utils.tests.init()
346if __name__ == "__main__": 346 ↛ 347line 346 didn't jump to line 347 because the condition on line 346 was never true
347 lsst.utils.tests.init()
348 unittest.main()