Coverage for python / lsst / drp / tasks / make_direct_warp.py: 23%

324 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-22 09:04 +0000

1# This file is part of drp_tasks. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21 

22from __future__ import annotations 

23 

24import dataclasses 

25from collections.abc import Mapping 

26from typing import TYPE_CHECKING, Iterable 

27 

28import numpy as np 

29 

30from lsst.afw.image import ExposureF, Image, Mask, PhotoCalib 

31from lsst.afw.math import BackgroundList, Warper 

32from lsst.coadd.utils import copyGoodPixels 

33from lsst.daf.butler import DataCoordinate, DeferredDatasetHandle 

34from lsst.geom import Box2D 

35from lsst.meas.algorithms import CoaddPsf, CoaddPsfConfig, backgroundFlatContext 

36from lsst.meas.algorithms.cloughTocher2DInterpolator import CloughTocher2DInterpolateTask 

37from lsst.meas.base import DetectorVisitIdGeneratorConfig 

38from lsst.pex.config import ConfigField, ConfigurableField, Field, RangeField 

39from lsst.pipe.base import ( 

40 InMemoryDatasetHandle, 

41 NoWorkFound, 

42 PipelineTask, 

43 PipelineTaskConfig, 

44 PipelineTaskConnections, 

45 Struct, 

46) 

47from lsst.pipe.base.connectionTypes import Input, Output 

48from lsst.pipe.tasks.coaddBase import makeSkyInfo 

49from lsst.pipe.tasks.coaddInputRecorder import CoaddInputRecorderTask 

50from lsst.pipe.tasks.selectImages import PsfWcsSelectImagesTask 

51from lsst.skymap import BaseSkyMap 

52 

53if TYPE_CHECKING: 

54 from lsst.afw.image import MaskedImage 

55 from lsst.afw.table import ExposureCatalog 

56 

57 

58__all__ = ( 

59 "MakeDirectWarpConfig", 

60 "MakeDirectWarpTask", 

61) 

62 

63 

64@dataclasses.dataclass 

65class WarpDetectorInputs: 

66 """Inputs passed to `MakeDirectWarpTask.run` for a single detector.""" 

67 

68 exposure_or_handle: ExposureF | DeferredDatasetHandle | InMemoryDatasetHandle 

69 """Detector image with initial calibration objects, or a deferred-load 

70 handle for one. 

71 """ 

72 

73 data_id: DataCoordinate 

74 """Butler data ID for this detector.""" 

75 

76 background_revert: BackgroundList | None = None 

77 """Background model to restore in (i.e. add to) the image.""" 

78 

79 background_apply: BackgroundList | None = None 

80 """Background model to apply to (i.e. subtract from) the image.""" 

81 

82 background_ratio_or_handle: Image | DeferredDatasetHandle | InMemoryDatasetHandle | None = None 

83 """Ratio of background-flattened image to photometric-flattened image.""" 

84 

85 @property 

86 def exposure(self) -> ExposureF: 

87 """Get the exposure object, loading it if necessary.""" 

88 if not isinstance(self.exposure_or_handle, ExposureF): 

89 self.exposure_or_handle = self.exposure_or_handle.get() 

90 

91 return self.exposure_or_handle 

92 

93 @property 

94 def background_to_photometric_ratio(self) -> Image: 

95 """Get the background_to_photometric object, loading if necessary.""" 

96 if isinstance(self.background_ratio_or_handle, (DeferredDatasetHandle, InMemoryDatasetHandle)): 

97 self.background_ratio_or_handle = self.background_ratio_or_handle.get() 

98 

99 return self.background_ratio_or_handle 

100 

101 def apply_background(self) -> None: 

102 """Apply (subtract) the `background_apply` to the exposure in-place. 

103 

104 Raises 

105 ------ 

106 RuntimeError 

107 Raised if `background_apply` is None. 

108 """ 

109 if self.background_apply is None: 

110 raise RuntimeError("No background to apply") 

111 

112 if self.background_apply: 

113 with backgroundFlatContext( 

114 self.exposure.maskedImage, 

115 self.background_to_photometric_ratio is not None, 

116 backgroundToPhotometricRatio=self.background_to_photometric_ratio, 

117 ): 

118 self.exposure.maskedImage -= self.background_apply.getImage() 

119 

120 def revert_background(self) -> None: 

121 """Revert (add) the `background_revert` from the exposure in-place. 

122 

123 Raises 

124 ------ 

125 RuntimeError 

126 Raised if `background_revert` is None. 

127 """ 

128 if self.background_revert is None: 

129 raise RuntimeError("No background to revert") 

130 

131 if self.background_revert: 

132 with backgroundFlatContext( 

133 self.exposure.maskedImage, 

134 self.background_to_photometric_ratio is not None, 

135 backgroundToPhotometricRatio=self.background_to_photometric_ratio, 

136 ): 

137 # Add only if `background_revert` is not a trivial background. 

138 self.exposure.maskedImage += self.background_revert.getImage() 

139 

140 

