Coverage for python / lsst / drp / tasks / make_direct_warp.py: 23%

325 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-07 08:43 +0000

1# This file is part of drp_tasks. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21 

22from __future__ import annotations 

23 

24import dataclasses 

25from collections.abc import Mapping 

26from typing import TYPE_CHECKING, Iterable 

27 

28import numpy as np 

29 

30from lsst.afw.image import ExposureF, Image, Mask, PhotoCalib 

31from lsst.afw.math import BackgroundList, Warper 

32from lsst.coadd.utils import copyGoodPixels 

33from lsst.daf.butler import DataCoordinate, DeferredDatasetHandle 

34from lsst.geom import Box2D, Box2I 

35from lsst.meas.algorithms import CoaddPsf, CoaddPsfConfig, backgroundFlatContext 

36from lsst.meas.algorithms.cloughTocher2DInterpolator import CloughTocher2DInterpolateTask 

37from lsst.meas.base import DetectorVisitIdGeneratorConfig 

38from lsst.pex.config import ConfigField, ConfigurableField, Field, RangeField 

39from lsst.pipe.base import ( 

40 InMemoryDatasetHandle, 

41 NoWorkFound, 

42 PipelineTask, 

43 PipelineTaskConfig, 

44 PipelineTaskConnections, 

45 Struct, 

46) 

47from lsst.pipe.base.connectionTypes import Input, Output 

48from lsst.pipe.tasks.coaddBase import makeSkyInfo 

49from lsst.pipe.tasks.coaddInputRecorder import CoaddInputRecorderTask 

50from lsst.pipe.tasks.selectImages import PsfWcsSelectImagesTask 

51from lsst.skymap import BaseSkyMap 

52 

53if TYPE_CHECKING: 

54 from lsst.afw.image import MaskedImage 

55 from lsst.afw.table import ExposureCatalog 

56 

57 

58__all__ = ( 

59 "MakeDirectWarpConfig", 

60 "MakeDirectWarpTask", 

61) 

62 

63 

64@dataclasses.dataclass 

65class WarpDetectorInputs: 

66 """Inputs passed to `MakeDirectWarpTask.run` for a single detector.""" 

67 

68 exposure_or_handle: ExposureF | DeferredDatasetHandle | InMemoryDatasetHandle 

69 """Detector image with initial calibration objects, or a deferred-load 

70 handle for one. 

71 """ 

72 

73 data_id: DataCoordinate 

74 """Butler data ID for this detector.""" 

75 

76 background_revert: BackgroundList | None = None 

77 """Background model to restore in (i.e. add to) the image.""" 

78 

79 background_apply: BackgroundList | None = None 

80 """Background model to apply to (i.e. subtract from) the image.""" 

81 

82 background_ratio_or_handle: Image | DeferredDatasetHandle | InMemoryDatasetHandle | None = None 

83 """Ratio of background-flattened image to photometric-flattened image.""" 

84 

85 @property 

86 def exposure(self) -> ExposureF: 

87 """Get the exposure object, loading it if necessary.""" 

88 if not isinstance(self.exposure_or_handle, ExposureF): 

89 self.exposure_or_handle = self.exposure_or_handle.get() 

90 

91 return self.exposure_or_handle 

92 

93 @property 

94 def background_to_photometric_ratio(self) -> Image: 

95 """Get the background_to_photometric object, loading if necessary.""" 

96 if isinstance(self.background_ratio_or_handle, (DeferredDatasetHandle, InMemoryDatasetHandle)): 

97 self.background_ratio_or_handle = self.background_ratio_or_handle.get() 

98 

99 return self.background_ratio_or_handle 

100 

101 def apply_background(self) -> None: 

102 """Apply (subtract) the `background_apply` to the exposure in-place. 

103 

104 Raises 

105 ------ 

106 RuntimeError 

107 Raised if `background_apply` is None. 

108 """ 

109 if self.background_apply is None: 

110 raise RuntimeError("No background to apply") 

111 

112 if self.background_apply: 

113 with backgroundFlatContext( 

114 self.exposure.maskedImage, 

115 self.background_to_photometric_ratio is not None, 

116 backgroundToPhotometricRatio=self.background_to_photometric_ratio, 

117 ): 

118 self.exposure.maskedImage -= self.background_apply.getImage() 

119 

120 def revert_background(self) -> None: 

121 """Revert (add) the `background_revert` from the exposure in-place. 

122 

123 Raises 

124 ------ 

125 RuntimeError 

126 Raised if `background_revert` is None. 

127 """ 

128 if self.background_revert is None: 

129 raise RuntimeError("No background to revert") 

130 

131 if self.background_revert: 

132 with backgroundFlatContext( 

133 self.exposure.maskedImage, 

134 self.background_to_photometric_ratio is not None, 

135 backgroundToPhotometricRatio=self.background_to_photometric_ratio, 

136 ): 

137 # Add only if `background_revert` is not a trivial background. 

138 self.exposure.maskedImage += self.background_revert.getImage() 

139 

140 

