Coverage for python / lsst / pipe / tasks / healSparseMapping.py: 16%

445 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-18 09:04 +0000

1# This file is part of pipe_tasks. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21 

22__all__ = ["HealSparseInputMapTask", "HealSparseInputMapConfig", 

23 "HealSparseMapFormatter", "HealSparsePropertyMapConnections", 

24 "HealSparsePropertyMapConfig", "HealSparsePropertyMapTask", 

25 "ConsolidateHealSparsePropertyMapConnections", 

26 "ConsolidateHealSparsePropertyMapConfig", 

27 "ConsolidateHealSparsePropertyMapTask"] 

28 

29from collections import defaultdict 

30import esutil 

31import warnings 

32import numbers 

33import numpy as np 

34import hpgeom as hpg 

35import healsparse as hsp 

36 

37import lsst.pex.config as pexConfig 

38import lsst.pipe.base as pipeBase 

39import lsst.geom 

40import lsst.afw.geom as afwGeom 

41from lsst.daf.butler import Formatter 

42from lsst.skymap import BaseSkyMap 

43from lsst.utils.timer import timeMethod 

44from .healSparseMappingProperties import (BasePropertyMap, BasePropertyMapConfig, 

45 PropertyMapMap, compute_approx_psf_size_and_shape) 

46 

47 

48class HealSparseMapFormatter(Formatter): 

49 """Interface for reading and writing healsparse.HealSparseMap files.""" 

50 unsupportedParameters = frozenset() 

51 supportedExtensions = frozenset({".hsp", ".fit", ".fits"}) 

52 extension = '.hsp' 

53 

54 def read(self, component=None): 

55 # Docstring inherited from Formatter.read. 

56 path = self.fileDescriptor.location.path 

57 

58 if component == 'coverage': 

59 try: 

60 data = hsp.HealSparseCoverage.read(path) 

61 except (OSError, RuntimeError): 

62 raise ValueError(f"Unable to read healsparse map with URI {self.fileDescriptor.location.uri}") 

63 

64 return data 

65 

66 if self.fileDescriptor.parameters is None: 

67 pixels = None 

68 degrade_nside = None 

69 else: 

70 pixels = self.fileDescriptor.parameters.get('pixels', None) 

71 degrade_nside = self.fileDescriptor.parameters.get('degrade_nside', None) 

72 try: 

73 data = hsp.HealSparseMap.read(path, pixels=pixels, degrade_nside=degrade_nside) 

74 except (OSError, RuntimeError): 

75 raise ValueError(f"Unable to read healsparse map with URI {self.fileDescriptor.location.uri}") 

76 

77 return data 

78 

79 def write(self, inMemoryDataset): 

80 # Docstring inherited from Formatter.write. 

81 # Update the location with the formatter-preferred file extension 

82 self.fileDescriptor.location.updateExtension(self.extension) 

83 inMemoryDataset.write(self.fileDescriptor.location.path, clobber=True) 

84 

85 

86def _is_power_of_two(value): 

87 """Check that value is a power of two. 

88 

89 Parameters 

90 ---------- 

91 value : `int` 

92 Value to check. 

93 

94 Returns 

95 ------- 

96 is_power_of_two : `bool` 

97 True if value is a power of two; False otherwise, or 

98 if value is not an integer. 

99 """ 

100 if not isinstance(value, numbers.Integral): 

101 return False 

102 

103 # See https://stackoverflow.com/questions/57025836 

104 # Every power of 2 has exactly 1 bit set to 1; subtracting 

105 # 1 therefore flips every preceding bit. If you and that 

106 # together with the original value it must be 0. 

107 return (value & (value - 1) == 0) and value != 0 

108 

109 

110class HealSparseInputMapConfig(pexConfig.Config): 

111 """Configuration parameters for HealSparseInputMapTask""" 

112 nside = pexConfig.Field( 

113 doc="Mapping healpix nside. Must be power of 2.", 

114 dtype=int, 

115 default=32768, 

116 check=_is_power_of_two, 

117 ) 

118 nside_coverage = pexConfig.Field( 

119 doc="HealSparse coverage map nside. Must be power of 2.", 

120 dtype=int, 

121 default=256, 

122 check=_is_power_of_two, 

123 ) 

124 bad_mask_min_coverage = pexConfig.Field( 

125 doc=("Minimum area fraction of a map healpixel pixel that must be " 

126 "covered by bad pixels to be removed from the input map. " 

127 "This is approximate."), 

128 dtype=float, 

129 default=0.5, 

130 ) 

131 

132 

133class HealSparseInputMapTask(pipeBase.Task): 

134 """Task for making a HealSparse input map.""" 

135 

136 ConfigClass = HealSparseInputMapConfig 

137 _DefaultName = "healSparseInputMap" 

138 

139 def __init__(self, **kwargs): 

140 pipeBase.Task.__init__(self, **kwargs) 

141 

142 self.ccd_input_map = None 

143 self.cell_input_map = None 

144 

145 def build_ccd_input_map(self, bbox, wcs, ccds): 

146 """Build a map from ccd valid polygons or bounding boxes. 

147 

148 Parameters 

149 ---------- 

150 bbox : `lsst.geom.Box2I` 

151 Bounding box for region to build input map. 

152 wcs : `lsst.afw.geom.SkyWcs` 

153 WCS object for region to build input map. 

154 ccds : `lsst.afw.table.ExposureCatalog` 

155 Exposure catalog with ccd data from coadd inputs. 

156 """ 

157 with warnings.catch_warnings(): 

158 # Healsparse will emit a warning if nside coverage is greater than 

159 # 128. In the case of generating patch input maps, and not global 

160 # maps, high nside coverage works fine, so we can suppress this 

161 # warning. 

162 warnings.simplefilter("ignore") 

163 self.ccd_input_map = hsp.HealSparseMap.make_empty(nside_coverage=self.config.nside_coverage, 

164 nside_sparse=self.config.nside, 

165 dtype=hsp.WIDE_MASK, 

166 wide_mask_maxbits=len(ccds)) 

167 self._wcs = wcs 

168 self._bbox = bbox 

169 self._ccds = ccds 

170 

171 pixel_scale = wcs.getPixelScale(bbox.getCenter()).asArcseconds() 

172 hpix_area_arcsec2 = hpg.nside_to_pixel_area(self.config.nside, degrees=True)*(3600.**2.) 