141class MakeDirectWarpConnections( 

142 PipelineTaskConnections, 

143 dimensions=("tract", "patch", "skymap", "instrument", "visit"), 

144 defaultTemplates={ 

145 "coaddName": "deep", 

146 }, 

147): 

148 """Connections for MakeWarpTask""" 

149 

150 calexp_list = Input( 

151 doc="Input exposures to be interpolated and resampled onto a SkyMap " "projection/patch.", 

152 name="calexp", 

153 storageClass="ExposureF", 

154 dimensions=("instrument", "visit", "detector"), 

155 multiple=True, 

156 deferLoad=True, 

157 ) 

158 background_revert_list = Input( 

159 doc="Background to be reverted (i.e., added back to the calexp). " 

160 "This connection is used only if doRevertOldBackground=True.", 

161 name="calexpBackground", 

162 storageClass="Background", 

163 dimensions=("instrument", "visit", "detector"), 

164 multiple=True, 

165 ) 

166 background_apply_list = Input( 

167 doc="Background to be applied (subtracted from the calexp). " 

168 "This is used only if doApplyNewBackground=True.", 

169 name="skyCorr", 

170 storageClass="Background", 

171 dimensions=("instrument", "visit", "detector"), 

172 multiple=True, 

173 ) 

174 background_to_photometric_ratio_list = Input( 

175 doc="Ratio of a background-flattened image to a photometric-flattened image. " 

176 "This is used only if doRevertOldBackground=True or doApplyNewBackground=True " 

177 "and doApplyFlatBackgroundRatio=True.", 

178 name="background_to_photometric_ratio", 

179 storageClass="Image", 

180 dimensions=("instrument", "visit", "detector"), 

181 multiple=True, 

182 deferLoad=True, 

183 ) 

184 visit_summary = Input( 

185 doc="Input visit-summary catalog with updated calibration objects.", 

186 name="finalVisitSummary", 

187 storageClass="ExposureCatalog", 

188 dimensions=("instrument", "visit"), 

189 ) 

190 sky_map = Input( 

191 doc="Input definition of geometry/bbox and projection/wcs for warps.", 

192 name=BaseSkyMap.SKYMAP_DATASET_TYPE_NAME, 

193 storageClass="SkyMap", 

194 dimensions=("skymap",), 

195 ) 

196 # Declare all possible outputs (except noise, which is configurable) 

197 warp = Output( 

198 doc="Output direct warped exposure produced by resampling calexps " "onto the skyMap patch geometry.", 

199 name="{coaddName}Coadd_directWarp", 

200 storageClass="ExposureF", 

201 dimensions=("tract", "patch", "skymap", "instrument", "visit"), 

202 ) 

203 masked_fraction_warp = Output( 

204 doc="Output masked fraction warped exposure.", 

205 name="{coaddName}Coadd_directWarp_maskedFraction", 

206 storageClass="ImageF", 

207 dimensions=("tract", "patch", "skymap", "instrument", "visit"), 

208 ) 

209 

210 def __init__(self, *, config=None): 

211 super().__init__(config=config) 

212 if not config: 

213 return 

214 

215 if not config.doRevertOldBackground: 

216 del self.background_revert_list 

217 if not config.doApplyNewBackground: 

218 del self.background_apply_list 

219 if not config.doApplyFlatBackgroundRatio or ( 

220 not config.doRevertOldBackground and not config.doApplyNewBackground 

221 ): 

222 del self.background_to_photometric_ratio_list 

223 

224 if not config.doWarpMaskedFraction: 

225 del self.masked_fraction_warp 

226 

227 # Dynamically set output connections for noise images, depending on the 

228 # number of noise realization specified in the config. 

229 for n in range(config.numberOfNoiseRealizations): 

230 noise_warp = Output( 

231 doc=f"Output direct warped noise exposure ({n})", 

232 name=f"direct_warp_noise{n}", 

233 # Store it as a MaskedImage to preserve the variance plane. 

234 storageClass="MaskedImageF", 

235 dimensions=("tract", "patch", "skymap", "instrument", "visit"), 

236 ) 

237 setattr(self, f"noise_warp{n}", noise_warp) 

238 

239 

240class MakeDirectWarpConfig( 

241 PipelineTaskConfig, 

242 pipelineConnections=MakeDirectWarpConnections, 

243): 

244 """Configuration for the MakeDirectWarpTask. 

245 

246 The config fields are as similar as possible to the corresponding fields in 

247 MakeWarpConfig. 

248 

249 Notes 

250 ----- 

251 The config fields are in camelCase to match the fields in the earlier 

252 version of the makeWarp task as closely as possible. 

253 """ 

254 

255 MAX_NUMBER_OF_NOISE_REALIZATIONS = 3 

256 """ 

257 numberOfNoiseRealizations is defined as a RangeField to prevent from 

258 making multiple output connections and blowing up the memory usage by 

259 accident. An upper bound of 3 is based on the best guess of the maximum 

260 number of noise realizations that will be used for metadetection. 

261 """ 

262 

263 numberOfNoiseRealizations = RangeField[int]( 

264 doc="Number of noise realizations to simulate and persist.", 

265 default=0, 

266 min=0, 

267 max=MAX_NUMBER_OF_NOISE_REALIZATIONS, 

268 inclusiveMax=True, 

269 ) 

