Coverage for python / lsst / drp / tasks / make_psf_matched_warp.py: 30%

95 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-26 09:31 +0000

1# This file is part of drp_tasks. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21 

22from __future__ import annotations 

23 

24__all__ = ( 

25 "MakePsfMatchedWarpConfig", 

26 "MakePsfMatchedWarpConnections", 

27 "MakePsfMatchedWarpTask", 

28) 

29 

30import warnings 

31from typing import TYPE_CHECKING 

32 

33import numpy as np 

34 

35import lsst.geom as geom 

36from lsst.afw.geom import Polygon, SinglePolygonException, makeWcsPairTransform 

37from lsst.coadd.utils import copyGoodPixels 

38from lsst.ip.diffim import ModelPsfMatchTask, PsfComputeShapeError, WarpedPsfTransformTooBigError 

39from lsst.meas.algorithms import GaussianPsfFactory, WarpedPsf 

40from lsst.pex.config import ConfigurableField 

41from lsst.pipe.base import ( 

42 AnnotatedPartialOutputsError, 

43 PipelineTask, 

44 PipelineTaskConfig, 

45 PipelineTaskConnections, 

46 Struct, 

47) 

48from lsst.pipe.base.connectionTypes import Input, Output 

49from lsst.pipe.tasks.coaddBase import growValidPolygons, makeSkyInfo 

50from lsst.skymap import BaseSkyMap 

51from lsst.utils.timer import timeMethod 

52 

53if TYPE_CHECKING: 

54 from lsst.afw.image import Exposure 

55 

56 

57class MakePsfMatchedWarpConnections( 

58 PipelineTaskConnections, 

59 dimensions=("tract", "patch", "skymap", "instrument", "visit"), 

60 defaultTemplates={ 

61 "coaddName": "deep", 

62 }, 

63): 

64 """Connections for MakePsfMatchedWarpTask""" 

65 

66 sky_map = Input( 

67 doc="Input definition of geometry/bbox and projection/wcs for warps.", 

68 name=BaseSkyMap.SKYMAP_DATASET_TYPE_NAME, 

69 storageClass="SkyMap", 

70 dimensions=("skymap",), 

71 ) 

72 

73 direct_warp = Input( 

74 doc="Direct warped exposure produced by resampling calexps onto the skyMap patch geometry", 

75 name="{coaddName}Coadd_directWarp", 

76 storageClass="ExposureF", 

77 dimensions=("tract", "patch", "skymap", "instrument", "visit"), 

78 ) 

79 

80 psf_matched_warp = Output( 

81 doc=( 

82 "Output PSF-Matched warped exposure, produced by resampling ", 

83 "calexps onto the skyMap patch geometry and PSF-matching to a model PSF.", 

84 ), 

85 name="{coaddName}Coadd_psfMatchedWarp", 

86 storageClass="ExposureF", 

87 dimensions=("tract", "patch", "skymap", "visit", "instrument"), 

88 ) 

89 

90 

91class MakePsfMatchedWarpConfig( 

92 PipelineTaskConfig, 

93 pipelineConnections=MakePsfMatchedWarpConnections, 

94): 

95 """Config for MakePsfMatchedWarpTask.""" 

96 

97 modelPsf = GaussianPsfFactory.makeField(doc="Model Psf factory") 

98 psfMatch = ConfigurableField( 

99 target=ModelPsfMatchTask, 

100 doc="Task to warp and PSF-match calexp", 

101 ) 

102 

103 def setDefaults(self): 

104 super().setDefaults() 

105 self.psfMatch.kernel["AL"].alardSigGauss = [1.0, 2.0, 4.5] 

106 self.modelPsf.defaultFwhm = 7.7 

107 

108 

109class MakePsfMatchedWarpTask(PipelineTask): 

110 ConfigClass = MakePsfMatchedWarpConfig 

111 _DefaultName = "makePsfMatchedWarp" 

112 

113 def __init__(self, **kwargs): 

114 super().__init__(**kwargs) 

115 self.makeSubtask("psfMatch") 

116 

117 def runQuantum(self, butlerQC, inputRefs, outputRefs): 

118 # Docstring inherited. 

119 

120 # Read in all inputs. 

121 inputs = butlerQC.get(inputRefs) 

122 

123 sky_map = inputs.pop("sky_map") 

124 

125 quantumDataId = butlerQC.quantum.dataId 

126 sky_info = makeSkyInfo( 

127 sky_map, 

128 tractId=quantumDataId["tract"], 

129 patchId=quantumDataId["patch"], 

130 ) 

131 

132 results = self.run(inputs["direct_warp"], sky_info.bbox) 

133 butlerQC.put(results, outputRefs) 

134 

135 @timeMethod 

136 def run(self, direct_warp: Exposure, bbox: geom.Box2I): 

137 """Make a PSF-matched warp from a direct warp. 

138 

139 Each individual detector from the direct warp is isolated, one at a 

140 time, and PSF-matched to the same model PSF. The PSF-matched images are 

141 then added back together to form the final PSF-matched warp. The bulk 

142 of the work is done by the `psfMatchTask`. 

143 

144 Notes 

145 ----- 

146 Pixels that receive no inputs are set to NaN, for e.g, chip gaps. This 

147 violates LSST algorithms group policy. 

148 

149 Parameters 

150 ---------- 

151 direct_warp : `lsst.afw.image.Exposure` 

152 Direct warp to be PSF-matched. 

153 

154 Returns 

155 ------- 

156 struct : `lsst.pipe.base.Struct` 

157 Struct containing the PSF-matched warp under the attribute 

158 `psf_matched_warp`. 

159 """ 

