Coverage for tests / test_makeSurveyPropertyMaps.py: 17%
172 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-30 09:11 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-30 09:11 +0000
1# This file is part of pipe_tasks.
2#
3# LSST Data Management System
4# This product includes software developed by the
5# LSST Project (http://www.lsst.org/).
6# See COPYRIGHT file at the top of the source tree.
7#
8# This program is free software: you can redistribute it and/or modify
9# it under the terms of the GNU General Public License as published by
10# the Free Software Foundation, either version 3 of the License, or
11# (at your option) any later version.
12#
13# This program is distributed in the hope that it will be useful,
14# but WITHOUT ANY WARRANTY; without even the implied warranty of
15# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16# GNU General Public License for more details.
17#
18# You should have received a copy of the LSST License Statement and
19# the GNU General Public License along with this program. If not,
20# see <https://www.lsstcorp.org/LegalNotices/>.
21#
22"""Test HealSparsePropertyMapTask.
23"""
24import unittest
25import numpy as np
26import healsparse as hsp
27import esutil
28import warnings
29import logging
30from astropy import units
32import lsst.utils.tests
33import lsst.daf.butler
34import lsst.afw.table as afwTable
35import lsst.afw.geom as afwGeom
36import lsst.afw.image as afwImage
37from lsst.skymap.discreteSkyMap import DiscreteSkyMap
38import lsst.geom as geom
40from lsst.pipe.base import InMemoryDatasetHandle
41from lsst.pipe.tasks.healSparseMapping import HealSparsePropertyMapTask
42from lsst.pipe.tasks.healSparseMappingProperties import (register_property_map,
43 BasePropertyMap)
45from surveyPropertyMapsTestUtils import makeMockVisitSummary
48# Test creation of an arbitrary new property map by registering it here
49# and using it in the test class.
50@register_property_map("dist_times_psfarea")
51class DistTimesPsfAreaPropertyMap(BasePropertyMap):
52 """Property map to compute the distance from the boresight center
53 by the psf area. Do not try this at home."""
54 description = "Distance times psf area"
55 unit = "deg*pixel**2"
56 requires_psf = True
58 def _compute(self, row, ra, dec, scalings, psf_array=None):
59 boresight = row.getVisitInfo().getBoresightRaDec()
60 dist = esutil.coords.sphdist(ra, dec,
61 boresight.getRa().asDegrees(), boresight.getDec().asDegrees())
62 return np.deg2rad(dist)*psf_array['psf_area']
65class HealSparsePropertyMapTaskTestCase(lsst.utils.tests.TestCase):
66 """Test of HealSparsePropertyMapTask.
68 These tests bypass the middleware used for accessing data and
69 managing Task execution.
70 """
71 def setUp(self):
72 tract = 0
73 band = 'r'
74 patch = 0
75 visits = [100, 101]
76 # Good to test crossing 0.
77 ra_center = 0.0
78 dec_center = -45.0
79 pixel_scale = 0.2
80 coadd_zp = 27.0
82 # Generate a mock skymap with one patch
83 config = DiscreteSkyMap.ConfigClass()
84 config.raList = [ra_center]
85 config.decList = [dec_center]
86 config.radiusList = [150*pixel_scale/3600.]
87 config.patchInnerDimensions = (350, 350)
88 config.patchBorder = 50
89 config.tractOverlap = 0.0
90 config.pixelScale = pixel_scale
91 sky_map = DiscreteSkyMap(config)
93 visit_summaries = [makeMockVisitSummary(visit,
94 ra_center=ra_center,
95 dec_center=dec_center)
96 for visit in visits]
97 visit_summary_refs = [InMemoryDatasetHandle(visit_summary, visit=visit)
98 for visit_summary, visit in zip(visit_summaries, visits)]
99 self.visit_summary_dict = {visit: ref.get()
100 for ref, visit in zip(visit_summary_refs, visits)}
102 # Generate an input map. Note that this does not need to be consistent
103 # with the visit_summary projections, we're just tracking values.
104 with warnings.catch_warnings():
105 # Healsparse will emit a warning if nside coverage is greater than
106 # 128. In the case of generating patch input maps, and not global
107 # maps, high nside coverage works fine, so we can suppress this
108 # warning.
109 warnings.simplefilter("ignore")
110 input_map = hsp.HealSparseMap.make_empty(nside_coverage=256,
111 nside_sparse=32768,
112 dtype=hsp.WIDE_MASK,
113 wide_mask_maxbits=len(visits)*2)
115 patch_poly = afwGeom.Polygon(geom.Box2D(sky_map[tract][patch].getOuterBBox()))
116 sph_pts = sky_map[tract].getWcs().pixelToSky(patch_poly.convexHull().getVertices())
117 patch_poly_radec = np.array([(sph.getRa().asDegrees(), sph.getDec().asDegrees())
118 for sph in sph_pts])
119 poly = hsp.Polygon(ra=patch_poly_radec[: -1, 0],
120 dec=patch_poly_radec[: -1, 1],
121 value=[0])
122 poly_pixels = poly.get_pixels(nside=input_map.nside_sparse)
123 # The input map has full coverage for bits 0 and 1
124 input_map.set_bits_pix(poly_pixels, [0])
125 input_map.set_bits_pix(poly_pixels, [1])
127 input_map_ref = InMemoryDatasetHandle(input_map, patch=patch, tract=tract)
128 self.input_map_dict = {patch: input_map_ref}
130 coadd = afwImage.ExposureF(sky_map[tract][patch].getOuterBBox(),
131 sky_map[tract].getWcs())
132 instFluxMag0 = 10.**(coadd_zp/2.5)
133 pc = afwImage.makePhotoCalibFromCalibZeroPoint(instFluxMag0)
134 coadd.setPhotoCalib(pc)
136 # Mock the coadd input ccd table
137 schema = afwTable.ExposureTable.makeMinimalSchema()
138 schema.addField("ccd", type="I")
139 schema.addField("visit", type="I")
140 schema.addField("weight", type="F")
141 ccds = afwTable.ExposureCatalog(schema)
142 ccds.resize(2)
143 ccds['id'] = np.arange(2)
144 ccds['visit'][0] = visits[0]
145 ccds['visit'][1] = visits[1]
146 ccds['ccd'][0] = 0
147 ccds['ccd'][1] = 1
148 ccds['weight'] = 10.0
149 for ccd_row in ccds:
150 summary = self.visit_summary_dict[ccd_row['visit']].find(ccd_row['ccd'])
151 ccd_row.setWcs(summary.getWcs())
152 ccd_row.setPsf(summary.getPsf())
153 ccd_row.setBBox(summary.getBBox())
154 ccd_row.setPhotoCalib(summary.getPhotoCalib())
156 inputs = afwImage.CoaddInputs()
157 inputs.ccds = ccds
158 coadd.getInfo().setCoaddInputs(inputs)
160 metadata = {}
161 for bit, row in enumerate(ccds):
162 metadata[f"B{bit:04d}CCD"] = row["ccd"]
163 metadata[f"B{bit:04d}VIS"] = row["visit"]
164 metadata[f"B{bit:04d}WT"] = row["weight"]
165 input_map.metadata = metadata
167 coadd_ref = InMemoryDatasetHandle(coadd, patch=patch, tract=tract, storageClass="ExposureF")
168 self.coadd_dict = {patch: coadd_ref}
170 self.tract = tract
171 self.band = band
172 self.sky_map = sky_map
173 self.input_map = input_map
175 def testPropertyMapCreation(self):
176 """Test creation of property maps."""
177 config = HealSparsePropertyMapTask.ConfigClass()
179 # Add our new test map to the set of maps
180 config.property_maps.names |= ['dist_times_psfarea']
181 config.property_maps['dist_times_psfarea'].do_min = True
182 config.property_maps['dist_times_psfarea'].do_max = True
183 config.property_maps['dist_times_psfarea'].do_mean = True
185 property_task = HealSparsePropertyMapTask(config=config)
187 property_task.run(self.sky_map,
188 self.tract,
189 self.band,
190 self.coadd_dict,
191 self.input_map_dict,
192 self.visit_summary_dict)
194 valid_pixels = self.input_map.valid_pixels
196 # Verify each map exists and has the correct pixels set.
197 for name, map_config, PropertyMapClass in config.property_maps.apply():
198 self.assertTrue(name in property_task.property_maps)
199 property_map = property_task.property_maps[name]
200 if map_config.do_min:
201 self.assertTrue(hasattr(property_map, 'min_map'))
202 np.testing.assert_array_equal(property_map.min_map.valid_pixels, valid_pixels)
203 metadata = property_map.min_map.metadata
204 self.assertIsNotNone(metadata["DESCRIPTION"])
205 self.assertEqual(metadata["OPERATION"], "minimum")
206 unit = units.Unit(metadata["UNIT"])
207 self.assertIsNotNone(unit)
208 else:
209 self.assertFalse(hasattr(property_map, 'min_map'))
210 if map_config.do_max:
211 self.assertTrue(hasattr(property_map, 'max_map'))
212 np.testing.assert_array_equal(property_map.max_map.valid_pixels, valid_pixels)
213 metadata = property_map.max_map.metadata
214 self.assertIsNotNone(metadata["DESCRIPTION"])
215 self.assertEqual(metadata["OPERATION"], "maximum")
216 unit = units.Unit(metadata["UNIT"])
217 self.assertIsNotNone(unit)
218 else:
219 self.assertFalse(hasattr(property_map, 'max_map'))
220 if map_config.do_mean:
221 self.assertTrue(hasattr(property_map, 'mean_map'))
222 np.testing.assert_array_equal(property_map.mean_map.valid_pixels, valid_pixels)
223 metadata = property_map.mean_map.metadata
224 self.assertIsNotNone(metadata["DESCRIPTION"])
225 self.assertEqual(metadata["OPERATION"], "mean")
226 unit = units.Unit(metadata["UNIT"])
227 self.assertIsNotNone(unit)
228 else:
229 self.assertFalse(hasattr(property_map, 'mean_map'))
230 if map_config.do_weighted_mean:
231 self.assertTrue(hasattr(property_map, 'weighted_mean_map'))
232 np.testing.assert_array_equal(property_map.weighted_mean_map.valid_pixels, valid_pixels)
233 metadata = property_map.weighted_mean_map.metadata
234 self.assertIsNotNone(metadata["DESCRIPTION"])
235 self.assertEqual(metadata["OPERATION"], "weighted mean")
236 unit = units.Unit(metadata["UNIT"])
237 self.assertIsNotNone(unit)
238 else:
239 self.assertFalse(hasattr(property_map, 'weighted_mean_map'))
240 if map_config.do_sum:
241 self.assertTrue(hasattr(property_map, 'sum_map'))
242 np.testing.assert_array_equal(property_map.sum_map.valid_pixels, valid_pixels)
243 metadata = property_map.sum_map.metadata
244 self.assertIsNotNone(metadata["DESCRIPTION"])
245 self.assertEqual(metadata["OPERATION"], "sum")
246 unit = units.Unit(metadata["UNIT"])
247 self.assertIsNotNone(unit)
248 else:
249 self.assertFalse(hasattr(property_map, 'sum_map'))
251 def testPropertyMapCreationEmptyInputMap(self):
252 """Test creation of maps with an empty input map (DM-37837)."""
253 # Replace the input map with an empty one.
254 with warnings.catch_warnings():
255 warnings.simplefilter("ignore")
256 self.input_map_dict[0] = InMemoryDatasetHandle(hsp.HealSparseMap.make_empty_like(
257 self.input_map_dict[0].inMemoryDataset
258 ), **self.input_map_dict[0].dataId)
260 config = HealSparsePropertyMapTask.ConfigClass()
262 property_task = HealSparsePropertyMapTask(config=config)
264 with self.assertLogs(level=logging.WARNING) as cm:
265 property_task.run(self.sky_map,
266 self.tract,
267 self.band,
268 self.coadd_dict,
269 self.input_map_dict,
270 self.visit_summary_dict)
271 self.assertIn("No valid pixels", cm.output[0])
273 # Ensure we have no valid pixels output.
274 self.assertEqual(property_task.property_maps["exposure_time"].sum_map.valid_pixels.size, 0)
277class MyMemoryTestCase(lsst.utils.tests.MemoryTestCase):
278 pass
281def setup_module(module):
282 lsst.utils.tests.init()
285if __name__ == "__main__": 285 ↛ 286line 285 didn't jump to line 286 because the condition on line 285 was never true
286 lsst.utils.tests.init()
287 unittest.main()