141class MakeDirectWarpConnections( 

142 PipelineTaskConnections, 

143 dimensions=("tract", "patch", "skymap", "instrument", "visit"), 

144 defaultTemplates={ 

145 "coaddName": "deep", 

146 }, 

147): 

148 """Connections for MakeWarpTask""" 

149 

150 calexp_list = Input( 

151 doc="Input exposures to be interpolated and resampled onto a SkyMap projection/patch.", 

152 name="calexp", 

153 storageClass="ExposureF", 

154 dimensions=("instrument", "visit", "detector"), 

155 multiple=True, 

156 deferLoad=True, 

157 ) 

158 background_revert_list = Input( 

159 doc="Background to be reverted (i.e., added back to the calexp). " 

160 "This connection is used only if doRevertOldBackground=True.", 

161 name="calexpBackground", 

162 storageClass="Background", 

163 dimensions=("instrument", "visit", "detector"), 

164 multiple=True, 

165 ) 

166 background_apply_list = Input( 

167 doc="Background to be applied (subtracted from the calexp). " 

168 "This is used only if doApplyNewBackground=True.", 

169 name="skyCorr", 

170 storageClass="Background", 

171 dimensions=("instrument", "visit", "detector"), 

172 multiple=True, 

173 ) 

174 background_to_photometric_ratio_list = Input( 

175 doc="Ratio of a background-flattened image to a photometric-flattened image. " 

176 "This is used only if doRevertOldBackground=True or doApplyNewBackground=True " 

177 "and doApplyFlatBackgroundRatio=True.", 

178 name="background_to_photometric_ratio", 

179 storageClass="Image", 

180 dimensions=("instrument", "visit", "detector"), 

181 multiple=True, 

182 deferLoad=True, 

183 ) 

184 visit_summary = Input( 

185 doc="Input visit-summary catalog with updated calibration objects.", 

186 name="finalVisitSummary", 

187 storageClass="ExposureCatalog", 

188 dimensions=("instrument", "visit"), 

189 ) 

190 sky_map = Input( 

191 doc="Input definition of geometry/bbox and projection/wcs for warps.", 

192 name=BaseSkyMap.SKYMAP_DATASET_TYPE_NAME, 

193 storageClass="SkyMap", 

194 dimensions=("skymap",), 

195 ) 

196 # Declare all possible outputs (except noise, which is configurable) 

197 warp = Output( 

198 doc="Output direct warped exposure produced by resampling calexps " "onto the skyMap patch geometry.", 

199 name="{coaddName}Coadd_directWarp", 

200 storageClass="ExposureF", 

201 dimensions=("tract", "patch", "skymap", "instrument", "visit"), 

202 ) 

203 masked_fraction_warp = Output( 

204 doc="Output masked fraction warped exposure.", 

205 name="{coaddName}Coadd_directWarp_maskedFraction", 

206 storageClass="ImageF", 

207 dimensions=("tract", "patch", "skymap", "instrument", "visit"), 

208 ) 

209 

210 def __init__(self, *, config=None): 

211 super().__init__(config=config) 

212 if not config: 

213 return 

214 

215 if not config.doRevertOldBackground: 

216 del self.background_revert_list 

217 if not config.doApplyNewBackground: 

218 del self.background_apply_list 

219 if not config.doApplyFlatBackgroundRatio or ( 

220 not config.doRevertOldBackground and not config.doApplyNewBackground 

221 ): 

222 del self.background_to_photometric_ratio_list 

223 

224 if not config.doWarpMaskedFraction: 

225 del self.masked_fraction_warp 

226 

227 # Dynamically set output connections for noise images, depending on the 

228 # number of noise realization specified in the config. 

229 for n in range(config.numberOfNoiseRealizations): 

230 noise_warp = Output( 

231 doc=f"Output direct warped noise exposure ({n})", 

232 name=f"direct_warp_noise{n}", 

233 # Store it as a MaskedImage to preserve the variance plane. 

234 storageClass="MaskedImageF", 

235 dimensions=("tract", "patch", "skymap", "instrument", "visit"), 

236 ) 

237 setattr(self, f"noise_warp{n}", noise_warp) 

238 

239 

240class MakeDirectWarpConfig( 

241 PipelineTaskConfig, 

242 pipelineConnections=MakeDirectWarpConnections, 

243): 

244 """Configuration for the MakeDirectWarpTask. 

245 

246 The config fields are as similar as possible to the corresponding fields in 

247 MakeWarpConfig. 

248 

249 Notes 

250 ----- 

251 The config fields are in camelCase to match the fields in the earlier 

252 version of the makeWarp task as closely as possible. 

253 """ 

254 

255 MAX_NUMBER_OF_NOISE_REALIZATIONS = 3 

256 """ 

257 numberOfNoiseRealizations is defined as a RangeField to prevent from 

258 making multiple output connections and blowing up the memory usage by 

259 accident. An upper bound of 3 is based on the best guess of the maximum 

260 number of noise realizations that will be used for metadetection. 

261 """ 

262 

