Coverage for python / lsst / drp / tasks / make_direct_warp.py: 23%
325 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-07 08:43 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-07 08:43 +0000
1# This file is part of drp_tasks.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
22from __future__ import annotations
24import dataclasses
25from collections.abc import Mapping
26from typing import TYPE_CHECKING, Iterable
28import numpy as np
30from lsst.afw.image import ExposureF, Image, Mask, PhotoCalib
31from lsst.afw.math import BackgroundList, Warper
32from lsst.coadd.utils import copyGoodPixels
33from lsst.daf.butler import DataCoordinate, DeferredDatasetHandle
34from lsst.geom import Box2D, Box2I
35from lsst.meas.algorithms import CoaddPsf, CoaddPsfConfig, backgroundFlatContext
36from lsst.meas.algorithms.cloughTocher2DInterpolator import CloughTocher2DInterpolateTask
37from lsst.meas.base import DetectorVisitIdGeneratorConfig
38from lsst.pex.config import ConfigField, ConfigurableField, Field, RangeField
39from lsst.pipe.base import (
40 InMemoryDatasetHandle,
41 NoWorkFound,
42 PipelineTask,
43 PipelineTaskConfig,
44 PipelineTaskConnections,
45 Struct,
46)
47from lsst.pipe.base.connectionTypes import Input, Output
48from lsst.pipe.tasks.coaddBase import makeSkyInfo
49from lsst.pipe.tasks.coaddInputRecorder import CoaddInputRecorderTask
50from lsst.pipe.tasks.selectImages import PsfWcsSelectImagesTask
51from lsst.skymap import BaseSkyMap
53if TYPE_CHECKING:
54 from lsst.afw.image import MaskedImage
55 from lsst.afw.table import ExposureCatalog
58__all__ = (
59 "MakeDirectWarpConfig",
60 "MakeDirectWarpTask",
61)
64@dataclasses.dataclass
65class WarpDetectorInputs:
66 """Inputs passed to `MakeDirectWarpTask.run` for a single detector."""
68 exposure_or_handle: ExposureF | DeferredDatasetHandle | InMemoryDatasetHandle
69 """Detector image with initial calibration objects, or a deferred-load
70 handle for one.
71 """
73 data_id: DataCoordinate
74 """Butler data ID for this detector."""
76 background_revert: BackgroundList | None = None
77 """Background model to restore in (i.e. add to) the image."""
79 background_apply: BackgroundList | None = None
80 """Background model to apply to (i.e. subtract from) the image."""
82 background_ratio_or_handle: Image | DeferredDatasetHandle | InMemoryDatasetHandle | None = None
83 """Ratio of background-flattened image to photometric-flattened image."""
85 @property
86 def exposure(self) -> ExposureF:
87 """Get the exposure object, loading it if necessary."""
88 if not isinstance(self.exposure_or_handle, ExposureF):
89 self.exposure_or_handle = self.exposure_or_handle.get()
91 return self.exposure_or_handle
93 @property
94 def background_to_photometric_ratio(self) -> Image:
95 """Get the background_to_photometric object, loading if necessary."""
96 if isinstance(self.background_ratio_or_handle, (DeferredDatasetHandle, InMemoryDatasetHandle)):
97 self.background_ratio_or_handle = self.background_ratio_or_handle.get()
99 return self.background_ratio_or_handle
101 def apply_background(self) -> None:
102 """Apply (subtract) the `background_apply` to the exposure in-place.
104 Raises
105 ------
106 RuntimeError
107 Raised if `background_apply` is None.
108 """
109 if self.background_apply is None:
110 raise RuntimeError("No background to apply")
112 if self.background_apply:
113 with backgroundFlatContext(
114 self.exposure.maskedImage,
115 self.background_to_photometric_ratio is not None,
116 backgroundToPhotometricRatio=self.background_to_photometric_ratio,
117 ):
118 self.exposure.maskedImage -= self.background_apply.getImage()
120 def revert_background(self) -> None:
121 """Revert (add) the `background_revert` from the exposure in-place.
123 Raises
124 ------
125 RuntimeError
126 Raised if `background_revert` is None.
127 """
128 if self.background_revert is None:
129 raise RuntimeError("No background to revert")
131 if self.background_revert:
132 with backgroundFlatContext(
133 self.exposure.maskedImage,
134 self.background_to_photometric_ratio is not None,
135 backgroundToPhotometricRatio=self.background_to_photometric_ratio,
136 ):
137 # Add only if `background_revert` is not a trivial background.
138 self.exposure.maskedImage += self.background_revert.getImage()
141class MakeDirectWarpConnections(
142 PipelineTaskConnections,
143 dimensions=("tract", "patch", "skymap", "instrument", "visit"),
144 defaultTemplates={
145 "coaddName": "deep",
146 },
147):
148 """Connections for MakeWarpTask"""
150 calexp_list = Input(
151 doc="Input exposures to be interpolated and resampled onto a SkyMap projection/patch.",
152 name="calexp",
153 storageClass="ExposureF",
154 dimensions=("instrument", "visit", "detector"),
155 multiple=True,
156 deferLoad=True,
157 )
158 background_revert_list = Input(
159 doc="Background to be reverted (i.e., added back to the calexp). "
160 "This connection is used only if doRevertOldBackground=True.",
161 name="calexpBackground",
162 storageClass="Background",
163 dimensions=("instrument", "visit", "detector"),
164 multiple=True,
165 )
166 background_apply_list = Input(
167 doc="Background to be applied (subtracted from the calexp). "
168 "This is used only if doApplyNewBackground=True.",
169 name="skyCorr",
170 storageClass="Background",
171 dimensions=("instrument", "visit", "detector"),
172 multiple=True,
173 )
174 background_to_photometric_ratio_list = Input(
175 doc="Ratio of a background-flattened image to a photometric-flattened image. "
176 "This is used only if doRevertOldBackground=True or doApplyNewBackground=True "
177 "and doApplyFlatBackgroundRatio=True.",
178 name="background_to_photometric_ratio",
179 storageClass="Image",
180 dimensions=("instrument", "visit", "detector"),
181 multiple=True,
182 deferLoad=True,
183 )
184 visit_summary = Input(
185 doc="Input visit-summary catalog with updated calibration objects.",
186 name="finalVisitSummary",
187 storageClass="ExposureCatalog",
188 dimensions=("instrument", "visit"),
189 )
190 sky_map = Input(
191 doc="Input definition of geometry/bbox and projection/wcs for warps.",
192 name=BaseSkyMap.SKYMAP_DATASET_TYPE_NAME,
193 storageClass="SkyMap",
194 dimensions=("skymap",),
195 )
196 # Declare all possible outputs (except noise, which is configurable)
197 warp = Output(
198 doc="Output direct warped exposure produced by resampling calexps " "onto the skyMap patch geometry.",
199 name="{coaddName}Coadd_directWarp",
200 storageClass="ExposureF",
201 dimensions=("tract", "patch", "skymap", "instrument", "visit"),
202 )
203 masked_fraction_warp = Output(
204 doc="Output masked fraction warped exposure.",
205 name="{coaddName}Coadd_directWarp_maskedFraction",
206 storageClass="ImageF",
207 dimensions=("tract", "patch", "skymap", "instrument", "visit"),
208 )
210 def __init__(self, *, config=None):
211 super().__init__(config=config)
212 if not config:
213 return
215 if not config.doRevertOldBackground:
216 del self.background_revert_list
217 if not config.doApplyNewBackground:
218 del self.background_apply_list
219 if not config.doApplyFlatBackgroundRatio or (
220 not config.doRevertOldBackground and not config.doApplyNewBackground
221 ):
222 del self.background_to_photometric_ratio_list
224 if not config.doWarpMaskedFraction:
225 del self.masked_fraction_warp
227 # Dynamically set output connections for noise images, depending on the
228 # number of noise realization specified in the config.
229 for n in range(config.numberOfNoiseRealizations):
230 noise_warp = Output(
231 doc=f"Output direct warped noise exposure ({n})",
232 name=f"direct_warp_noise{n}",
233 # Store it as a MaskedImage to preserve the variance plane.
234 storageClass="MaskedImageF",
235 dimensions=("tract", "patch", "skymap", "instrument", "visit"),
236 )
237 setattr(self, f"noise_warp{n}", noise_warp)
240class MakeDirectWarpConfig(
241 PipelineTaskConfig,
242 pipelineConnections=MakeDirectWarpConnections,
243):
244 """Configuration for the MakeDirectWarpTask.
246 The config fields are as similar as possible to the corresponding fields in
247 MakeWarpConfig.
249 Notes
250 -----
251 The config fields are in camelCase to match the fields in the earlier
252 version of the makeWarp task as closely as possible.
253 """
255 MAX_NUMBER_OF_NOISE_REALIZATIONS = 3
256 """
257 numberOfNoiseRealizations is defined as a RangeField to prevent from
258 making multiple output connections and blowing up the memory usage by
259 accident. An upper bound of 3 is based on the best guess of the maximum
260 number of noise realizations that will be used for metadetection.
261 """
263 numberOfNoiseRealizations = RangeField[int](
264 doc="Number of noise realizations to simulate and persist.",
265 default=0,
266 min=0,
267 max=MAX_NUMBER_OF_NOISE_REALIZATIONS,
268 inclusiveMax=True,
269 )
270 seedOffset = Field[int](
271 doc="Offset to the seed used for the noise realization. This can be "
272 "used to create a different noise realization if the default ones "
273 "are catastrophic, or for testing sensitivity to the noise.",
274 default=0,
275 )
276 useMedianVariance = Field[bool](
277 doc="Use the median of variance plane in the input calexp to generate "
278 "noise realizations? If False, per-pixel variance will be used.",
279 default=True,
280 )
281 doRevertOldBackground = Field[bool](
282 doc="Revert the old backgrounds from the `background_revert_list` " "connection?",
283 default=False,
284 )
285 doApplyNewBackground = Field[bool](
286 doc="Apply the new backgrounds from the `background_apply_list` " "connection?",
287 default=False,
288 )
289 doApplyFlatBackgroundRatio = Field[bool](
290 doc="Apply flat background ratio prior to background adjustments? Should be True "
291 "if processing was done with an illumination correction.",
292 default=False,
293 )
294 useVisitSummaryPsf = Field[bool](
295 doc="If True, use the PSF model and aperture corrections from the "
296 "'visit_summary' connection to make the warp. If False, use the "
297 "PSF model and aperture corrections from the 'calexp' connection.",
298 default=True,
299 )
300 useVisitSummaryWcs = Field[bool](
301 doc="If True, use the WCS from the "
302 "'visit_summary' connection to make the warp. If False, use the "
303 "WCS from the 'calexp' connection.",
304 default=True,
305 )
306 useVisitSummaryPhotoCalib = Field[bool](
307 doc="If True, use the photometric calibration from the "
308 "'visit_summary' connection to make the warp. If False, use the "
309 "photometric calibration from the 'calexp' connection.",
310 default=True,
311 )
312 doSelectPreWarp = Field[bool](
313 doc="Select ccds before warping?",
314 default=True,
315 )
316 select = ConfigurableField(
317 doc="Image selection subtask.",
318 target=PsfWcsSelectImagesTask,
319 )
320 doPreWarpInterpolation = Field[bool](
321 doc="Interpolate over bad pixels before warping?",
322 default=False,
323 )
324 preWarpInterpolation = ConfigurableField(
325 doc="Interpolation task to use for pre-warping interpolation",
326 target=CloughTocher2DInterpolateTask,
327 )
328 inputRecorder = ConfigurableField(
329 doc="Subtask that helps fill CoaddInputs catalogs added to the final " "coadd",
330 target=CoaddInputRecorderTask,
331 )
332 includeCalibVar = Field[bool](
333 doc="Add photometric calibration variance to warp variance plane?",
334 default=False,
335 deprecated="Deprecated and disabled. Will be removed after v29.",
336 )
337 border = Field[int](
338 doc="Pad the patch boundary of the warp by these many pixels, so as to allow for PSF-matching later",
339 default=256,
340 )
341 warper = ConfigField(
342 doc="Configuration for the warper that warps the image and noise",
343 dtype=Warper.ConfigClass,
344 )
345 doWarpMaskedFraction = Field[bool](
346 doc="Warp the masked fraction image?",
347 default=True,
348 )
349 maskedFractionWarper = ConfigField(
350 doc="Configuration for the warp that warps the mask fraction image",
351 dtype=Warper.ConfigClass,
352 )
353 coaddPsf = ConfigField(
354 doc="Configuration for CoaddPsf",
355 dtype=CoaddPsfConfig,
356 )
357 idGenerator = DetectorVisitIdGeneratorConfig.make_field()
359 # Use bgSubtracted and doApplySkyCorr to match the old MakeWarpConfig,
360 # but as properties instead of config fields.
361 @property
362 def bgSubtracted(self) -> bool:
363 return not self.doRevertOldBackground
365 @bgSubtracted.setter
366 def bgSubtracted(self, value: bool) -> None:
367 self.doRevertOldBackground = not value
369 @property
370 def doApplySkyCorr(self) -> bool:
371 return self.doApplyNewBackground
373 @doApplySkyCorr.setter
374 def doApplySkyCorr(self, value: bool) -> None:
375 self.doApplyNewBackground = value
377 def setDefaults(self) -> None:
378 super().setDefaults()
379 self.warper.warpingKernelName = "lanczos3"
380 self.warper.cacheSize = 0
381 self.maskedFractionWarper.warpingKernelName = "bilinear"
384class MakeDirectWarpTask(PipelineTask):
385 """Warp single-detector images onto a common projection.
387 This task iterates over multiple images (corresponding to different
388 detectors) from a single visit that overlap the target patch. Pixels that
389 receive no input from any detector are set to NaN in the output image, and
390 NO_DATA bit is set in the mask plane.
391 """
393 ConfigClass = MakeDirectWarpConfig
394 _DefaultName = "makeDirectWarp"
396 def __init__(self, **kwargs):
397 super().__init__(**kwargs)
398 self.makeSubtask("inputRecorder")
399 self.makeSubtask("preWarpInterpolation")
400 if self.config.doSelectPreWarp:
401 self.makeSubtask("select")
403 self.warper = Warper.fromConfig(self.config.warper)
404 if self.config.doWarpMaskedFraction:
405 self.maskedFractionWarper = Warper.fromConfig(self.config.maskedFractionWarper)
407 def runQuantum(self, butlerQC, inputRefs, outputRefs):
408 # Docstring inherited.
410 inputs: Mapping[int, WarpDetectorInputs] = {} # Detector ID -> WarpDetectorInputs
411 for handle in butlerQC.get(inputRefs.calexp_list):
412 inputs[handle.dataId["detector"]] = WarpDetectorInputs(
413 exposure_or_handle=handle,
414 data_id=handle.dataId,
415 )
417 if not inputs:
418 raise NoWorkFound("No input warps provided for co-addition")
420 # Add backgrounds to the inputs struct, being careful not to assume
421 # they're present for the same detectors we got image handles for, in
422 # case of upstream errors.
423 for ref in getattr(inputRefs, "background_revert_list", []):
424 detector_id = ref.dataId["detector"]
425 if detector_id in inputs:
426 inputs[detector_id].background_revert = butlerQC.get(ref)
427 else:
428 self.log.info("Got %s for detector %d but no image", ref.datasetType.name, detector_id)
429 for ref in getattr(inputRefs, "background_apply_list", []):
430 detector_id = ref.dataId["detector"]
431 if detector_id in inputs:
432 inputs[detector_id].background_apply = butlerQC.get(ref)
433 else:
434 self.log.info("Got %s for detector %d but no image", ref.datasetType.name, detector_id)
435 for ref in getattr(inputRefs, "background_to_photometric_ratio_list", []):
436 detector_id = ref.dataId["detector"]
437 if detector_id in inputs:
438 inputs[detector_id].background_ratio_or_handle = butlerQC.get(ref)
439 else:
440 self.log.info("Got %s for detector %d but no image", ref.datasetType.name, detector_id)
442 visit_summary = butlerQC.get(inputRefs.visit_summary)
443 sky_map = butlerQC.get(inputRefs.sky_map)
445 quantumDataId = butlerQC.quantum.dataId
446 sky_info = makeSkyInfo(
447 sky_map,
448 tractId=quantumDataId["tract"],
449 patchId=quantumDataId["patch"],
450 )
452 results = self.run(inputs, sky_info, visit_summary=visit_summary)
453 butlerQC.put(results, outputRefs)
455 def _preselect_inputs(
456 self,
457 inputs: Mapping[int, WarpDetectorInputs],
458 sky_info: Struct,
459 visit_summary: ExposureCatalog,
460 ) -> dict[int, WarpDetectorInputs]:
461 """Filter the list of inputs via a 'select' subtask.
463 Parameters
464 ----------
465 inputs : ``~collections.abc.Mapping` [ `int`, `WarpDetectorInputs` ]
466 Per-detector input structs.
467 sky_info : `lsst.pipe.base.Struct`
468 Structure with information about the tract and patch.
469 visit_summary : `~lsst.afw.table.ExposureCatalog`
470 Record with updated calibration information for the full visit.
472 Returns
473 -------
474 filtered_inputs : `dict` [ `int`, `WarpDetectorInputs` ]
475 Like ``inputs``, with rejected detectors dropped.
476 """
477 data_id_list, bbox_list, wcs_list = [], [], []
478 for detector_id, detector_inputs in inputs.items():
479 row = visit_summary.find(detector_id)
480 if row is None:
481 self.log.info(
482 "Input calexp is not listed in visit_summary: %d; assuming bad.",
483 detector_id,
484 )
485 continue
486 data_id_list.append(detector_inputs.data_id)
487 bbox_list.append(row.getBBox())
488 wcs_list.append(row.getWcs())
490 cornerPosList = Box2D(sky_info.bbox).getCorners()
491 coordList = [sky_info.wcs.pixelToSky(pos) for pos in cornerPosList]
493 good_indices = self.select.run(
494 bboxList=bbox_list,
495 wcsList=wcs_list,
496 visitSummary=visit_summary,
497 coordList=coordList,
498 dataIds=data_id_list,
499 )
500 detector_ids = list(inputs.keys())
501 good_detector_ids = [detector_ids[i] for i in good_indices]
502 return {detector_id: inputs[detector_id] for detector_id in good_detector_ids}
504 def run(self, inputs: Mapping[int, WarpDetectorInputs], sky_info, visit_summary) -> Struct:
505 """Create a Warp dataset from inputs.
507 Parameters
508 ----------
509 inputs : `Mapping` [ `int`, `WarpDetectorInputs` ]
510 Dictionary of input datasets, with the detector id being the key.
511 sky_info : `~lsst.pipe.base.Struct`
512 A Struct object containing wcs, bounding box, and other information
513 about the patches within the tract.
514 visit_summary : `~lsst.afw.table.ExposureCatalog` | None
515 Table of visit summary information. If provided, the visit summary
516 information will be used to update the calibration of the input
517 exposures. If None, the input exposures will be used as-is.
519 Returns
520 -------
521 results : `~lsst.pipe.base.Struct`
522 A Struct object containing the warped exposure, noise exposure(s),
523 and masked fraction image.
524 """
525 if self.config.doSelectPreWarp:
526 inputs = self._preselect_inputs(inputs, sky_info, visit_summary=visit_summary)
527 if not inputs:
528 raise NoWorkFound("No input warps remain after selection for co-addition")
530 target_bbox, target_wcs = Box2I(sky_info.bbox), sky_info.wcs
531 target_bbox.grow(self.config.border)
532 del sky_info # Make sure we don't use the original bbox by accident
534 # Initialize the objects that will hold the warp.
535 final_warp = ExposureF(target_bbox, target_wcs)
537 for _, warp_detector_input in inputs.items():
538 visit_id = warp_detector_input.data_id["visit"]
539 break # Just need the visit id from any one of the inputs.
541 # The warpExposure routine is expensive, and we do not want to call
542 # it twice (i.e., a second time for PSF-matched warps). We do not
543 # want to hold all the warped exposures in memory at once either.
544 # So we create empty exposure(s) to accumulate the warps of each type,
545 # and then process each detector serially.
546 final_warp = self._prepareEmptyExposure(target_bbox, target_wcs)
547 final_masked_fraction_warp = self._prepareEmptyExposure(target_bbox, target_wcs)
548 final_noise_warps = {
549 n_noise: self._prepareEmptyExposure(target_bbox, target_wcs)
550 for n_noise in range(self.config.numberOfNoiseRealizations)
551 }
553 # We need a few bookkeeping variables only for the science coadd.
554 totalGoodPixels = 0
555 inputRecorder = self.inputRecorder.makeCoaddTempExpRecorder(
556 visit_id,
557 len(inputs),
558 )
560 for index, detector_inputs in enumerate(inputs.values()):
561 self.log.debug(
562 "Warping exposure %d/%d for id=%s",
563 index + 1,
564 len(inputs),
565 detector_inputs.data_id,
566 )
568 input_exposure = detector_inputs.exposure
569 # Generate noise image(s) in-situ.
570 seed = self.get_seed_from_data_id(detector_inputs.data_id)
571 # Limit to last 32 bits to avoid overflow in numpy.
572 np_seed = (seed + self.config.seedOffset) & 0xFFFFFFFF
573 self.log.debug("Setting numpy random seed to %d for noise realization", np_seed)
574 rng = np.random.RandomState(np_seed)
576 # Generate noise images in-situ.
577 noise_calexps = self.make_noise_exposures(input_exposure, rng)
579 warpedExposure = self.process(
580 detector_inputs,
581 target_wcs,
582 self.warper,
583 visit_summary,
584 destBBox=target_bbox,
585 )
587 if warpedExposure is None:
588 self.log.debug(
589 "Skipping exposure %s because it could not be warped.", detector_inputs.data_id
590 )
591 continue
593 # Accumulate the partial warps in an online fashion.
594 nGood = copyGoodPixels(
595 final_warp.maskedImage,
596 warpedExposure.maskedImage,
597 final_warp.mask.getPlaneBitMask(["NO_DATA"]),
598 )
600 ccdId = self.config.idGenerator.apply(detector_inputs.data_id).catalog_id
601 inputRecorder.addCalExp(input_exposure, ccdId, nGood)
602 totalGoodPixels += nGood
604 if self.config.doWarpMaskedFraction:
605 # Obtain the masked fraction exposure and warp it.
606 if self.config.doPreWarpInterpolation:
607 badMaskPlanes = self.preWarpInterpolation.config.badMaskPlanes
608 else:
609 badMaskPlanes = []
610 masked_fraction_exp = self._get_bad_mask(input_exposure, badMaskPlanes)
612 masked_fraction_warp = self.maskedFractionWarper.warpExposure(
613 target_wcs, masked_fraction_exp, destBBox=target_bbox
614 )
616 copyGoodPixels(
617 final_masked_fraction_warp.maskedImage,
618 masked_fraction_warp.maskedImage,
619 final_masked_fraction_warp.mask.getPlaneBitMask(["NO_DATA"]),
620 )
622 # Process and accumulate noise images.
623 for n_noise in range(self.config.numberOfNoiseRealizations):
624 noise_calexp = noise_calexps[n_noise]
625 noise_pseudo_inputs = dataclasses.replace(
626 detector_inputs,
627 exposure_or_handle=noise_calexp,
628 background_revert=BackgroundList(),
629 background_apply=BackgroundList(),
630 )
631 warpedNoise = self.process(
632 noise_pseudo_inputs,
633 target_wcs,
634 self.warper,
635 visit_summary,
636 destBBox=target_bbox,
637 )
639 copyGoodPixels(
640 final_noise_warps[n_noise].maskedImage,
641 warpedNoise.maskedImage,
642 final_noise_warps[n_noise].mask.getPlaneBitMask(["NO_DATA"]),
643 )
645 # If there are no good pixels, return a Struct filled with None.
646 if totalGoodPixels == 0:
647 results = Struct(
648 warp=None,
649 masked_fraction_warp=None,
650 )
651 for noise_index in range(self.config.numberOfNoiseRealizations):
652 setattr(results, f"noise_warp{noise_index}", None)
654 return results
656 # Finish the inputRecorder and add the coaddPsf to the final warp.
657 inputRecorder.finish(final_warp, totalGoodPixels)
659 coaddPsf = CoaddPsf(
660 inputRecorder.coaddInputs.ccds,
661 target_wcs,
662 self.config.coaddPsf.makeControl(),
663 )
665 final_warp.setPsf(coaddPsf)
666 for _, warp_detector_input in inputs.items():
667 final_warp.setFilter(warp_detector_input.exposure.getFilter())
668 final_warp.getInfo().setVisitInfo(warp_detector_input.exposure.getInfo().getVisitInfo())
669 break # Just need the filter and visit info from any one of the inputs.
671 results = Struct(
672 warp=final_warp,
673 )
675 if self.config.doWarpMaskedFraction:
676 results.masked_fraction_warp = final_masked_fraction_warp.image
678 for noise_index, noise_exposure in final_noise_warps.items():
679 setattr(results, f"noise_warp{noise_index}", noise_exposure.maskedImage)
681 return results
683 def process(
684 self,
685 detector_inputs: WarpDetectorInputs,
686 target_wcs,
687 warper,
688 visit_summary=None,
689 maxBBox=None,
690 destBBox=None,
691 ) -> ExposureF | None:
692 """Process an exposure.
694 There are three processing steps that are applied to the input:
696 1. Interpolate over bad pixels before warping.
697 2. Apply all calibrations from visit_summary to the exposure.
698 3. Warp the exposure to the target coordinate system.
700 Parameters
701 ----------
702 detector_inputs : `WarpDetectorInputs`
703 The input exposure to be processed, along with any other
704 per-detector modifications.
705 target_wcs : `~lsst.afw.geom.SkyWcs`
706 The WCS of the target patch.
707 warper : `~lsst.afw.math.Warper`
708 The warper to use for warping the input exposure.
709 visit_summary : `~lsst.afw.table.ExposureCatalog` | None
710 Table of visit summary information. If not None, the visit_summary
711 information will be used to update the calibration of the input
712 exposures. Otherwise, the input exposures will be used as-is.
713 maxBBox : `~lsst.geom.Box2I` | None
714 Maximum bounding box of the warped exposure. If None, this is
715 determined automatically.
716 destBBox : `~lsst.geom.Box2I` | None
717 Exact bounding box of the warped exposure. If None, this is
718 determined automatically.
720 Returns
721 -------
722 warped_exposure : `~lsst.afw.image.Exposure` | None
723 The processed and warped exposure, if all the calibrations could
724 be applied successfully. Otherwise, None.
725 """
727 input_exposure = detector_inputs.exposure
728 if self.config.doPreWarpInterpolation:
729 self.preWarpInterpolation.run(input_exposure.maskedImage)
731 success = self._apply_all_calibrations(detector_inputs, visit_summary=visit_summary)
733 if not success:
734 return None
736 with self.timer("warp"):
737 warped_exposure = warper.warpExposure(
738 target_wcs,
739 input_exposure,
740 maxBBox=maxBBox,
741 destBBox=destBBox,
742 )
744 # Potentially a post-warp interpolation here? Relies on DM-38630.
746 return warped_exposure
748 def _apply_all_calibrations(
749 self,
750 detector_inputs: WarpDetectorInputs,
751 *,
752 visit_summary: ExposureCatalog | None = None,
753 ) -> bool:
754 """Apply all of the calibrations from visit_summary to the exposure.
756 Specifically, this method updates the following (if available) to the
757 input exposure in place from ``visit_summary``:
759 - Aperture correction map
760 - Photometric calibration
761 - PSF
762 - WCS
764 It also reverts and applies backgrounds in ``detector_inputs``.
766 Parameters
767 ----------
768 detector_inputs : `WarpDetectorInputs`
769 The input exposure to be processed, along with any other
770 per-detector modifications.
771 visit_summary : `~lsst.afw.table.ExposureCatalog` | None
772 Table of visit summary information. If not None, the visit summary
773 information will be used to update the calibration of the input
774 exposures. Otherwise, the input exposures will be used as-is.
776 Returns
777 -------
778 success : `bool`
779 True if all calibrations were successfully applied,
780 False otherwise.
782 Raises
783 ------
784 RuntimeError
785 Raised if ``visit_summary`` is provided but does not contain a
786 record corresponding to ``detector_inputs``, or if one of the
787 background fields in ``detector_inputs`` is inconsistent with the
788 task configuration.
789 """
790 if self.config.doRevertOldBackground:
791 detector_inputs.revert_background()
792 elif detector_inputs.background_revert:
793 # This could get trigged only if runQuantum is skipped and run is
794 # called directly.
795 raise RuntimeError(
796 f"doRevertOldBackground is False, but {detector_inputs.data_id} has a background_revert."
797 )
799 input_exposure = detector_inputs.exposure
800 if visit_summary is not None:
801 detector = input_exposure.info.getDetector().getId()
802 row = visit_summary.find(detector)
804 if row is None:
805 self.log.info(
806 "Unexpectedly incomplete visit_summary: detector = %s is missing. Skipping it.",
807 detector,
808 )
809 return False
811 if self.config.useVisitSummaryPhotoCalib:
812 photo_calib = row.getPhotoCalib()
813 if photo_calib is None:
814 self.log.info(
815 "No photometric calibration found in visit summary for detector = %s. Skipping it.",
816 detector,
817 )
818 return False
819 elif not np.isfinite(photo_calib.getInstFluxAtZeroMagnitude()):
820 self.log.info(
821 f"Invalid PhotoCalib found in visit summary for detector {detector}. Skipping it.",
822 )
823 return False
824 else:
825 input_exposure.setPhotoCalib(photo_calib)
827 if self.config.useVisitSummaryWcs:
828 if wcs := row.getWcs():
829 input_exposure.setWcs(wcs)
830 else:
831 self.log.info("No WCS found in visit summary for detector = %s. Skipping it.", detector)
832 return False
834 if self.config.useVisitSummaryPsf:
835 if psf := row.getPsf():
836 input_exposure.setPsf(psf)
837 else:
838 self.log.info("No PSF found in visit summary for detector = %s. Skipping it.", detector)
839 return False
841 if apcorr_map := row.getApCorrMap():
842 input_exposure.setApCorrMap(apcorr_map)
843 else:
844 self.log.info(
845 "No aperture correction map found in visit summary for detector = %s. Skipping it",
846 detector,
847 )
848 return False
850 if self.config.doApplyNewBackground:
851 detector_inputs.apply_background()
852 elif detector_inputs.background_apply:
853 # This could get trigged only if runQuantum is skipped and run is
854 # called directly.
855 raise RuntimeError(
856 f"doApplyNewBackground is False, but {detector_inputs.data_id} has a background_apply."
857 )
859 # Calibrate the (masked) image.
860 # This should likely happen even if visit_summary is None.
861 photo_calib = input_exposure.photoCalib
862 input_exposure.maskedImage = photo_calib.calibrateImage(input_exposure.maskedImage)
863 # This new PhotoCalib shouldn't need to be used, but setting it here
864 # to reflect the fact that the image now has calibrated pixels might
865 # help avoid future bugs.
866 input_exposure.setPhotoCalib(PhotoCalib(1.0))
868 return True
870 @classmethod
871 def _prepareEmptyExposure(cls, bbox, wcs):
872 """Produce an empty exposure for a given patch.
874 Parameters
875 ----------
876 bbox : `lsst.geom.Box2I`
877 Bounding box for the exposure.
878 wcs : `lsst.afw.geom.SkyWcs`
879 Mapping from pixels to sky.
881 Returns
882 -------
883 exp : `lsst.afw.image.exposure.ExposureF`
884 An empty exposure for a given patch.
885 """
886 exp = ExposureF(bbox, wcs)
887 exp.getMaskedImage().set(np.nan, Mask.getPlaneBitMask("NO_DATA"), np.inf)
888 # Set the PhotoCalib to 1 to mean that pixels are nJy, since we will
889 # calibrate them before we warp them.
890 exp.setPhotoCalib(PhotoCalib(1.0))
891 exp.metadata["BUNIT"] = "nJy"
892 return exp
894 @staticmethod
895 def compute_median_variance(mi: MaskedImage) -> float:
896 """Compute the median variance across the good pixels of a MaskedImage.
898 Parameters
899 ----------
900 mi : `~lsst.afw.image.MaskedImage`
901 The input image on which to compute the median variance.
903 Returns
904 -------
905 median_variance : `float`
906 Median variance of the input calexp.
907 """
908 # Shouldn't this exclude pixels that are masked, to be safe?
909 # This is implemented as it was in descwl_coadd.
910 return np.median(mi.variance.array[np.isfinite(mi.variance.array) & np.isfinite(mi.image.array)])
912 def get_seed_from_data_id(self, data_id) -> int:
913 """Get a seed value given a data_id.
915 This method generates a unique, reproducible pseudo-random number for
916 a data id. This is not affected by ordering of the input, or what
917 set of visits, ccds etc. are given.
919 This is implemented as a public method, so that simulations that
920 don't necessary deal with the middleware can mock up a ``data_id``
921 instance, or override this method with a different one to obtain a
922 seed value consistent with the pipeline task.
924 Parameters
925 ----------
926 data_id : `~lsst.daf.butler.DataCoordinate`
927 Data identifier dictionary.
929 Returns
930 -------
931 seed : `int`
932 A unique seed for this data_id to seed a random number generator.
933 """
934 return self.config.idGenerator.apply(data_id).catalog_id
936 def make_noise_exposures(self, calexp: ExposureF, rng) -> dict[int, ExposureF]:
937 """Make pure noise realizations based on ``calexp``.
939 Parameters
940 ----------
941 calexp : `~lsst.afw.image.ExposureF`
942 The input exposure on which to base the noise realizations.
943 rng : `np.random.RandomState`
944 Random number generator to use for the noise realizations.
946 Returns
947 -------
948 noise_calexps : `dict` [`int`, `~lsst.afw.image.ExposureF`]
949 A mapping of integers ranging from 0 up to
950 config.numberOfNoiseRealizations to the corresponding
951 noise realization exposures.
952 """
953 noise_calexps = {}
955 # If no noise exposures are requested, return the empty dictionary
956 # without any further computations.
957 if self.config.numberOfNoiseRealizations == 0:
958 return noise_calexps
960 if self.config.useMedianVariance:
961 variance = self.compute_median_variance(calexp.maskedImage)
962 else:
963 variance = calexp.variance.array
965 for n_noise in range(self.config.numberOfNoiseRealizations):
966 noise_calexp = calexp.clone()
967 noise_calexp.image.array[:, :] = rng.normal(
968 scale=np.sqrt(variance),
969 size=noise_calexp.image.array.shape,
970 )
971 noise_calexp.variance.array[:, :] = variance
972 noise_calexps[n_noise] = noise_calexp
974 return noise_calexps
976 @classmethod
977 def _get_bad_mask(cls, exp: ExposureF, badMaskPlanes: Iterable[str]) -> ExposureF:
978 """Get an Exposure of bad mask
980 Parameters
981 ----------
982 exp: `lsst.afw.image.Exposure`
983 The exposure data.
984 badMaskPlanes: `list` [`str`]
985 List of mask planes to be considered as bad.
987 Returns
988 -------
989 bad_mask: `~lsst.afw.image.Exposure`
990 An Exposure with boolean array with True if inverse variance <= 0
991 or if any of the badMaskPlanes bits are set, and False otherwise.
992 """
994 bad_mask = exp.clone()
996 var = exp.variance.array
997 mask = exp.mask.array
999 bitMask = exp.mask.getPlaneBitMask(badMaskPlanes)
1001 bad_mask.image.array[:, :] = (var < 0) | np.isinf(var) | ((mask & bitMask) != 0)
1003 bad_mask.variance.array *= 0.0
1005 return bad_mask