Coverage for python / lsst / drp / tasks / make_direct_warp.py: 23%
324 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-05 18:49 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-05 18:49 +0000
1# This file is part of drp_tasks.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
22from __future__ import annotations
24import dataclasses
25from collections.abc import Mapping
26from typing import TYPE_CHECKING, Iterable
28import numpy as np
30from lsst.afw.image import ExposureF, Image, Mask, PhotoCalib
31from lsst.afw.math import BackgroundList, Warper
32from lsst.coadd.utils import copyGoodPixels
33from lsst.daf.butler import DataCoordinate, DeferredDatasetHandle
34from lsst.geom import Box2D
35from lsst.meas.algorithms import CoaddPsf, CoaddPsfConfig, backgroundFlatContext
36from lsst.meas.algorithms.cloughTocher2DInterpolator import CloughTocher2DInterpolateTask
37from lsst.meas.base import DetectorVisitIdGeneratorConfig
38from lsst.pex.config import ConfigField, ConfigurableField, Field, RangeField
39from lsst.pipe.base import (
40 InMemoryDatasetHandle,
41 NoWorkFound,
42 PipelineTask,
43 PipelineTaskConfig,
44 PipelineTaskConnections,
45 Struct,
46)
47from lsst.pipe.base.connectionTypes import Input, Output
48from lsst.pipe.tasks.coaddBase import makeSkyInfo
49from lsst.pipe.tasks.coaddInputRecorder import CoaddInputRecorderTask
50from lsst.pipe.tasks.selectImages import PsfWcsSelectImagesTask
51from lsst.skymap import BaseSkyMap
53if TYPE_CHECKING:
54 from lsst.afw.image import MaskedImage
55 from lsst.afw.table import ExposureCatalog
58__all__ = (
59 "MakeDirectWarpConfig",
60 "MakeDirectWarpTask",
61)
64@dataclasses.dataclass
65class WarpDetectorInputs:
66 """Inputs passed to `MakeDirectWarpTask.run` for a single detector."""
68 exposure_or_handle: ExposureF | DeferredDatasetHandle | InMemoryDatasetHandle
69 """Detector image with initial calibration objects, or a deferred-load
70 handle for one.
71 """
73 data_id: DataCoordinate
74 """Butler data ID for this detector."""
76 background_revert: BackgroundList | None = None
77 """Background model to restore in (i.e. add to) the image."""
79 background_apply: BackgroundList | None = None
80 """Background model to apply to (i.e. subtract from) the image."""
82 background_ratio_or_handle: Image | DeferredDatasetHandle | InMemoryDatasetHandle | None = None
83 """Ratio of background-flattened image to photometric-flattened image."""
85 @property
86 def exposure(self) -> ExposureF:
87 """Get the exposure object, loading it if necessary."""
88 if not isinstance(self.exposure_or_handle, ExposureF):
89 self.exposure_or_handle = self.exposure_or_handle.get()
91 return self.exposure_or_handle
93 @property
94 def background_to_photometric_ratio(self) -> Image:
95 """Get the background_to_photometric object, loading if necessary."""
96 if isinstance(self.background_ratio_or_handle, (DeferredDatasetHandle, InMemoryDatasetHandle)):
97 self.background_ratio_or_handle = self.background_ratio_or_handle.get()
99 return self.background_ratio_or_handle
101 def apply_background(self) -> None:
102 """Apply (subtract) the `background_apply` to the exposure in-place.
104 Raises
105 ------
106 RuntimeError
107 Raised if `background_apply` is None.
108 """
109 if self.background_apply is None:
110 raise RuntimeError("No background to apply")
112 if self.background_apply:
113 with backgroundFlatContext(
114 self.exposure.maskedImage,
115 self.background_to_photometric_ratio is not None,
116 backgroundToPhotometricRatio=self.background_to_photometric_ratio,
117 ):
118 self.exposure.maskedImage -= self.background_apply.getImage()
120 def revert_background(self) -> None:
121 """Revert (add) the `background_revert` from the exposure in-place.
123 Raises
124 ------
125 RuntimeError
126 Raised if `background_revert` is None.
127 """
128 if self.background_revert is None:
129 raise RuntimeError("No background to revert")
131 if self.background_revert:
132 with backgroundFlatContext(
133 self.exposure.maskedImage,
134 self.background_to_photometric_ratio is not None,
135 backgroundToPhotometricRatio=self.background_to_photometric_ratio,
136 ):
137 # Add only if `background_revert` is not a trivial background.
138 self.exposure.maskedImage += self.background_revert.getImage()
141class MakeDirectWarpConnections(
142 PipelineTaskConnections,
143 dimensions=("tract", "patch", "skymap", "instrument", "visit"),
144 defaultTemplates={
145 "coaddName": "deep",
146 },
147):
148 """Connections for MakeWarpTask"""
150 calexp_list = Input(
151 doc="Input exposures to be interpolated and resampled onto a SkyMap " "projection/patch.",
152 name="calexp",
153 storageClass="ExposureF",
154 dimensions=("instrument", "visit", "detector"),
155 multiple=True,
156 deferLoad=True,
157 )
158 background_revert_list = Input(
159 doc="Background to be reverted (i.e., added back to the calexp). "
160 "This connection is used only if doRevertOldBackground=True.",
161 name="calexpBackground",
162 storageClass="Background",
163 dimensions=("instrument", "visit", "detector"),
164 multiple=True,
165 )
166 background_apply_list = Input(
167 doc="Background to be applied (subtracted from the calexp). "
168 "This is used only if doApplyNewBackground=True.",
169 name="skyCorr",
170 storageClass="Background",
171 dimensions=("instrument", "visit", "detector"),
172 multiple=True,
173 )
174 background_to_photometric_ratio_list = Input(
175 doc="Ratio of a background-flattened image to a photometric-flattened image. "
176 "This is used only if doRevertOldBackground=True or doApplyNewBackground=True "
177 "and doApplyFlatBackgroundRatio=True.",
178 name="background_to_photometric_ratio",
179 storageClass="Image",
180 dimensions=("instrument", "visit", "detector"),
181 multiple=True,
182 deferLoad=True,
183 )
184 visit_summary = Input(
185 doc="Input visit-summary catalog with updated calibration objects.",
186 name="finalVisitSummary",
187 storageClass="ExposureCatalog",
188 dimensions=("instrument", "visit"),
189 )
190 sky_map = Input(
191 doc="Input definition of geometry/bbox and projection/wcs for warps.",
192 name=BaseSkyMap.SKYMAP_DATASET_TYPE_NAME,
193 storageClass="SkyMap",
194 dimensions=("skymap",),
195 )
196 # Declare all possible outputs (except noise, which is configurable)
197 warp = Output(
198 doc="Output direct warped exposure produced by resampling calexps " "onto the skyMap patch geometry.",
199 name="{coaddName}Coadd_directWarp",
200 storageClass="ExposureF",
201 dimensions=("tract", "patch", "skymap", "instrument", "visit"),
202 )
203 masked_fraction_warp = Output(
204 doc="Output masked fraction warped exposure.",
205 name="{coaddName}Coadd_directWarp_maskedFraction",
206 storageClass="ImageF",
207 dimensions=("tract", "patch", "skymap", "instrument", "visit"),
208 )
210 def __init__(self, *, config=None):
211 super().__init__(config=config)
212 if not config:
213 return
215 if not config.doRevertOldBackground:
216 del self.background_revert_list
217 if not config.doApplyNewBackground:
218 del self.background_apply_list
219 if not config.doApplyFlatBackgroundRatio or (
220 not config.doRevertOldBackground and not config.doApplyNewBackground
221 ):
222 del self.background_to_photometric_ratio_list
224 if not config.doWarpMaskedFraction:
225 del self.masked_fraction_warp
227 # Dynamically set output connections for noise images, depending on the
228 # number of noise realization specified in the config.
229 for n in range(config.numberOfNoiseRealizations):
230 noise_warp = Output(
231 doc=f"Output direct warped noise exposure ({n})",
232 name=f"direct_warp_noise{n}",
233 # Store it as a MaskedImage to preserve the variance plane.
234 storageClass="MaskedImageF",
235 dimensions=("tract", "patch", "skymap", "instrument", "visit"),
236 )
237 setattr(self, f"noise_warp{n}", noise_warp)
240class MakeDirectWarpConfig(
241 PipelineTaskConfig,
242 pipelineConnections=MakeDirectWarpConnections,
243):
244 """Configuration for the MakeDirectWarpTask.
246 The config fields are as similar as possible to the corresponding fields in
247 MakeWarpConfig.
249 Notes
250 -----
251 The config fields are in camelCase to match the fields in the earlier
252 version of the makeWarp task as closely as possible.
253 """
255 MAX_NUMBER_OF_NOISE_REALIZATIONS = 3
256 """
257 numberOfNoiseRealizations is defined as a RangeField to prevent from
258 making multiple output connections and blowing up the memory usage by
259 accident. An upper bound of 3 is based on the best guess of the maximum
260 number of noise realizations that will be used for metadetection.
261 """
263 numberOfNoiseRealizations = RangeField[int](
264 doc="Number of noise realizations to simulate and persist.",
265 default=0,
266 min=0,
267 max=MAX_NUMBER_OF_NOISE_REALIZATIONS,
268 inclusiveMax=True,
269 )
270 seedOffset = Field[int](
271 doc="Offset to the seed used for the noise realization. This can be "
272 "used to create a different noise realization if the default ones "
273 "are catastrophic, or for testing sensitivity to the noise.",
274 default=0,
275 )
276 useMedianVariance = Field[bool](
277 doc="Use the median of variance plane in the input calexp to generate "
278 "noise realizations? If False, per-pixel variance will be used.",
279 default=True,
280 )
281 doRevertOldBackground = Field[bool](
282 doc="Revert the old backgrounds from the `background_revert_list` " "connection?",
283 default=False,
284 )
285 doApplyNewBackground = Field[bool](
286 doc="Apply the new backgrounds from the `background_apply_list` " "connection?",
287 default=False,
288 )
289 doApplyFlatBackgroundRatio = Field[bool](
290 doc="Apply flat background ratio prior to background adjustments? Should be True "
291 "if processing was done with an illumination correction.",
292 default=False,
293 )
294 useVisitSummaryPsf = Field[bool](
295 doc="If True, use the PSF model and aperture corrections from the "
296 "'visit_summary' connection to make the warp. If False, use the "
297 "PSF model and aperture corrections from the 'calexp' connection.",
298 default=True,
299 )
300 useVisitSummaryWcs = Field[bool](
301 doc="If True, use the WCS from the "
302 "'visit_summary' connection to make the warp. If False, use the "
303 "WCS from the 'calexp' connection.",
304 default=True,
305 )
306 useVisitSummaryPhotoCalib = Field[bool](
307 doc="If True, use the photometric calibration from the "
308 "'visit_summary' connection to make the warp. If False, use the "
309 "photometric calibration from the 'calexp' connection.",
310 default=True,
311 )
312 doSelectPreWarp = Field[bool](
313 doc="Select ccds before warping?",
314 default=True,
315 )
316 select = ConfigurableField(
317 doc="Image selection subtask.",
318 target=PsfWcsSelectImagesTask,
319 )
320 doPreWarpInterpolation = Field[bool](
321 doc="Interpolate over bad pixels before warping?",
322 default=False,
323 )
324 preWarpInterpolation = ConfigurableField(
325 doc="Interpolation task to use for pre-warping interpolation",
326 target=CloughTocher2DInterpolateTask,
327 )
328 inputRecorder = ConfigurableField(
329 doc="Subtask that helps fill CoaddInputs catalogs added to the final " "coadd",
330 target=CoaddInputRecorderTask,
331 )
332 includeCalibVar = Field[bool](
333 doc="Add photometric calibration variance to warp variance plane?",
334 default=False,
335 deprecated="Deprecated and disabled. Will be removed after v29.",
336 )
337 border = Field[int](
338 doc="Pad the patch boundary of the warp by these many pixels, so as to allow for PSF-matching later",
339 default=256,
340 )
341 warper = ConfigField(
342 doc="Configuration for the warper that warps the image and noise",
343 dtype=Warper.ConfigClass,
344 )
345 doWarpMaskedFraction = Field[bool](
346 doc="Warp the masked fraction image?",
347 default=True,
348 )
349 maskedFractionWarper = ConfigField(
350 doc="Configuration for the warp that warps the mask fraction image",
351 dtype=Warper.ConfigClass,
352 )
353 coaddPsf = ConfigField(
354 doc="Configuration for CoaddPsf",
355 dtype=CoaddPsfConfig,
356 )
357 idGenerator = DetectorVisitIdGeneratorConfig.make_field()
359 # Use bgSubtracted and doApplySkyCorr to match the old MakeWarpConfig,
360 # but as properties instead of config fields.
361 @property
362 def bgSubtracted(self) -> bool:
363 return not self.doRevertOldBackground
365 @bgSubtracted.setter
366 def bgSubtracted(self, value: bool) -> None:
367 self.doRevertOldBackground = not value
369 @property
370 def doApplySkyCorr(self) -> bool:
371 return self.doApplyNewBackground
373 @doApplySkyCorr.setter
374 def doApplySkyCorr(self, value: bool) -> None:
375 self.doApplyNewBackground = value
377 def setDefaults(self) -> None:
378 super().setDefaults()
379 self.warper.warpingKernelName = "lanczos3"
380 self.warper.cacheSize = 0
381 self.maskedFractionWarper.warpingKernelName = "bilinear"
384class MakeDirectWarpTask(PipelineTask):
385 """Warp single-detector images onto a common projection.
387 This task iterates over multiple images (corresponding to different
388 detectors) from a single visit that overlap the target patch. Pixels that
389 receive no input from any detector are set to NaN in the output image, and
390 NO_DATA bit is set in the mask plane.
392 This differs from the standard `MakeWarp` Task in the following
393 ways:
395 1. No selection on ccds at the time of warping. This is done later during
396 the coaddition stage.
397 2. Interpolate over a set of masked pixels before warping.
398 3. Generate an image where each pixel denotes how much of the pixel is
399 masked.
400 4. Generate multiple noise warps with the same interpolation applied.
401 5. No option to produce a PSF-matched warp.
402 """
404 ConfigClass = MakeDirectWarpConfig
405 _DefaultName = "makeDirectWarp"
407 def __init__(self, **kwargs):
408 super().__init__(**kwargs)
409 self.makeSubtask("inputRecorder")
410 self.makeSubtask("preWarpInterpolation")
411 if self.config.doSelectPreWarp:
412 self.makeSubtask("select")
414 self.warper = Warper.fromConfig(self.config.warper)
415 if self.config.doWarpMaskedFraction:
416 self.maskedFractionWarper = Warper.fromConfig(self.config.maskedFractionWarper)
418 def runQuantum(self, butlerQC, inputRefs, outputRefs):
419 # Docstring inherited.
421 inputs: Mapping[int, WarpDetectorInputs] = {} # Detector ID -> WarpDetectorInputs
422 for handle in butlerQC.get(inputRefs.calexp_list):
423 inputs[handle.dataId["detector"]] = WarpDetectorInputs(
424 exposure_or_handle=handle,
425 data_id=handle.dataId,
426 )
428 if not inputs:
429 raise NoWorkFound("No input warps provided for co-addition")
431 # Add backgrounds to the inputs struct, being careful not to assume
432 # they're present for the same detectors we got image handles for, in
433 # case of upstream errors.
434 for ref in getattr(inputRefs, "background_revert_list", []):
435 detector_id = ref.dataId["detector"]
436 if detector_id in inputs:
437 inputs[detector_id].background_revert = butlerQC.get(ref)
438 else:
439 self.log.info("Got %s for detector %d but no image", ref.datasetType.name, detector_id)
440 for ref in getattr(inputRefs, "background_apply_list", []):
441 detector_id = ref.dataId["detector"]
442 if detector_id in inputs:
443 inputs[detector_id].background_apply = butlerQC.get(ref)
444 else:
445 self.log.info("Got %s for detector %d but no image", ref.datasetType.name, detector_id)
446 for ref in getattr(inputRefs, "background_to_photometric_ratio_list", []):
447 detector_id = ref.dataId["detector"]
448 if detector_id in inputs:
449 inputs[detector_id].background_ratio_or_handle = butlerQC.get(ref)
450 else:
451 self.log.info("Got %s for detector %d but no image", ref.datasetType.name, detector_id)
453 visit_summary = butlerQC.get(inputRefs.visit_summary)
454 sky_map = butlerQC.get(inputRefs.sky_map)
456 quantumDataId = butlerQC.quantum.dataId
457 sky_info = makeSkyInfo(
458 sky_map,
459 tractId=quantumDataId["tract"],
460 patchId=quantumDataId["patch"],
461 )
463 results = self.run(inputs, sky_info, visit_summary=visit_summary)
464 butlerQC.put(results, outputRefs)
466 def _preselect_inputs(
467 self,
468 inputs: Mapping[int, WarpDetectorInputs],
469 sky_info: Struct,
470 visit_summary: ExposureCatalog,
471 ) -> dict[int, WarpDetectorInputs]:
472 """Filter the list of inputs via a 'select' subtask.
474 Parameters
475 ----------
476 inputs : ``~collections.abc.Mapping` [ `int`, `WarpDetectorInputs` ]
477 Per-detector input structs.
478 sky_info : `lsst.pipe.base.Struct`
479 Structure with information about the tract and patch.
480 visit_summary : `~lsst.afw.table.ExposureCatalog`
481 Record with updated calibration information for the full visit.
483 Returns
484 -------
485 filtered_inputs : `dict` [ `int`, `WarpDetectorInputs` ]
486 Like ``inputs``, with rejected detectors dropped.
487 """
488 data_id_list, bbox_list, wcs_list = [], [], []
489 for detector_id, detector_inputs in inputs.items():
490 row = visit_summary.find(detector_id)
491 if row is None:
492 self.log.info(
493 "Input calexp is not listed in visit_summary: %d; assuming bad.",
494 detector_id,
495 )
496 continue
497 data_id_list.append(detector_inputs.data_id)
498 bbox_list.append(row.getBBox())
499 wcs_list.append(row.getWcs())
501 cornerPosList = Box2D(sky_info.bbox).getCorners()
502 coordList = [sky_info.wcs.pixelToSky(pos) for pos in cornerPosList]
504 good_indices = self.select.run(
505 bboxList=bbox_list,
506 wcsList=wcs_list,
507 visitSummary=visit_summary,
508 coordList=coordList,
509 dataIds=data_id_list,
510 )
511 detector_ids = list(inputs.keys())
512 good_detector_ids = [detector_ids[i] for i in good_indices]
513 return {detector_id: inputs[detector_id] for detector_id in good_detector_ids}
515 def run(self, inputs: Mapping[int, WarpDetectorInputs], sky_info, visit_summary) -> Struct:
516 """Create a Warp dataset from inputs.
518 Parameters
519 ----------
520 inputs : `Mapping` [ `int`, `WarpDetectorInputs` ]
521 Dictionary of input datasets, with the detector id being the key.
522 sky_info : `~lsst.pipe.base.Struct`
523 A Struct object containing wcs, bounding box, and other information
524 about the patches within the tract.
525 visit_summary : `~lsst.afw.table.ExposureCatalog` | None
526 Table of visit summary information. If provided, the visit summary
527 information will be used to update the calibration of the input
528 exposures. If None, the input exposures will be used as-is.
530 Returns
531 -------
532 results : `~lsst.pipe.base.Struct`
533 A Struct object containing the warped exposure, noise exposure(s),
534 and masked fraction image.
535 """
536 if self.config.doSelectPreWarp:
537 inputs = self._preselect_inputs(inputs, sky_info, visit_summary=visit_summary)
538 if not inputs:
539 raise NoWorkFound("No input warps remain after selection for co-addition")
541 sky_info.bbox.grow(self.config.border)
542 target_bbox, target_wcs = sky_info.bbox, sky_info.wcs
544 # Initialize the objects that will hold the warp.
545 final_warp = ExposureF(target_bbox, target_wcs)
547 for _, warp_detector_input in inputs.items():
548 visit_id = warp_detector_input.data_id["visit"]
549 break # Just need the visit id from any one of the inputs.
551 # The warpExposure routine is expensive, and we do not want to call
552 # it twice (i.e., a second time for PSF-matched warps). We do not
553 # want to hold all the warped exposures in memory at once either.
554 # So we create empty exposure(s) to accumulate the warps of each type,
555 # and then process each detector serially.
556 final_warp = self._prepareEmptyExposure(sky_info)
557 final_masked_fraction_warp = self._prepareEmptyExposure(sky_info)
558 final_noise_warps = {
559 n_noise: self._prepareEmptyExposure(sky_info)
560 for n_noise in range(self.config.numberOfNoiseRealizations)
561 }
563 # We need a few bookkeeping variables only for the science coadd.
564 totalGoodPixels = 0
565 inputRecorder = self.inputRecorder.makeCoaddTempExpRecorder(
566 visit_id,
567 len(inputs),
568 )
570 for index, detector_inputs in enumerate(inputs.values()):
571 self.log.debug(
572 "Warping exposure %d/%d for id=%s",
573 index + 1,
574 len(inputs),
575 detector_inputs.data_id,
576 )
578 input_exposure = detector_inputs.exposure
579 # Generate noise image(s) in-situ.
580 seed = self.get_seed_from_data_id(detector_inputs.data_id)
581 # Limit to last 32 bits to avoid overflow in numpy.
582 np_seed = (seed + self.config.seedOffset) & 0xFFFFFFFF
583 self.log.debug("Setting numpy random seed to %d for noise realization", np_seed)
584 rng = np.random.RandomState(np_seed)
586 # Generate noise images in-situ.
587 noise_calexps = self.make_noise_exposures(input_exposure, rng)
589 warpedExposure = self.process(
590 detector_inputs,
591 target_wcs,
592 self.warper,
593 visit_summary,
594 destBBox=target_bbox,
595 )
597 if warpedExposure is None:
598 self.log.debug(
599 "Skipping exposure %s because it could not be warped.", detector_inputs.data_id
600 )
601 continue
603 # Accumulate the partial warps in an online fashion.
604 nGood = copyGoodPixels(
605 final_warp.maskedImage,
606 warpedExposure.maskedImage,
607 final_warp.mask.getPlaneBitMask(["NO_DATA"]),
608 )
610 ccdId = self.config.idGenerator.apply(detector_inputs.data_id).catalog_id
611 inputRecorder.addCalExp(input_exposure, ccdId, nGood)
612 totalGoodPixels += nGood
614 if self.config.doWarpMaskedFraction:
615 # Obtain the masked fraction exposure and warp it.
616 if self.config.doPreWarpInterpolation:
617 badMaskPlanes = self.preWarpInterpolation.config.badMaskPlanes
618 else:
619 badMaskPlanes = []
620 masked_fraction_exp = self._get_bad_mask(input_exposure, badMaskPlanes)
622 masked_fraction_warp = self.maskedFractionWarper.warpExposure(
623 target_wcs, masked_fraction_exp, destBBox=target_bbox
624 )
626 copyGoodPixels(
627 final_masked_fraction_warp.maskedImage,
628 masked_fraction_warp.maskedImage,
629 final_masked_fraction_warp.mask.getPlaneBitMask(["NO_DATA"]),
630 )
632 # Process and accumulate noise images.
633 for n_noise in range(self.config.numberOfNoiseRealizations):
634 noise_calexp = noise_calexps[n_noise]
635 noise_pseudo_inputs = dataclasses.replace(
636 detector_inputs,
637 exposure_or_handle=noise_calexp,
638 background_revert=BackgroundList(),
639 background_apply=BackgroundList(),
640 )
641 warpedNoise = self.process(
642 noise_pseudo_inputs,
643 target_wcs,
644 self.warper,
645 visit_summary,
646 destBBox=target_bbox,
647 )
649 copyGoodPixels(
650 final_noise_warps[n_noise].maskedImage,
651 warpedNoise.maskedImage,
652 final_noise_warps[n_noise].mask.getPlaneBitMask(["NO_DATA"]),
653 )
655 # If there are no good pixels, return a Struct filled with None.
656 if totalGoodPixels == 0:
657 results = Struct(
658 warp=None,
659 masked_fraction_warp=None,
660 )
661 for noise_index in range(self.config.numberOfNoiseRealizations):
662 setattr(results, f"noise_warp{noise_index}", None)
664 return results
666 # Finish the inputRecorder and add the coaddPsf to the final warp.
667 inputRecorder.finish(final_warp, totalGoodPixels)
669 coaddPsf = CoaddPsf(
670 inputRecorder.coaddInputs.ccds,
671 sky_info.wcs,
672 self.config.coaddPsf.makeControl(),
673 )
675 final_warp.setPsf(coaddPsf)
676 for _, warp_detector_input in inputs.items():
677 final_warp.setFilter(warp_detector_input.exposure.getFilter())
678 final_warp.getInfo().setVisitInfo(warp_detector_input.exposure.getInfo().getVisitInfo())
679 break # Just need the filter and visit info from any one of the inputs.
681 results = Struct(
682 warp=final_warp,
683 )
685 if self.config.doWarpMaskedFraction:
686 results.masked_fraction_warp = final_masked_fraction_warp.image
688 for noise_index, noise_exposure in final_noise_warps.items():
689 setattr(results, f"noise_warp{noise_index}", noise_exposure.maskedImage)
691 return results
693 def process(
694 self,
695 detector_inputs: WarpDetectorInputs,
696 target_wcs,
697 warper,
698 visit_summary=None,
699 maxBBox=None,
700 destBBox=None,
701 ) -> ExposureF | None:
702 """Process an exposure.
704 There are three processing steps that are applied to the input:
706 1. Interpolate over bad pixels before warping.
707 2. Apply all calibrations from visit_summary to the exposure.
708 3. Warp the exposure to the target coordinate system.
710 Parameters
711 ----------
712 detector_inputs : `WarpDetectorInputs`
713 The input exposure to be processed, along with any other
714 per-detector modifications.
715 target_wcs : `~lsst.afw.geom.SkyWcs`
716 The WCS of the target patch.
717 warper : `~lsst.afw.math.Warper`
718 The warper to use for warping the input exposure.
719 visit_summary : `~lsst.afw.table.ExposureCatalog` | None
720 Table of visit summary information. If not None, the visit_summary
721 information will be used to update the calibration of the input
722 exposures. Otherwise, the input exposures will be used as-is.
723 maxBBox : `~lsst.geom.Box2I` | None
724 Maximum bounding box of the warped exposure. If None, this is
725 determined automatically.
726 destBBox : `~lsst.geom.Box2I` | None
727 Exact bounding box of the warped exposure. If None, this is
728 determined automatically.
730 Returns
731 -------
732 warped_exposure : `~lsst.afw.image.Exposure` | None
733 The processed and warped exposure, if all the calibrations could
734 be applied successfully. Otherwise, None.
735 """
737 input_exposure = detector_inputs.exposure
738 if self.config.doPreWarpInterpolation:
739 self.preWarpInterpolation.run(input_exposure.maskedImage)
741 success = self._apply_all_calibrations(detector_inputs, visit_summary=visit_summary)
743 if not success:
744 return None
746 with self.timer("warp"):
747 warped_exposure = warper.warpExposure(
748 target_wcs,
749 input_exposure,
750 maxBBox=maxBBox,
751 destBBox=destBBox,
752 )
754 # Potentially a post-warp interpolation here? Relies on DM-38630.
756 return warped_exposure
758 def _apply_all_calibrations(
759 self,
760 detector_inputs: WarpDetectorInputs,
761 *,
762 visit_summary: ExposureCatalog | None = None,
763 ) -> bool:
764 """Apply all of the calibrations from visit_summary to the exposure.
766 Specifically, this method updates the following (if available) to the
767 input exposure in place from ``visit_summary``:
769 - Aperture correction map
770 - Photometric calibration
771 - PSF
772 - WCS
774 It also reverts and applies backgrounds in ``detector_inputs``.
776 Parameters
777 ----------
778 detector_inputs : `WarpDetectorInputs`
779 The input exposure to be processed, along with any other
780 per-detector modifications.
781 visit_summary : `~lsst.afw.table.ExposureCatalog` | None
782 Table of visit summary information. If not None, the visit summary
783 information will be used to update the calibration of the input
784 exposures. Otherwise, the input exposures will be used as-is.
786 Returns
787 -------
788 success : `bool`
789 True if all calibrations were successfully applied,
790 False otherwise.
792 Raises
793 ------
794 RuntimeError
795 Raised if ``visit_summary`` is provided but does not contain a
796 record corresponding to ``detector_inputs``, or if one of the
797 background fields in ``detector_inputs`` is inconsistent with the
798 task configuration.
799 """
800 if self.config.doRevertOldBackground:
801 detector_inputs.revert_background()
802 elif detector_inputs.background_revert:
803 # This could get trigged only if runQuantum is skipped and run is
804 # called directly.
805 raise RuntimeError(
806 f"doRevertOldBackground is False, but {detector_inputs.data_id} has a background_revert."
807 )
809 input_exposure = detector_inputs.exposure
810 if visit_summary is not None:
811 detector = input_exposure.info.getDetector().getId()
812 row = visit_summary.find(detector)
814 if row is None:
815 self.log.info(
816 "Unexpectedly incomplete visit_summary: detector = %s is missing. Skipping it.",
817 detector,
818 )
819 return False
821 if self.config.useVisitSummaryPhotoCalib:
822 photo_calib = row.getPhotoCalib()
823 if photo_calib is None:
824 self.log.info(
825 "No photometric calibration found in visit summary for detector = %s. Skipping it.",
826 detector,
827 )
828 return False
829 elif not np.isfinite(photo_calib.getInstFluxAtZeroMagnitude()):
830 self.log.info(
831 f"Invalid PhotoCalib found in visit summary for detector {detector}. Skipping it.",
832 )
833 return False
834 else:
835 input_exposure.setPhotoCalib(photo_calib)
837 if self.config.useVisitSummaryWcs:
838 if wcs := row.getWcs():
839 input_exposure.setWcs(wcs)
840 else:
841 self.log.info("No WCS found in visit summary for detector = %s. Skipping it.", detector)
842 return False
844 if self.config.useVisitSummaryPsf:
845 if psf := row.getPsf():
846 input_exposure.setPsf(psf)
847 else:
848 self.log.info("No PSF found in visit summary for detector = %s. Skipping it.", detector)
849 return False
851 if apcorr_map := row.getApCorrMap():
852 input_exposure.setApCorrMap(apcorr_map)
853 else:
854 self.log.info(
855 "No aperture correction map found in visit summary for detector = %s. Skipping it",
856 detector,
857 )
858 return False
860 if self.config.doApplyNewBackground:
861 detector_inputs.apply_background()
862 elif detector_inputs.background_apply:
863 # This could get trigged only if runQuantum is skipped and run is
864 # called directly.
865 raise RuntimeError(
866 f"doApplyNewBackground is False, but {detector_inputs.data_id} has a background_apply."
867 )
869 # Calibrate the (masked) image.
870 # This should likely happen even if visit_summary is None.
871 photo_calib = input_exposure.photoCalib
872 input_exposure.maskedImage = photo_calib.calibrateImage(input_exposure.maskedImage)
873 # This new PhotoCalib shouldn't need to be used, but setting it here
874 # to reflect the fact that the image now has calibrated pixels might
875 # help avoid future bugs.
876 input_exposure.setPhotoCalib(PhotoCalib(1.0))
878 return True
880 # This method is copied from makeWarp.py
881 @classmethod
882 def _prepareEmptyExposure(cls, sky_info):
883 """Produce an empty exposure for a given patch.
885 Parameters
886 ----------
887 sky_info : `lsst.pipe.base.Struct`
888 Struct from `~lsst.pipe.base.coaddBase.makeSkyInfo` with
889 geometric information about the patch.
891 Returns
892 -------
893 exp : `lsst.afw.image.exposure.ExposureF`
894 An empty exposure for a given patch.
895 """
896 exp = ExposureF(sky_info.bbox, sky_info.wcs)
897 exp.getMaskedImage().set(np.nan, Mask.getPlaneBitMask("NO_DATA"), np.inf)
898 # Set the PhotoCalib to 1 to mean that pixels are nJy, since we will
899 # calibrate them before we warp them.
900 exp.setPhotoCalib(PhotoCalib(1.0))
901 exp.metadata["BUNIT"] = "nJy"
902 return exp
904 @staticmethod
905 def compute_median_variance(mi: MaskedImage) -> float:
906 """Compute the median variance across the good pixels of a MaskedImage.
908 Parameters
909 ----------
910 mi : `~lsst.afw.image.MaskedImage`
911 The input image on which to compute the median variance.
913 Returns
914 -------
915 median_variance : `float`
916 Median variance of the input calexp.
917 """
918 # Shouldn't this exclude pixels that are masked, to be safe?
919 # This is implemented as it was in descwl_coadd.
920 return np.median(mi.variance.array[np.isfinite(mi.variance.array) & np.isfinite(mi.image.array)])
922 def get_seed_from_data_id(self, data_id) -> int:
923 """Get a seed value given a data_id.
925 This method generates a unique, reproducible pseudo-random number for
926 a data id. This is not affected by ordering of the input, or what
927 set of visits, ccds etc. are given.
929 This is implemented as a public method, so that simulations that
930 don't necessary deal with the middleware can mock up a ``data_id``
931 instance, or override this method with a different one to obtain a
932 seed value consistent with the pipeline task.
934 Parameters
935 ----------
936 data_id : `~lsst.daf.butler.DataCoordinate`
937 Data identifier dictionary.
939 Returns
940 -------
941 seed : `int`
942 A unique seed for this data_id to seed a random number generator.
943 """
944 return self.config.idGenerator.apply(data_id).catalog_id
946 def make_noise_exposures(self, calexp: ExposureF, rng) -> dict[int, ExposureF]:
947 """Make pure noise realizations based on ``calexp``.
949 Parameters
950 ----------
951 calexp : `~lsst.afw.image.ExposureF`
952 The input exposure on which to base the noise realizations.
953 rng : `np.random.RandomState`
954 Random number generator to use for the noise realizations.
956 Returns
957 -------
958 noise_calexps : `dict` [`int`, `~lsst.afw.image.ExposureF`]
959 A mapping of integers ranging from 0 up to
960 config.numberOfNoiseRealizations to the corresponding
961 noise realization exposures.
962 """
963 noise_calexps = {}
965 # If no noise exposures are requested, return the empty dictionary
966 # without any further computations.
967 if self.config.numberOfNoiseRealizations == 0:
968 return noise_calexps
970 if self.config.useMedianVariance:
971 variance = self.compute_median_variance(calexp.maskedImage)
972 else:
973 variance = calexp.variance.array
975 for n_noise in range(self.config.numberOfNoiseRealizations):
976 noise_calexp = calexp.clone()
977 noise_calexp.image.array[:, :] = rng.normal(
978 scale=np.sqrt(variance),
979 size=noise_calexp.image.array.shape,
980 )
981 noise_calexp.variance.array[:, :] = variance
982 noise_calexps[n_noise] = noise_calexp
984 return noise_calexps
986 @classmethod
987 def _get_bad_mask(cls, exp: ExposureF, badMaskPlanes: Iterable[str]) -> ExposureF:
988 """Get an Exposure of bad mask
990 Parameters
991 ----------
992 exp: `lsst.afw.image.Exposure`
993 The exposure data.
994 badMaskPlanes: `list` [`str`]
995 List of mask planes to be considered as bad.
997 Returns
998 -------
999 bad_mask: `~lsst.afw.image.Exposure`
1000 An Exposure with boolean array with True if inverse variance <= 0
1001 or if any of the badMaskPlanes bits are set, and False otherwise.
1002 """
1004 bad_mask = exp.clone()
1006 var = exp.variance.array
1007 mask = exp.mask.array
1009 bitMask = exp.mask.getPlaneBitMask(badMaskPlanes)
1011 bad_mask.image.array[:, :] = (var < 0) | np.isinf(var) | ((mask & bitMask) != 0)
1013 bad_mask.variance.array *= 0.0
1015 return bad_mask