270 seedOffset = Field[int]( 

271 doc="Offset to the seed used for the noise realization. This can be " 

272 "used to create a different noise realization if the default ones " 

273 "are catastrophic, or for testing sensitivity to the noise.", 

274 default=0, 

275 ) 

276 useMedianVariance = Field[bool]( 

277 doc="Use the median of variance plane in the input calexp to generate " 

278 "noise realizations? If False, per-pixel variance will be used.", 

279 default=True, 

280 ) 

281 doRevertOldBackground = Field[bool]( 

282 doc="Revert the old backgrounds from the `background_revert_list` " "connection?", 

283 default=False, 

284 ) 

285 doApplyNewBackground = Field[bool]( 

286 doc="Apply the new backgrounds from the `background_apply_list` " "connection?", 

287 default=False, 

288 ) 

289 doApplyFlatBackgroundRatio = Field[bool]( 

290 doc="Apply flat background ratio prior to background adjustments? Should be True " 

291 "if processing was done with an illumination correction.", 

292 default=False, 

293 ) 

294 useVisitSummaryPsf = Field[bool]( 

295 doc="If True, use the PSF model and aperture corrections from the " 

296 "'visit_summary' connection to make the warp. If False, use the " 

297 "PSF model and aperture corrections from the 'calexp' connection.", 

298 default=True, 

299 ) 

300 useVisitSummaryWcs = Field[bool]( 

301 doc="If True, use the WCS from the " 

302 "'visit_summary' connection to make the warp. If False, use the " 

303 "WCS from the 'calexp' connection.", 

304 default=True, 

305 ) 

306 useVisitSummaryPhotoCalib = Field[bool]( 

307 doc="If True, use the photometric calibration from the " 

308 "'visit_summary' connection to make the warp. If False, use the " 

309 "photometric calibration from the 'calexp' connection.", 

310 default=True, 

311 ) 

312 doSelectPreWarp = Field[bool]( 

313 doc="Select ccds before warping?", 

314 default=True, 

315 ) 

316 select = ConfigurableField( 

317 doc="Image selection subtask.", 

318 target=PsfWcsSelectImagesTask, 

319 ) 

320 doPreWarpInterpolation = Field[bool]( 

321 doc="Interpolate over bad pixels before warping?", 

322 default=False, 

323 ) 

324 preWarpInterpolation = ConfigurableField( 

325 doc="Interpolation task to use for pre-warping interpolation", 

326 target=CloughTocher2DInterpolateTask, 

327 ) 

328 inputRecorder = ConfigurableField( 

329 doc="Subtask that helps fill CoaddInputs catalogs added to the final " "coadd", 

330 target=CoaddInputRecorderTask, 

331 ) 

332 includeCalibVar = Field[bool]( 

333 doc="Add photometric calibration variance to warp variance plane?", 

334 default=False, 

335 deprecated="Deprecated and disabled. Will be removed after v29.", 

336 ) 

337 border = Field[int]( 

338 doc="Pad the patch boundary of the warp by these many pixels, so as to allow for PSF-matching later", 

339 default=256, 

340 ) 

341 warper = ConfigField( 

342 doc="Configuration for the warper that warps the image and noise", 

343 dtype=Warper.ConfigClass, 

344 ) 

345 doWarpMaskedFraction = Field[bool]( 

346 doc="Warp the masked fraction image?", 

347 default=True, 

348 ) 

349 maskedFractionWarper = ConfigField( 

350 doc="Configuration for the warp that warps the mask fraction image", 

351 dtype=Warper.ConfigClass, 

352 ) 

353 coaddPsf = ConfigField( 

354 doc="Configuration for CoaddPsf", 

355 dtype=CoaddPsfConfig, 

356 ) 

357 idGenerator = DetectorVisitIdGeneratorConfig.make_field() 

358 

359 # Use bgSubtracted and doApplySkyCorr to match the old MakeWarpConfig, 

360 # but as properties instead of config fields. 

361 @property 

362 def bgSubtracted(self) -> bool: 

363 return not self.doRevertOldBackground 

364 

365 @bgSubtracted.setter 

366 def bgSubtracted(self, value: bool) -> None: 

367 self.doRevertOldBackground = not value 

368 

369 @property 

370 def doApplySkyCorr(self) -> bool: 

371 return self.doApplyNewBackground 

372 

373 @doApplySkyCorr.setter 

374 def doApplySkyCorr(self, value: bool) -> None: 

375 self.doApplyNewBackground = value 

376 

377 def setDefaults(self) -> None: 

378 super().setDefaults() 

379 self.warper.warpingKernelName = "lanczos3" 

380 self.warper.cacheSize = 0 

381 self.maskedFractionWarper.warpingKernelName = "bilinear" 

382 

383 

384class MakeDirectWarpTask(PipelineTask): 