173 self._min_bad = self.config.bad_mask_min_coverage*hpix_area_arcsec2/(pixel_scale**2.) 

174 

175 metadata = {} 

176 self._bits_per_visit_ccd = {} 

177 self._bits_per_visit = defaultdict(list) 

178 for bit, ccd_row in enumerate(ccds): 

179 metadata[f"B{bit:04d}CCD"] = ccd_row["ccd"] 

180 metadata[f"B{bit:04d}VIS"] = ccd_row["visit"] 

181 metadata[f"B{bit:04d}WT"] = ccd_row["weight"] 

182 

183 self._bits_per_visit_ccd[(ccd_row["visit"], ccd_row["ccd"])] = bit 

184 self._bits_per_visit[ccd_row["visit"]].append(bit) 

185 

186 ccd_poly = ccd_row.getValidPolygon() 

187 if ccd_poly is None: 

188 ccd_poly = afwGeom.Polygon(lsst.geom.Box2D(ccd_row.getBBox())) 

189 # Detectors need to be rendered with their own wcs. 

190 ccd_poly_radec = self._pixels_to_radec(ccd_row.getWcs(), ccd_poly.convexHull().getVertices()) 

191 

192 # Create a ccd healsparse polygon 

193 poly = hsp.Polygon(ra=ccd_poly_radec[: -1, 0], 

194 dec=ccd_poly_radec[: -1, 1], 

195 value=[bit]) 

196 self.ccd_input_map.set_bits_pix(poly.get_pixels(nside=self.ccd_input_map.nside_sparse), 

197 [bit]) 

198 

199 # Cut down to the overall bounding box with associated wcs. 

200 bbox_afw_poly = afwGeom.Polygon(lsst.geom.Box2D(bbox)) 

201 bbox_poly_radec = self._pixels_to_radec(self._wcs, 

202 bbox_afw_poly.convexHull().getVertices()) 

203 bbox_poly = hsp.Polygon(ra=bbox_poly_radec[: -1, 0], dec=bbox_poly_radec[: -1, 1], 

204 value=np.arange(self.ccd_input_map.wide_mask_maxbits)) 

205 with warnings.catch_warnings(): 

206 warnings.simplefilter("ignore") 

207 bbox_poly_map = bbox_poly.get_map_like(self.ccd_input_map) 

208 self.ccd_input_map = hsp.and_intersection([self.ccd_input_map, bbox_poly_map]) 

209 self.ccd_input_map.metadata = metadata 

210 

211 # Create a temporary map to hold the count of bad pixels in each healpix pixel 

212 dtype = [(f"v{visit}", np.int64) for visit in self._bits_per_visit.keys()] 

213 

214 with warnings.catch_warnings(): 

215 # Healsparse will emit a warning if nside coverage is greater than 

216 # 128. In the case of generating patch input maps, and not global 

217 # maps, high nside coverage works fine, so we can suppress this 

218 # warning. 

219 warnings.simplefilter("ignore") 

220 self._ccd_input_bad_count_map = hsp.HealSparseMap.make_empty( 

221 nside_coverage=self.config.nside_coverage, 

222 nside_sparse=self.config.nside, 

223 dtype=dtype, 

224 primary=dtype[0][0]) 

225 

226 self._ccd_input_pixels = self.ccd_input_map.valid_pixels 

227 

228 # Don't set input bad map if there are no ccds which overlap the bbox. 

229 if len(self._ccd_input_pixels) > 0: 

230 # Ensure these are sorted. 

231 self._ccd_input_pixels = np.sort(self._ccd_input_pixels) 

232 

233 self._ccd_input_bad_count_map[self._ccd_input_pixels] = np.zeros(1, dtype=dtype) 

234 

235 def mask_warp_bbox(self, bbox, visit, mask, bit_mask_value): 

236 """Mask a subregion from a visit. 

237 This must be run after build_ccd_input_map initializes 

238 the overall map. 

239 

240 Parameters 

241 ---------- 

242 bbox : `lsst.geom.Box2I` 

243 Bounding box from region to mask. 

244 visit : `int` 

245 Visit number corresponding to warp with mask. 

246 mask : `lsst.afw.image.MaskX` 

247 Mask plane from warp exposure. 

248 bit_mask_value : `int` 

249 Bit mask to check for bad pixels. 

250 

251 Raises 

252 ------ 

253 RuntimeError : Raised if build_ccd_input_map was not run first. 

254 """ 

255 if self.ccd_input_map is None: 

256 raise RuntimeError("Must run build_ccd_input_map before mask_warp_bbox") 

257 

258 if len(self._ccd_input_pixels) == 0: 

259 # This tract has no coverage, so there is nothing to do. 

260 return 

261 

262 # Find the bad pixels and convert to healpix 

263 bad_pixels = np.where(mask.array & bit_mask_value) 

264 if len(bad_pixels[0]) == 0: 

265 # No bad pixels 

266 return 

267 

268 # Bad pixels come from warps which use the overall wcs. 

269 bad_ra, bad_dec = self._wcs.pixelToSkyArray( 

270 bad_pixels[1].astype(np.float64) + bbox.getMinX(), 

271 bad_pixels[0].astype(np.float64) + bbox.getMinY(), 

272 degrees=True, 

273 ) 

274 bad_hpix = hpg.angle_to_pixel(self.config.nside, bad_ra, bad_dec) 

275 

276 # Check if any of these "bad" pixels are in the valid footprint. 

277 match_input, match_bad = esutil.numpy_util.match(self._ccd_input_pixels, bad_hpix, presorted=True) 

278 if len(match_bad) == 0: 

279 return 

280 

281 bad_hpix = bad_hpix[match_bad] 

282 

283 # Create a view of the column we need to add to. 

284 count_map_visit = self._ccd_input_bad_count_map[f"v{visit}"] 

285 # Add the bad pixels to the accumulator. Note that the view 

286 # cannot append pixels, but the match above ensures we are 

287 # only adding to pixels that are already in the coverage 

288 # map and initialized. 

289 count_map_visit.update_values_pix(bad_hpix, 1, operation="add") 

290 

291 def finalize_ccd_input_map_mask(self): 

292 """Use accumulated mask information to finalize the masking of 

293 ccd_input_map. 

294 

295 Raises 

296 ------ 

297 RuntimeError : Raised if build_ccd_input_map was not run first. 

298 """ 