263 numberOfNoiseRealizations = RangeField[int]( 

264 doc="Number of noise realizations to simulate and persist.", 

265 default=0, 

266 min=0, 

267 max=MAX_NUMBER_OF_NOISE_REALIZATIONS, 

268 inclusiveMax=True, 

269 ) 

270 seedOffset = Field[int]( 

271 doc="Offset to the seed used for the noise realization. This can be " 

272 "used to create a different noise realization if the default ones " 

273 "are catastrophic, or for testing sensitivity to the noise.", 

274 default=0, 

275 ) 

276 useMedianVariance = Field[bool]( 

277 doc="Use the median of variance plane in the input calexp to generate " 

278 "noise realizations? If False, per-pixel variance will be used.", 

279 default=True, 

280 ) 

281 doRevertOldBackground = Field[bool]( 

282 doc="Revert the old backgrounds from the `background_revert_list` " "connection?", 

283 default=False, 

284 ) 

285 doApplyNewBackground = Field[bool]( 

286 doc="Apply the new backgrounds from the `background_apply_list` " "connection?", 

287 default=False, 

288 ) 

289 doApplyFlatBackgroundRatio = Field[bool]( 

290 doc="Apply flat background ratio prior to background adjustments? Should be True " 

291 "if processing was done with an illumination correction.", 

292 default=False, 

293 ) 

294 useVisitSummaryPsf = Field[bool]( 

295 doc="If True, use the PSF model and aperture corrections from the " 

296 "'visit_summary' connection to make the warp. If False, use the " 

297 "PSF model and aperture corrections from the 'calexp' connection.", 

298 default=True, 

299 ) 

300 useVisitSummaryWcs = Field[bool]( 

301 doc="If True, use the WCS from the " 

302 "'visit_summary' connection to make the warp. If False, use the " 

303 "WCS from the 'calexp' connection.", 

304 default=True, 

305 ) 

306 useVisitSummaryPhotoCalib = Field[bool]( 

307 doc="If True, use the photometric calibration from the " 

308 "'visit_summary' connection to make the warp. If False, use the " 

309 "photometric calibration from the 'calexp' connection.", 

310 default=True, 

311 ) 

312 doSelectPreWarp = Field[bool]( 

313 doc="Select ccds before warping?", 

314 default=True, 

315 ) 

316 select = ConfigurableField( 

317 doc="Image selection subtask.", 

318 target=PsfWcsSelectImagesTask, 

319 ) 

320 doPreWarpInterpolation = Field[bool]( 

321 doc="Interpolate over bad pixels before warping?", 

322 default=False, 

323 ) 

324 preWarpInterpolation = ConfigurableField( 

325 doc="Interpolation task to use for pre-warping interpolation", 

326 target=CloughTocher2DInterpolateTask, 

327 ) 

328 inputRecorder = ConfigurableField( 

329 doc="Subtask that helps fill CoaddInputs catalogs added to the final " "coadd", 

330 target=CoaddInputRecorderTask, 

331 ) 

332 includeCalibVar = Field[bool]( 

333 doc="Add photometric calibration variance to warp variance plane?", 

334 default=False, 

335 deprecated="Deprecated and disabled. Will be removed after v29.", 

336 ) 

337 border = Field[int]( 

338 doc="Pad the patch boundary of the warp by these many pixels, so as to allow for PSF-matching later", 

339 default=256, 

340 ) 

341 warper = ConfigField( 

342 doc="Configuration for the warper that warps the image and noise", 

343 dtype=Warper.ConfigClass, 

344 ) 

345 doWarpMaskedFraction = Field[bool]( 

346 doc="Warp the masked fraction image?", 

347 default=True, 

348 ) 

349 maskedFractionWarper = ConfigField( 

350 doc="Configuration for the warp that warps the mask fraction image", 

351 dtype=Warper.ConfigClass, 

352 ) 

353 coaddPsf = ConfigField( 

354 doc="Configuration for CoaddPsf", 

355 dtype=CoaddPsfConfig, 

356 ) 

357 idGenerator = DetectorVisitIdGeneratorConfig.make_field() 

358 

359 # Use bgSubtracted and doApplySkyCorr to match the old MakeWarpConfig, 

360 # but as properties instead of config fields. 

361 @property 

362 def bgSubtracted(self) -> bool: 

363 return not self.doRevertOldBackground 

364 

365 @bgSubtracted.setter 

366 def bgSubtracted(self, value: bool) -> None: 

367 self.doRevertOldBackground = not value 

368 

369 @property 

370 def doApplySkyCorr(self) -> bool: 

371 return self.doApplyNewBackground 

372 

373 @doApplySkyCorr.setter 

374 def doApplySkyCorr(self, value: bool) -> None: 

375 self.doApplyNewBackground = value 

376 

377 def setDefaults(self) -> None: 

378 super().setDefaults() 

379 self.warper.warpingKernelName = "lanczos3" 

380 self.warper.cacheSize = 0 

381 self.maskedFractionWarper.warpingKernelName = "bilinear" 

382 

383 

384class MakeDirectWarpTask(PipelineTask): 