385 """Warp single-detector images onto a common projection. 

386 

387 This task iterates over multiple images (corresponding to different 

388 detectors) from a single visit that overlap the target patch. Pixels that 

389 receive no input from any detector are set to NaN in the output image, and 

390 NO_DATA bit is set in the mask plane. 

391 

392 This differs from the standard `MakeWarp` Task in the following 

393 ways: 

394 

395 1. No selection on ccds at the time of warping. This is done later during 

396 the coaddition stage. 

397 2. Interpolate over a set of masked pixels before warping. 

398 3. Generate an image where each pixel denotes how much of the pixel is 

399 masked. 

400 4. Generate multiple noise warps with the same interpolation applied. 

401 5. No option to produce a PSF-matched warp. 

402 """ 

403 

404 ConfigClass = MakeDirectWarpConfig 

405 _DefaultName = "makeDirectWarp" 

406 

407 def __init__(self, **kwargs): 

408 super().__init__(**kwargs) 

409 self.makeSubtask("inputRecorder") 

410 self.makeSubtask("preWarpInterpolation") 

411 if self.config.doSelectPreWarp: 

412 self.makeSubtask("select") 

413 

414 self.warper = Warper.fromConfig(self.config.warper) 

415 if self.config.doWarpMaskedFraction: 

416 self.maskedFractionWarper = Warper.fromConfig(self.config.maskedFractionWarper) 

417 

418 def runQuantum(self, butlerQC, inputRefs, outputRefs): 

419 # Docstring inherited. 

420 

421 inputs: Mapping[int, WarpDetectorInputs] = {} # Detector ID -> WarpDetectorInputs 

422 for handle in butlerQC.get(inputRefs.calexp_list): 

423 inputs[handle.dataId["detector"]] = WarpDetectorInputs( 

424 exposure_or_handle=handle, 

425 data_id=handle.dataId, 

426 ) 

427 

428 if not inputs: 

429 raise NoWorkFound("No input warps provided for co-addition") 

430 

431 # Add backgrounds to the inputs struct, being careful not to assume 

432 # they're present for the same detectors we got image handles for, in 

433 # case of upstream errors. 

434 for ref in getattr(inputRefs, "background_revert_list", []): 

435 detector_id = ref.dataId["detector"] 

436 if detector_id in inputs: 

437 inputs[detector_id].background_revert = butlerQC.get(ref) 

438 else: 

439 self.log.info("Got %s for detector %d but no image", ref.datasetType.name, detector_id) 

440 for ref in getattr(inputRefs, "background_apply_list", []): 

441 detector_id = ref.dataId["detector"] 

442 if detector_id in inputs: 

443 inputs[detector_id].background_apply = butlerQC.get(ref) 

444 else: 

445 self.log.info("Got %s for detector %d but no image", ref.datasetType.name, detector_id) 

446 for ref in getattr(inputRefs, "background_to_photometric_ratio_list", []): 

447 detector_id = ref.dataId["detector"] 

448 if detector_id in inputs: 

449 inputs[detector_id].background_ratio_or_handle = butlerQC.get(ref) 

450 else: 

451 self.log.info("Got %s for detector %d but no image", ref.datasetType.name, detector_id) 

452 

453 visit_summary = butlerQC.get(inputRefs.visit_summary) 

454 sky_map = butlerQC.get(inputRefs.sky_map) 

455 

456 quantumDataId = butlerQC.quantum.dataId 

457 sky_info = makeSkyInfo( 

458 sky_map, 

459 tractId=quantumDataId["tract"], 

460 patchId=quantumDataId["patch"], 

461 ) 

462 

463 results = self.run(inputs, sky_info, visit_summary=visit_summary) 

464 butlerQC.put(results, outputRefs) 

465 

466 def _preselect_inputs( 

467 self, 

468 inputs: Mapping[int, WarpDetectorInputs], 

469 sky_info: Struct, 

470 visit_summary: ExposureCatalog, 

471 ) -> dict[int, WarpDetectorInputs]: 

472 """Filter the list of inputs via a 'select' subtask. 

473 

474 Parameters 

475 ---------- 

476 inputs : ``~collections.abc.Mapping` [ `int`, `WarpDetectorInputs` ] 

477 Per-detector input structs. 

478 sky_info : `lsst.pipe.base.Struct` 

479 Structure with information about the tract and patch. 

480 visit_summary : `~lsst.afw.table.ExposureCatalog` 

481 Record with updated calibration information for the full visit. 

482 

483 Returns 

484 ------- 

485 filtered_inputs : `dict` [ `int`, `WarpDetectorInputs` ] 

486 Like ``inputs``, with rejected detectors dropped. 

487 """ 

488 data_id_list, bbox_list, wcs_list = [], [], [] 

489 for detector_id, detector_inputs in inputs.items(): 

490 row = visit_summary.find(detector_id) 

491 if row is None: 

492 self.log.info( 

493 "Input calexp is not listed in visit_summary: %d; assuming bad.", 

494 detector_id, 

495 ) 

496 continue 

497 data_id_list.append(detector_inputs.data_id) 

498 bbox_list.append(row.getBBox()) 

499 wcs_list.append(row.getWcs()) 

500 

501 cornerPosList = Box2D(sky_info.bbox).getCorners() 