299 if self.ccd_input_map is None: 

300 raise RuntimeError("Must run build_ccd_input_map before finalize_ccd_input_map_mask.") 

301 

302 count_map_arr = self._ccd_input_bad_count_map[self._ccd_input_pixels] 

303 for visit in self._bits_per_visit: 

304 to_mask, = np.where(count_map_arr[f"v{visit}"] > self._min_bad) 

305 if to_mask.size == 0: 

306 continue 

307 self.ccd_input_map.clear_bits_pix(self._ccd_input_pixels[to_mask], 

308 self._bits_per_visit[visit]) 

309 

310 # Clear memory 

311 self._ccd_input_bad_count_map = None 

312 

313 def initialize_cell_input_map(self, bbox, wcs, visit_detectors): 

314 """Initialize the cell input map. 

315 

316 Parameters 

317 ---------- 

318 bbox : `lsst.geom.Box2I` 

319 Bounding box for region to build input map. 

320 wcs : `lsst.afw.geom.SkyWcs` 

321 WCS object for region to build input map. 

322 visit_detectors : `list` [`tuple`] 

323 List of visit/detector tuples. 

324 """ 

325 with warnings.catch_warnings(): 

326 # Healsparse will emit a warning if nside coverage is greater than 

327 # 128. In the case of generating patch input maps, and not global 

328 # maps, high nside coverage works fine, so we can suppress this 

329 # warning. 

330 warnings.simplefilter("ignore") 

331 self.cell_input_map = hsp.HealSparseMap.make_empty( 

332 nside_coverage=self.config.nside_coverage, 

333 nside_sparse=self.config.nside, 

334 dtype=hsp.WIDE_MASK, 

335 wide_mask_maxbits=len(visit_detectors), 

336 ) 

337 

338 self._wcs = wcs 

339 self._bbox = bbox 

340 self._visit_detectors = visit_detectors 

341 

342 metadata = {} 

343 self._bits_per_visit_detector = {} 

344 self._bits_per_visit = defaultdict(list) 

345 for bit, (visit, detector) in enumerate(visit_detectors): 

346 metadata[f"B{bit:04d}CCD"] = detector 

347 metadata[f"B{bit:04d}VIS"] = visit 

348 # Weight will be filled later. 

349 metadata[f"B{bit:04d}WT"] = 0.0 

350 

351 self._bits_per_visit_detector[(visit, detector)] = bit 

352 self._bits_per_visit[visit].append(bit) 

353 

354 self.cell_input_map.metadata = metadata 

355 

356 self._cell_pixels = {} 

357 self._visit_detector_cache = None 

358 self._detector_map_cache = None 

359 

360 def build_cell_input_map(self, cell): 

361 """Add a cell to the input map. 

362 

363 Parameters 

364 ---------- 

365 cell : `lsst.skymap.cellInfo.CellInfo` 

366 Cell to initialize. 

367 """ 

368 if self.cell_input_map is None: 

369 raise RuntimeError("Must run initialize_cell_input_map() before build_cell_input_map()") 

370 

371 # The input map only needs the *inner* sky polygon. 

372 cell_poly = cell.getInnerSkyPolygon() 

373 vertices = np.asarray([[v.x(), v.y(), v.z()] for v in cell_poly.getVertices()]) 

374 pixels = hpg.query_polygon_vec(self.config.nside, vertices) 

375 

376 self._cell_pixels[cell.sequential_index] = pixels 

377 

378 def add_warp_to_cell_input_map(self, ccd_row, weight, cell): 

379 """Add a warp to the input map for a given cell. 

380 

381 Parameters 

382 ---------- 

383 ccd_row : `lsst.afw.table.ExposureRecord` 

384 Row from the ccd table. 

385 weight : `float` 

386 Weight to use for this detector. 

387 cell : `lsst.skymap.cellInfo.CellInfo` 

388 Cell that overlaps the ccd_table_row. 

389 """ 

390 visit = int(ccd_row["visit"]) 

391 detector = int(ccd_row["ccd"]) 

392 

393 if (bit := self._bits_per_visit_detector.get((visit, detector), None)) is None: 

394 raise RuntimeError(f"Visit {visit} / detector {detector} not expected in map.") 

395 

396 if (cell_pixels := self._cell_pixels.get(cell.sequential_index, None)) is None: 

397 raise RuntimeError(f"Cell {cell.sequential_index} not expected in map.") 

398 

399 if (visit, detector) != self._visit_detector_cache: 

400 self._visit_detector_cache = (visit, detector) 

401 

402 ccd_poly = ccd_row.validPolygon 

403 if ccd_poly is None: 

404 ccd_poly = afwGeom.Polygon(lsst.geom.Box2D(ccd_row.getBBox())) 

405 # Detectors need to be rendered with their own wcs. 

406 ccd_poly_radec = self._pixels_to_radec(ccd_row.getWcs(), ccd_poly.convexHull().getVertices()) 

407 

408 poly = hsp.Polygon( 

409 ra=ccd_poly_radec[: -1, 0], 

410 dec=ccd_poly_radec[: -1, 1], 

411 value=True, 

412 ) 

413 with warnings.catch_warnings(): 

414 # Healsparse will emit a warning if nside coverage is greater 

415 # than 128. In the case of generating patch input maps, and 

416 # not global maps, high nside coverage works fine, so we can 

417 # suppress this warning. 

418 warnings.simplefilter("ignore") 

419 

420 self._detector_map_cache = poly.get_map( 

421 nside_coverage=self.config.nside_coverage, 

422 nside_sparse=self.config.nside, 

423 dtype=np.bool_, 

424 ) 

425 

426 self.cell_input_map.metadata[f"B{bit:04d}WT"] = weight 

427 

428 overlap = self._detector_map_cache[cell_pixels] 

429 self.cell_input_map.set_bits_pix(cell_pixels[overlap], bit) 

430 

431 def _pixels_to_radec(self, wcs, pixels): 

432 """Convert pixels to ra/dec positions using a wcs. 

433 

434 Parameters 

435 ---------- 

436 wcs : `lsst.afw.geom.SkyWcs` 

437 WCS object. 

438 pixels : `list` [`lsst.geom.Point2D`] 

439 List of pixels to convert. 

440 

441 Returns 

442 ------- 

443 radec : `numpy.ndarray` 

444 Nx2 array of ra/dec positions associated with pixels. 

445 """ 