385 """Warp single-detector images onto a common projection. 

386 

387 This task iterates over multiple images (corresponding to different 

388 detectors) from a single visit that overlap the target patch. Pixels that 

389 receive no input from any detector are set to NaN in the output image, and 

390 NO_DATA bit is set in the mask plane. 

391 """ 

392 

393 ConfigClass = MakeDirectWarpConfig 

394 _DefaultName = "makeDirectWarp" 

395 

396 def __init__(self, **kwargs): 

397 super().__init__(**kwargs) 

398 self.makeSubtask("inputRecorder") 

399 self.makeSubtask("preWarpInterpolation") 

400 if self.config.doSelectPreWarp: 

401 self.makeSubtask("select") 

402 

403 self.warper = Warper.fromConfig(self.config.warper) 

404 if self.config.doWarpMaskedFraction: 

405 self.maskedFractionWarper = Warper.fromConfig(self.config.maskedFractionWarper) 

406 

407 def runQuantum(self, butlerQC, inputRefs, outputRefs): 

408 # Docstring inherited. 

409 

410 inputs: Mapping[int, WarpDetectorInputs] = {} # Detector ID -> WarpDetectorInputs 

411 for handle in butlerQC.get(inputRefs.calexp_list): 

412 inputs[handle.dataId["detector"]] = WarpDetectorInputs( 

413 exposure_or_handle=handle, 

414 data_id=handle.dataId, 

415 ) 

416 

417 if not inputs: 

418 raise NoWorkFound("No input warps provided for co-addition") 

419 

420 # Add backgrounds to the inputs struct, being careful not to assume 

421 # they're present for the same detectors we got image handles for, in 

422 # case of upstream errors. 

423 for ref in getattr(inputRefs, "background_revert_list", []): 

424 detector_id = ref.dataId["detector"] 

425 if detector_id in inputs: 

426 inputs[detector_id].background_revert = butlerQC.get(ref) 

427 else: 

428 self.log.info("Got %s for detector %d but no image", ref.datasetType.name, detector_id) 

429 for ref in getattr(inputRefs, "background_apply_list", []): 

430 detector_id = ref.dataId["detector"] 

431 if detector_id in inputs: 

432 inputs[detector_id].background_apply = butlerQC.get(ref) 

433 else: 

434 self.log.info("Got %s for detector %d but no image", ref.datasetType.name, detector_id) 

435 for ref in getattr(inputRefs, "background_to_photometric_ratio_list", []): 

436 detector_id = ref.dataId["detector"] 

437 if detector_id in inputs: 

438 inputs[detector_id].background_ratio_or_handle = butlerQC.get(ref) 

439 else: 

440 self.log.info("Got %s for detector %d but no image", ref.datasetType.name, detector_id) 

441 

442 visit_summary = butlerQC.get(inputRefs.visit_summary) 

443 sky_map = butlerQC.get(inputRefs.sky_map) 

444 

445 quantumDataId = butlerQC.quantum.dataId 

446 sky_info = makeSkyInfo( 

447 sky_map, 

448 tractId=quantumDataId["tract"], 

449 patchId=quantumDataId["patch"], 

450 ) 

451 

452 results = self.run(inputs, sky_info, visit_summary=visit_summary) 

453 butlerQC.put(results, outputRefs) 

454 

455 def _preselect_inputs( 

456 self, 

457 inputs: Mapping[int, WarpDetectorInputs], 

458 sky_info: Struct, 

459 visit_summary: ExposureCatalog, 

460 ) -> dict[int, WarpDetectorInputs]: 

461 """Filter the list of inputs via a 'select' subtask. 

462 

463 Parameters 

464 ---------- 

465 inputs : ``~collections.abc.Mapping` [ `int`, `WarpDetectorInputs` ] 

466 Per-detector input structs. 

467 sky_info : `lsst.pipe.base.Struct` 

468 Structure with information about the tract and patch. 

469 visit_summary : `~lsst.afw.table.ExposureCatalog` 

470 Record with updated calibration information for the full visit. 

471 

472 Returns 

473 ------- 

474 filtered_inputs : `dict` [ `int`, `WarpDetectorInputs` ] 

475 Like ``inputs``, with rejected detectors dropped. 

476 """ 

477 data_id_list, bbox_list, wcs_list = [], [], [] 

478 for detector_id, detector_inputs in inputs.items(): 

479 row = visit_summary.find(detector_id) 

480 if row is None: 

481 self.log.info( 

482 "Input calexp is not listed in visit_summary: %d; assuming bad.", 

483 detector_id, 

484 ) 

485 continue 

486 data_id_list.append(detector_inputs.data_id) 

487 bbox_list.append(row.getBBox()) 

488 wcs_list.append(row.getWcs()) 

489 

490 cornerPosList = Box2D(sky_info.bbox).getCorners() 

491 coordList = [sky_info.wcs.pixelToSky(pos) for pos in cornerPosList] 

492 

493 good_indices = self.select.run( 

494 bboxList=bbox_list, 

495 wcsList=wcs_list, 

496 visitSummary=visit_summary, 

497 coordList=coordList, 

498 dataIds=data_id_list, 

499 ) 