502 coordList = [sky_info.wcs.pixelToSky(pos) for pos in cornerPosList] 

503 

504 good_indices = self.select.run( 

505 bboxList=bbox_list, 

506 wcsList=wcs_list, 

507 visitSummary=visit_summary, 

508 coordList=coordList, 

509 dataIds=data_id_list, 

510 ) 

511 detector_ids = list(inputs.keys()) 

512 good_detector_ids = [detector_ids[i] for i in good_indices] 

513 return {detector_id: inputs[detector_id] for detector_id in good_detector_ids} 

514 

515 def run(self, inputs: Mapping[int, WarpDetectorInputs], sky_info, visit_summary) -> Struct: 

516 """Create a Warp dataset from inputs. 

517 

518 Parameters 

519 ---------- 

520 inputs : `Mapping` [ `int`, `WarpDetectorInputs` ] 

521 Dictionary of input datasets, with the detector id being the key. 

522 sky_info : `~lsst.pipe.base.Struct` 

523 A Struct object containing wcs, bounding box, and other information 

524 about the patches within the tract. 

525 visit_summary : `~lsst.afw.table.ExposureCatalog` | None 

526 Table of visit summary information. If provided, the visit summary 

527 information will be used to update the calibration of the input 

528 exposures. If None, the input exposures will be used as-is. 

529 

530 Returns 

531 ------- 

532 results : `~lsst.pipe.base.Struct` 

533 A Struct object containing the warped exposure, noise exposure(s), 

534 and masked fraction image. 

535 """ 

536 if self.config.doSelectPreWarp: 

537 inputs = self._preselect_inputs(inputs, sky_info, visit_summary=visit_summary) 

538 if not inputs: 

539 raise NoWorkFound("No input warps remain after selection for co-addition") 

540 

541 sky_info.bbox.grow(self.config.border) 

542 target_bbox, target_wcs = sky_info.bbox, sky_info.wcs 

543 

544 # Initialize the objects that will hold the warp. 

545 final_warp = ExposureF(target_bbox, target_wcs) 

546 

547 for _, warp_detector_input in inputs.items(): 

548 visit_id = warp_detector_input.data_id["visit"] 

549 break # Just need the visit id from any one of the inputs. 

550 

551 # The warpExposure routine is expensive, and we do not want to call 

552 # it twice (i.e., a second time for PSF-matched warps). We do not 

553 # want to hold all the warped exposures in memory at once either. 

554 # So we create empty exposure(s) to accumulate the warps of each type, 

555 # and then process each detector serially. 

556 final_warp = self._prepareEmptyExposure(sky_info) 

557 final_masked_fraction_warp = self._prepareEmptyExposure(sky_info) 

558 final_noise_warps = { 

559 n_noise: self._prepareEmptyExposure(sky_info) 

560 for n_noise in range(self.config.numberOfNoiseRealizations) 

561 } 

562 

563 # We need a few bookkeeping variables only for the science coadd. 

564 totalGoodPixels = 0 

565 inputRecorder = self.inputRecorder.makeCoaddTempExpRecorder( 

566 visit_id, 

567 len(inputs), 

568 ) 

569 

570 for index, detector_inputs in enumerate(inputs.values()): 

571 self.log.debug( 

572 "Warping exposure %d/%d for id=%s", 

573 index + 1, 

574 len(inputs), 

575 detector_inputs.data_id, 

576 ) 

577 

578 input_exposure = detector_inputs.exposure 

579 # Generate noise image(s) in-situ. 

580 seed = self.get_seed_from_data_id(detector_inputs.data_id) 

581 # Limit to last 32 bits to avoid overflow in numpy. 

582 np_seed = (seed + self.config.seedOffset) & 0xFFFFFFFF 

583 self.log.debug("Setting numpy random seed to %d for noise realization", np_seed) 

584 rng = np.random.RandomState(np_seed) 

585 

586 # Generate noise images in-situ. 

587 noise_calexps = self.make_noise_exposures(input_exposure, rng) 

588 

589 warpedExposure = self.process( 

590 detector_inputs, 

591 target_wcs, 

592 self.warper, 

593 visit_summary, 

594 destBBox=target_bbox, 

595 ) 

596 

597 if warpedExposure is None: 

598 self.log.debug( 

599 "Skipping exposure %s because it could not be warped.", detector_inputs.data_id 

600 ) 

601 continue 

602 

603 # Accumulate the partial warps in an online fashion. 

604 nGood = copyGoodPixels( 

605 final_warp.maskedImage, 

606 warpedExposure.maskedImage, 

607 final_warp.mask.getPlaneBitMask(["NO_DATA"]), 

608 ) 

609 

610 ccdId = self.config.idGenerator.apply(detector_inputs.data_id).catalog_id 

611 inputRecorder.addCalExp(input_exposure, ccdId, nGood) 

612 totalGoodPixels += nGood 

613 

614 if self.config.doWarpMaskedFraction: 

615 # Obtain the masked fraction exposure and warp it. 

616 if self.config.doPreWarpInterpolation: 

617 badMaskPlanes = self.preWarpInterpolation.config.badMaskPlanes 