446 sph_pts = wcs.pixelToSky(pixels) 

447 return np.array([(sph.getRa().asDegrees(), sph.getDec().asDegrees()) 

448 for sph in sph_pts]) 

449 

450 

451class HealSparsePropertyMapConnections(pipeBase.PipelineTaskConnections, 

452 dimensions=("tract", "band", "skymap",), 

453 defaultTemplates={"coaddName": "deep", 

454 "calexpType": ""}): 

455 input_maps = pipeBase.connectionTypes.Input( 

456 doc="Healsparse bit-wise coadd input maps", 

457 name="{coaddName}Coadd_inputMap", 

458 storageClass="HealSparseMap", 

459 dimensions=("tract", "patch", "skymap", "band"), 

460 multiple=True, 

461 deferLoad=True, 

462 ) 

463 coadd_exposures = pipeBase.connectionTypes.Input( 

464 doc="Coadded exposures associated with input_maps", 

465 name="{coaddName}Coadd_calexp", 

466 storageClass="ExposureF", 

467 dimensions=("tract", "patch", "skymap", "band"), 

468 multiple=True, 

469 deferLoad=True, 

470 ) 

471 visit_summaries = pipeBase.connectionTypes.Input( 

472 doc="Visit summary tables with aggregated statistics", 

473 name="finalVisitSummary", 

474 storageClass="ExposureCatalog", 

475 dimensions=("instrument", "visit"), 

476 multiple=True, 

477 deferLoad=True, 

478 ) 

479 sky_map = pipeBase.connectionTypes.Input( 

480 doc="Input definition of geometry/bbox and projection/wcs for coadded exposures", 

481 name=BaseSkyMap.SKYMAP_DATASET_TYPE_NAME, 

482 storageClass="SkyMap", 

483 dimensions=("skymap",), 

484 ) 

485 

486 # Create output connections for all possible maps defined in the 

487 # registry. The vars() trick used here allows us to set class attributes 

488 # programatically. Taken from 

489 # https://stackoverflow.com/questions/2519807/ 

490 # setting-a-class-attribute-with-a-given-name-in-python-while-defining-the-class 

491 for name in BasePropertyMap.registry: 

492 vars()[f"{name}_map_min"] = pipeBase.connectionTypes.Output( 

493 doc=f"Minimum-value map of {name}", 

494 name=f"{{coaddName}}Coadd_{name}_map_min", 

495 storageClass="HealSparseMap", 

496 dimensions=("tract", "skymap", "band"), 

497 ) 

498 vars()[f"{name}_map_max"] = pipeBase.connectionTypes.Output( 

499 doc=f"Maximum-value map of {name}", 

500 name=f"{{coaddName}}Coadd_{name}_map_max", 

501 storageClass="HealSparseMap", 

502 dimensions=("tract", "skymap", "band"), 

503 ) 

504 vars()[f"{name}_map_mean"] = pipeBase.connectionTypes.Output( 

505 doc=f"Mean-value map of {name}", 

506 name=f"{{coaddName}}Coadd_{name}_map_mean", 

507 storageClass="HealSparseMap", 

508 dimensions=("tract", "skymap", "band"), 

509 ) 

510 vars()[f"{name}_map_weighted_mean"] = pipeBase.connectionTypes.Output( 

511 doc=f"Weighted mean-value map of {name}", 

512 name=f"{{coaddName}}Coadd_{name}_map_weighted_mean", 

513 storageClass="HealSparseMap", 

514 dimensions=("tract", "skymap", "band"), 

515 ) 

516 vars()[f"{name}_map_sum"] = pipeBase.connectionTypes.Output( 

517 doc=f"Sum-value map of {name}", 

518 name=f"{{coaddName}}Coadd_{name}_map_sum", 

519 storageClass="HealSparseMap", 

520 dimensions=("tract", "skymap", "band"), 

521 ) 

522 

523 def __init__(self, *, config=None): 

524 super().__init__(config=config) 

525 

526 # Not all possible maps in the registry will be configured to run. 

527 # Here we remove the unused connections. 

528 for name in BasePropertyMap.registry: 

529 if name not in config.property_maps: 

530 prop_config = BasePropertyMapConfig() 

531 prop_config.do_min = False 

532 prop_config.do_max = False 

533 prop_config.do_mean = False 

534 prop_config.do_weighted_mean = False 

535 prop_config.do_sum = False 

536 else: 

537 prop_config = config.property_maps[name] 

538 

539 if not prop_config.do_min: 

540 self.outputs.remove(f"{name}_map_min") 

541 if not prop_config.do_max: 

542 self.outputs.remove(f"{name}_map_max") 

543 if not prop_config.do_mean: 

544 self.outputs.remove(f"{name}_map_mean") 

545 if not prop_config.do_weighted_mean: 

546 self.outputs.remove(f"{name}_map_weighted_mean") 

547 if not prop_config.do_sum: 

548 self.outputs.remove(f"{name}_map_sum") 

549 

550 

551class HealSparsePropertyMapConfig(pipeBase.PipelineTaskConfig, 

552 pipelineConnections=HealSparsePropertyMapConnections): 

553 """Configuration parameters for HealSparsePropertyMapTask""" 

554 property_maps = BasePropertyMap.registry.makeField( 

555 multi=True, 

556 default=["exposure_time", 

557 "psf_size", 

558 "psf_e1", 

559 "psf_e2", 

560 "psf_maglim", 

561 "sky_noise", 

562 "sky_background", 

563 "dcr_dra", 

564 "dcr_ddec", 

565 "dcr_e1", 

566 "dcr_e2", 

567 "epoch"], 

568 doc="Property map computation objects", 

569 ) 

570 

571 def setDefaults(self): 

572 self.property_maps["exposure_time"].do_sum = True 

573 self.property_maps["psf_size"].do_weighted_mean = True 

574 self.property_maps["psf_e1"].do_weighted_mean = True 

575 self.property_maps["psf_e2"].do_weighted_mean = True 

576 self.property_maps["psf_maglim"].do_weighted_mean = True 

577 self.property_maps["sky_noise"].do_weighted_mean = True 

578 self.property_maps["sky_background"].do_weighted_mean = True 

579 self.property_maps["dcr_dra"].do_weighted_mean = True 

580 self.property_maps["dcr_ddec"].do_weighted_mean = True 

581 self.property_maps["dcr_e1"].do_weighted_mean = True 

