Coverage for python / lsst / pipe / tasks / healSparseMapping.py: 16%
445 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-30 09:11 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-30 09:11 +0000
1# This file is part of pipe_tasks.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
22__all__ = ["HealSparseInputMapTask", "HealSparseInputMapConfig",
23 "HealSparseMapFormatter", "HealSparsePropertyMapConnections",
24 "HealSparsePropertyMapConfig", "HealSparsePropertyMapTask",
25 "ConsolidateHealSparsePropertyMapConnections",
26 "ConsolidateHealSparsePropertyMapConfig",
27 "ConsolidateHealSparsePropertyMapTask"]
29from collections import defaultdict
30import esutil
31import warnings
32import numbers
33import numpy as np
34import hpgeom as hpg
35import healsparse as hsp
37import lsst.pex.config as pexConfig
38import lsst.pipe.base as pipeBase
39import lsst.geom
40import lsst.afw.geom as afwGeom
41from lsst.daf.butler import Formatter
42from lsst.skymap import BaseSkyMap
43from lsst.utils.timer import timeMethod
44from .healSparseMappingProperties import (BasePropertyMap, BasePropertyMapConfig,
45 PropertyMapMap, compute_approx_psf_size_and_shape)
48class HealSparseMapFormatter(Formatter):
49 """Interface for reading and writing healsparse.HealSparseMap files."""
50 unsupportedParameters = frozenset()
51 supportedExtensions = frozenset({".hsp", ".fit", ".fits"})
52 extension = '.hsp'
54 def read(self, component=None):
55 # Docstring inherited from Formatter.read.
56 path = self.fileDescriptor.location.path
58 if component == 'coverage':
59 try:
60 data = hsp.HealSparseCoverage.read(path)
61 except (OSError, RuntimeError):
62 raise ValueError(f"Unable to read healsparse map with URI {self.fileDescriptor.location.uri}")
64 return data
66 if self.fileDescriptor.parameters is None:
67 pixels = None
68 degrade_nside = None
69 else:
70 pixels = self.fileDescriptor.parameters.get('pixels', None)
71 degrade_nside = self.fileDescriptor.parameters.get('degrade_nside', None)
72 try:
73 data = hsp.HealSparseMap.read(path, pixels=pixels, degrade_nside=degrade_nside)
74 except (OSError, RuntimeError):
75 raise ValueError(f"Unable to read healsparse map with URI {self.fileDescriptor.location.uri}")
77 return data
79 def write(self, inMemoryDataset):
80 # Docstring inherited from Formatter.write.
81 # Update the location with the formatter-preferred file extension
82 self.fileDescriptor.location.updateExtension(self.extension)
83 inMemoryDataset.write(self.fileDescriptor.location.path, clobber=True)
86def _is_power_of_two(value):
87 """Check that value is a power of two.
89 Parameters
90 ----------
91 value : `int`
92 Value to check.
94 Returns
95 -------
96 is_power_of_two : `bool`
97 True if value is a power of two; False otherwise, or
98 if value is not an integer.
99 """
100 if not isinstance(value, numbers.Integral):
101 return False
103 # See https://stackoverflow.com/questions/57025836
104 # Every power of 2 has exactly 1 bit set to 1; subtracting
105 # 1 therefore flips every preceding bit. If you and that
106 # together with the original value it must be 0.
107 return (value & (value - 1) == 0) and value != 0
110class HealSparseInputMapConfig(pexConfig.Config):
111 """Configuration parameters for HealSparseInputMapTask"""
112 nside = pexConfig.Field(
113 doc="Mapping healpix nside. Must be power of 2.",
114 dtype=int,
115 default=32768,
116 check=_is_power_of_two,
117 )
118 nside_coverage = pexConfig.Field(
119 doc="HealSparse coverage map nside. Must be power of 2.",
120 dtype=int,
121 default=256,
122 check=_is_power_of_two,
123 )
124 bad_mask_min_coverage = pexConfig.Field(
125 doc=("Minimum area fraction of a map healpixel pixel that must be "
126 "covered by bad pixels to be removed from the input map. "
127 "This is approximate."),
128 dtype=float,
129 default=0.5,
130 )
133class HealSparseInputMapTask(pipeBase.Task):
134 """Task for making a HealSparse input map."""
136 ConfigClass = HealSparseInputMapConfig
137 _DefaultName = "healSparseInputMap"
139 def __init__(self, **kwargs):
140 pipeBase.Task.__init__(self, **kwargs)
142 self.ccd_input_map = None
143 self.cell_input_map = None
145 def build_ccd_input_map(self, bbox, wcs, ccds):
146 """Build a map from ccd valid polygons or bounding boxes.
148 Parameters
149 ----------
150 bbox : `lsst.geom.Box2I`
151 Bounding box for region to build input map.
152 wcs : `lsst.afw.geom.SkyWcs`
153 WCS object for region to build input map.
154 ccds : `lsst.afw.table.ExposureCatalog`
155 Exposure catalog with ccd data from coadd inputs.
156 """
157 with warnings.catch_warnings():
158 # Healsparse will emit a warning if nside coverage is greater than
159 # 128. In the case of generating patch input maps, and not global
160 # maps, high nside coverage works fine, so we can suppress this
161 # warning.
162 warnings.simplefilter("ignore")
163 self.ccd_input_map = hsp.HealSparseMap.make_empty(nside_coverage=self.config.nside_coverage,
164 nside_sparse=self.config.nside,
165 dtype=hsp.WIDE_MASK,
166 wide_mask_maxbits=len(ccds))
167 self._wcs = wcs
168 self._bbox = bbox
169 self._ccds = ccds
171 pixel_scale = wcs.getPixelScale(bbox.getCenter()).asArcseconds()
172 hpix_area_arcsec2 = hpg.nside_to_pixel_area(self.config.nside, degrees=True)*(3600.**2.)
173 self._min_bad = self.config.bad_mask_min_coverage*hpix_area_arcsec2/(pixel_scale**2.)
175 metadata = {}
176 self._bits_per_visit_ccd = {}
177 self._bits_per_visit = defaultdict(list)
178 for bit, ccd_row in enumerate(ccds):
179 metadata[f"B{bit:04d}CCD"] = ccd_row["ccd"]
180 metadata[f"B{bit:04d}VIS"] = ccd_row["visit"]
181 metadata[f"B{bit:04d}WT"] = ccd_row["weight"]
183 self._bits_per_visit_ccd[(ccd_row["visit"], ccd_row["ccd"])] = bit
184 self._bits_per_visit[ccd_row["visit"]].append(bit)
186 ccd_poly = ccd_row.getValidPolygon()
187 if ccd_poly is None:
188 ccd_poly = afwGeom.Polygon(lsst.geom.Box2D(ccd_row.getBBox()))
189 # Detectors need to be rendered with their own wcs.
190 ccd_poly_radec = self._pixels_to_radec(ccd_row.getWcs(), ccd_poly.convexHull().getVertices())
192 # Create a ccd healsparse polygon
193 poly = hsp.Polygon(ra=ccd_poly_radec[: -1, 0],
194 dec=ccd_poly_radec[: -1, 1],
195 value=[bit])
196 self.ccd_input_map.set_bits_pix(poly.get_pixels(nside=self.ccd_input_map.nside_sparse),
197 [bit])
199 # Cut down to the overall bounding box with associated wcs.
200 bbox_afw_poly = afwGeom.Polygon(lsst.geom.Box2D(bbox))
201 bbox_poly_radec = self._pixels_to_radec(self._wcs,
202 bbox_afw_poly.convexHull().getVertices())
203 bbox_poly = hsp.Polygon(ra=bbox_poly_radec[: -1, 0], dec=bbox_poly_radec[: -1, 1],
204 value=np.arange(self.ccd_input_map.wide_mask_maxbits))
205 with warnings.catch_warnings():
206 warnings.simplefilter("ignore")
207 bbox_poly_map = bbox_poly.get_map_like(self.ccd_input_map)
208 self.ccd_input_map = hsp.and_intersection([self.ccd_input_map, bbox_poly_map])
209 self.ccd_input_map.metadata = metadata
211 # Create a temporary map to hold the count of bad pixels in each healpix pixel
212 dtype = [(f"v{visit}", np.int64) for visit in self._bits_per_visit.keys()]
214 with warnings.catch_warnings():
215 # Healsparse will emit a warning if nside coverage is greater than
216 # 128. In the case of generating patch input maps, and not global
217 # maps, high nside coverage works fine, so we can suppress this
218 # warning.
219 warnings.simplefilter("ignore")
220 self._ccd_input_bad_count_map = hsp.HealSparseMap.make_empty(
221 nside_coverage=self.config.nside_coverage,
222 nside_sparse=self.config.nside,
223 dtype=dtype,
224 primary=dtype[0][0])
226 self._ccd_input_pixels = self.ccd_input_map.valid_pixels
228 # Don't set input bad map if there are no ccds which overlap the bbox.
229 if len(self._ccd_input_pixels) > 0:
230 # Ensure these are sorted.
231 self._ccd_input_pixels = np.sort(self._ccd_input_pixels)
233 self._ccd_input_bad_count_map[self._ccd_input_pixels] = np.zeros(1, dtype=dtype)
235 def mask_warp_bbox(self, bbox, visit, mask, bit_mask_value):
236 """Mask a subregion from a visit.
237 This must be run after build_ccd_input_map initializes
238 the overall map.
240 Parameters
241 ----------
242 bbox : `lsst.geom.Box2I`
243 Bounding box from region to mask.
244 visit : `int`
245 Visit number corresponding to warp with mask.
246 mask : `lsst.afw.image.MaskX`
247 Mask plane from warp exposure.
248 bit_mask_value : `int`
249 Bit mask to check for bad pixels.
251 Raises
252 ------
253 RuntimeError : Raised if build_ccd_input_map was not run first.
254 """
255 if self.ccd_input_map is None:
256 raise RuntimeError("Must run build_ccd_input_map before mask_warp_bbox")
258 if len(self._ccd_input_pixels) == 0:
259 # This tract has no coverage, so there is nothing to do.
260 return
262 # Find the bad pixels and convert to healpix
263 bad_pixels = np.where(mask.array & bit_mask_value)
264 if len(bad_pixels[0]) == 0:
265 # No bad pixels
266 return
268 # Bad pixels come from warps which use the overall wcs.
269 bad_ra, bad_dec = self._wcs.pixelToSkyArray(
270 bad_pixels[1].astype(np.float64) + bbox.getMinX(),
271 bad_pixels[0].astype(np.float64) + bbox.getMinY(),
272 degrees=True,
273 )
274 bad_hpix = hpg.angle_to_pixel(self.config.nside, bad_ra, bad_dec)
276 # Check if any of these "bad" pixels are in the valid footprint.
277 match_input, match_bad = esutil.numpy_util.match(self._ccd_input_pixels, bad_hpix, presorted=True)
278 if len(match_bad) == 0:
279 return
281 bad_hpix = bad_hpix[match_bad]
283 # Create a view of the column we need to add to.
284 count_map_visit = self._ccd_input_bad_count_map[f"v{visit}"]
285 # Add the bad pixels to the accumulator. Note that the view
286 # cannot append pixels, but the match above ensures we are
287 # only adding to pixels that are already in the coverage
288 # map and initialized.
289 count_map_visit.update_values_pix(bad_hpix, 1, operation="add")
291 def finalize_ccd_input_map_mask(self):
292 """Use accumulated mask information to finalize the masking of
293 ccd_input_map.
295 Raises
296 ------
297 RuntimeError : Raised if build_ccd_input_map was not run first.
298 """
299 if self.ccd_input_map is None:
300 raise RuntimeError("Must run build_ccd_input_map before finalize_ccd_input_map_mask.")
302 count_map_arr = self._ccd_input_bad_count_map[self._ccd_input_pixels]
303 for visit in self._bits_per_visit:
304 to_mask, = np.where(count_map_arr[f"v{visit}"] > self._min_bad)
305 if to_mask.size == 0:
306 continue
307 self.ccd_input_map.clear_bits_pix(self._ccd_input_pixels[to_mask],
308 self._bits_per_visit[visit])
310 # Clear memory
311 self._ccd_input_bad_count_map = None
313 def initialize_cell_input_map(self, bbox, wcs, visit_detectors):
314 """Initialize the cell input map.
316 Parameters
317 ----------
318 bbox : `lsst.geom.Box2I`
319 Bounding box for region to build input map.
320 wcs : `lsst.afw.geom.SkyWcs`
321 WCS object for region to build input map.
322 visit_detectors : `list` [`tuple`]
323 List of visit/detector tuples.
324 """
325 with warnings.catch_warnings():
326 # Healsparse will emit a warning if nside coverage is greater than
327 # 128. In the case of generating patch input maps, and not global
328 # maps, high nside coverage works fine, so we can suppress this
329 # warning.
330 warnings.simplefilter("ignore")
331 self.cell_input_map = hsp.HealSparseMap.make_empty(
332 nside_coverage=self.config.nside_coverage,
333 nside_sparse=self.config.nside,
334 dtype=hsp.WIDE_MASK,
335 wide_mask_maxbits=len(visit_detectors),
336 )
338 self._wcs = wcs
339 self._bbox = bbox
340 self._visit_detectors = visit_detectors
342 metadata = {}
343 self._bits_per_visit_detector = {}
344 self._bits_per_visit = defaultdict(list)
345 for bit, (visit, detector) in enumerate(visit_detectors):
346 metadata[f"B{bit:04d}CCD"] = detector
347 metadata[f"B{bit:04d}VIS"] = visit
348 # Weight will be filled later.
349 metadata[f"B{bit:04d}WT"] = 0.0
351 self._bits_per_visit_detector[(visit, detector)] = bit
352 self._bits_per_visit[visit].append(bit)
354 self.cell_input_map.metadata = metadata
356 self._cell_pixels = {}
357 self._visit_detector_cache = None
358 self._detector_map_cache = None
360 def build_cell_input_map(self, cell):
361 """Add a cell to the input map.
363 Parameters
364 ----------
365 cell : `lsst.skymap.cellInfo.CellInfo`
366 Cell to initialize.
367 """
368 if self.cell_input_map is None:
369 raise RuntimeError("Must run initialize_cell_input_map() before build_cell_input_map()")
371 # The input map only needs the *inner* sky polygon.
372 cell_poly = cell.getInnerSkyPolygon()
373 vertices = np.asarray([[v.x(), v.y(), v.z()] for v in cell_poly.getVertices()])
374 pixels = hpg.query_polygon_vec(self.config.nside, vertices)
376 self._cell_pixels[cell.sequential_index] = pixels
378 def add_warp_to_cell_input_map(self, ccd_row, weight, cell):
379 """Add a warp to the input map for a given cell.
381 Parameters
382 ----------
383 ccd_row : `lsst.afw.table.ExposureRecord`
384 Row from the ccd table.
385 weight : `float`
386 Weight to use for this detector.
387 cell : `lsst.skymap.cellInfo.CellInfo`
388 Cell that overlaps the ccd_table_row.
389 """
390 visit = int(ccd_row["visit"])
391 detector = int(ccd_row["ccd"])
393 if (bit := self._bits_per_visit_detector.get((visit, detector), None)) is None:
394 raise RuntimeError(f"Visit {visit} / detector {detector} not expected in map.")
396 if (cell_pixels := self._cell_pixels.get(cell.sequential_index, None)) is None:
397 raise RuntimeError(f"Cell {cell.sequential_index} not expected in map.")
399 if (visit, detector) != self._visit_detector_cache:
400 self._visit_detector_cache = (visit, detector)
402 ccd_poly = ccd_row.validPolygon
403 if ccd_poly is None:
404 ccd_poly = afwGeom.Polygon(lsst.geom.Box2D(ccd_row.getBBox()))
405 # Detectors need to be rendered with their own wcs.
406 ccd_poly_radec = self._pixels_to_radec(ccd_row.getWcs(), ccd_poly.convexHull().getVertices())
408 poly = hsp.Polygon(
409 ra=ccd_poly_radec[: -1, 0],
410 dec=ccd_poly_radec[: -1, 1],
411 value=True,
412 )
413 with warnings.catch_warnings():
414 # Healsparse will emit a warning if nside coverage is greater
415 # than 128. In the case of generating patch input maps, and
416 # not global maps, high nside coverage works fine, so we can
417 # suppress this warning.
418 warnings.simplefilter("ignore")
420 self._detector_map_cache = poly.get_map(
421 nside_coverage=self.config.nside_coverage,
422 nside_sparse=self.config.nside,
423 dtype=np.bool_,
424 )
426 self.cell_input_map.metadata[f"B{bit:04d}WT"] = weight
428 overlap = self._detector_map_cache[cell_pixels]
429 self.cell_input_map.set_bits_pix(cell_pixels[overlap], bit)
431 def _pixels_to_radec(self, wcs, pixels):
432 """Convert pixels to ra/dec positions using a wcs.
434 Parameters
435 ----------
436 wcs : `lsst.afw.geom.SkyWcs`
437 WCS object.
438 pixels : `list` [`lsst.geom.Point2D`]
439 List of pixels to convert.
441 Returns
442 -------
443 radec : `numpy.ndarray`
444 Nx2 array of ra/dec positions associated with pixels.
445 """
446 sph_pts = wcs.pixelToSky(pixels)
447 return np.array([(sph.getRa().asDegrees(), sph.getDec().asDegrees())
448 for sph in sph_pts])
451class HealSparsePropertyMapConnections(pipeBase.PipelineTaskConnections,
452 dimensions=("tract", "band", "skymap",),
453 defaultTemplates={"coaddName": "deep",
454 "calexpType": ""}):
455 input_maps = pipeBase.connectionTypes.Input(
456 doc="Healsparse bit-wise coadd input maps",
457 name="{coaddName}Coadd_inputMap",
458 storageClass="HealSparseMap",
459 dimensions=("tract", "patch", "skymap", "band"),
460 multiple=True,
461 deferLoad=True,
462 )
463 coadd_exposures = pipeBase.connectionTypes.Input(
464 doc="Coadded exposures associated with input_maps",
465 name="{coaddName}Coadd_calexp",
466 storageClass="ExposureF",
467 dimensions=("tract", "patch", "skymap", "band"),
468 multiple=True,
469 deferLoad=True,
470 )
471 visit_summaries = pipeBase.connectionTypes.Input(
472 doc="Visit summary tables with aggregated statistics",
473 name="finalVisitSummary",
474 storageClass="ExposureCatalog",
475 dimensions=("instrument", "visit"),
476 multiple=True,
477 deferLoad=True,
478 )
479 sky_map = pipeBase.connectionTypes.Input(
480 doc="Input definition of geometry/bbox and projection/wcs for coadded exposures",
481 name=BaseSkyMap.SKYMAP_DATASET_TYPE_NAME,
482 storageClass="SkyMap",
483 dimensions=("skymap",),
484 )
486 # Create output connections for all possible maps defined in the
487 # registry. The vars() trick used here allows us to set class attributes
488 # programatically. Taken from
489 # https://stackoverflow.com/questions/2519807/
490 # setting-a-class-attribute-with-a-given-name-in-python-while-defining-the-class
491 for name in BasePropertyMap.registry:
492 vars()[f"{name}_map_min"] = pipeBase.connectionTypes.Output(
493 doc=f"Minimum-value map of {name}",
494 name=f"{{coaddName}}Coadd_{name}_map_min",
495 storageClass="HealSparseMap",
496 dimensions=("tract", "skymap", "band"),
497 )
498 vars()[f"{name}_map_max"] = pipeBase.connectionTypes.Output(
499 doc=f"Maximum-value map of {name}",
500 name=f"{{coaddName}}Coadd_{name}_map_max",
501 storageClass="HealSparseMap",
502 dimensions=("tract", "skymap", "band"),
503 )
504 vars()[f"{name}_map_mean"] = pipeBase.connectionTypes.Output(
505 doc=f"Mean-value map of {name}",
506 name=f"{{coaddName}}Coadd_{name}_map_mean",
507 storageClass="HealSparseMap",
508 dimensions=("tract", "skymap", "band"),
509 )
510 vars()[f"{name}_map_weighted_mean"] = pipeBase.connectionTypes.Output(
511 doc=f"Weighted mean-value map of {name}",
512 name=f"{{coaddName}}Coadd_{name}_map_weighted_mean",
513 storageClass="HealSparseMap",
514 dimensions=("tract", "skymap", "band"),
515 )
516 vars()[f"{name}_map_sum"] = pipeBase.connectionTypes.Output(
517 doc=f"Sum-value map of {name}",
518 name=f"{{coaddName}}Coadd_{name}_map_sum",
519 storageClass="HealSparseMap",
520 dimensions=("tract", "skymap", "band"),
521 )
523 def __init__(self, *, config=None):
524 super().__init__(config=config)
526 # Not all possible maps in the registry will be configured to run.
527 # Here we remove the unused connections.
528 for name in BasePropertyMap.registry:
529 if name not in config.property_maps:
530 prop_config = BasePropertyMapConfig()
531 prop_config.do_min = False
532 prop_config.do_max = False
533 prop_config.do_mean = False
534 prop_config.do_weighted_mean = False
535 prop_config.do_sum = False
536 else:
537 prop_config = config.property_maps[name]
539 if not prop_config.do_min:
540 self.outputs.remove(f"{name}_map_min")
541 if not prop_config.do_max:
542 self.outputs.remove(f"{name}_map_max")
543 if not prop_config.do_mean:
544 self.outputs.remove(f"{name}_map_mean")
545 if not prop_config.do_weighted_mean:
546 self.outputs.remove(f"{name}_map_weighted_mean")
547 if not prop_config.do_sum:
548 self.outputs.remove(f"{name}_map_sum")
551class HealSparsePropertyMapConfig(pipeBase.PipelineTaskConfig,
552 pipelineConnections=HealSparsePropertyMapConnections):
553 """Configuration parameters for HealSparsePropertyMapTask"""
554 property_maps = BasePropertyMap.registry.makeField(
555 multi=True,
556 default=["exposure_time",
557 "psf_size",
558 "psf_e1",
559 "psf_e2",
560 "psf_maglim",
561 "sky_noise",
562 "sky_background",
563 "dcr_dra",
564 "dcr_ddec",
565 "dcr_e1",
566 "dcr_e2",
567 "epoch"],
568 doc="Property map computation objects",
569 )
571 def setDefaults(self):
572 self.property_maps["exposure_time"].do_sum = True
573 self.property_maps["psf_size"].do_weighted_mean = True
574 self.property_maps["psf_e1"].do_weighted_mean = True
575 self.property_maps["psf_e2"].do_weighted_mean = True
576 self.property_maps["psf_maglim"].do_weighted_mean = True
577 self.property_maps["sky_noise"].do_weighted_mean = True
578 self.property_maps["sky_background"].do_weighted_mean = True
579 self.property_maps["dcr_dra"].do_weighted_mean = True
580 self.property_maps["dcr_ddec"].do_weighted_mean = True
581 self.property_maps["dcr_e1"].do_weighted_mean = True
582 self.property_maps["dcr_e2"].do_weighted_mean = True
583 self.property_maps["epoch"].do_mean = True
584 self.property_maps["epoch"].do_min = True
585 self.property_maps["epoch"].do_max = True
588class HealSparsePropertyMapTask(pipeBase.PipelineTask):
589 """Task to compute Healsparse property maps.
591 This task will compute individual property maps (per tract, per
592 map type, per band). These maps cover the full coadd tract, and
593 are not truncated to the inner tract region.
594 """
595 ConfigClass = HealSparsePropertyMapConfig
596 _DefaultName = "healSparsePropertyMapTask"
598 def __init__(self, **kwargs):
599 super().__init__(**kwargs)
600 self.property_maps = PropertyMapMap()
601 for name, config, PropertyMapClass in self.config.property_maps.apply():
602 self.property_maps[name] = PropertyMapClass(config, name)
604 @timeMethod
605 def runQuantum(self, butlerQC, inputRefs, outputRefs):
606 inputs = butlerQC.get(inputRefs)
608 sky_map = inputs.pop("sky_map")
610 tract = butlerQC.quantum.dataId["tract"]
611 band = butlerQC.quantum.dataId["band"]
613 input_map_dict = {ref.dataId["patch"]: ref for ref in inputs["input_maps"]}
614 coadd_dict = {ref.dataId["patch"]: ref for ref in inputs["coadd_exposures"]}
616 visit_summary_dict = {ref.dataId["visit"]: ref.get()
617 for ref in inputs["visit_summaries"]}
619 self.run(sky_map, tract, band, coadd_dict, input_map_dict, visit_summary_dict)
621 # Write the outputs
622 for name, property_map in self.property_maps.items():
623 if property_map.config.do_min:
624 butlerQC.put(property_map.min_map,
625 getattr(outputRefs, f"{name}_map_min"))
626 if property_map.config.do_max:
627 butlerQC.put(property_map.max_map,
628 getattr(outputRefs, f"{name}_map_max"))
629 if property_map.config.do_mean:
630 butlerQC.put(property_map.mean_map,
631 getattr(outputRefs, f"{name}_map_mean"))
632 if property_map.config.do_weighted_mean:
633 butlerQC.put(property_map.weighted_mean_map,
634 getattr(outputRefs, f"{name}_map_weighted_mean"))
635 if property_map.config.do_sum:
636 butlerQC.put(property_map.sum_map,
637 getattr(outputRefs, f"{name}_map_sum"))
639 def run(self, sky_map, tract, band, coadd_dict, input_map_dict, visit_summary_dict):
640 """Run the healsparse property task.
642 Parameters
643 ----------
644 sky_map : Sky map object
645 tract : `int`
646 Tract number.
647 band : `str`
648 Band name for logging.
649 coadd_dict : `dict` [`int`: `lsst.daf.butler.DeferredDatasetHandle`]
650 Dictionary of coadd exposure references. Keys are patch numbers.
651 input_map_dict : `dict` [`int`: `lsst.daf.butler.DeferredDatasetHandle`]
652 Dictionary of input map references. Keys are patch numbers.
653 visit_summary_dict : `dict` [`int`: `lsst.afw.table.ExposureCatalog`]
654 Dictionary of visit summary tables. Keys are visit numbers.
656 Raises
657 ------
658 RepeatableQuantumError
659 If visit_summary_dict is missing any visits or detectors found in an
660 input map. This leads to an inconsistency between what is in the coadd
661 (via the input map) and the visit summary tables which contain data
662 to compute the maps.
663 """
664 tract_info = sky_map[tract]
666 tract_maps_initialized = False
668 for patch in input_map_dict.keys():
669 self.log.info("Making maps for band %s, tract %d, patch %d.",
670 band, tract, patch)
672 patch_info = tract_info[patch]
674 input_map = input_map_dict[patch].get()
676 # Extract input map metadata.
677 input_bit_weight_dict = {}
678 for bit in range(input_map.wide_mask_maxbits):
679 # Not all bits may be listed because maxbits must be a multiple
680 # of 8.
681 if f"B{bit:04d}CCD" in input_map.metadata:
682 visit = input_map.metadata[f"B{bit:04d}VIS"]
683 detector = input_map.metadata[f"B{bit:04d}CCD"]
684 weight = input_map.metadata[f"B{bit:04d}WT"]
685 input_bit_weight_dict[(visit, detector)] = (bit, weight)
687 # Initialize the tract maps as soon as we have the first input
688 # map for getting nside information.
689 if not tract_maps_initialized:
690 # We use the first input map nside information to initialize
691 # the tract maps
692 nside_coverage = self._compute_nside_coverage_tract(tract_info)
693 nside = input_map.nside_sparse
695 do_compute_approx_psf = False
696 # Initialize the tract maps
697 for property_map in self.property_maps:
698 property_map.initialize_tract_maps(nside_coverage, nside)
699 if property_map.requires_psf:
700 do_compute_approx_psf = True
702 tract_maps_initialized = True
704 if input_map.valid_pixels.size == 0:
705 self.log.warning("No valid pixels for band %s, tract %d, patch %d; skipping.",
706 band, tract, patch)
707 continue
709 coadd_photo_calib = coadd_dict[patch].get(component="photoCalib")
710 coadd_zeropoint = 2.5*np.log10(coadd_photo_calib.getInstFluxAtZeroMagnitude())
712 # Crop input_map to the inner polygon of the patch
713 poly_vertices = patch_info.getInnerSkyPolygon(tract_info.getWcs()).getVertices()
714 patch_radec = self._vertices_to_radec(poly_vertices)
715 patch_poly = hsp.Polygon(ra=patch_radec[:, 0], dec=patch_radec[:, 1],
716 value=np.arange(input_map.wide_mask_maxbits))
717 with warnings.catch_warnings():
718 # Healsparse will emit a warning if nside coverage is greater than
719 # 128. In the case of generating patch input maps, and not global
720 # maps, high nside coverage works fine, so we can suppress this
721 # warning.
722 warnings.simplefilter("ignore")
723 patch_poly_map = patch_poly.get_map_like(input_map)
724 input_map = hsp.and_intersection([input_map, patch_poly_map])
726 valid_pixels, vpix_ra, vpix_dec = input_map.valid_pixels_pos(return_pixels=True)
728 # Check if there are no valid pixels for the inner (unique) patch region
729 if valid_pixels.size == 0:
730 continue
732 # Initialize the value accumulators
733 for property_map in self.property_maps:
734 property_map.initialize_values(valid_pixels.size)
735 property_map.zeropoint = coadd_zeropoint
737 # Initialize the weight and counter accumulators
738 total_weights = np.zeros(valid_pixels.size)
739 total_inputs = np.zeros(valid_pixels.size, dtype=np.int32)
741 for (visit, detector), (bit, weight) in input_bit_weight_dict.items():
742 # Which pixels in the map are used by this visit/detector
743 inmap, = np.where(input_map.check_bits_pix(valid_pixels, [bit]))
745 # Check if there are any valid pixels in the map from this deteector.
746 if inmap.size == 0:
747 continue
749 # Retrieve the correct visitSummary row
750 if visit not in visit_summary_dict:
751 msg = f"Visit {visit} not found in visit_summaries."
752 raise pipeBase.RepeatableQuantumError(msg)
753 row = visit_summary_dict[visit].find(detector)
754 if row is None:
755 msg = f"Visit {visit} / detector {detector} not found in visit_summaries."
756 raise pipeBase.RepeatableQuantumError(msg)
758 x, y = row.wcs.skyToPixelArray(vpix_ra[inmap], vpix_dec[inmap], degrees=True)
759 scalings = self._compute_calib_scale(row, x, y)
761 if do_compute_approx_psf:
762 psf_array = compute_approx_psf_size_and_shape(row, vpix_ra[inmap], vpix_dec[inmap])
763 else:
764 psf_array = None
766 total_weights[inmap] += weight
767 total_inputs[inmap] += 1
769 # Accumulate the values
770 for property_map in self.property_maps:
771 property_map.accumulate_values(inmap,
772 vpix_ra[inmap],
773 vpix_dec[inmap],
774 weight,
775 scalings,
776 row,
777 psf_array=psf_array)
779 # Finalize the mean values and set the tract maps
780 for property_map in self.property_maps:
781 property_map.finalize_mean_values(total_weights, total_inputs)
782 property_map.set_map_values(valid_pixels)
784 def _compute_calib_scale(self, ccd_row, x, y):
785 """Compute calibration scaling values.
787 Parameters
788 ----------
789 ccd_row : `lsst.afw.table.ExposureRecord`
790 Exposure metadata for a given detector exposure.
791 x : `np.ndarray`
792 Array of x positions.
793 y : `np.ndarray`
794 Array of y positions.
796 Returns
797 -------
798 calib_scale : `np.ndarray`
799 Array of calibration scale values.
800 """
801 photo_calib = ccd_row.getPhotoCalib()
802 bf = photo_calib.computeScaledCalibration()
803 if bf.getBBox() == ccd_row.getBBox():
804 # Track variable calibration over the detector
805 calib_scale = photo_calib.getCalibrationMean()*bf.evaluate(x, y)
806 else:
807 # Spatially constant calibration
808 calib_scale = photo_calib.getCalibrationMean()
810 return calib_scale
812 def _vertices_to_radec(self, vertices):
813 """Convert polygon vertices to ra/dec.
815 Parameters
816 ----------
817 vertices : `list` [ `lsst.sphgeom.UnitVector3d` ]
818 Vertices for bounding polygon.
820 Returns
821 -------
822 radec : `numpy.ndarray`
823 Nx2 array of ra/dec positions (in degrees) associated with vertices.
824 """
825 lonlats = [lsst.sphgeom.LonLat(x) for x in vertices]
826 radec = np.array([(x.getLon().asDegrees(), x.getLat().asDegrees()) for
827 x in lonlats])
828 return radec
830 def _compute_nside_coverage_tract(self, tract_info):
831 """Compute the optimal coverage nside for a tract.
833 Parameters
834 ----------
835 tract_info : `lsst.skymap.tractInfo.ExplicitTractInfo`
836 Tract information object.
838 Returns
839 -------
840 nside_coverage : `int`
841 Optimal coverage nside for a tract map.
842 """
843 num_patches = tract_info.getNumPatches()
845 # Compute approximate patch area
846 patch_info = tract_info.getPatchInfo(0)
847 vertices = patch_info.getInnerSkyPolygon(tract_info.getWcs()).getVertices()
848 radec = self._vertices_to_radec(vertices)
849 delta_ra = np.max(radec[:, 0]) - np.min(radec[:, 0])
850 delta_dec = np.max(radec[:, 1]) - np.min(radec[:, 1])
851 patch_area = delta_ra*delta_dec*np.cos(np.deg2rad(np.mean(radec[:, 1])))
853 tract_area = num_patches[0]*num_patches[1]*patch_area
854 # Start with a fairly low nside and increase until we find the approximate area.
855 nside_coverage_tract = 32
856 while hpg.nside_to_pixel_area(nside_coverage_tract, degrees=True) > tract_area:
857 nside_coverage_tract = 2*nside_coverage_tract
858 # Step back one, but don't go bigger pixels than nside=32 or smaller
859 # than 128 (recommended by healsparse).
860 nside_coverage_tract = int(np.clip(nside_coverage_tract/2, 32, 128))
862 return nside_coverage_tract
865class ConsolidateHealSparsePropertyMapConnections(pipeBase.PipelineTaskConnections,
866 dimensions=("band", "skymap",),
867 defaultTemplates={"coaddName": "deep"}):
868 sky_map = pipeBase.connectionTypes.Input(
869 doc="Input definition of geometry/bbox and projection/wcs for coadded exposures",
870 name=BaseSkyMap.SKYMAP_DATASET_TYPE_NAME,
871 storageClass="SkyMap",
872 dimensions=("skymap",),
873 )
875 # Create output connections for all possible maps defined in the
876 # registry. The vars() trick used here allows us to set class attributes
877 # programatically. Taken from
878 # https://stackoverflow.com/questions/2519807/
879 # setting-a-class-attribute-with-a-given-name-in-python-while-defining-the-class
880 for name in BasePropertyMap.registry:
881 vars()[f"{name}_map_min"] = pipeBase.connectionTypes.Input(
882 doc=f"Minimum-value map of {name}",
883 name=f"{{coaddName}}Coadd_{name}_map_min",
884 storageClass="HealSparseMap",
885 dimensions=("tract", "skymap", "band"),
886 multiple=True,
887 deferLoad=True,
888 )
889 vars()[f"{name}_consolidated_map_min"] = pipeBase.connectionTypes.Output(
890 doc=f"Minumum-value map of {name}",
891 name=f"{{coaddName}}Coadd_{name}_consolidated_map_min",
892 storageClass="HealSparseMap",
893 dimensions=("skymap", "band"),
894 )
895 vars()[f"{name}_map_max"] = pipeBase.connectionTypes.Input(
896 doc=f"Maximum-value map of {name}",
897 name=f"{{coaddName}}Coadd_{name}_map_max",
898 storageClass="HealSparseMap",
899 dimensions=("tract", "skymap", "band"),
900 multiple=True,
901 deferLoad=True,
902 )
903 vars()[f"{name}_consolidated_map_max"] = pipeBase.connectionTypes.Output(
904 doc=f"Minumum-value map of {name}",
905 name=f"{{coaddName}}Coadd_{name}_consolidated_map_max",
906 storageClass="HealSparseMap",
907 dimensions=("skymap", "band"),
908 )
909 vars()[f"{name}_map_mean"] = pipeBase.connectionTypes.Input(
910 doc=f"Mean-value map of {name}",
911 name=f"{{coaddName}}Coadd_{name}_map_mean",
912 storageClass="HealSparseMap",
913 dimensions=("tract", "skymap", "band"),
914 multiple=True,
915 deferLoad=True,
916 )
917 vars()[f"{name}_consolidated_map_mean"] = pipeBase.connectionTypes.Output(
918 doc=f"Minumum-value map of {name}",
919 name=f"{{coaddName}}Coadd_{name}_consolidated_map_mean",
920 storageClass="HealSparseMap",
921 dimensions=("skymap", "band"),
922 )
923 vars()[f"{name}_map_weighted_mean"] = pipeBase.connectionTypes.Input(
924 doc=f"Weighted mean-value map of {name}",
925 name=f"{{coaddName}}Coadd_{name}_map_weighted_mean",
926 storageClass="HealSparseMap",
927 dimensions=("tract", "skymap", "band"),
928 multiple=True,
929 deferLoad=True,
930 )
931 vars()[f"{name}_consolidated_map_weighted_mean"] = pipeBase.connectionTypes.Output(
932 doc=f"Minumum-value map of {name}",
933 name=f"{{coaddName}}Coadd_{name}_consolidated_map_weighted_mean",
934 storageClass="HealSparseMap",
935 dimensions=("skymap", "band"),
936 )
937 vars()[f"{name}_map_sum"] = pipeBase.connectionTypes.Input(
938 doc=f"Sum-value map of {name}",
939 name=f"{{coaddName}}Coadd_{name}_map_sum",
940 storageClass="HealSparseMap",
941 dimensions=("tract", "skymap", "band"),
942 multiple=True,
943 deferLoad=True,
944 )
945 vars()[f"{name}_consolidated_map_sum"] = pipeBase.connectionTypes.Output(
946 doc=f"Minumum-value map of {name}",
947 name=f"{{coaddName}}Coadd_{name}_consolidated_map_sum",
948 storageClass="HealSparseMap",
949 dimensions=("skymap", "band"),
950 )
952 def __init__(self, *, config=None):
953 super().__init__(config=config)
955 # Not all possible maps in the registry will be configured to run.
956 # Here we remove the unused connections.
957 for name in BasePropertyMap.registry:
958 if name not in config.property_maps:
959 prop_config = BasePropertyMapConfig()
960 prop_config.do_min = False
961 prop_config.do_max = False
962 prop_config.do_mean = False
963 prop_config.do_weighted_mean = False
964 prop_config.do_sum = False
965 else:
966 prop_config = config.property_maps[name]
968 if not prop_config.do_min:
969 self.inputs.remove(f"{name}_map_min")
970 self.outputs.remove(f"{name}_consolidated_map_min")
971 if not prop_config.do_max:
972 self.inputs.remove(f"{name}_map_max")
973 self.outputs.remove(f"{name}_consolidated_map_max")
974 if not prop_config.do_mean:
975 self.inputs.remove(f"{name}_map_mean")
976 self.outputs.remove(f"{name}_consolidated_map_mean")
977 if not prop_config.do_weighted_mean:
978 self.inputs.remove(f"{name}_map_weighted_mean")
979 self.outputs.remove(f"{name}_consolidated_map_weighted_mean")
980 if not prop_config.do_sum:
981 self.inputs.remove(f"{name}_map_sum")
982 self.outputs.remove(f"{name}_consolidated_map_sum")
985class ConsolidateHealSparsePropertyMapConfig(pipeBase.PipelineTaskConfig,
986 pipelineConnections=ConsolidateHealSparsePropertyMapConnections):
987 """Configuration parameters for ConsolidateHealSparsePropertyMapTask"""
988 property_maps = BasePropertyMap.registry.makeField(
989 multi=True,
990 default=["exposure_time",
991 "psf_size",
992 "psf_e1",
993 "psf_e2",
994 "psf_maglim",
995 "sky_noise",
996 "sky_background",
997 "dcr_dra",
998 "dcr_ddec",
999 "dcr_e1",
1000 "dcr_e2",
1001 "epoch"],
1002 doc="Property map computation objects",
1003 )
1004 nside_coverage = pexConfig.Field(
1005 doc="Consolidated HealSparse coverage map nside. Must be power of 2.",
1006 dtype=int,
1007 default=32,
1008 check=_is_power_of_two,
1009 )
1011 def setDefaults(self):
1012 self.property_maps["exposure_time"].do_sum = True
1013 self.property_maps["psf_size"].do_weighted_mean = True
1014 self.property_maps["psf_e1"].do_weighted_mean = True
1015 self.property_maps["psf_e2"].do_weighted_mean = True
1016 self.property_maps["psf_maglim"].do_weighted_mean = True
1017 self.property_maps["sky_noise"].do_weighted_mean = True
1018 self.property_maps["sky_background"].do_weighted_mean = True
1019 self.property_maps["dcr_dra"].do_weighted_mean = True
1020 self.property_maps["dcr_ddec"].do_weighted_mean = True
1021 self.property_maps["dcr_e1"].do_weighted_mean = True
1022 self.property_maps["dcr_e2"].do_weighted_mean = True
1023 self.property_maps["epoch"].do_mean = True
1024 self.property_maps["epoch"].do_min = True
1025 self.property_maps["epoch"].do_max = True
1028class ConsolidateHealSparsePropertyMapTask(pipeBase.PipelineTask):
1029 """Task to consolidate HealSparse property maps.
1031 This task will take all the individual tract-based maps (per map type,
1032 per band) and consolidate them into one survey-wide map (per map type,
1033 per band). Each tract map is truncated to its inner region before
1034 consolidation.
1035 """
1036 ConfigClass = ConsolidateHealSparsePropertyMapConfig
1037 _DefaultName = "consolidateHealSparsePropertyMapTask"
1039 def __init__(self, **kwargs):
1040 super().__init__(**kwargs)
1041 self.property_maps = PropertyMapMap()
1042 for name, config, PropertyMapClass in self.config.property_maps.apply():
1043 self.property_maps[name] = PropertyMapClass(config, name)
1045 @timeMethod
1046 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1047 inputs = butlerQC.get(inputRefs)
1049 sky_map = inputs.pop("sky_map")
1051 # These need to be consolidated one at a time to conserve memory.
1052 for name in self.config.property_maps.names:
1053 for type_ in ['min', 'max', 'mean', 'weighted_mean', 'sum']:
1054 map_type = f"{name}_map_{type_}"
1055 if map_type in inputs:
1056 input_refs = {ref.dataId['tract']: ref
1057 for ref in inputs[map_type]}
1058 consolidated_map = self.consolidate_map(sky_map, input_refs)
1059 butlerQC.put(consolidated_map,
1060 getattr(outputRefs, f"{name}_consolidated_map_{type_}"))
1062 def consolidate_map(self, sky_map, input_refs):
1063 """Consolidate the healsparse property maps.
1065 Parameters
1066 ----------
1067 sky_map : Sky map object
1068 input_refs : `dict` [`int`: `lsst.daf.butler.DeferredDatasetHandle`]
1069 Dictionary of tract_id mapping to dataref.
1071 Returns
1072 -------
1073 consolidated_map : `healsparse.HealSparseMap`
1074 Consolidated HealSparse map.
1075 """
1076 # First, we read in the coverage maps to know how much memory
1077 # to allocate
1078 cov_mask = None
1079 nside_coverage_inputs = None
1080 for tract_id in input_refs:
1081 cov = input_refs[tract_id].get(component='coverage')
1082 if cov_mask is None:
1083 cov_mask = cov.coverage_mask
1084 nside_coverage_inputs = cov.nside_coverage
1085 else:
1086 cov_mask |= cov.coverage_mask
1088 cov_pix_inputs, = np.where(cov_mask)
1090 # Compute the coverage pixels for the desired nside_coverage
1091 if nside_coverage_inputs == self.config.nside_coverage:
1092 cov_pix = cov_pix_inputs
1093 elif nside_coverage_inputs > self.config.nside_coverage:
1094 # Converting from higher resolution coverage to lower
1095 # resolution coverage.
1096 bit_shift = hsp.utils._compute_bitshift(self.config.nside_coverage,
1097 nside_coverage_inputs)
1098 cov_pix = np.right_shift(cov_pix_inputs, bit_shift)
1099 else:
1100 # Converting from lower resolution coverage to higher
1101 # resolution coverage.
1102 bit_shift = hsp.utils._compute_bitshift(nside_coverage_inputs,
1103 self.config.nside_coverage)
1104 cov_pix = np.left_shift(cov_pix_inputs, bit_shift)
1106 # Now read in each tract map and build the consolidated map.
1107 consolidated_map = None
1108 for tract_id in input_refs:
1109 input_map = input_refs[tract_id].get()
1110 if consolidated_map is None:
1111 consolidated_map = hsp.HealSparseMap.make_empty(
1112 self.config.nside_coverage,
1113 input_map.nside_sparse,
1114 input_map.dtype,
1115 sentinel=input_map._sentinel,
1116 cov_pixels=cov_pix,
1117 metadata=input_map.metadata,
1118 )
1120 # Only use pixels that are properly inside the tract.
1121 vpix, ra, dec = input_map.valid_pixels_pos(return_pixels=True)
1122 vpix_tract_ids = sky_map.findTractIdArray(ra, dec, degrees=True)
1124 in_tract = (vpix_tract_ids == tract_id)
1126 consolidated_map[vpix[in_tract]] = input_map[vpix[in_tract]]
1128 return consolidated_map