618 else: 

619 badMaskPlanes = [] 

620 masked_fraction_exp = self._get_bad_mask(input_exposure, badMaskPlanes) 

621 

622 masked_fraction_warp = self.maskedFractionWarper.warpExposure( 

623 target_wcs, masked_fraction_exp, destBBox=target_bbox 

624 ) 

625 

626 copyGoodPixels( 

627 final_masked_fraction_warp.maskedImage, 

628 masked_fraction_warp.maskedImage, 

629 final_masked_fraction_warp.mask.getPlaneBitMask(["NO_DATA"]), 

630 ) 

631 

632 # Process and accumulate noise images. 

633 for n_noise in range(self.config.numberOfNoiseRealizations): 

634 noise_calexp = noise_calexps[n_noise] 

635 noise_pseudo_inputs = dataclasses.replace( 

636 detector_inputs, 

637 exposure_or_handle=noise_calexp, 

638 background_revert=BackgroundList(), 

639 background_apply=BackgroundList(), 

640 ) 

641 warpedNoise = self.process( 

642 noise_pseudo_inputs, 

643 target_wcs, 

644 self.warper, 

645 visit_summary, 

646 destBBox=target_bbox, 

647 ) 

648 

649 copyGoodPixels( 

650 final_noise_warps[n_noise].maskedImage, 

651 warpedNoise.maskedImage, 

652 final_noise_warps[n_noise].mask.getPlaneBitMask(["NO_DATA"]), 

653 ) 

654 

655 # If there are no good pixels, return a Struct filled with None. 

656 if totalGoodPixels == 0: 

657 results = Struct( 

658 warp=None, 

659 masked_fraction_warp=None, 

660 ) 

661 for noise_index in range(self.config.numberOfNoiseRealizations): 

662 setattr(results, f"noise_warp{noise_index}", None) 

663 

664 return results 

665 

666 # Finish the inputRecorder and add the coaddPsf to the final warp. 

667 inputRecorder.finish(final_warp, totalGoodPixels) 

668 

669 coaddPsf = CoaddPsf( 

670 inputRecorder.coaddInputs.ccds, 

671 sky_info.wcs, 

672 self.config.coaddPsf.makeControl(), 

673 ) 

674 

675 final_warp.setPsf(coaddPsf) 

676 for _, warp_detector_input in inputs.items(): 

677 final_warp.setFilter(warp_detector_input.exposure.getFilter()) 

678 final_warp.getInfo().setVisitInfo(warp_detector_input.exposure.getInfo().getVisitInfo()) 

679 break # Just need the filter and visit info from any one of the inputs. 

680 

681 results = Struct( 

682 warp=final_warp, 

683 ) 

684 

685 if self.config.doWarpMaskedFraction: 

686 results.masked_fraction_warp = final_masked_fraction_warp.image 

687 

688 for noise_index, noise_exposure in final_noise_warps.items(): 

689 setattr(results, f"noise_warp{noise_index}", noise_exposure.maskedImage) 

690 

691 return results 

692 

693 def process( 

694 self, 

695 detector_inputs: WarpDetectorInputs, 

696 target_wcs, 

697 warper, 

698 visit_summary=None, 

699 maxBBox=None, 

700 destBBox=None, 

701 ) -> ExposureF | None: 

702 """Process an exposure. 

703 

704 There are three processing steps that are applied to the input: 

705 

706 1. Interpolate over bad pixels before warping. 

707 2. Apply all calibrations from visit_summary to the exposure. 

708 3. Warp the exposure to the target coordinate system. 

709 

710 Parameters 

711 ---------- 

712 detector_inputs : `WarpDetectorInputs` 

713 The input exposure to be processed, along with any other 

714 per-detector modifications. 

715 target_wcs : `~lsst.afw.geom.SkyWcs` 

716 The WCS of the target patch. 

717 warper : `~lsst.afw.math.Warper` 

718 The warper to use for warping the input exposure. 

719 visit_summary : `~lsst.afw.table.ExposureCatalog` | None 

720 Table of visit summary information. If not None, the visit_summary 

721 information will be used to update the calibration of the input 

722 exposures. Otherwise, the input exposures will be used as-is. 

723 maxBBox : `~lsst.geom.Box2I` | None 

724 Maximum bounding box of the warped exposure. If None, this is 

725 determined automatically. 

726 destBBox : `~lsst.geom.Box2I` | None 

727 Exact bounding box of the warped exposure. If None, this is 

728 determined automatically. 

729 

730 Returns 

731 ------- 

732 warped_exposure : `~lsst.afw.image.Exposure` | None 

733 The processed and warped exposure, if all the calibrations could 

734 be applied successfully. Otherwise, None. 

735 """ 

736 

737 input_exposure = detector_inputs.exposure 

738 if self.config.doPreWarpInterpolation: 

739 self.preWarpInterpolation.run(input_exposure.maskedImage) 

740 

741 success = self._apply_all_calibrations(detector_inputs, visit_summary=visit_summary) 

742 

743 if not success: 

744 return None 

745 

746 with self.timer("warp"): 