582 self.property_maps["dcr_e2"].do_weighted_mean = True 

583 self.property_maps["epoch"].do_mean = True 

584 self.property_maps["epoch"].do_min = True 

585 self.property_maps["epoch"].do_max = True 

586 

587 

588class HealSparsePropertyMapTask(pipeBase.PipelineTask): 

589 """Task to compute Healsparse property maps. 

590 

591 This task will compute individual property maps (per tract, per 

592 map type, per band). These maps cover the full coadd tract, and 

593 are not truncated to the inner tract region. 

594 """ 

595 ConfigClass = HealSparsePropertyMapConfig 

596 _DefaultName = "healSparsePropertyMapTask" 

597 

598 def __init__(self, **kwargs): 

599 super().__init__(**kwargs) 

600 self.property_maps = PropertyMapMap() 

601 for name, config, PropertyMapClass in self.config.property_maps.apply(): 

602 self.property_maps[name] = PropertyMapClass(config, name) 

603 

604 @timeMethod 

605 def runQuantum(self, butlerQC, inputRefs, outputRefs): 

606 inputs = butlerQC.get(inputRefs) 

607 

608 sky_map = inputs.pop("sky_map") 

609 

610 tract = butlerQC.quantum.dataId["tract"] 

611 band = butlerQC.quantum.dataId["band"] 

612 

613 input_map_dict = {ref.dataId["patch"]: ref for ref in inputs["input_maps"]} 

614 coadd_dict = {ref.dataId["patch"]: ref for ref in inputs["coadd_exposures"]} 

615 

616 visit_summary_dict = {ref.dataId["visit"]: ref.get() 

617 for ref in inputs["visit_summaries"]} 

618 

619 self.run(sky_map, tract, band, coadd_dict, input_map_dict, visit_summary_dict) 

620 

621 # Write the outputs 

622 for name, property_map in self.property_maps.items(): 

623 if property_map.config.do_min: 

624 butlerQC.put(property_map.min_map, 

625 getattr(outputRefs, f"{name}_map_min")) 

626 if property_map.config.do_max: 

627 butlerQC.put(property_map.max_map, 

628 getattr(outputRefs, f"{name}_map_max")) 

629 if property_map.config.do_mean: 

630 butlerQC.put(property_map.mean_map, 

631 getattr(outputRefs, f"{name}_map_mean")) 

632 if property_map.config.do_weighted_mean: 

633 butlerQC.put(property_map.weighted_mean_map, 

634 getattr(outputRefs, f"{name}_map_weighted_mean")) 

635 if property_map.config.do_sum: 

636 butlerQC.put(property_map.sum_map, 

637 getattr(outputRefs, f"{name}_map_sum")) 

638 

639 def run(self, sky_map, tract, band, coadd_dict, input_map_dict, visit_summary_dict): 

640 """Run the healsparse property task. 

641 

642 Parameters 

643 ---------- 

644 sky_map : Sky map object 

645 tract : `int` 

646 Tract number. 

647 band : `str` 

648 Band name for logging. 

649 coadd_dict : `dict` [`int`: `lsst.daf.butler.DeferredDatasetHandle`] 

650 Dictionary of coadd exposure references. Keys are patch numbers. 

651 input_map_dict : `dict` [`int`: `lsst.daf.butler.DeferredDatasetHandle`] 

652 Dictionary of input map references. Keys are patch numbers. 

653 visit_summary_dict : `dict` [`int`: `lsst.afw.table.ExposureCatalog`] 

654 Dictionary of visit summary tables. Keys are visit numbers. 

655 

656 Raises 

657 ------ 

658 RepeatableQuantumError 

659 If visit_summary_dict is missing any visits or detectors found in an 

660 input map. This leads to an inconsistency between what is in the coadd 

661 (via the input map) and the visit summary tables which contain data 

662 to compute the maps. 

663 """ 

664 tract_info = sky_map[tract] 

665 

666 tract_maps_initialized = False 

667 

668 for patch in input_map_dict.keys(): 

669 self.log.info("Making maps for band %s, tract %d, patch %d.", 

670 band, tract, patch) 

671 

672 patch_info = tract_info[patch] 

673 

674 input_map = input_map_dict[patch].get() 

675 

676 # Extract input map metadata. 

677 input_bit_weight_dict = {} 

678 for bit in range(input_map.wide_mask_maxbits): 

679 # Not all bits may be listed because maxbits must be a multiple 

680 # of 8. 

681 if f"B{bit:04d}CCD" in input_map.metadata: 

682 visit = input_map.metadata[f"B{bit:04d}VIS"] 

683 detector = input_map.metadata[f"B{bit:04d}CCD"] 

684 weight = input_map.metadata[f"B{bit:04d}WT"] 

685 input_bit_weight_dict[(visit, detector)] = (bit, weight) 

686 

687 # Initialize the tract maps as soon as we have the first input 

688 # map for getting nside information. 

689 if not tract_maps_initialized: 

690 # We use the first input map nside information to initialize 

691 # the tract maps 

692 nside_coverage = self._compute_nside_coverage_tract(tract_info) 

693 nside = input_map.nside_sparse 

694 

695 do_compute_approx_psf = False 

696 # Initialize the tract maps 

697 for property_map in self.property_maps: 

698 property_map.initialize_tract_maps(nside_coverage, nside) 

699 if property_map.requires_psf: 

700 do_compute_approx_psf = True 

701 

702 tract_maps_initialized = True 

703 

704 if input_map.valid_pixels.size == 0: 

705 self.log.warning("No valid pixels for band %s, tract %d, patch %d; skipping.", 

706 band, tract, patch) 

707 continue 

708 

709 coadd_photo_calib = coadd_dict[patch].get(component="photoCalib") 

710 coadd_zeropoint = 2.5*np.log10(coadd_photo_calib.getInstFluxAtZeroMagnitude()) 

711 

712 # Crop input_map to the inner polygon of the patch 

713 poly_vertices = patch_info.getInnerSkyPolygon(tract_info.getWcs()).getVertices() 

714 patch_radec = self._vertices_to_radec(poly_vertices) 

715 patch_poly = hsp.Polygon(ra=patch_radec[:, 0], dec=patch_radec[:, 1], 

716 value=np.arange(input_map.wide_mask_maxbits)) 

717 with warnings.catch_warnings(): 

