Coverage for python / lsst / drp / tasks / make_psf_matched_warp.py: 30%
95 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 09:32 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 09:32 +0000
1# This file is part of drp_tasks.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
22from __future__ import annotations
24__all__ = (
25 "MakePsfMatchedWarpConfig",
26 "MakePsfMatchedWarpConnections",
27 "MakePsfMatchedWarpTask",
28)
30import warnings
31from typing import TYPE_CHECKING
33import numpy as np
35import lsst.geom as geom
36from lsst.afw.geom import Polygon, SinglePolygonException, makeWcsPairTransform
37from lsst.coadd.utils import copyGoodPixels
38from lsst.ip.diffim import ModelPsfMatchTask, PsfComputeShapeError, WarpedPsfTransformTooBigError
39from lsst.meas.algorithms import GaussianPsfFactory, WarpedPsf
40from lsst.pex.config import ConfigurableField
41from lsst.pipe.base import (
42 AnnotatedPartialOutputsError,
43 PipelineTask,
44 PipelineTaskConfig,
45 PipelineTaskConnections,
46 Struct,
47)
48from lsst.pipe.base.connectionTypes import Input, Output
49from lsst.pipe.tasks.coaddBase import growValidPolygons, makeSkyInfo
50from lsst.skymap import BaseSkyMap
51from lsst.utils.timer import timeMethod
53if TYPE_CHECKING:
54 from lsst.afw.image import Exposure
57class MakePsfMatchedWarpConnections(
58 PipelineTaskConnections,
59 dimensions=("tract", "patch", "skymap", "instrument", "visit"),
60 defaultTemplates={
61 "coaddName": "deep",
62 },
63):
64 """Connections for MakePsfMatchedWarpTask"""
66 sky_map = Input(
67 doc="Input definition of geometry/bbox and projection/wcs for warps.",
68 name=BaseSkyMap.SKYMAP_DATASET_TYPE_NAME,
69 storageClass="SkyMap",
70 dimensions=("skymap",),
71 )
73 direct_warp = Input(
74 doc="Direct warped exposure produced by resampling calexps onto the skyMap patch geometry",
75 name="{coaddName}Coadd_directWarp",
76 storageClass="ExposureF",
77 dimensions=("tract", "patch", "skymap", "instrument", "visit"),
78 )
80 psf_matched_warp = Output(
81 doc=(
82 "Output PSF-Matched warped exposure, produced by resampling ",
83 "calexps onto the skyMap patch geometry and PSF-matching to a model PSF.",
84 ),
85 name="{coaddName}Coadd_psfMatchedWarp",
86 storageClass="ExposureF",
87 dimensions=("tract", "patch", "skymap", "visit", "instrument"),
88 )
91class MakePsfMatchedWarpConfig(
92 PipelineTaskConfig,
93 pipelineConnections=MakePsfMatchedWarpConnections,
94):
95 """Config for MakePsfMatchedWarpTask."""
97 modelPsf = GaussianPsfFactory.makeField(doc="Model Psf factory")
98 psfMatch = ConfigurableField(
99 target=ModelPsfMatchTask,
100 doc="Task to warp and PSF-match calexp",
101 )
103 def setDefaults(self):
104 super().setDefaults()
105 self.psfMatch.kernel["AL"].alardSigGauss = [1.0, 2.0, 4.5]
106 self.modelPsf.defaultFwhm = 7.7
109class MakePsfMatchedWarpTask(PipelineTask):
110 ConfigClass = MakePsfMatchedWarpConfig
111 _DefaultName = "makePsfMatchedWarp"
113 def __init__(self, **kwargs):
114 super().__init__(**kwargs)
115 self.makeSubtask("psfMatch")
117 def runQuantum(self, butlerQC, inputRefs, outputRefs):
118 # Docstring inherited.
120 # Read in all inputs.
121 inputs = butlerQC.get(inputRefs)
123 sky_map = inputs.pop("sky_map")
125 quantumDataId = butlerQC.quantum.dataId
126 sky_info = makeSkyInfo(
127 sky_map,
128 tractId=quantumDataId["tract"],
129 patchId=quantumDataId["patch"],
130 )
132 results = self.run(inputs["direct_warp"], sky_info.bbox)
133 butlerQC.put(results, outputRefs)
135 @timeMethod
136 def run(self, direct_warp: Exposure, bbox: geom.Box2I):
137 """Make a PSF-matched warp from a direct warp.
139 Each individual detector from the direct warp is isolated, one at a
140 time, and PSF-matched to the same model PSF. The PSF-matched images are
141 then added back together to form the final PSF-matched warp. The bulk
142 of the work is done by the `psfMatchTask`.
144 Notes
145 -----
146 Pixels that receive no inputs are set to NaN, for e.g, chip gaps. This
147 violates LSST algorithms group policy.
149 Parameters
150 ----------
151 direct_warp : `lsst.afw.image.Exposure`
152 Direct warp to be PSF-matched.
154 Returns
155 -------
156 struct : `lsst.pipe.base.Struct`
157 Struct containing the PSF-matched warp under the attribute
158 `psf_matched_warp`.
159 """
160 modelPsf = self.config.modelPsf.apply()
162 bit_mask = direct_warp.mask.getPlaneBitMask("NO_DATA")
163 # Prepare the output exposure. We clone the input image to keep the
164 # metadata, but reset the image, mask and variance planes.
165 exposure_psf_matched = direct_warp[bbox].clone()
166 exposure_psf_matched.image.array[:, :] = np.nan
167 exposure_psf_matched.mask.array[:, :] = bit_mask
168 exposure_psf_matched.variance.array[:, :] = np.inf
169 exposure_psf_matched.setPsf(modelPsf)
171 total_good_pixels = 0 # Total number of pixels copied to output.
173 for row in direct_warp.info.getCoaddInputs().ccds:
174 transform = makeWcsPairTransform(row.wcs, direct_warp.wcs)
175 warp_psf = WarpedPsf(row.getPsf(), transform)
177 if (src_polygon := row.validPolygon) is None:
178 # Calculate the polygon for this detector.
179 src_polygon = Polygon(geom.Box2D(row.getBBox()))
180 self.log.debug("Polygon for detector=%d is calculated as %s", row["ccd"], src_polygon)
181 else:
182 self.log.debug(
183 "Polygon for detector=%d is read from the input calexp as %s", row["ccd"], src_polygon
184 )
186 try:
187 destination_polygon = src_polygon.transform(transform).intersectionSingle(
188 geom.Box2D(direct_warp.getBBox())
189 )
190 except SinglePolygonException:
191 self.log.info(
192 "Skipping CCD %d as its polygon does not intersect the direct warp",
193 row["ccd"],
194 )
195 continue
197 # Compute the minimum possible bounding box that overlaps the CCD.
198 # First find the intersection polygon between the per-detector warp
199 # and the warp bounding box.
200 bbox = geom.Box2I()
201 for corner in destination_polygon.getVertices():
202 bbox.include(geom.Point2I(corner))
203 bbox.clip(direct_warp.getBBox()) # Additional safeguard
205 # Because the direct warps are larger, it is possible that after
206 # clipping, `bbox` lies outside PSF-matched warp's bounding box.
207 if not bbox.overlaps(exposure_psf_matched.getBBox()):
208 self.log.debug(
209 "Skipping CCD %d as its bbox %s does not overlap the PSF-matched warp",
210 row["ccd"],
211 bbox,
212 )
213 continue
215 self.log.debug("PSF-matching CCD %d with bbox %s", row["ccd"], bbox)
217 ccd_mask_array = ~(destination_polygon.createImage(bbox).array <= 0)
219 # Clone the subimage, set the PSF to the model and reset the planes
220 # outside the detector.
221 temp_warp = direct_warp[bbox].clone()
222 temp_warp.setPsf(warp_psf)
223 temp_warp.image.array *= ccd_mask_array
224 temp_warp.mask.array |= (~ccd_mask_array) * bit_mask
225 # We intend to divide by zero outside the detector to set the
226 # per-pixel variance values to infinity. Suppress the warning.
227 with warnings.catch_warnings():
228 warnings.filterwarnings("ignore", message="divide by zero", category=RuntimeWarning)
229 temp_warp.variance.array /= ccd_mask_array
231 try:
232 temp_psf_matched = self.psfMatch.run(temp_warp, modelPsf).psfMatchedExposure
233 except (WarpedPsfTransformTooBigError, PsfComputeShapeError) as e:
234 error = AnnotatedPartialOutputsError.annotate(
235 e,
236 self,
237 temp_warp,
238 log=self.log,
239 )
240 raise error from e
241 del temp_warp
243 # Set pixels outside the intersection polygon to NO_DATA.
244 temp_psf_matched.mask[bbox].array |= (~ccd_mask_array) * bit_mask
246 # Clip the bbox to the PSF-matched warp bounding box.
247 bbox.clip(exposure_psf_matched.getBBox())
249 num_good_pixels = copyGoodPixels(
250 exposure_psf_matched.maskedImage[bbox],
251 temp_psf_matched.maskedImage[bbox],
252 bit_mask,
253 )
255 del temp_psf_matched
257 self.log.info(
258 "Copied %d pixels from CCD %d to exposure_psf_matched",
259 num_good_pixels,
260 row["ccd"],
261 )
262 total_good_pixels += num_good_pixels
264 self.log.info("Total number of good pixels = %d", total_good_pixels)
266 if total_good_pixels > 0:
267 growValidPolygons(
268 exposure_psf_matched.info.getCoaddInputs(),
269 -self.config.psfMatch.kernel.active.kernelSize // 2,
270 )
272 return Struct(psf_matched_warp=exposure_psf_matched)
273 else:
274 return Struct(psf_matched_warp=None)