160 modelPsf = self.config.modelPsf.apply() 

161 

162 bit_mask = direct_warp.mask.getPlaneBitMask("NO_DATA") 

163 # Prepare the output exposure. We clone the input image to keep the 

164 # metadata, but reset the image, mask and variance planes. 

165 exposure_psf_matched = direct_warp[bbox].clone() 

166 exposure_psf_matched.image.array[:, :] = np.nan 

167 exposure_psf_matched.mask.array[:, :] = bit_mask 

168 exposure_psf_matched.variance.array[:, :] = np.inf 

169 exposure_psf_matched.setPsf(modelPsf) 

170 

171 total_good_pixels = 0 # Total number of pixels copied to output. 

172 

173 for row in direct_warp.info.getCoaddInputs().ccds: 

174 transform = makeWcsPairTransform(row.wcs, direct_warp.wcs) 

175 warp_psf = WarpedPsf(row.getPsf(), transform) 

176 

177 if (src_polygon := row.validPolygon) is None: 

178 # Calculate the polygon for this detector. 

179 src_polygon = Polygon(geom.Box2D(row.getBBox())) 

180 self.log.debug("Polygon for detector=%d is calculated as %s", row["ccd"], src_polygon) 

181 else: 

182 self.log.debug( 

183 "Polygon for detector=%d is read from the input calexp as %s", row["ccd"], src_polygon 

184 ) 

185 

186 try: 

187 destination_polygon = src_polygon.transform(transform).intersectionSingle( 

188 geom.Box2D(direct_warp.getBBox()) 

189 ) 

190 except SinglePolygonException: 

191 self.log.info( 

192 "Skipping CCD %d as its polygon does not intersect the direct warp", 

193 row["ccd"], 

194 ) 

195 continue 

196 

197 # Compute the minimum possible bounding box that overlaps the CCD. 

198 # First find the intersection polygon between the per-detector warp 

199 # and the warp bounding box. 

200 bbox = geom.Box2I() 

201 for corner in destination_polygon.getVertices(): 

202 bbox.include(geom.Point2I(corner)) 

203 bbox.clip(direct_warp.getBBox()) # Additional safeguard 

204 

205 # Because the direct warps are larger, it is possible that after 

206 # clipping, `bbox` lies outside PSF-matched warp's bounding box. 

207 if not bbox.overlaps(exposure_psf_matched.getBBox()): 

208 self.log.debug( 

209 "Skipping CCD %d as its bbox %s does not overlap the PSF-matched warp", 

210 row["ccd"], 

211 bbox, 

212 ) 

213 continue 

214 

215 self.log.debug("PSF-matching CCD %d with bbox %s", row["ccd"], bbox) 

216 

217 ccd_mask_array = ~(destination_polygon.createImage(bbox).array <= 0) 

218 

219 # Clone the subimage, set the PSF to the model and reset the planes 

220 # outside the detector. 

221 temp_warp = direct_warp[bbox].clone() 

222 temp_warp.setPsf(warp_psf) 

223 temp_warp.image.array *= ccd_mask_array 

224 temp_warp.mask.array |= (~ccd_mask_array) * bit_mask 

225 # We intend to divide by zero outside the detector to set the 

226 # per-pixel variance values to infinity. Suppress the warning. 

227 with warnings.catch_warnings(): 

228 warnings.filterwarnings("ignore", message="divide by zero", category=RuntimeWarning) 

229 temp_warp.variance.array /= ccd_mask_array 

230 

231 try: 

232 temp_psf_matched = self.psfMatch.run(temp_warp, modelPsf).psfMatchedExposure 

233 except (WarpedPsfTransformTooBigError, PsfComputeShapeError) as e: 

234 error = AnnotatedPartialOutputsError.annotate( 

235 e, 

236 self, 

237 temp_warp, 

238 log=self.log, 

239 ) 

240 raise error from e 

241 del temp_warp 

242 

243 # Set pixels outside the intersection polygon to NO_DATA. 

244 temp_psf_matched.mask[bbox].array |= (~ccd_mask_array) * bit_mask 

245 

246 # Clip the bbox to the PSF-matched warp bounding box. 

247 bbox.clip(exposure_psf_matched.getBBox()) 

248 

249 num_good_pixels = copyGoodPixels( 

250 exposure_psf_matched.maskedImage[bbox], 

251 temp_psf_matched.maskedImage[bbox], 

252 bit_mask, 

253 ) 

254 

255 del temp_psf_matched 

256 

257 self.log.info( 

258 "Copied %d pixels from CCD %d to exposure_psf_matched", 

259 num_good_pixels, 

260 row["ccd"], 

261 ) 

262 total_good_pixels += num_good_pixels 

263 

264 self.log.info("Total number of good pixels = %d", total_good_pixels) 

265 

266 if total_good_pixels > 0: 

267 growValidPolygons( 

268 exposure_psf_matched.info.getCoaddInputs(), 

269 -self.config.psfMatch.kernel.active.kernelSize // 2, 

270 ) 

271 

272 return Struct(psf_matched_warp=exposure_psf_matched) 

273 else: 

274 return Struct(psf_matched_warp=None)