718 # Healsparse will emit a warning if nside coverage is greater than 

719 # 128. In the case of generating patch input maps, and not global 

720 # maps, high nside coverage works fine, so we can suppress this 

721 # warning. 

722 warnings.simplefilter("ignore") 

723 patch_poly_map = patch_poly.get_map_like(input_map) 

724 input_map = hsp.and_intersection([input_map, patch_poly_map]) 

725 

726 valid_pixels, vpix_ra, vpix_dec = input_map.valid_pixels_pos(return_pixels=True) 

727 

728 # Check if there are no valid pixels for the inner (unique) patch region 

729 if valid_pixels.size == 0: 

730 continue 

731 

732 # Initialize the value accumulators 

733 for property_map in self.property_maps: 

734 property_map.initialize_values(valid_pixels.size) 

735 property_map.zeropoint = coadd_zeropoint 

736 

737 # Initialize the weight and counter accumulators 

738 total_weights = np.zeros(valid_pixels.size) 

739 total_inputs = np.zeros(valid_pixels.size, dtype=np.int32) 

740 

741 for (visit, detector), (bit, weight) in input_bit_weight_dict.items(): 

742 # Which pixels in the map are used by this visit/detector 

743 inmap, = np.where(input_map.check_bits_pix(valid_pixels, [bit])) 

744 

745 # Check if there are any valid pixels in the map from this deteector. 

746 if inmap.size == 0: 

747 continue 

748 

749 # Retrieve the correct visitSummary row 

750 if visit not in visit_summary_dict: 

751 msg = f"Visit {visit} not found in visit_summaries." 

752 raise pipeBase.RepeatableQuantumError(msg) 

753 row = visit_summary_dict[visit].find(detector) 

754 if row is None: 

755 msg = f"Visit {visit} / detector {detector} not found in visit_summaries." 

756 raise pipeBase.RepeatableQuantumError(msg) 

757 

758 x, y = row.wcs.skyToPixelArray(vpix_ra[inmap], vpix_dec[inmap], degrees=True) 

759 scalings = self._compute_calib_scale(row, x, y) 

760 

761 if do_compute_approx_psf: 

762 psf_array = compute_approx_psf_size_and_shape(row, vpix_ra[inmap], vpix_dec[inmap]) 

763 else: 

764 psf_array = None 

765 

766 total_weights[inmap] += weight 

767 total_inputs[inmap] += 1 

768 

769 # Accumulate the values 

770 for property_map in self.property_maps: 

771 property_map.accumulate_values(inmap, 

772 vpix_ra[inmap], 

773 vpix_dec[inmap], 

774 weight, 

775 scalings, 

776 row, 

777 psf_array=psf_array) 

778 

779 # Finalize the mean values and set the tract maps 

780 for property_map in self.property_maps: 

781 property_map.finalize_mean_values(total_weights, total_inputs) 

782 property_map.set_map_values(valid_pixels) 

783 

784 def _compute_calib_scale(self, ccd_row, x, y): 

785 """Compute calibration scaling values. 

786 

787 Parameters 

788 ---------- 

789 ccd_row : `lsst.afw.table.ExposureRecord` 

790 Exposure metadata for a given detector exposure. 

791 x : `np.ndarray` 

792 Array of x positions. 

793 y : `np.ndarray` 

794 Array of y positions. 

795 

796 Returns 

797 ------- 

798 calib_scale : `np.ndarray` 

799 Array of calibration scale values. 

800 """ 

801 photo_calib = ccd_row.getPhotoCalib() 

802 bf = photo_calib.computeScaledCalibration() 

803 if bf.getBBox() == ccd_row.getBBox(): 

804 # Track variable calibration over the detector 

805 calib_scale = photo_calib.getCalibrationMean()*bf.evaluate(x, y) 

806 else: 

807 # Spatially constant calibration 

808 calib_scale = photo_calib.getCalibrationMean() 

809 

810 return calib_scale 

811 

812 def _vertices_to_radec(self, vertices): 

813 """Convert polygon vertices to ra/dec. 

814 

815 Parameters 

816 ---------- 

817 vertices : `list` [ `lsst.sphgeom.UnitVector3d` ] 

818 Vertices for bounding polygon. 

819 

820 Returns 

821 ------- 

822 radec : `numpy.ndarray` 

823 Nx2 array of ra/dec positions (in degrees) associated with vertices. 

824 """ 

825 lonlats = [lsst.sphgeom.LonLat(x) for x in vertices] 

826 radec = np.array([(x.getLon().asDegrees(), x.getLat().asDegrees()) for 

827 x in lonlats]) 

828 return radec 

829 

830 def _compute_nside_coverage_tract(self, tract_info): 

831 """Compute the optimal coverage nside for a tract. 

832 

833 Parameters 

834 ---------- 

835 tract_info : `lsst.skymap.tractInfo.ExplicitTractInfo` 

836 Tract information object. 

837 

838 Returns 

839 ------- 

840 nside_coverage : `int` 

841 Optimal coverage nside for a tract map. 

842 """ 

843 num_patches = tract_info.getNumPatches() 

844 

845 # Compute approximate patch area 

846 patch_info = tract_info.getPatchInfo(0) 

847 vertices = patch_info.getInnerSkyPolygon(tract_info.getWcs()).getVertices() 

848 radec = self._vertices_to_radec(vertices) 

849 delta_ra = np.max(radec[:, 0]) - np.min(radec[:, 0]) 

850 delta_dec = np.max(radec[:, 1]) - np.min(radec[:, 1]) 

851 patch_area = delta_ra*delta_dec*np.cos(np.deg2rad(np.mean(radec[:, 1]))) 

852 

853 tract_area = num_patches[0]*num_patches[1]*patch_area 

854 # Start with a fairly low nside and increase until we find the approximate area. 

855 nside_coverage_tract = 32 

856 while hpg.nside_to_pixel_area(nside_coverage_tract, degrees=True) > tract_area: 

857 nside_coverage_tract = 2*nside_coverage_tract 

858 # Step back one, but don't go bigger pixels than nside=32 or smaller 

859 # than 128 (recommended by healsparse). 

860 nside_coverage_tract = int(np.clip(nside_coverage_tract/2, 32, 128)) 

861 

862 return nside_coverage_tract 

863 

864 

865class ConsolidateHealSparsePropertyMapConnections(pipeBase.PipelineTaskConnections, 

866 dimensions=("band", "skymap",), 

867 defaultTemplates={"coaddName": "deep"}): 