500 detector_ids = list(inputs.keys()) 

501 good_detector_ids = [detector_ids[i] for i in good_indices] 

502 return {detector_id: inputs[detector_id] for detector_id in good_detector_ids} 

503 

504 def run(self, inputs: Mapping[int, WarpDetectorInputs], sky_info, visit_summary) -> Struct: 

505 """Create a Warp dataset from inputs. 

506 

507 Parameters 

508 ---------- 

509 inputs : `Mapping` [ `int`, `WarpDetectorInputs` ] 

510 Dictionary of input datasets, with the detector id being the key. 

511 sky_info : `~lsst.pipe.base.Struct` 

512 A Struct object containing wcs, bounding box, and other information 

513 about the patches within the tract. 

514 visit_summary : `~lsst.afw.table.ExposureCatalog` | None 

515 Table of visit summary information. If provided, the visit summary 

516 information will be used to update the calibration of the input 

517 exposures. If None, the input exposures will be used as-is. 

518 

519 Returns 

520 ------- 

521 results : `~lsst.pipe.base.Struct` 

522 A Struct object containing the warped exposure, noise exposure(s), 

523 and masked fraction image. 

524 """ 

525 if self.config.doSelectPreWarp: 

526 inputs = self._preselect_inputs(inputs, sky_info, visit_summary=visit_summary) 

527 if not inputs: 

528 raise NoWorkFound("No input warps remain after selection for co-addition") 

529 

530 target_bbox, target_wcs = Box2I(sky_info.bbox), sky_info.wcs 

531 target_bbox.grow(self.config.border) 

532 del sky_info # Make sure we don't use the original bbox by accident 

533 

534 # Initialize the objects that will hold the warp. 

535 final_warp = ExposureF(target_bbox, target_wcs) 

536 

537 for _, warp_detector_input in inputs.items(): 

538 visit_id = warp_detector_input.data_id["visit"] 

539 break # Just need the visit id from any one of the inputs. 

540 

541 # The warpExposure routine is expensive, and we do not want to call 

542 # it twice (i.e., a second time for PSF-matched warps). We do not 

543 # want to hold all the warped exposures in memory at once either. 

544 # So we create empty exposure(s) to accumulate the warps of each type, 

545 # and then process each detector serially. 

546 final_warp = self._prepareEmptyExposure(target_bbox, target_wcs) 

547 final_masked_fraction_warp = self._prepareEmptyExposure(target_bbox, target_wcs) 

548 final_noise_warps = { 

549 n_noise: self._prepareEmptyExposure(target_bbox, target_wcs) 

550 for n_noise in range(self.config.numberOfNoiseRealizations) 

551 } 

552 

553 # We need a few bookkeeping variables only for the science coadd. 

554 totalGoodPixels = 0 

555 inputRecorder = self.inputRecorder.makeCoaddTempExpRecorder( 

556 visit_id, 

557 len(inputs), 

558 ) 

559 

560 for index, detector_inputs in enumerate(inputs.values()): 

561 self.log.debug( 

562 "Warping exposure %d/%d for id=%s", 

563 index + 1, 

564 len(inputs), 

565 detector_inputs.data_id, 

566 ) 

567 

568 input_exposure = detector_inputs.exposure 

569 # Generate noise image(s) in-situ. 

570 seed = self.get_seed_from_data_id(detector_inputs.data_id) 

571 # Limit to last 32 bits to avoid overflow in numpy. 

572 np_seed = (seed + self.config.seedOffset) & 0xFFFFFFFF 

573 self.log.debug("Setting numpy random seed to %d for noise realization", np_seed) 

574 rng = np.random.RandomState(np_seed) 

575 

576 # Generate noise images in-situ. 

577 noise_calexps = self.make_noise_exposures(input_exposure, rng) 

578 

579 warpedExposure = self.process( 

580 detector_inputs, 

581 target_wcs, 

582 self.warper, 

583 visit_summary, 

584 destBBox=target_bbox, 

585 ) 

586 

587 if warpedExposure is None: 

588 self.log.debug( 

589 "Skipping exposure %s because it could not be warped.", detector_inputs.data_id 

590 ) 

591 continue 

592 

593 # Accumulate the partial warps in an online fashion. 

594 nGood = copyGoodPixels( 

595 final_warp.maskedImage, 

596 warpedExposure.maskedImage, 

597 final_warp.mask.getPlaneBitMask(["NO_DATA"]), 

598 ) 

599 

600 ccdId = self.config.idGenerator.apply(detector_inputs.data_id).catalog_id 

601 inputRecorder.addCalExp(input_exposure, ccdId, nGood) 

602 totalGoodPixels += nGood 

603 

604 if self.config.doWarpMaskedFraction: 

605 # Obtain the masked fraction exposure and warp it. 

606 if self.config.doPreWarpInterpolation: 

607 badMaskPlanes = self.preWarpInterpolation.config.badMaskPlanes 

608 else: 

609 badMaskPlanes = [] 

