22__all__ = [
"HealSparseInputMapTask",
"HealSparseInputMapConfig",
23 "HealSparseMapFormatter",
"HealSparsePropertyMapConnections",
24 "HealSparsePropertyMapConfig",
"HealSparsePropertyMapTask",
25 "ConsolidateHealSparsePropertyMapConnections",
26 "ConsolidateHealSparsePropertyMapConfig",
27 "ConsolidateHealSparsePropertyMapTask"]
29from collections
import defaultdict
35import healsparse
as hsp
38import lsst.pipe.base
as pipeBase
41from lsst.daf.butler
import Formatter
43from lsst.utils.timer
import timeMethod
44from .healSparseMappingProperties
import (BasePropertyMap, BasePropertyMapConfig,
45 PropertyMapMap, compute_approx_psf_size_and_shape)
49 """Interface for reading and writing healsparse.HealSparseMap files."""
50 unsupportedParameters = frozenset()
51 supportedExtensions = frozenset({
".hsp",
".fit",
".fits"})
54 def read(self, component=None):
56 path = self.fileDescriptor.location.path
58 if component ==
'coverage':
60 data = hsp.HealSparseCoverage.read(path)
61 except (OSError, RuntimeError):
62 raise ValueError(f
"Unable to read healsparse map with URI {self.fileDescriptor.location.uri}")
66 if self.fileDescriptor.parameters
is None:
70 pixels = self.fileDescriptor.parameters.get(
'pixels',
None)
71 degrade_nside = self.fileDescriptor.parameters.get(
'degrade_nside',
None)
73 data = hsp.HealSparseMap.read(path, pixels=pixels, degrade_nside=degrade_nside)
74 except (OSError, RuntimeError):
75 raise ValueError(f
"Unable to read healsparse map with URI {self.fileDescriptor.location.uri}")
79 def write(self, inMemoryDataset):
82 self.fileDescriptor.location.updateExtension(self.
extension)
83 inMemoryDataset.write(self.fileDescriptor.location.path, clobber=
True)
87 """Check that value is a power of two.
96 is_power_of_two : `bool`
97 True if value is a power of two; False otherwise, or
98 if value is not an integer.
100 if not isinstance(value, numbers.Integral):
107 return (value & (value - 1) == 0)
and value != 0
111 """Configuration parameters for HealSparseInputMapTask"""
112 nside = pexConfig.Field(
113 doc=
"Mapping healpix nside. Must be power of 2.",
116 check=_is_power_of_two,
118 nside_coverage = pexConfig.Field(
119 doc=
"HealSparse coverage map nside. Must be power of 2.",
122 check=_is_power_of_two,
124 bad_mask_min_coverage = pexConfig.Field(
125 doc=(
"Minimum area fraction of a map healpixel pixel that must be "
126 "covered by bad pixels to be removed from the input map. "
127 "This is approximate."),
134 """Task for making a HealSparse input map."""
136 ConfigClass = HealSparseInputMapConfig
137 _DefaultName =
"healSparseInputMap"
140 pipeBase.Task.__init__(self, **kwargs)
146 """Build a map from ccd valid polygons or bounding boxes.
150 bbox : `lsst.geom.Box2I`
151 Bounding box for region to build input map.
152 wcs : `lsst.afw.geom.SkyWcs`
153 WCS object for region to build input map.
154 ccds : `lsst.afw.table.ExposureCatalog`
155 Exposure catalog with ccd data from coadd inputs.
157 with warnings.catch_warnings():
162 warnings.simplefilter(
"ignore")
163 self.
ccd_input_map = hsp.HealSparseMap.make_empty(nside_coverage=self.config.nside_coverage,
164 nside_sparse=self.config.nside,
166 wide_mask_maxbits=len(ccds))
171 pixel_scale = wcs.getPixelScale(bbox.getCenter()).asArcseconds()
172 hpix_area_arcsec2 = hpg.nside_to_pixel_area(self.config.nside, degrees=
True)*(3600.**2.)
173 self.
_min_bad = self.config.bad_mask_min_coverage*hpix_area_arcsec2/(pixel_scale**2.)
178 for bit, ccd_row
in enumerate(ccds):
179 metadata[f
"B{bit:04d}CCD"] = ccd_row[
"ccd"]
180 metadata[f
"B{bit:04d}VIS"] = ccd_row[
"visit"]
181 metadata[f
"B{bit:04d}WT"] = ccd_row[
"weight"]
186 ccd_poly = ccd_row.getValidPolygon()
190 ccd_poly_radec = self.
_pixels_to_radec(ccd_row.getWcs(), ccd_poly.convexHull().getVertices())
193 poly = hsp.Polygon(ra=ccd_poly_radec[: -1, 0],
194 dec=ccd_poly_radec[: -1, 1],
202 bbox_afw_poly.convexHull().getVertices())
203 bbox_poly = hsp.Polygon(ra=bbox_poly_radec[: -1, 0], dec=bbox_poly_radec[: -1, 1],
205 with warnings.catch_warnings():
206 warnings.simplefilter(
"ignore")
212 dtype = [(f
"v{visit}", np.int64)
for visit
in self.
_bits_per_visit.keys()]
214 with warnings.catch_warnings():
219 warnings.simplefilter(
"ignore")
221 nside_coverage=self.config.nside_coverage,
222 nside_sparse=self.config.nside,
236 """Mask a subregion from a visit.
237 This must be run after build_ccd_input_map initializes
242 bbox : `lsst.geom.Box2I`
243 Bounding box from region to mask.
245 Visit number corresponding to warp with mask.
246 mask : `lsst.afw.image.MaskX`
247 Mask plane from warp exposure.
248 bit_mask_value : `int`
249 Bit mask to check for bad pixels.
253 RuntimeError : Raised if build_ccd_input_map was not run first.
256 raise RuntimeError(
"Must run build_ccd_input_map before mask_warp_bbox")
263 bad_pixels = np.where(mask.array & bit_mask_value)
264 if len(bad_pixels[0]) == 0:
269 bad_ra, bad_dec = self.
_wcs.pixelToSkyArray(
270 bad_pixels[1].astype(np.float64) + bbox.getMinX(),
271 bad_pixels[0].astype(np.float64) + bbox.getMinY(),
274 bad_hpix = hpg.angle_to_pixel(self.config.nside, bad_ra, bad_dec)
277 match_input, match_bad = esutil.numpy_util.match(self.
_ccd_input_pixels, bad_hpix, presorted=
True)
278 if len(match_bad) == 0:
281 bad_hpix = bad_hpix[match_bad]
289 count_map_visit.update_values_pix(bad_hpix, 1, operation=
"add")
292 """Use accumulated mask information to finalize the masking of
297 RuntimeError : Raised if build_ccd_input_map was not run first.
300 raise RuntimeError(
"Must run build_ccd_input_map before finalize_ccd_input_map_mask.")
304 to_mask, = np.where(count_map_arr[f
"v{visit}"] > self.
_min_bad)
305 if to_mask.size == 0:
314 """Initialize the cell input map.
318 bbox : `lsst.geom.Box2I`
319 Bounding box for region to build input map.
320 wcs : `lsst.afw.geom.SkyWcs`
321 WCS object for region to build input map.
322 visit_detectors : `list` [`tuple`]
323 List of visit/detector tuples.
325 with warnings.catch_warnings():
330 warnings.simplefilter(
"ignore")
332 nside_coverage=self.config.nside_coverage,
333 nside_sparse=self.config.nside,
335 wide_mask_maxbits=len(visit_detectors),
345 for bit, (visit, detector)
in enumerate(visit_detectors):
346 metadata[f
"B{bit:04d}CCD"] = detector
347 metadata[f
"B{bit:04d}VIS"] = visit
349 metadata[f
"B{bit:04d}WT"] = 0.0
361 """Add a cell to the input map.
365 cell : `lsst.skymap.cellInfo.CellInfo`
369 raise RuntimeError(
"Must run initialize_cell_input_map() before build_cell_input_map()")
372 cell_poly = cell.getInnerSkyPolygon()
373 vertices = np.asarray([[v.x(), v.y(), v.z()]
for v
in cell_poly.getVertices()])
374 pixels = hpg.query_polygon_vec(self.config.nside, vertices)
379 """Add a warp to the input map for a given cell.
383 ccd_row : `lsst.afw.table.ExposureRecord`
384 Row from the ccd table.
386 Weight to use for this detector.
387 cell : `lsst.skymap.cellInfo.CellInfo`
388 Cell that overlaps the ccd_table_row.
390 visit = int(ccd_row[
"visit"])
391 detector = int(ccd_row[
"ccd"])
394 raise RuntimeError(f
"Visit {visit} / detector {detector} not expected in map.")
396 if (cell_pixels := self.
_cell_pixels.get(cell.sequential_index,
None))
is None:
397 raise RuntimeError(f
"Cell {cell.sequential_index} not expected in map.")
402 ccd_poly = ccd_row.validPolygon
406 ccd_poly_radec = self.
_pixels_to_radec(ccd_row.getWcs(), ccd_poly.convexHull().getVertices())
409 ra=ccd_poly_radec[: -1, 0],
410 dec=ccd_poly_radec[: -1, 1],
413 with warnings.catch_warnings():
418 warnings.simplefilter(
"ignore")
421 nside_coverage=self.config.nside_coverage,
422 nside_sparse=self.config.nside,
432 """Convert pixels to ra/dec positions using a wcs.
436 wcs : `lsst.afw.geom.SkyWcs`
438 pixels : `list` [`lsst.geom.Point2D`]
439 List of pixels to convert.
443 radec : `numpy.ndarray`
444 Nx2 array of ra/dec positions associated with pixels.
446 sph_pts = wcs.pixelToSky(pixels)
447 return np.array([(sph.getRa().asDegrees(), sph.getDec().asDegrees())
452 dimensions=(
"tract",
"band",
"skymap",),
453 defaultTemplates={
"coaddName":
"deep",
455 input_maps = pipeBase.connectionTypes.Input(
456 doc=
"Healsparse bit-wise coadd input maps",
457 name=
"{coaddName}Coadd_inputMap",
458 storageClass=
"HealSparseMap",
459 dimensions=(
"tract",
"patch",
"skymap",
"band"),
463 coadd_exposures = pipeBase.connectionTypes.Input(
464 doc=
"Coadded exposures associated with input_maps",
465 name=
"{coaddName}Coadd_calexp",
466 storageClass=
"ExposureF",
467 dimensions=(
"tract",
"patch",
"skymap",
"band"),
471 visit_summaries = pipeBase.connectionTypes.Input(
472 doc=
"Visit summary tables with aggregated statistics",
473 name=
"finalVisitSummary",
474 storageClass=
"ExposureCatalog",
475 dimensions=(
"instrument",
"visit"),
479 sky_map = pipeBase.connectionTypes.Input(
480 doc=
"Input definition of geometry/bbox and projection/wcs for coadded exposures",
481 name=BaseSkyMap.SKYMAP_DATASET_TYPE_NAME,
482 storageClass=
"SkyMap",
483 dimensions=(
"skymap",),
491 for name
in BasePropertyMap.registry:
492 vars()[f
"{name}_map_min"] = pipeBase.connectionTypes.Output(
493 doc=f
"Minimum-value map of {name}",
494 name=f
"{{coaddName}}Coadd_{name}_map_min",
495 storageClass=
"HealSparseMap",
496 dimensions=(
"tract",
"skymap",
"band"),
498 vars()[f
"{name}_map_max"] = pipeBase.connectionTypes.Output(
499 doc=f
"Maximum-value map of {name}",
500 name=f
"{{coaddName}}Coadd_{name}_map_max",
501 storageClass=
"HealSparseMap",
502 dimensions=(
"tract",
"skymap",
"band"),
504 vars()[f
"{name}_map_mean"] = pipeBase.connectionTypes.Output(
505 doc=f
"Mean-value map of {name}",
506 name=f
"{{coaddName}}Coadd_{name}_map_mean",
507 storageClass=
"HealSparseMap",
508 dimensions=(
"tract",
"skymap",
"band"),
510 vars()[f
"{name}_map_weighted_mean"] = pipeBase.connectionTypes.Output(
511 doc=f
"Weighted mean-value map of {name}",
512 name=f
"{{coaddName}}Coadd_{name}_map_weighted_mean",
513 storageClass=
"HealSparseMap",
514 dimensions=(
"tract",
"skymap",
"band"),
516 vars()[f
"{name}_map_sum"] = pipeBase.connectionTypes.Output(
517 doc=f
"Sum-value map of {name}",
518 name=f
"{{coaddName}}Coadd_{name}_map_sum",
519 storageClass=
"HealSparseMap",
520 dimensions=(
"tract",
"skymap",
"band"),
523 def __init__(self, *, config=None):
524 super().__init__(config=config)
528 for name
in BasePropertyMap.registry:
529 if name
not in config.property_maps:
531 prop_config.do_min =
False
532 prop_config.do_max =
False
533 prop_config.do_mean =
False
534 prop_config.do_weighted_mean =
False
535 prop_config.do_sum =
False
537 prop_config = config.property_maps[name]
539 if not prop_config.do_min:
540 self.outputs.remove(f
"{name}_map_min")
541 if not prop_config.do_max:
542 self.outputs.remove(f
"{name}_map_max")
543 if not prop_config.do_mean:
544 self.outputs.remove(f
"{name}_map_mean")
545 if not prop_config.do_weighted_mean:
546 self.outputs.remove(f
"{name}_map_weighted_mean")
547 if not prop_config.do_sum:
548 self.outputs.remove(f
"{name}_map_sum")
551class HealSparsePropertyMapConfig(pipeBase.PipelineTaskConfig,
552 pipelineConnections=HealSparsePropertyMapConnections):
553 """Configuration parameters for HealSparsePropertyMapTask"""
554 property_maps = BasePropertyMap.registry.makeField(
556 default=[
"exposure_time",
568 doc=
"Property map computation objects",
571 def setDefaults(self):
572 self.property_maps[
"exposure_time"].do_sum =
True
573 self.property_maps[
"psf_size"].do_weighted_mean =
True
574 self.property_maps[
"psf_e1"].do_weighted_mean =
True
575 self.property_maps[
"psf_e2"].do_weighted_mean =
True
576 self.property_maps[
"psf_maglim"].do_weighted_mean =
True
577 self.property_maps[
"sky_noise"].do_weighted_mean =
True
578 self.property_maps[
"sky_background"].do_weighted_mean =
True
579 self.property_maps[
"dcr_dra"].do_weighted_mean =
True
580 self.property_maps[
"dcr_ddec"].do_weighted_mean =
True
581 self.property_maps[
"dcr_e1"].do_weighted_mean =
True
582 self.property_maps[
"dcr_e2"].do_weighted_mean =
True
583 self.property_maps[
"epoch"].do_mean =
True
584 self.property_maps[
"epoch"].do_min =
True
585 self.property_maps[
"epoch"].do_max =
True
588class HealSparsePropertyMapTask(pipeBase.PipelineTask):
589 """Task to compute Healsparse property maps.
591 This task will compute individual property maps (per tract, per
592 map type, per band). These maps cover the full coadd tract, and
593 are not truncated to the inner tract region.
595 ConfigClass = HealSparsePropertyMapConfig
596 _DefaultName =
"healSparsePropertyMapTask"
598 def __init__(self, **kwargs):
599 super().__init__(**kwargs)
601 for name, config, PropertyMapClass
in self.config.property_maps.apply():
602 self.property_maps[name] = PropertyMapClass(config, name)
605 def runQuantum(self, butlerQC, inputRefs, outputRefs):
606 inputs = butlerQC.get(inputRefs)
608 sky_map = inputs.pop(
"sky_map")
610 tract = butlerQC.quantum.dataId[
"tract"]
611 band = butlerQC.quantum.dataId[
"band"]
613 input_map_dict = {ref.dataId[
"patch"]: ref
for ref
in inputs[
"input_maps"]}
614 coadd_dict = {ref.dataId[
"patch"]: ref
for ref
in inputs[
"coadd_exposures"]}
616 visit_summary_dict = {ref.dataId[
"visit"]: ref.get()
617 for ref
in inputs[
"visit_summaries"]}
619 self.run(sky_map, tract, band, coadd_dict, input_map_dict, visit_summary_dict)
622 for name, property_map
in self.property_maps.items():
623 if property_map.config.do_min:
624 butlerQC.put(property_map.min_map,
625 getattr(outputRefs, f
"{name}_map_min"))
626 if property_map.config.do_max:
627 butlerQC.put(property_map.max_map,
628 getattr(outputRefs, f
"{name}_map_max"))
629 if property_map.config.do_mean:
630 butlerQC.put(property_map.mean_map,
631 getattr(outputRefs, f
"{name}_map_mean"))
632 if property_map.config.do_weighted_mean:
633 butlerQC.put(property_map.weighted_mean_map,
634 getattr(outputRefs, f
"{name}_map_weighted_mean"))
635 if property_map.config.do_sum:
636 butlerQC.put(property_map.sum_map,
637 getattr(outputRefs, f
"{name}_map_sum"))
639 def run(self, sky_map, tract, band, coadd_dict, input_map_dict, visit_summary_dict):
640 """Run the healsparse property task.
644 sky_map : Sky map object
648 Band name for logging.
649 coadd_dict : `dict` [`int`: `lsst.daf.butler.DeferredDatasetHandle`]
650 Dictionary of coadd exposure references. Keys are patch numbers.
651 input_map_dict : `dict` [`int`: `lsst.daf.butler.DeferredDatasetHandle`]
652 Dictionary of input map references. Keys are patch numbers.
653 visit_summary_dict : `dict` [`int`: `lsst.afw.table.ExposureCatalog`]
654 Dictionary of visit summary tables. Keys are visit numbers.
658 RepeatableQuantumError
659 If visit_summary_dict is missing any visits or detectors found in an
660 input map. This leads to an inconsistency between what is in the coadd
661 (via the input map) and the visit summary tables which contain data
664 tract_info = sky_map[tract]
666 tract_maps_initialized =
False
668 for patch
in input_map_dict.keys():
669 self.log.info(
"Making maps for band %s, tract %d, patch %d.",
672 patch_info = tract_info[patch]
674 input_map = input_map_dict[patch].get()
677 input_bit_weight_dict = {}
678 for bit
in range(input_map.wide_mask_maxbits):
681 if f
"B{bit:04d}CCD" in input_map.metadata:
682 visit = input_map.metadata[f
"B{bit:04d}VIS"]
683 detector = input_map.metadata[f
"B{bit:04d}CCD"]
684 weight = input_map.metadata[f
"B{bit:04d}WT"]
685 input_bit_weight_dict[(visit, detector)] = (bit, weight)
689 if not tract_maps_initialized:
692 nside_coverage = self._compute_nside_coverage_tract(tract_info)
693 nside = input_map.nside_sparse
695 do_compute_approx_psf =
False
697 for property_map
in self.property_maps:
698 property_map.initialize_tract_maps(nside_coverage, nside)
699 if property_map.requires_psf:
700 do_compute_approx_psf =
True
702 tract_maps_initialized =
True
704 if input_map.valid_pixels.size == 0:
705 self.log.warning(
"No valid pixels for band %s, tract %d, patch %d; skipping.",
709 coadd_photo_calib = coadd_dict[patch].get(component=
"photoCalib")
710 coadd_zeropoint = 2.5*np.log10(coadd_photo_calib.getInstFluxAtZeroMagnitude())
713 poly_vertices = patch_info.getInnerSkyPolygon(tract_info.getWcs()).getVertices()
714 patch_radec = self._vertices_to_radec(poly_vertices)
715 patch_poly = hsp.Polygon(ra=patch_radec[:, 0], dec=patch_radec[:, 1],
716 value=np.arange(input_map.wide_mask_maxbits))
717 with warnings.catch_warnings():
722 warnings.simplefilter(
"ignore")
723 patch_poly_map = patch_poly.get_map_like(input_map)
724 input_map = hsp.and_intersection([input_map, patch_poly_map])
726 valid_pixels, vpix_ra, vpix_dec = input_map.valid_pixels_pos(return_pixels=
True)
729 if valid_pixels.size == 0:
733 for property_map
in self.property_maps:
734 property_map.initialize_values(valid_pixels.size)
735 property_map.zeropoint = coadd_zeropoint
738 total_weights = np.zeros(valid_pixels.size)
739 total_inputs = np.zeros(valid_pixels.size, dtype=np.int32)
741 for (visit, detector), (bit, weight)
in input_bit_weight_dict.items():
743 inmap, = np.where(input_map.check_bits_pix(valid_pixels, [bit]))
750 if visit
not in visit_summary_dict:
751 msg = f
"Visit {visit} not found in visit_summaries."
752 raise pipeBase.RepeatableQuantumError(msg)
753 row = visit_summary_dict[visit].find(detector)
755 msg = f
"Visit {visit} / detector {detector} not found in visit_summaries."
756 raise pipeBase.RepeatableQuantumError(msg)
758 x, y = row.wcs.skyToPixelArray(vpix_ra[inmap], vpix_dec[inmap], degrees=
True)
759 scalings = self._compute_calib_scale(row, x, y)
761 if do_compute_approx_psf:
762 psf_array = compute_approx_psf_size_and_shape(row, vpix_ra[inmap], vpix_dec[inmap])
766 total_weights[inmap] += weight
767 total_inputs[inmap] += 1
770 for property_map
in self.property_maps:
771 property_map.accumulate_values(inmap,
780 for property_map
in self.property_maps:
781 property_map.finalize_mean_values(total_weights, total_inputs)
782 property_map.set_map_values(valid_pixels)
784 def _compute_calib_scale(self, ccd_row, x, y):
785 """Compute calibration scaling values.
789 ccd_row : `lsst.afw.table.ExposureRecord`
790 Exposure metadata for a given detector exposure.
792 Array of x positions.
794 Array of y positions.
798 calib_scale : `np.ndarray`
799 Array of calibration scale values.
801 photo_calib = ccd_row.getPhotoCalib()
802 bf = photo_calib.computeScaledCalibration()
803 if bf.getBBox() == ccd_row.getBBox():
805 calib_scale = photo_calib.getCalibrationMean()*bf.evaluate(x, y)
808 calib_scale = photo_calib.getCalibrationMean()
812 def _vertices_to_radec(self, vertices):
813 """Convert polygon vertices to ra/dec.
817 vertices : `list` [ `lsst.sphgeom.UnitVector3d` ]
818 Vertices for bounding polygon.
822 radec : `numpy.ndarray`
823 Nx2 array of ra/dec positions (in degrees) associated with vertices.
826 radec = np.array([(x.getLon().asDegrees(), x.getLat().asDegrees())
for
830 def _compute_nside_coverage_tract(self, tract_info):
831 """Compute the optimal coverage nside for a tract.
835 tract_info : `lsst.skymap.tractInfo.ExplicitTractInfo`
836 Tract information object.
840 nside_coverage : `int`
841 Optimal coverage nside for a tract map.
843 num_patches = tract_info.getNumPatches()
846 patch_info = tract_info.getPatchInfo(0)
847 vertices = patch_info.getInnerSkyPolygon(tract_info.getWcs()).getVertices()
848 radec = self._vertices_to_radec(vertices)
849 delta_ra = np.max(radec[:, 0]) - np.min(radec[:, 0])
850 delta_dec = np.max(radec[:, 1]) - np.min(radec[:, 1])
851 patch_area = delta_ra*delta_dec*np.cos(np.deg2rad(np.mean(radec[:, 1])))
853 tract_area = num_patches[0]*num_patches[1]*patch_area
855 nside_coverage_tract = 32
856 while hpg.nside_to_pixel_area(nside_coverage_tract, degrees=
True) > tract_area:
857 nside_coverage_tract = 2*nside_coverage_tract
860 nside_coverage_tract = int(np.clip(nside_coverage_tract/2, 32, 128))
862 return nside_coverage_tract
865class ConsolidateHealSparsePropertyMapConnections(pipeBase.PipelineTaskConnections,
866 dimensions=(
"band",
"skymap",),
867 defaultTemplates={
"coaddName":
"deep"}):
868 sky_map = pipeBase.connectionTypes.Input(
869 doc=
"Input definition of geometry/bbox and projection/wcs for coadded exposures",
870 name=BaseSkyMap.SKYMAP_DATASET_TYPE_NAME,
871 storageClass=
"SkyMap",
872 dimensions=(
"skymap",),
880 for name
in BasePropertyMap.registry:
881 vars()[f
"{name}_map_min"] = pipeBase.connectionTypes.Input(
882 doc=f
"Minimum-value map of {name}",
883 name=f
"{{coaddName}}Coadd_{name}_map_min",
884 storageClass=
"HealSparseMap",
885 dimensions=(
"tract",
"skymap",
"band"),
889 vars()[f
"{name}_consolidated_map_min"] = pipeBase.connectionTypes.Output(
890 doc=f
"Minumum-value map of {name}",
891 name=f
"{{coaddName}}Coadd_{name}_consolidated_map_min",
892 storageClass=
"HealSparseMap",
893 dimensions=(
"skymap",
"band"),
895 vars()[f
"{name}_map_max"] = pipeBase.connectionTypes.Input(
896 doc=f
"Maximum-value map of {name}",
897 name=f
"{{coaddName}}Coadd_{name}_map_max",
898 storageClass=
"HealSparseMap",
899 dimensions=(
"tract",
"skymap",
"band"),
903 vars()[f
"{name}_consolidated_map_max"] = pipeBase.connectionTypes.Output(
904 doc=f
"Minumum-value map of {name}",
905 name=f
"{{coaddName}}Coadd_{name}_consolidated_map_max",
906 storageClass=
"HealSparseMap",
907 dimensions=(
"skymap",
"band"),
909 vars()[f
"{name}_map_mean"] = pipeBase.connectionTypes.Input(
910 doc=f
"Mean-value map of {name}",
911 name=f
"{{coaddName}}Coadd_{name}_map_mean",
912 storageClass=
"HealSparseMap",
913 dimensions=(
"tract",
"skymap",
"band"),
917 vars()[f
"{name}_consolidated_map_mean"] = pipeBase.connectionTypes.Output(
918 doc=f
"Minumum-value map of {name}",
919 name=f
"{{coaddName}}Coadd_{name}_consolidated_map_mean",
920 storageClass=
"HealSparseMap",
921 dimensions=(
"skymap",
"band"),
923 vars()[f
"{name}_map_weighted_mean"] = pipeBase.connectionTypes.Input(
924 doc=f
"Weighted mean-value map of {name}",
925 name=f
"{{coaddName}}Coadd_{name}_map_weighted_mean",
926 storageClass=
"HealSparseMap",
927 dimensions=(
"tract",
"skymap",
"band"),
931 vars()[f
"{name}_consolidated_map_weighted_mean"] = pipeBase.connectionTypes.Output(
932 doc=f
"Minumum-value map of {name}",
933 name=f
"{{coaddName}}Coadd_{name}_consolidated_map_weighted_mean",
934 storageClass=
"HealSparseMap",
935 dimensions=(
"skymap",
"band"),
937 vars()[f
"{name}_map_sum"] = pipeBase.connectionTypes.Input(
938 doc=f
"Sum-value map of {name}",
939 name=f
"{{coaddName}}Coadd_{name}_map_sum",
940 storageClass=
"HealSparseMap",
941 dimensions=(
"tract",
"skymap",
"band"),
945 vars()[f
"{name}_consolidated_map_sum"] = pipeBase.connectionTypes.Output(
946 doc=f
"Minumum-value map of {name}",
947 name=f
"{{coaddName}}Coadd_{name}_consolidated_map_sum",
948 storageClass=
"HealSparseMap",
949 dimensions=(
"skymap",
"band"),
952 def __init__(self, *, config=None):
953 super().__init__(config=config)
957 for name
in BasePropertyMap.registry:
958 if name
not in config.property_maps:
960 prop_config.do_min =
False
961 prop_config.do_max =
False
962 prop_config.do_mean =
False
963 prop_config.do_weighted_mean =
False
964 prop_config.do_sum =
False
966 prop_config = config.property_maps[name]
968 if not prop_config.do_min:
969 self.inputs.remove(f
"{name}_map_min")
970 self.outputs.remove(f
"{name}_consolidated_map_min")
971 if not prop_config.do_max:
972 self.inputs.remove(f
"{name}_map_max")
973 self.outputs.remove(f
"{name}_consolidated_map_max")
974 if not prop_config.do_mean:
975 self.inputs.remove(f
"{name}_map_mean")
976 self.outputs.remove(f
"{name}_consolidated_map_mean")
977 if not prop_config.do_weighted_mean:
978 self.inputs.remove(f
"{name}_map_weighted_mean")
979 self.outputs.remove(f
"{name}_consolidated_map_weighted_mean")
980 if not prop_config.do_sum:
981 self.inputs.remove(f
"{name}_map_sum")
982 self.outputs.remove(f
"{name}_consolidated_map_sum")
985class ConsolidateHealSparsePropertyMapConfig(pipeBase.PipelineTaskConfig,
986 pipelineConnections=ConsolidateHealSparsePropertyMapConnections):
987 """Configuration parameters for ConsolidateHealSparsePropertyMapTask"""
988 property_maps = BasePropertyMap.registry.makeField(
990 default=[
"exposure_time",
1002 doc=
"Property map computation objects",
1004 nside_coverage = pexConfig.Field(
1005 doc=
"Consolidated HealSparse coverage map nside. Must be power of 2.",
1008 check=_is_power_of_two,
1011 def setDefaults(self):
1012 self.property_maps[
"exposure_time"].do_sum =
True
1013 self.property_maps[
"psf_size"].do_weighted_mean =
True
1014 self.property_maps[
"psf_e1"].do_weighted_mean =
True
1015 self.property_maps[
"psf_e2"].do_weighted_mean =
True
1016 self.property_maps[
"psf_maglim"].do_weighted_mean =
True
1017 self.property_maps[
"sky_noise"].do_weighted_mean =
True
1018 self.property_maps[
"sky_background"].do_weighted_mean =
True
1019 self.property_maps[
"dcr_dra"].do_weighted_mean =
True
1020 self.property_maps[
"dcr_ddec"].do_weighted_mean =
True
1021 self.property_maps[
"dcr_e1"].do_weighted_mean =
True
1022 self.property_maps[
"dcr_e2"].do_weighted_mean =
True
1023 self.property_maps[
"epoch"].do_mean =
True
1024 self.property_maps[
"epoch"].do_min =
True
1025 self.property_maps[
"epoch"].do_max =
True
1028class ConsolidateHealSparsePropertyMapTask(pipeBase.PipelineTask):
1029 """Task to consolidate HealSparse property maps.
1031 This task will take all the individual tract-based maps (per map type,
1032 per band) and consolidate them into one survey-wide map (per map type,
1033 per band). Each tract map is truncated to its inner region before
1036 ConfigClass = ConsolidateHealSparsePropertyMapConfig
1037 _DefaultName =
"consolidateHealSparsePropertyMapTask"
1039 def __init__(self, **kwargs):
1040 super().__init__(**kwargs)
1042 for name, config, PropertyMapClass
in self.config.property_maps.apply():
1043 self.property_maps[name] = PropertyMapClass(config, name)
1046 def runQuantum(self, butlerQC, inputRefs, outputRefs):
1047 inputs = butlerQC.get(inputRefs)
1049 sky_map = inputs.pop(
"sky_map")
1052 for name
in self.config.property_maps.names:
1053 for type_
in [
'min',
'max',
'mean',
'weighted_mean',
'sum']:
1054 map_type = f
"{name}_map_{type_}"
1055 if map_type
in inputs:
1056 input_refs = {ref.dataId[
'tract']: ref
1057 for ref
in inputs[map_type]}
1058 consolidated_map = self.consolidate_map(sky_map, input_refs)
1059 butlerQC.put(consolidated_map,
1060 getattr(outputRefs, f
"{name}_consolidated_map_{type_}"))
1062 def consolidate_map(self, sky_map, input_refs):
1063 """Consolidate the healsparse property maps.
1067 sky_map : Sky map object
1068 input_refs : `dict` [`int`: `lsst.daf.butler.DeferredDatasetHandle`]
1069 Dictionary of tract_id mapping to dataref.
1073 consolidated_map : `healsparse.HealSparseMap`
1074 Consolidated HealSparse map.
1079 nside_coverage_inputs =
None
1080 for tract_id
in input_refs:
1081 cov = input_refs[tract_id].get(component=
'coverage')
1082 if cov_mask
is None:
1083 cov_mask = cov.coverage_mask
1084 nside_coverage_inputs = cov.nside_coverage
1086 cov_mask |= cov.coverage_mask
1088 cov_pix_inputs, = np.where(cov_mask)
1091 if nside_coverage_inputs == self.config.nside_coverage:
1092 cov_pix = cov_pix_inputs
1093 elif nside_coverage_inputs > self.config.nside_coverage:
1096 bit_shift = hsp.utils._compute_bitshift(self.config.nside_coverage,
1097 nside_coverage_inputs)
1098 cov_pix = np.right_shift(cov_pix_inputs, bit_shift)
1102 bit_shift = hsp.utils._compute_bitshift(nside_coverage_inputs,
1103 self.config.nside_coverage)
1104 cov_pix = np.left_shift(cov_pix_inputs, bit_shift)
1107 consolidated_map =
None
1108 for tract_id
in input_refs:
1109 input_map = input_refs[tract_id].get()
1110 if consolidated_map
is None:
1111 consolidated_map = hsp.HealSparseMap.make_empty(
1112 self.config.nside_coverage,
1113 input_map.nside_sparse,
1115 sentinel=input_map._sentinel,
1117 metadata=input_map.metadata,
1121 vpix, ra, dec = input_map.valid_pixels_pos(return_pixels=
True)
1122 vpix_tract_ids = sky_map.findTractIdArray(ra, dec, degrees=
True)
1124 in_tract = (vpix_tract_ids == tract_id)
1126 consolidated_map[vpix[in_tract]] = input_map[vpix[in_tract]]
1128 return consolidated_map