868 sky_map = pipeBase.connectionTypes.Input( 

869 doc="Input definition of geometry/bbox and projection/wcs for coadded exposures", 

870 name=BaseSkyMap.SKYMAP_DATASET_TYPE_NAME, 

871 storageClass="SkyMap", 

872 dimensions=("skymap",), 

873 ) 

874 

875 # Create output connections for all possible maps defined in the 

876 # registry. The vars() trick used here allows us to set class attributes 

877 # programatically. Taken from 

878 # https://stackoverflow.com/questions/2519807/ 

879 # setting-a-class-attribute-with-a-given-name-in-python-while-defining-the-class 

880 for name in BasePropertyMap.registry: 

881 vars()[f"{name}_map_min"] = pipeBase.connectionTypes.Input( 

882 doc=f"Minimum-value map of {name}", 

883 name=f"{{coaddName}}Coadd_{name}_map_min", 

884 storageClass="HealSparseMap", 

885 dimensions=("tract", "skymap", "band"), 

886 multiple=True, 

887 deferLoad=True, 

888 ) 

889 vars()[f"{name}_consolidated_map_min"] = pipeBase.connectionTypes.Output( 

890 doc=f"Minumum-value map of {name}", 

891 name=f"{{coaddName}}Coadd_{name}_consolidated_map_min", 

892 storageClass="HealSparseMap", 

893 dimensions=("skymap", "band"), 

894 ) 

895 vars()[f"{name}_map_max"] = pipeBase.connectionTypes.Input( 

896 doc=f"Maximum-value map of {name}", 

897 name=f"{{coaddName}}Coadd_{name}_map_max", 

898 storageClass="HealSparseMap", 

899 dimensions=("tract", "skymap", "band"), 

900 multiple=True, 

901 deferLoad=True, 

902 ) 

903 vars()[f"{name}_consolidated_map_max"] = pipeBase.connectionTypes.Output( 

904 doc=f"Minumum-value map of {name}", 

905 name=f"{{coaddName}}Coadd_{name}_consolidated_map_max", 

906 storageClass="HealSparseMap", 

907 dimensions=("skymap", "band"), 

908 ) 

909 vars()[f"{name}_map_mean"] = pipeBase.connectionTypes.Input( 

910 doc=f"Mean-value map of {name}", 

911 name=f"{{coaddName}}Coadd_{name}_map_mean", 

912 storageClass="HealSparseMap", 

913 dimensions=("tract", "skymap", "band"), 

914 multiple=True, 

915 deferLoad=True, 

916 ) 

917 vars()[f"{name}_consolidated_map_mean"] = pipeBase.connectionTypes.Output( 

918 doc=f"Minumum-value map of {name}", 

919 name=f"{{coaddName}}Coadd_{name}_consolidated_map_mean", 

920 storageClass="HealSparseMap", 

921 dimensions=("skymap", "band"), 

922 ) 

923 vars()[f"{name}_map_weighted_mean"] = pipeBase.connectionTypes.Input( 

924 doc=f"Weighted mean-value map of {name}", 

925 name=f"{{coaddName}}Coadd_{name}_map_weighted_mean", 

926 storageClass="HealSparseMap", 

927 dimensions=("tract", "skymap", "band"), 

928 multiple=True, 

929 deferLoad=True, 

930 ) 

931 vars()[f"{name}_consolidated_map_weighted_mean"] = pipeBase.connectionTypes.Output( 

932 doc=f"Minumum-value map of {name}", 

933 name=f"{{coaddName}}Coadd_{name}_consolidated_map_weighted_mean", 

934 storageClass="HealSparseMap", 

935 dimensions=("skymap", "band"), 

936 ) 

937 vars()[f"{name}_map_sum"] = pipeBase.connectionTypes.Input( 

938 doc=f"Sum-value map of {name}", 

939 name=f"{{coaddName}}Coadd_{name}_map_sum", 

940 storageClass="HealSparseMap", 

941 dimensions=("tract", "skymap", "band"), 

942 multiple=True, 

943 deferLoad=True, 

944 ) 

945 vars()[f"{name}_consolidated_map_sum"] = pipeBase.connectionTypes.Output( 

946 doc=f"Minumum-value map of {name}", 

947 name=f"{{coaddName}}Coadd_{name}_consolidated_map_sum", 

948 storageClass="HealSparseMap", 

949 dimensions=("skymap", "band"), 

950 ) 

951 

952 def __init__(self, *, config=None): 

953 super().__init__(config=config) 

954 

955 # Not all possible maps in the registry will be configured to run. 

956 # Here we remove the unused connections. 

957 for name in BasePropertyMap.registry: 

958 if name not in config.property_maps: 

959 prop_config = BasePropertyMapConfig() 

960 prop_config.do_min = False 

961 prop_config.do_max = False 

962 prop_config.do_mean = False 

963 prop_config.do_weighted_mean = False 

964 prop_config.do_sum = False 

965 else: 

966 prop_config = config.property_maps[name] 

967 

968 if not prop_config.do_min: 

969 self.inputs.remove(f"{name}_map_min") 

970 self.outputs.remove(f"{name}_consolidated_map_min") 

971 if not prop_config.do_max: 

972 self.inputs.remove(f"{name}_map_max") 

973 self.outputs.remove(f"{name}_consolidated_map_max") 

974 if not prop_config.do_mean: 

975 self.inputs.remove(f"{name}_map_mean") 

976 self.outputs.remove(f"{name}_consolidated_map_mean") 

977 if not prop_config.do_weighted_mean: 

978 self.inputs.remove(f"{name}_map_weighted_mean") 

979 self.outputs.remove(f"{name}_consolidated_map_weighted_mean") 

980 if not prop_config.do_sum: 

981 self.inputs.remove(f"{name}_map_sum") 

982 self.outputs.remove(f"{name}_consolidated_map_sum") 

983 

984 

985class ConsolidateHealSparsePropertyMapConfig(pipeBase.PipelineTaskConfig, 

986 pipelineConnections=ConsolidateHealSparsePropertyMapConnections): 

987 """Configuration parameters for ConsolidateHealSparsePropertyMapTask""" 