610 masked_fraction_exp = self._get_bad_mask(input_exposure, badMaskPlanes) 

611 

612 masked_fraction_warp = self.maskedFractionWarper.warpExposure( 

613 target_wcs, masked_fraction_exp, destBBox=target_bbox 

614 ) 

615 

616 copyGoodPixels( 

617 final_masked_fraction_warp.maskedImage, 

618 masked_fraction_warp.maskedImage, 

619 final_masked_fraction_warp.mask.getPlaneBitMask(["NO_DATA"]), 

620 ) 

621 

622 # Process and accumulate noise images. 

623 for n_noise in range(self.config.numberOfNoiseRealizations): 

624 noise_calexp = noise_calexps[n_noise] 

625 noise_pseudo_inputs = dataclasses.replace( 

626 detector_inputs, 

627 exposure_or_handle=noise_calexp, 

628 background_revert=BackgroundList(), 

629 background_apply=BackgroundList(), 

630 ) 

631 warpedNoise = self.process( 

632 noise_pseudo_inputs, 

633 target_wcs, 

634 self.warper, 

635 visit_summary, 

636 destBBox=target_bbox, 

637 ) 

638 

639 copyGoodPixels( 

640 final_noise_warps[n_noise].maskedImage, 

641 warpedNoise.maskedImage, 

642 final_noise_warps[n_noise].mask.getPlaneBitMask(["NO_DATA"]), 

643 ) 

644 

645 # If there are no good pixels, return a Struct filled with None. 

646 if totalGoodPixels == 0: 

647 results = Struct( 

648 warp=None, 

649 masked_fraction_warp=None, 

650 ) 

651 for noise_index in range(self.config.numberOfNoiseRealizations): 

652 setattr(results, f"noise_warp{noise_index}", None) 

653 

654 return results 

655 

656 # Finish the inputRecorder and add the coaddPsf to the final warp. 

657 inputRecorder.finish(final_warp, totalGoodPixels) 

658 

659 coaddPsf = CoaddPsf( 

660 inputRecorder.coaddInputs.ccds, 

661 target_wcs, 

662 self.config.coaddPsf.makeControl(), 

663 ) 

664 

665 final_warp.setPsf(coaddPsf) 

666 for _, warp_detector_input in inputs.items(): 

667 final_warp.setFilter(warp_detector_input.exposure.getFilter()) 

668 final_warp.getInfo().setVisitInfo(warp_detector_input.exposure.getInfo().getVisitInfo()) 

669 break # Just need the filter and visit info from any one of the inputs. 

670 

671 results = Struct( 

672 warp=final_warp, 

673 ) 

674 

675 if self.config.doWarpMaskedFraction: 

676 results.masked_fraction_warp = final_masked_fraction_warp.image 

677 

678 for noise_index, noise_exposure in final_noise_warps.items(): 

679 setattr(results, f"noise_warp{noise_index}", noise_exposure.maskedImage) 

680 

681 return results 

682 

683 def process( 

684 self, 

685 detector_inputs: WarpDetectorInputs, 

686 target_wcs, 

687 warper, 

688 visit_summary=None, 

689 maxBBox=None, 

690 destBBox=None, 

691 ) -> ExposureF | None: 

692 """Process an exposure. 

693 

694 There are three processing steps that are applied to the input: 

695 

696 1. Interpolate over bad pixels before warping. 

697 2. Apply all calibrations from visit_summary to the exposure. 

698 3. Warp the exposure to the target coordinate system. 

699 

700 Parameters 

701 ---------- 

702 detector_inputs : `WarpDetectorInputs` 

703 The input exposure to be processed, along with any other 

704 per-detector modifications. 

705 target_wcs : `~lsst.afw.geom.SkyWcs` 

706 The WCS of the target patch. 

707 warper : `~lsst.afw.math.Warper` 

708 The warper to use for warping the input exposure. 

709 visit_summary : `~lsst.afw.table.ExposureCatalog` | None 

710 Table of visit summary information. If not None, the visit_summary 

711 information will be used to update the calibration of the input 

712 exposures. Otherwise, the input exposures will be used as-is. 

713 maxBBox : `~lsst.geom.Box2I` | None 

714 Maximum bounding box of the warped exposure. If None, this is 

715 determined automatically. 

716 destBBox : `~lsst.geom.Box2I` | None 

717 Exact bounding box of the warped exposure. If None, this is 

718 determined automatically. 

719 

720 Returns 

721 ------- 

722 warped_exposure : `~lsst.afw.image.Exposure` | None 

723 The processed and warped exposure, if all the calibrations could 

724 be applied successfully. Otherwise, None. 

725 """ 

726 

727 input_exposure = detector_inputs.exposure 

728 if self.config.doPreWarpInterpolation: 

729 self.preWarpInterpolation.run(input_exposure.maskedImage) 

730 

731 success = self._apply_all_calibrations(detector_inputs, visit_summary=visit_summary) 

732 

733 if not success: 

734 return None 

735 

736 with self.timer("warp"): 