747 warped_exposure = warper.warpExposure( 

748 target_wcs, 

749 input_exposure, 

750 maxBBox=maxBBox, 

751 destBBox=destBBox, 

752 ) 

753 

754 # Potentially a post-warp interpolation here? Relies on DM-38630. 

755 

756 return warped_exposure 

757 

758 def _apply_all_calibrations( 

759 self, 

760 detector_inputs: WarpDetectorInputs, 

761 *, 

762 visit_summary: ExposureCatalog | None = None, 

763 ) -> bool: 

764 """Apply all of the calibrations from visit_summary to the exposure. 

765 

766 Specifically, this method updates the following (if available) to the 

767 input exposure in place from ``visit_summary``: 

768 

769 - Aperture correction map 

770 - Photometric calibration 

771 - PSF 

772 - WCS 

773 

774 It also reverts and applies backgrounds in ``detector_inputs``. 

775 

776 Parameters 

777 ---------- 

778 detector_inputs : `WarpDetectorInputs` 

779 The input exposure to be processed, along with any other 

780 per-detector modifications. 

781 visit_summary : `~lsst.afw.table.ExposureCatalog` | None 

782 Table of visit summary information. If not None, the visit summary 

783 information will be used to update the calibration of the input 

784 exposures. Otherwise, the input exposures will be used as-is. 

785 

786 Returns 

787 ------- 

788 success : `bool` 

789 True if all calibrations were successfully applied, 

790 False otherwise. 

791 

792 Raises 

793 ------ 

794 RuntimeError 

795 Raised if ``visit_summary`` is provided but does not contain a 

796 record corresponding to ``detector_inputs``, or if one of the 

797 background fields in ``detector_inputs`` is inconsistent with the 

798 task configuration. 

799 """ 

800 if self.config.doRevertOldBackground: 

801 detector_inputs.revert_background() 

802 elif detector_inputs.background_revert: 

803 # This could get trigged only if runQuantum is skipped and run is 

804 # called directly. 

805 raise RuntimeError( 

806 f"doRevertOldBackground is False, but {detector_inputs.data_id} has a background_revert." 

807 ) 

808 

809 input_exposure = detector_inputs.exposure 

810 if visit_summary is not None: 

811 detector = input_exposure.info.getDetector().getId() 

812 row = visit_summary.find(detector) 

813 

814 if row is None: 

815 self.log.info( 

816 "Unexpectedly incomplete visit_summary: detector = %s is missing. Skipping it.", 

817 detector, 

818 ) 

819 return False 

820 

821 if self.config.useVisitSummaryPhotoCalib: 

822 photo_calib = row.getPhotoCalib() 

823 if photo_calib is None: 

824 self.log.info( 

825 "No photometric calibration found in visit summary for detector = %s. Skipping it.", 

826 detector, 

827 ) 

828 return False 

829 elif not np.isfinite(photo_calib.getInstFluxAtZeroMagnitude()): 

830 self.log.info( 

831 f"Invalid PhotoCalib found in visit summary for detector {detector}. Skipping it.", 

832 ) 

833 return False 

834 else: 

835 input_exposure.setPhotoCalib(photo_calib) 

836 

837 if self.config.useVisitSummaryWcs: 

838 if wcs := row.getWcs(): 

839 input_exposure.setWcs(wcs) 

840 else: 

841 self.log.info("No WCS found in visit summary for detector = %s. Skipping it.", detector) 

842 return False 

843 

844 if self.config.useVisitSummaryPsf: 

845 if psf := row.getPsf(): 

846 input_exposure.setPsf(psf) 

847 else: 

848 self.log.info("No PSF found in visit summary for detector = %s. Skipping it.", detector) 

849 return False 

850 

851 if apcorr_map := row.getApCorrMap(): 

852 input_exposure.setApCorrMap(apcorr_map) 

853 else: 

854 self.log.info( 

855 "No aperture correction map found in visit summary for detector = %s. Skipping it", 

856 detector, 

857 ) 

858 return False 

859 

860 if self.config.doApplyNewBackground: 

861 detector_inputs.apply_background() 

862 elif detector_inputs.background_apply: 

863 # This could get trigged only if runQuantum is skipped and run is 

864 # called directly. 

865 raise RuntimeError( 

866 f"doApplyNewBackground is False, but {detector_inputs.data_id} has a background_apply." 

867 ) 

868 

869 # Calibrate the (masked) image. 

870 # This should likely happen even if visit_summary is None. 

871 photo_calib = input_exposure.photoCalib 

872 input_exposure.maskedImage = photo_calib.calibrateImage(input_exposure.maskedImage) 

873 # This new PhotoCalib shouldn't need to be used, but setting it here 

874 # to reflect the fact that the image now has calibrated pixels might 

875 # help avoid future bugs. 

876 input_exposure.setPhotoCalib(PhotoCalib(1.0)) 

877 

878 return True 

879 

880 # This method is copied from makeWarp.py 

881 @classmethod 

882 def _prepareEmptyExposure(cls, sky_info): 