988 property_maps = BasePropertyMap.registry.makeField( 

989 multi=True, 

990 default=["exposure_time", 

991 "psf_size", 

992 "psf_e1", 

993 "psf_e2", 

994 "psf_maglim", 

995 "sky_noise", 

996 "sky_background", 

997 "dcr_dra", 

998 "dcr_ddec", 

999 "dcr_e1", 

1000 "dcr_e2", 

1001 "epoch"], 

1002 doc="Property map computation objects", 

1003 ) 

1004 nside_coverage = pexConfig.Field( 

1005 doc="Consolidated HealSparse coverage map nside. Must be power of 2.", 

1006 dtype=int, 

1007 default=32, 

1008 check=_is_power_of_two, 

1009 ) 

1010 

1011 def setDefaults(self): 

1012 self.property_maps["exposure_time"].do_sum = True 

1013 self.property_maps["psf_size"].do_weighted_mean = True 

1014 self.property_maps["psf_e1"].do_weighted_mean = True 

1015 self.property_maps["psf_e2"].do_weighted_mean = True 

1016 self.property_maps["psf_maglim"].do_weighted_mean = True 

1017 self.property_maps["sky_noise"].do_weighted_mean = True 

1018 self.property_maps["sky_background"].do_weighted_mean = True 

1019 self.property_maps["dcr_dra"].do_weighted_mean = True 

1020 self.property_maps["dcr_ddec"].do_weighted_mean = True 

1021 self.property_maps["dcr_e1"].do_weighted_mean = True 

1022 self.property_maps["dcr_e2"].do_weighted_mean = True 

1023 self.property_maps["epoch"].do_mean = True 

1024 self.property_maps["epoch"].do_min = True 

1025 self.property_maps["epoch"].do_max = True 

1026 

1027 

1028class ConsolidateHealSparsePropertyMapTask(pipeBase.PipelineTask): 

1029 """Task to consolidate HealSparse property maps. 

1030 

1031 This task will take all the individual tract-based maps (per map type, 

1032 per band) and consolidate them into one survey-wide map (per map type, 

1033 per band). Each tract map is truncated to its inner region before 

1034 consolidation. 

1035 """ 

1036 ConfigClass = ConsolidateHealSparsePropertyMapConfig 

1037 _DefaultName = "consolidateHealSparsePropertyMapTask" 

1038 

1039 def __init__(self, **kwargs): 

1040 super().__init__(**kwargs) 

1041 self.property_maps = PropertyMapMap() 

1042 for name, config, PropertyMapClass in self.config.property_maps.apply(): 

1043 self.property_maps[name] = PropertyMapClass(config, name) 

1044 

1045 @timeMethod 

1046 def runQuantum(self, butlerQC, inputRefs, outputRefs): 

1047 inputs = butlerQC.get(inputRefs) 

1048 

1049 sky_map = inputs.pop("sky_map") 

1050 

1051 # These need to be consolidated one at a time to conserve memory. 

1052 for name in self.config.property_maps.names: 

1053 for type_ in ['min', 'max', 'mean', 'weighted_mean', 'sum']: 

1054 map_type = f"{name}_map_{type_}" 

1055 if map_type in inputs: 

1056 input_refs = {ref.dataId['tract']: ref 

1057 for ref in inputs[map_type]} 

1058 consolidated_map = self.consolidate_map(sky_map, input_refs) 

1059 butlerQC.put(consolidated_map, 

1060 getattr(outputRefs, f"{name}_consolidated_map_{type_}")) 

1061 

1062 def consolidate_map(self, sky_map, input_refs): 

1063 """Consolidate the healsparse property maps. 

1064 

1065 Parameters 

1066 ---------- 

1067 sky_map : Sky map object 

1068 input_refs : `dict` [`int`: `lsst.daf.butler.DeferredDatasetHandle`] 

1069 Dictionary of tract_id mapping to dataref. 

1070 

1071 Returns 

1072 ------- 

1073 consolidated_map : `healsparse.HealSparseMap` 

1074 Consolidated HealSparse map. 

1075 """ 

1076 # First, we read in the coverage maps to know how much memory 

1077 # to allocate 

1078 cov_mask = None 

1079 nside_coverage_inputs = None 

1080 for tract_id in input_refs: 

1081 cov = input_refs[tract_id].get(component='coverage') 

1082 if cov_mask is None: 

1083 cov_mask = cov.coverage_mask 

1084 nside_coverage_inputs = cov.nside_coverage 

1085 else: 

1086 cov_mask |= cov.coverage_mask 

1087 

1088 cov_pix_inputs, = np.where(cov_mask) 

1089 

1090 # Compute the coverage pixels for the desired nside_coverage 

1091 if nside_coverage_inputs == self.config.nside_coverage: 

1092 cov_pix = cov_pix_inputs 

1093 elif nside_coverage_inputs > self.config.nside_coverage: 

1094 # Converting from higher resolution coverage to lower 

1095 # resolution coverage. 

1096 bit_shift = hsp.utils._compute_bitshift(self.config.nside_coverage, 

1097 nside_coverage_inputs) 

1098 cov_pix = np.right_shift(cov_pix_inputs, bit_shift) 

1099 else: 

1100 # Converting from lower resolution coverage to higher 

1101 # resolution coverage. 

1102 bit_shift = hsp.utils._compute_bitshift(nside_coverage_inputs, 

1103 self.config.nside_coverage) 

1104 cov_pix = np.left_shift(cov_pix_inputs, bit_shift) 

1105 

1106 # Now read in each tract map and build the consolidated map. 

1107 consolidated_map = None 

1108 for tract_id in input_refs: 

1109 input_map = input_refs[tract_id].get() 

1110 if consolidated_map is None: 

1111 consolidated_map = hsp.HealSparseMap.make_empty( 

1112 self.config.nside_coverage, 

1113 input_map.nside_sparse, 

1114 input_map.dtype, 

1115 sentinel=input_map._sentinel, 

1116 cov_pixels=cov_pix, 

1117 metadata=input_map.metadata, 

1118 ) 

1119 

1120 # Only use pixels that are properly inside the tract. 

1121 vpix, ra, dec = input_map.valid_pixels_pos(return_pixels=True) 

1122 vpix_tract_ids = sky_map.findTractIdArray(ra, dec, degrees=True) 

1123 

1124 in_tract = (vpix_tract_ids == tract_id) 

1125 

1126 consolidated_map[vpix[in_tract]] = input_map[vpix[in_tract]] 

1127 

1128 return consolidated_map