737 warped_exposure = warper.warpExposure( 

738 target_wcs, 

739 input_exposure, 

740 maxBBox=maxBBox, 

741 destBBox=destBBox, 

742 ) 

743 

744 # Potentially a post-warp interpolation here? Relies on DM-38630. 

745 

746 return warped_exposure 

747 

748 def _apply_all_calibrations( 

749 self, 

750 detector_inputs: WarpDetectorInputs, 

751 *, 

752 visit_summary: ExposureCatalog | None = None, 

753 ) -> bool: 

754 """Apply all of the calibrations from visit_summary to the exposure. 

755 

756 Specifically, this method updates the following (if available) to the 

757 input exposure in place from ``visit_summary``: 

758 

759 - Aperture correction map 

760 - Photometric calibration 

761 - PSF 

762 - WCS 

763 

764 It also reverts and applies backgrounds in ``detector_inputs``. 

765 

766 Parameters 

767 ---------- 

768 detector_inputs : `WarpDetectorInputs` 

769 The input exposure to be processed, along with any other 

770 per-detector modifications. 

771 visit_summary : `~lsst.afw.table.ExposureCatalog` | None 

772 Table of visit summary information. If not None, the visit summary 

773 information will be used to update the calibration of the input 

774 exposures. Otherwise, the input exposures will be used as-is. 

775 

776 Returns 

777 ------- 

778 success : `bool` 

779 True if all calibrations were successfully applied, 

780 False otherwise. 

781 

782 Raises 

783 ------ 

784 RuntimeError 

785 Raised if ``visit_summary`` is provided but does not contain a 

786 record corresponding to ``detector_inputs``, or if one of the 

787 background fields in ``detector_inputs`` is inconsistent with the 

788 task configuration. 

789 """ 

790 if self.config.doRevertOldBackground: 

791 detector_inputs.revert_background() 

792 elif detector_inputs.background_revert: 

793 # This could get trigged only if runQuantum is skipped and run is 

794 # called directly. 

795 raise RuntimeError( 

796 f"doRevertOldBackground is False, but {detector_inputs.data_id} has a background_revert." 

797 ) 

798 

799 input_exposure = detector_inputs.exposure 

800 if visit_summary is not None: 

801 detector = input_exposure.info.getDetector().getId() 

802 row = visit_summary.find(detector) 

803 

804 if row is None: 

805 self.log.info( 

806 "Unexpectedly incomplete visit_summary: detector = %s is missing. Skipping it.", 

807 detector, 

808 ) 

809 return False 

810 

811 if self.config.useVisitSummaryPhotoCalib: 

812 photo_calib = row.getPhotoCalib() 

813 if photo_calib is None: 

814 self.log.info( 

815 "No photometric calibration found in visit summary for detector = %s. Skipping it.", 

816 detector, 

817 ) 

818 return False 

819 elif not np.isfinite(photo_calib.getInstFluxAtZeroMagnitude()): 

820 self.log.info( 

821 f"Invalid PhotoCalib found in visit summary for detector {detector}. Skipping it.", 

822 ) 

823 return False 

824 else: 

825 input_exposure.setPhotoCalib(photo_calib) 

826 

827 if self.config.useVisitSummaryWcs: 

828 if wcs := row.getWcs(): 

829 input_exposure.setWcs(wcs) 

830 else: 

831 self.log.info("No WCS found in visit summary for detector = %s. Skipping it.", detector) 

832 return False 

833 

834 if self.config.useVisitSummaryPsf: 

835 if psf := row.getPsf(): 

836 input_exposure.setPsf(psf) 

837 else: 

838 self.log.info("No PSF found in visit summary for detector = %s. Skipping it.", detector) 

839 return False 

840 

841 if apcorr_map := row.getApCorrMap(): 

842 input_exposure.setApCorrMap(apcorr_map) 

843 else: 

844 self.log.info( 

845 "No aperture correction map found in visit summary for detector = %s. Skipping it", 

846 detector, 

847 ) 

848 return False 

849 

850 if self.config.doApplyNewBackground: 

851 detector_inputs.apply_background() 

852 elif detector_inputs.background_apply: 

853 # This could get trigged only if runQuantum is skipped and run is 

854 # called directly. 

855 raise RuntimeError( 

856 f"doApplyNewBackground is False, but {detector_inputs.data_id} has a background_apply." 

857 ) 

858 

859 # Calibrate the (masked) image. 

860 # This should likely happen even if visit_summary is None. 

861 photo_calib = input_exposure.photoCalib 

862 input_exposure.maskedImage = photo_calib.calibrateImage(input_exposure.maskedImage) 

863 # This new PhotoCalib shouldn't need to be used, but setting it here 

864 # to reflect the fact that the image now has calibrated pixels might 

865 # help avoid future bugs. 

866 input_exposure.setPhotoCalib(PhotoCalib(1.0)) 

867 

868 return True 

869 

870 @classmethod 

871 def _prepareEmptyExposure(cls, bbox, wcs): 