883 """Produce an empty exposure for a given patch. 

884 

885 Parameters 

886 ---------- 

887 sky_info : `lsst.pipe.base.Struct` 

888 Struct from `~lsst.pipe.base.coaddBase.makeSkyInfo` with 

889 geometric information about the patch. 

890 

891 Returns 

892 ------- 

893 exp : `lsst.afw.image.exposure.ExposureF` 

894 An empty exposure for a given patch. 

895 """ 

896 exp = ExposureF(sky_info.bbox, sky_info.wcs) 

897 exp.getMaskedImage().set(np.nan, Mask.getPlaneBitMask("NO_DATA"), np.inf) 

898 # Set the PhotoCalib to 1 to mean that pixels are nJy, since we will 

899 # calibrate them before we warp them. 

900 exp.setPhotoCalib(PhotoCalib(1.0)) 

901 exp.metadata["BUNIT"] = "nJy" 

902 return exp 

903 

904 @staticmethod 

905 def compute_median_variance(mi: MaskedImage) -> float: 

906 """Compute the median variance across the good pixels of a MaskedImage. 

907 

908 Parameters 

909 ---------- 

910 mi : `~lsst.afw.image.MaskedImage` 

911 The input image on which to compute the median variance. 

912 

913 Returns 

914 ------- 

915 median_variance : `float` 

916 Median variance of the input calexp. 

917 """ 

918 # Shouldn't this exclude pixels that are masked, to be safe? 

919 # This is implemented as it was in descwl_coadd. 

920 return np.median(mi.variance.array[np.isfinite(mi.variance.array) & np.isfinite(mi.image.array)]) 

921 

922 def get_seed_from_data_id(self, data_id) -> int: 

923 """Get a seed value given a data_id. 

924 

925 This method generates a unique, reproducible pseudo-random number for 

926 a data id. This is not affected by ordering of the input, or what 

927 set of visits, ccds etc. are given. 

928 

929 This is implemented as a public method, so that simulations that 

930 don't necessary deal with the middleware can mock up a ``data_id`` 

931 instance, or override this method with a different one to obtain a 

932 seed value consistent with the pipeline task. 

933 

934 Parameters 

935 ---------- 

936 data_id : `~lsst.daf.butler.DataCoordinate` 

937 Data identifier dictionary. 

938 

939 Returns 

940 ------- 

941 seed : `int` 

942 A unique seed for this data_id to seed a random number generator. 

943 """ 

944 return self.config.idGenerator.apply(data_id).catalog_id 

945 

946 def make_noise_exposures(self, calexp: ExposureF, rng) -> dict[int, ExposureF]: 

947 """Make pure noise realizations based on ``calexp``. 

948 

949 Parameters 

950 ---------- 

951 calexp : `~lsst.afw.image.ExposureF` 

952 The input exposure on which to base the noise realizations. 

953 rng : `np.random.RandomState` 

954 Random number generator to use for the noise realizations. 

955 

956 Returns 

957 ------- 

958 noise_calexps : `dict` [`int`, `~lsst.afw.image.ExposureF`] 

959 A mapping of integers ranging from 0 up to 

960 config.numberOfNoiseRealizations to the corresponding 

961 noise realization exposures. 

962 """ 

963 noise_calexps = {} 

964 

965 # If no noise exposures are requested, return the empty dictionary 

966 # without any further computations. 

967 if self.config.numberOfNoiseRealizations == 0: 

968 return noise_calexps 

969 

970 if self.config.useMedianVariance: 

971 variance = self.compute_median_variance(calexp.maskedImage) 

972 else: 

973 variance = calexp.variance.array 

974 

975 for n_noise in range(self.config.numberOfNoiseRealizations): 

976 noise_calexp = calexp.clone() 

977 noise_calexp.image.array[:, :] = rng.normal( 

978 scale=np.sqrt(variance), 

979 size=noise_calexp.image.array.shape, 

980 ) 

981 noise_calexp.variance.array[:, :] = variance 

982 noise_calexps[n_noise] = noise_calexp 

983 

984 return noise_calexps 

985 

986 @classmethod 

987 def _get_bad_mask(cls, exp: ExposureF, badMaskPlanes: Iterable[str]) -> ExposureF: 

988 """Get an Exposure of bad mask 

989 

990 Parameters 

991 ---------- 

992 exp: `lsst.afw.image.Exposure` 

993 The exposure data. 

994 badMaskPlanes: `list` [`str`] 

995 List of mask planes to be considered as bad. 

996 

997 Returns 

998 ------- 

999 bad_mask: `~lsst.afw.image.Exposure` 

1000 An Exposure with boolean array with True if inverse variance <= 0 

1001 or if any of the badMaskPlanes bits are set, and False otherwise. 

1002 """ 

1003 

1004 bad_mask = exp.clone() 

1005 

1006 var = exp.variance.array 

1007 mask = exp.mask.array 

1008 

1009 bitMask = exp.mask.getPlaneBitMask(badMaskPlanes) 

1010 

1011 bad_mask.image.array[:, :] = (var < 0) | np.isinf(var) | ((mask & bitMask) != 0) 

1012 

1013 bad_mask.variance.array *= 0.0 

1014 

1015 return bad_mask