872 """Produce an empty exposure for a given patch. 

873 

874 Parameters 

875 ---------- 

876 bbox : `lsst.geom.Box2I` 

877 Bounding box for the exposure. 

878 wcs : `lsst.afw.geom.SkyWcs` 

879 Mapping from pixels to sky. 

880 

881 Returns 

882 ------- 

883 exp : `lsst.afw.image.exposure.ExposureF` 

884 An empty exposure for a given patch. 

885 """ 

886 exp = ExposureF(bbox, wcs) 

887 exp.getMaskedImage().set(np.nan, Mask.getPlaneBitMask("NO_DATA"), np.inf) 

888 # Set the PhotoCalib to 1 to mean that pixels are nJy, since we will 

889 # calibrate them before we warp them. 

890 exp.setPhotoCalib(PhotoCalib(1.0)) 

891 exp.metadata["BUNIT"] = "nJy" 

892 return exp 

893 

894 @staticmethod 

895 def compute_median_variance(mi: MaskedImage) -> float: 

896 """Compute the median variance across the good pixels of a MaskedImage. 

897 

898 Parameters 

899 ---------- 

900 mi : `~lsst.afw.image.MaskedImage` 

901 The input image on which to compute the median variance. 

902 

903 Returns 

904 ------- 

905 median_variance : `float` 

906 Median variance of the input calexp. 

907 """ 

908 # Shouldn't this exclude pixels that are masked, to be safe? 

909 # This is implemented as it was in descwl_coadd. 

910 return np.median(mi.variance.array[np.isfinite(mi.variance.array) & np.isfinite(mi.image.array)]) 

911 

912 def get_seed_from_data_id(self, data_id) -> int: 

913 """Get a seed value given a data_id. 

914 

915 This method generates a unique, reproducible pseudo-random number for 

916 a data id. This is not affected by ordering of the input, or what 

917 set of visits, ccds etc. are given. 

918 

919 This is implemented as a public method, so that simulations that 

920 don't necessary deal with the middleware can mock up a ``data_id`` 

921 instance, or override this method with a different one to obtain a 

922 seed value consistent with the pipeline task. 

923 

924 Parameters 

925 ---------- 

926 data_id : `~lsst.daf.butler.DataCoordinate` 

927 Data identifier dictionary. 

928 

929 Returns 

930 ------- 

931 seed : `int` 

932 A unique seed for this data_id to seed a random number generator. 

933 """ 

934 return self.config.idGenerator.apply(data_id).catalog_id 

935 

936 def make_noise_exposures(self, calexp: ExposureF, rng) -> dict[int, ExposureF]: 

937 """Make pure noise realizations based on ``calexp``. 

938 

939 Parameters 

940 ---------- 

941 calexp : `~lsst.afw.image.ExposureF` 

942 The input exposure on which to base the noise realizations. 

943 rng : `np.random.RandomState` 

944 Random number generator to use for the noise realizations. 

945 

946 Returns 

947 ------- 

948 noise_calexps : `dict` [`int`, `~lsst.afw.image.ExposureF`] 

949 A mapping of integers ranging from 0 up to 

950 config.numberOfNoiseRealizations to the corresponding 

951 noise realization exposures. 

952 """ 

953 noise_calexps = {} 

954 

955 # If no noise exposures are requested, return the empty dictionary 

956 # without any further computations. 

957 if self.config.numberOfNoiseRealizations == 0: 

958 return noise_calexps 

959 

960 if self.config.useMedianVariance: 

961 variance = self.compute_median_variance(calexp.maskedImage) 

962 else: 

963 variance = calexp.variance.array 

964 

965 for n_noise in range(self.config.numberOfNoiseRealizations): 

966 noise_calexp = calexp.clone() 

967 noise_calexp.image.array[:, :] = rng.normal( 

968 scale=np.sqrt(variance), 

969 size=noise_calexp.image.array.shape, 

970 ) 

971 noise_calexp.variance.array[:, :] = variance 

972 noise_calexps[n_noise] = noise_calexp 

973 

974 return noise_calexps 

975 

976 @classmethod 

977 def _get_bad_mask(cls, exp: ExposureF, badMaskPlanes: Iterable[str]) -> ExposureF: 

978 """Get an Exposure of bad mask 

979 

980 Parameters 

981 ---------- 

982 exp: `lsst.afw.image.Exposure` 

983 The exposure data. 

984 badMaskPlanes: `list` [`str`] 

985 List of mask planes to be considered as bad. 

986 

987 Returns 

988 ------- 

989 bad_mask: `~lsst.afw.image.Exposure` 

990 An Exposure with boolean array with True if inverse variance <= 0 

991 or if any of the badMaskPlanes bits are set, and False otherwise. 

992 """ 

993 

994 bad_mask = exp.clone() 

995 

996 var = exp.variance.array 

997 mask = exp.mask.array 

998 

999 bitMask = exp.mask.getPlaneBitMask(badMaskPlanes) 

1000 

1001 bad_mask.image.array[:, :] = (var < 0) | np.isinf(var) | ((mask & bitMask) != 0) 

1002 

1003 bad_mask.variance.array *= 0.0 

1004 

1005 return bad_mask