Coverage for python / lsst / drp / tasks / update_visit_summary.py: 18%

367 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-04 17:41 +0000

1# This file is part of drp_tasks. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21 

22from __future__ import annotations 

23 

24__all__ = ( 

25 "UpdateVisitSummaryConnections", 

26 "UpdateVisitSummaryConfig", 

27 "UpdateVisitSummaryTask", 

28 "PossiblyMultipleInput", 

29 "PerTractInput", 

30 "PerHealpixInput", 

31 "GlobalInput", 

32) 

33 

34import dataclasses 

35import re 

36from abc import ABC, abstractmethod 

37from collections.abc import Iterable, Mapping 

38from typing import Any 

39 

40import astropy.table 

41import numpy as np 

42 

43import lsst.pipe.base.connectionTypes as cT 

44from lsst.afw.cameraGeom import Camera 

45from lsst.afw.geom import SkyWcs 

46from lsst.afw.image import ExposureSummaryStats 

47from lsst.afw.math import BackgroundList 

48from lsst.afw.table import ExposureCatalog, ExposureRecord, SchemaMapper 

49from lsst.daf.butler import Butler, DatasetRef, DeferredDatasetHandle 

50from lsst.geom import Angle, Box2D, Box2I, SpherePoint, arcseconds, degrees 

51from lsst.meas.astrom.fit_sip_approximation import FitSipApproximationTask 

52from lsst.meas.astrom.refit_pointing import RefitPointingTask 

53from lsst.pex.config import ChoiceField, ConfigurableField, Field, ListField 

54from lsst.pipe.base import ( 

55 AlgorithmError, 

56 AnnotatedPartialOutputsError, 

57 InputQuantizedConnection, 

58 InvalidQuantumError, 

59 OutputQuantizedConnection, 

60 PipelineTask, 

61 PipelineTaskConfig, 

62 PipelineTaskConnections, 

63 QuantumContext, 

64 Struct, 

65) 

66from lsst.pipe.tasks.computeExposureSummaryStats import ComputeExposureSummaryStatsTask 

67from lsst.skymap import BaseSkyMap, TractInfo 

68from lsst.skymap.detail import makeSkyPolygonFromBBox 

69from lsst.sphgeom import HealpixPixelization 

70 

71 

72def compute_center_for_detector_record( 

73 record: ExposureRecord, bbox: Box2I | None = None, wcs: SkyWcs | None = None 

74) -> SpherePoint | None: 

75 """Compute the sky coordinate center for a detector to be used when 

76 testing distance to tract center. 

77 

78 Parameters 

79 ---------- 

80 record : `lsst.afw.table.ExposureRecord` 

81 Exposure record to obtain WCS and bbox from if not provided. 

82 bbox : `lsst.geom.Box2I`, optional 

83 Bounding box for the detector in its own pixel coordinates. 

84 wcs : `lsst.afw.geom.SkyWcs`, optional 

85 WCS that maps the detector's pixel coordinate system to celestial 

86 coordinates. 

87 

88 Returns 

89 ------- 

90 center : `lsst.geom.SpherePoint` or `None` 

91 Center of the detector in sky coordinates, or `None` if no WCS was 

92 given or present in the given record. 

93 """ 

94 if bbox is None: 

95 bbox = record.getBBox() 

96 if wcs is None: 

97 wcs = record.getWcs() 

98 if wcs is None: 

99 return None 

100 region = makeSkyPolygonFromBBox(bbox, wcs) 

101 return SpherePoint(region.getCentroid()) 

102 

103 

104def _validate_wcs_provider(name: str) -> bool: 

105 """A helper function to validate the WCS provider at config time. 

106 

107 Parameters 

108 ---------- 

109 name : str 

110 The source or dimension of the source of the WCS. 

111 

112 Returns 

113 ------- 

114 is_valid : bool 

115 Whether the provided source is valid. 

116 """ 

117 # Check if ``name`` is a healpix dimension. 

118 match_healpix = re.match(re.compile(r"healpix\d+"), name) 

119 # Check if ``name`` is one of the other source options. 

120 match_other = name in ["input_summary", "global", "tract"] 

121 return match_healpix or match_other 

122 

123 

124class PossiblyMultipleInput(ABC): 

125 """A helper ABC for handling input `~lsst.afw.table.ExposureCatalog` 

126 datasets that may be multiple (one per tract/visit combination) or 

127 unique/global (one per visit). 

128 """ 

129 

130 @abstractmethod 

131 def best_for_detector( 

132 self, 

133 detector_id: int, 

134 center: SpherePoint | None = None, 

135 bbox: Box2I | None = None, 

136 ) -> tuple[int, ExposureRecord | None]: 

137 """Return the exposure record for this detector that is the best match 

138 for this detector. 

139 

140 Parameters 

141 ---------- 

142 detector_id : `int` 

143 Detector ID; used to find the right row in the catalog or catalogs. 

144 center : `lsst.geom.SpherePoint` or `None` 

145 Center of the detector in sky coordinates. If not provided, one 

146 will be computed via `compute_center_for_detector_record`. 

147 bbox : `lsst.geom.Box2I`, optional 

148 Bounding box for the detector in its own pixel coordinates. 

149 

150 Returns 

151 ------- 

152 tract_id : `int` 

153 ID of the tract that supplied this record, or `-1` if ``record`` is 

154 `None` or if the input was not per-tract. 

155 record : `lsst.afw.table.ExposureRecord` or `None` 

156 Best record for this detector, or `None` if there either were no 

157 records for this detector or no WCS available to compute a center. 

158 """ 

159 raise NotImplementedError() 

160 

161 

162@dataclasses.dataclass 

163class PerTractInput(PossiblyMultipleInput): 

164 """Wrapper class for input `~lsst.afw.table.ExposureCatalog` datasets 

165 that are per-tract. 

166 

167 This selects the best tract via the minimum average distance (on the sky) 

168 from the detector's corners to the tract center. 

169 """ 

170 

171 catalogs_by_tract: list[tuple[TractInfo, ExposureCatalog]] 

172 """List of tuples of catalogs and the tracts they correspond to 

173 (`list` [`tuple` [`lsst.skymap.TractInfo`, 

174 `lsst.afw.table.ExposureCatalog`]]). 

175 """ 

176 

177 @classmethod 

178 def load( 

179 cls, 

180 butler: QuantumContext | Butler, 

181 sky_map: BaseSkyMap, 

182 refs: Iterable[DatasetRef], 

183 ) -> PerTractInput: 

184 """Load and wrap input catalogs. 

185 

186 Parameters 

187 ---------- 

188 butler : `lsst.pipe.base.QuantumContext` 

189 Butler proxy used in `~lsst.pipe.base.PipelineTask.runQuantum`. 

190 sky_map : `lsst.skymap.BaseSkyMap` 

191 Definition of tracts and patches. 

192 refs : `~collections.abc.Iterable` [`lsst.daf.butler.DatasetRef`] 

193 References to the catalog datasets to load. 

194 

195 Returns 

196 ------- 

197 wrapper : `PerTractInput` 

198 Wrapper object for the loaded catalogs. 

199 """ 

200 catalogs_by_tract = [] 

201 for ref in refs: 

202 tract_id = ref.dataId["tract"] 

203 tract_info = sky_map[tract_id] 

204 catalogs_by_tract.append( 

205 ( 

206 tract_info, 

207 butler.get(ref), 

208 ) 

209 ) 

210 return cls(catalogs_by_tract) 

211 

212 def best_for_detector( 

213 self, 

214 detector_id: int, 

215 center: SpherePoint | None = None, 

216 bbox: Box2I | None = None, 

217 ) -> tuple[int, ExposureRecord | None]: 

218 # Docstring inherited. 

219 best_result: tuple[int, ExposureRecord | None] = (-1, None) 

220 best_distance: Angle = float("inf") * degrees 

221 for tract_info, catalog in self.catalogs_by_tract: 

222 record = catalog.find(detector_id) 

223 if record is None: 

224 continue 

225 if center is None: 

226 center_for_record = compute_center_for_detector_record(record, bbox=bbox) 

227 if center_for_record is None: 

228 continue 

229 else: 

230 center_for_record = center 

231 center_distance = tract_info.ctr_coord.separation(center_for_record) 

232 if best_distance > center_distance: 

233 best_result = (tract_info.tract_id, record) 

234 best_distance = center_distance 

235 return best_result 

236 

237 

238@dataclasses.dataclass 

239class PerHealpixInput(PossiblyMultipleInput): 

240 """Wrapper class for input `~lsst.afw.table.ExposureCatalog` datasets 

241 that are per-healpix pixel. 

242 

243 This selects the best pixel based on which one contains the detector 

244 centroid. Since the pixel regions are ConvexPolygons that do not perfectly 

245 describe the pixel borders, in case of ambiguity, the pixel is chosen via 

246 the minimum distance (on the sky) from the detector's centroid to the pixel 

247 centroid. 

248 """ 

249 

250 catalogs_by_pixel: list[int, tuple[int, SpherePoint, ExposureCatalog]] 

251 """List of tuples of catalogs and the pixels they correspond to 

252 (`list` [`tuple` [`int`, `lsst.geom.SpherePoint`, 

253 `lsst.afw.table.ExposureCatalog`]]). 

254 """ 

255 

256 @classmethod 

257 def load( 

258 cls, 

259 butler: QuantumContext | Butler, 

260 refs: Iterable[DatasetRef], 

261 ) -> PerHealpixInput: 

262 """Load and wrap input catalogs. 

263 

264 Parameters 

265 ---------- 

266 butler : `lsst.pipe.base.QuantumContext` 

267 Butler proxy used in `~lsst.pipe.base.PipelineTask.runQuantum`. 

268 refs : `~collections.abc.Iterable` [`lsst.daf.butler.DatasetRef`] 

269 References to the catalog datasets to load. 

270 

271 Returns 

272 ------- 

273 wrapper : `PerHealpixInput` 

274 Wrapper object for the loaded catalogs. 

275 """ 

276 catalogs_by_pixel = [] 

277 pixelization = HealpixPixelization(3) 

278 for ref in refs: 

279 pixel_id = ref.dataId["healpix3"] 

280 pixel_region = pixelization.pixel(pixel_id) 

281 catalogs_by_pixel.append( 

282 ( 

283 pixel_id, 

284 pixel_region, 

285 butler.get(ref), 

286 ) 

287 ) 

288 return cls(catalogs_by_pixel) 

289 

290 def best_for_detector( 

291 self, 

292 detector_id: int, 

293 center: SpherePoint | None = None, 

294 bbox: Box2I | None = None, 

295 ) -> tuple[int, ExposureRecord | None]: 

296 # Docstring inherited. 

297 best_result: tuple[int, ExposureRecord | None] = (-1, None) 

298 inPixel = [] 

299 distances = [] 

300 results = [] 

301 for pixel_id, pixel_region, catalog in self.catalogs_by_pixel: 

302 record = catalog.find(detector_id) 

303 if record is None: 

304 continue 

305 if center is None: 

306 center_for_record = compute_center_for_detector_record(record, bbox=bbox) 

307 if center_for_record is None: 

308 continue 

309 else: 

310 center_for_record = center 

311 

312 inPixel.append(pixel_region.contains(center_for_record.getVector())) 

313 distances.append(SpherePoint(pixel_region.getCentroid()).separation(center_for_record)) 

314 results.append((pixel_id, record)) 

315 

316 if len(results) == 0: 

317 # There were no entries at all; return the null result. 

318 return best_result 

319 

320 inPixel = np.array(inPixel) 

321 results = np.array(results) 

322 distances = np.array(distances) 

323 if inPixel.sum() == 1: 

324 # There was one match in the pixel; return it. 

325 best_result = results[inPixel][0] 

326 elif inPixel.sum() == 0: 

327 # There were no entries in the pixel, but choose the closest. 

328 best_result = results[distances.argmin()] 

329 else: 

330 # There were multiple entries in the pixel. Choose the closest. 

331 best_result = results[inPixel][distances[inPixel].argmin()] 

332 return best_result 

333 

334 

335@dataclasses.dataclass 

336class GlobalInput(PossiblyMultipleInput): 

337 """Wrapper class for input `~lsst.afw.table.ExposureCatalog` datasets 

338 that are not per-tract. 

339 """ 

340 

341 catalog: ExposureCatalog 

342 """Loaded per-visit catalog dataset (`lsst.afw.table.ExposureCatalog`). 

343 """ 

344 

345 def best_for_detector( 

346 self, 

347 detector_id: int, 

348 center: SpherePoint | None = None, 

349 bbox: Box2I | None = None, 

350 ) -> tuple[int, ExposureRecord | None]: 

351 # Docstring inherited. 

352 return -1, self.catalog.find(detector_id) 

353 

354 

355class UpdateVisitSummaryConnections( 

356 PipelineTaskConnections, 

357 dimensions=("instrument", "visit"), 

358 defaultTemplates={ 

359 "skyWcsName": "gbdesAstrometricFit", 

360 "photoCalibName": "fgcm", 

361 }, 

362): 

363 camera = cT.PrerequisiteInput( 

364 doc="Camera geometry.", 

365 name="camera", 

366 dimensions=("instrument",), 

367 storageClass="Camera", 

368 isCalibration=True, 

369 ) 

370 sky_map = cT.Input( 

371 doc="Description of tract/patch geometry.", 

372 name=BaseSkyMap.SKYMAP_DATASET_TYPE_NAME, 

373 dimensions=("skymap",), 

374 storageClass="SkyMap", 

375 ) 

376 input_summary_schema = cT.InitInput( 

377 doc="Schema for input_summary_catalog.", 

378 name="visitSummary_schema", 

379 storageClass="ExposureCatalog", 

380 ) 

381 input_summary_catalog = cT.Input( 

382 doc="Visit summary table to load and modify.", 

383 name="visitSummary", 

384 dimensions=("instrument", "visit"), 

385 storageClass="ExposureCatalog", 

386 ) 

387 input_exposures = cT.Input( 

388 doc=( 

389 "Per-detector images to obtain image, mask, and variance from " 

390 "(embedded summary stats and other components are ignored)." 

391 ), 

392 name="calexp", 

393 dimensions=("instrument", "detector", "visit"), 

394 storageClass="ExposureF", 

395 multiple=True, 

396 deferLoad=True, 

397 deferGraphConstraint=True, 

398 ) 

399 psf_overrides = cT.Input( 

400 doc="Visit-level catalog of updated PSFs to use.", 

401 name="finalized_psf_ap_corr_catalog", 

402 dimensions=("instrument", "visit"), 

403 storageClass="ExposureCatalog", 

404 deferGraphConstraint=True, 

405 ) 

406 psf_star_catalog = cT.Input( 

407 doc="Per-visit table of PSF reserved- and used-star measurements.", 

408 name="finalized_src_table", 

409 dimensions=("instrument", "visit"), 

410 storageClass="ArrowAstropy", 

411 deferGraphConstraint=True, 

412 ) 

413 ap_corr_overrides = cT.Input( 

414 doc="Visit-level catalog of updated aperture correction maps to use.", 

415 name="finalized_psf_ap_corr_catalog", 

416 dimensions=("instrument", "visit"), 

417 storageClass="ExposureCatalog", 

418 deferGraphConstraint=True, 

419 ) 

420 photo_calib_overrides_tract = cT.Input( 

421 doc="Per-Tract visit-level catalog of updated photometric calibration objects to use.", 

422 name="{photoCalibName}PhotoCalibCatalog", 

423 dimensions=("instrument", "visit", "tract"), 

424 storageClass="ExposureCatalog", 

425 multiple=True, 

426 deferGraphConstraint=True, 

427 ) 

428 photo_calib_overrides_global = cT.Input( 

429 doc="Global visit-level catalog of updated photometric calibration objects to use.", 

430 name="{photoCalibName}PhotoCalibCatalog", 

431 dimensions=("instrument", "visit"), 

432 storageClass="ExposureCatalog", 

433 deferGraphConstraint=True, 

434 ) 

435 wcs_overrides_tract = cT.Input( 

436 doc="Per-tract visit-level catalog of updated astrometric calibration objects to use.", 

437 name="{skyWcsName}SkyWcsCatalog", 

438 dimensions=("instrument", "visit", "tract"), 

439 storageClass="ExposureCatalog", 

440 multiple=True, 

441 deferGraphConstraint=True, 

442 ) 

443 wcs_overrides_skypix = cT.Input( 

444 doc="Per-skypix pixel visit-level catalog of updated astrometric calibration objects to use.", 

445 name="gbdesHealpix3AstrometricFitSkyWcsCatalog", 

446 dimensions=("instrument", "visit", "healpix3"), 

447 storageClass="ExposureCatalog", 

448 multiple=True, 

449 deferGraphConstraint=True, 

450 ) 

451 wcs_overrides_global = cT.Input( 

452 doc="Global visit-level catalog of updated astrometric calibration objects to use.", 

453 name="{skyWcsName}SkyWcsCatalog", 

454 dimensions=("instrument", "visit"), 

455 storageClass="ExposureCatalog", 

456 deferGraphConstraint=True, 

457 ) 

458 background_originals = cT.Input( 

459 doc="Per-detector original background that has already been subtracted from 'input_exposures'.", 

460 name="calexpBackground", 

461 dimensions=("instrument", "visit", "detector"), 

462 storageClass="Background", 

463 multiple=True, 

464 deferLoad=True, 

465 deferGraphConstraint=True, 

466 ) 

467 background_overrides = cT.Input( 

468 doc="Per-detector background that can be subtracted directly from 'input_exposures'.", 

469 name="skyCorr", 

470 dimensions=("instrument", "visit", "detector"), 

471 storageClass="Background", 

472 multiple=True, 

473 deferLoad=True, 

474 deferGraphConstraint=True, 

475 ) 

476 output_summary_schema = cT.InitOutput( 

477 doc="Schema of the output visit summary catalog.", 

478 name="finalVisitSummary_schema", 

479 storageClass="ExposureCatalog", 

480 ) 

481 output_summary_catalog = cT.Output( 

482 doc="Visit-level catalog summarizing all image characterizations and calibrations.", 

483 name="finalVisitSummary", 

484 dimensions=("instrument", "visit"), 

485 storageClass="ExposureCatalog", 

486 ) 

487 visit_geometry = cT.Output( 

488 doc="Updated visit[, detector] regions that can be used to update butler dimensions records.", 

489 name="visit_geometry", 

490 dimensions=("instrument", "visit"), 

491 storageClass="VisitGeometry", 

492 ) 

493 

494 def __init__(self, *, config: UpdateVisitSummaryConfig | None = None): 

495 super().__init__(config=config) 

496 match self.config.wcs_provider: 

497 case "input_summary": 

498 self.inputs.remove("wcs_overrides_tract") 

499 self.inputs.remove("wcs_overrides_global") 

500 self.inputs.remove("wcs_overrides_skypix") 

501 case "tract": 

502 self.inputs.remove("wcs_overrides_global") 

503 self.inputs.remove("wcs_overrides_skypix") 

504 case "global": 

505 self.inputs.remove("wcs_overrides_tract") 

506 self.inputs.remove("wcs_overrides_skypix") 

507 case x if "healpix" in x: 

508 self.inputs.remove("wcs_overrides_tract") 

509 self.inputs.remove("wcs_overrides_global") 

510 self.wcs_overrides_skypix = dataclasses.replace( 

511 self.wcs_overrides_skypix, dimensions=("instrument", "visit", self.config.wcs_provider) 

512 ) 

513 case bad: 

514 raise ValueError(f"Invalid value wcs_provider={bad!r}; config was not validated.") 

515 match self.config.photo_calib_provider: 

516 case "input_summary": 

517 self.inputs.remove("photo_calib_overrides_tract") 

518 self.inputs.remove("photo_calib_overrides_global") 

519 case "tract": 

520 self.inputs.remove("photo_calib_overrides_global") 

521 case "global": 

522 self.inputs.remove("photo_calib_overrides_tract") 

523 case bad: 

524 raise ValueError(f"Invalid value photo_calib_provider={bad!r}; config was not validated.") 

525 match self.config.background_provider: 

526 case "input_summary": 

527 self.inputs.remove("background_originals") 

528 self.inputs.remove("background_overrides") 

529 case "replacement": 

530 pass 

531 case bad: 

532 raise ValueError(f"Invalid value background_provider={bad!r}; config was not validated.") 

533 if self.config.do_refit_pointing: 

534 self.camera = dataclasses.replace(self.camera, dimensions=config.camera_dimensions) 

535 else: 

536 del self.camera 

537 if not self.config.do_write_visit_geometry: 

538 del self.visit_geometry 

539 

540 

541class UpdateVisitSummaryConfig(PipelineTaskConfig, pipelineConnections=UpdateVisitSummaryConnections): 

542 """Configuration for UpdateVisitSummaryTask. 

543 

544 Notes 

545 ----- 

546 The configuration defaults for this task reflect a simple or "least common 

547 denominator" pipeline, not the more complete, more sophisticated pipeline 

548 we run on the instruments we support best. The expectation is that the 

549 various full pipeline definitions will generally import the simpler 

550 definition, so making the defaults correspond to any full pipeline would 

551 just lead to the simple pipeline setting them back to the simple-pipeline 

552 values and the full pipeline still having to then override them to the 

553 full-pipeline values. 

554 """ 

555 

556 compute_summary_stats = ConfigurableField( 

557 doc="Subtask that computes summary statistics from Exposure components.", 

558 target=ComputeExposureSummaryStatsTask, 

559 ) 

560 wcs_provider = Field( 

561 doc=( 

562 "Which connection and behavior to use when applying WCS overrides." 

563 "The string should be one of the following:\n" 

564 "- input_summary: Propagate the WCS from the input visit summary " 

565 "catalog and do not recompute WCS-based summary statistics.\n" 

566 "- tract: Use the 'wcs_overrides_tract' connection to load an " 

567 "`ExposureCatalog` with {visit, tract} dimensions and per-" 

568 "detector rows, and recommpute WCS-based summary statistics.\n" 

569 "- global: Use the 'wcs_overrides_global' connection to load an " 

570 "`ExposureCatalog` with {visit} dimensions and per-detector rows," 

571 " and recommpute WCS-based summary statistics.\n" 

572 "- healpixN: Use the 'wcs_overrides_skypix' connection to load an " 

573 "`ExposureCatalog` with {visit, healpixN} dimensions and per-" 

574 "detector rows, and recommpute WCS-based summary statistics." 

575 ), 

576 # If needed, we could add options here to propagate the WCS from 

577 # the input exposures and/or transfer WCS-based summary statistics 

578 # from them as well. Right now there's no use case for that, since 

579 # the input visit summary is always produced after the last time we 

580 # write a new Exposure. 

581 dtype=str, 

582 default="input_summary", 

583 optional=False, 

584 check=_validate_wcs_provider, 

585 ) 

586 photo_calib_provider = ChoiceField( 

587 doc="Which connection and behavior to use when applying photometric calibration overrides.", 

588 dtype=str, 

589 allowed={ 

590 "input_summary": ( 

591 "Propagate the PhotoCalib from the input visit summary catalog " 

592 "and do not recompute photometric calibration summary " 

593 "statistics." 

594 ), 

595 "tract": { 

596 "Use the 'photo_calib_overrides_tract' connection to load an " 

597 "`ExposureCatalog` with {visit, tract} dimensions and per-" 

598 "detector rows, and recommpute photometric calibration summary " 

599 "statistics." 

600 }, 

601 "global": { 

602 "Use the 'photo_calib_overrides_global' connection to load an " 

603 "`ExposureCatalog` with {visit} dimensions and per-" 

604 "detector rows, and recommpute photometric calibration summary " 

605 "statistics." 

606 }, 

607 # If needed, we could add options here to propagate the PhotoCalib 

608 # from the input exposures and/or transfer photometric calibration 

609 # summary statistics them as well. Right now there's no use case 

610 # for that, since the input visit summary is always produced after 

611 # the last time we write a new Exposure. 

612 }, 

613 default="input_summary", 

614 optional=False, 

615 ) 

616 background_provider = ChoiceField( 

617 doc="Which connection(s) and behavior to use when applying background overrides.", 

618 dtype=str, 

619 allowed={ 

620 "input_summary": ( 

621 "The input visit summary catalog already includes summary " 

622 "statistics for the final backgrounds that can be used as-is." 

623 ), 

624 "replacement": { 

625 "The 'background_originals' connection refers to a background " 

626 "model that has been superseded by the model referred to by " 

627 "the 'background_overrides' connection." 

628 }, 

629 # Could also imagine an option in which there is no original 

630 # background and the new one stands alone; can add later if needed. 

631 }, 

632 default="input_summary", 

633 optional=False, 

634 ) 

635 # Could imagine an option here to say that the original background has not 

636 # been subtracted from the input exposures, allowing postISRCCD to be used 

637 # as input exposures. Can add later if needed. 

638 wcs_consistency_threshold = Field( 

639 doc=( 

640 "Maximum distance (in arcseconds) between the on-sky corners of a ``wcs_provider`` WCS " 

641 "and the ``input_summary`` WCS. When this threshold is exceeded, the WCS is set to None. " 

642 "When this is not `None` and there is no input summary WCS, the WCS is also nulled." 

643 ), 

644 dtype=float, 

645 optional=True, 

646 default=None, 

647 ) 

648 refit_pointing = ConfigurableField( 

649 "A subtask for refitting the boresight pointing and orientation, " 

650 "and using those to produce new regions for butler dimensions.", 

651 target=RefitPointingTask, 

652 ) 

653 camera_dimensions = ListField( 

654 "The dimensions of the 'camera' prerequisite input connection.", 

655 dtype=str, 

656 default=["instrument"], 

657 ) 

658 do_refit_pointing = Field( 

659 "Whether to re-fit the pointing model.", 

660 dtype=bool, 

661 default=True, 

662 ) 

663 do_write_visit_geometry = Field( 

664 "Whether to write refit-pointing regions that can be used to update butler dimension records.", 

665 dtype=bool, 

666 default=False, 

667 ) 

668 do_fit_sip_approximations = Field( 

669 "Whether to fit TAN-SIP WCS approximations for use in FITS headers. " 

670 "This also causes the 'wcs_sip_delta_sky' and 'wcs_sip_delta_pixel' fields to be added " 

671 "to track the precision of these approximations.", 

672 dtype=bool, 

673 default=True, 

674 ) 

675 fit_sip_approximations = ConfigurableField( 

676 "Configuration for fitting TAN-SIP WCS approximations for use in FITS headers.", 

677 target=FitSipApproximationTask, 

678 ) 

679 

680 def validate(self): 

681 super().validate() 

682 if self.wcs_consistency_threshold is not None and self.wcs_provider == "input_summary": 

683 raise ValueError("Cannot check for WCS consistency if there is only one WCS to consider.") 

684 if self.do_write_visit_geometry and not self.do_refit_pointing: 

685 raise ValueError("Cannot write visit_geometry without refitting the pointing.") 

686 

687 

688class UpdateVisitSummaryTask(PipelineTask): 

689 """A pipeline task that creates a new visit-summary table after all 

690 `lsst.afw.image.Exposure` components have been finalized. 

691 

692 Notes 

693 ----- 

694 This task is designed to be run just prior to making warps for coaddition, 

695 as it aggregates all inputs other than the images and backgrounds into a 

696 single ``ExposureCatalog`` dataset and recomputes summary statistics that 

697 are useful in selecting which images should go into a coadd. Its output 

698 can also be used to reconstruct a final processed visit image when combined 

699 with a post-ISR image, the background model, and the final mask. 

700 """ 

701 

702 # The `run` method of this task can conditionally apply overrides for PSFs 

703 # and aperture corrections, but its `PipelineTask` interface always applies 

704 # them. We can always add the config options to make them optional later, 

705 # if that turns out to be useful. 

706 

707 _DefaultName = "updateVisitSummary" 

708 ConfigClass = UpdateVisitSummaryConfig 

709 

710 compute_summary_stats: ComputeExposureSummaryStatsTask 

711 

712 def __init__(self, *, initInputs: dict[str, Any] | None = None, **kwargs: Any): 

713 super().__init__(initInputs=initInputs, **kwargs) 

714 self.makeSubtask("compute_summary_stats") 

715 if initInputs is None or "input_summary_schema" not in initInputs: 

716 raise RuntimeError("Task requires 'input_summary_schema' in initInputs.") 

717 input_summary_schema = initInputs["input_summary_schema"].schema 

718 self.schema_mapper = SchemaMapper(input_summary_schema) 

719 self.schema_mapper.addMinimalSchema(input_summary_schema) 

720 self.schema = self.schema_mapper.getOutputSchema() 

721 if self.config.wcs_provider == "tract": 

722 self.schema.addField("wcsTractId", type="L", doc="ID of the tract that provided the WCS.") 

723 elif "healpix" in self.config.wcs_provider: 

724 self.schema.addField("wcsSkypixId", type="L", doc="ID of the skypix pixel that provided the WCS.") 

725 if self.config.wcs_provider != "input_summary": 

726 self.schema.addField( 

727 "wcs_corner_max_offset", 

728 type="Angle", 

729 doc=( 

730 "Maximum on-sky difference between the input-summary WCS and the wcs_provider WCS " 

731 "at detector corners." 

732 ), 

733 ) 

734 if self.config.photo_calib_provider == "tract": 

735 self.schema.addField( 

736 "photoCalibTractId", 

737 type="L", 

738 doc="ID of the tract that provided the PhotoCalib.", 

739 ) 

740 if self.config.do_refit_pointing: 

741 self.makeSubtask("refit_pointing", schema=self.schema) 

742 self.output_summary_schema = ExposureCatalog(self.schema) 

743 self._wcs_consistency_threshold = ( 

744 self.config.wcs_consistency_threshold * arcseconds 

745 if self.config.wcs_consistency_threshold is not None 

746 else None 

747 ) 

748 if self.config.do_fit_sip_approximations: 

749 self.makeSubtask("fit_sip_approximations") 

750 self.schema.addField( 

751 "wcs_sip_delta_sky", 

752 type="Angle", 

753 doc="Maximum pixel-to-sky difference between the WCS and its FITS TAN-SIP approximation.", 

754 ) 

755 self.schema.addField( 

756 "wcs_sip_delta_pixel", 

757 type=np.float64, 

758 units="pixel", 

759 doc="Maximum sky-to-pixl difference between the WCS and its FITS TAN-SIP approximation.", 

760 ) 

761 

762 def runQuantum( 

763 self, 

764 butlerQC: QuantumContext, 

765 inputRefs: InputQuantizedConnection, 

766 outputRefs: OutputQuantizedConnection, 

767 ) -> None: 

768 # Docstring inherited. 

769 sky_map = butlerQC.get(inputRefs.sky_map) 

770 del inputRefs.sky_map 

771 inputs = {} 

772 # Collapse the wcs_override_ and photo_calib_override_ connection pairs 

773 # into individual inputs (either ExposureCatalog or PerTractInput 

774 # objects). 

775 match self.config.wcs_provider: 

776 case "tract": 

777 inputs["wcs_overrides"] = PerTractInput.load(butlerQC, sky_map, inputRefs.wcs_overrides_tract) 

778 del inputRefs.wcs_overrides_tract 

779 case "global": 

780 inputs["wcs_overrides"] = GlobalInput(butlerQC.get(inputRefs.wcs_overrides_global)) 

781 del inputRefs.wcs_overrides_global 

782 case "input_summary": 

783 inputs["wcs_overrides"] = None 

784 case x if "healpix" in x: 

785 # TODO: match healpix of arbitrary int? 

786 inputs["wcs_overrides"] = PerHealpixInput.load(butlerQC, inputRefs.wcs_overrides_skypix) 

787 del inputRefs.wcs_overrides_skypix 

788 match self.config.photo_calib_provider: 

789 case "tract": 

790 inputs["photo_calib_overrides"] = PerTractInput.load( 

791 butlerQC, sky_map, inputRefs.photo_calib_overrides_tract 

792 ) 

793 del inputRefs.photo_calib_overrides_tract 

794 case "global": 

795 inputs["photo_calib_overrides"] = GlobalInput( 

796 butlerQC.get(inputRefs.photo_calib_overrides_global) 

797 ) 

798 del inputRefs.photo_calib_overrides_global 

799 case "input_summary": 

800 inputs["photo_calib_overrides"] = None 

801 # Load or make DeferredDatasetHandles for everything else. 

802 inputs.update(butlerQC.get(inputRefs)) 

803 deferred_dataset_types = ["input_exposures"] 

804 # Handle whether to look for background originals and overrides at all. 

805 match self.config.background_provider: 

806 case "replacement": 

807 deferred_dataset_types.append("background_originals") 

808 deferred_dataset_types.append("background_overrides") 

809 # Transform the lists of DeferredDatasetHandles for the multiple=True, 

810 # deferLoad=True connections into mappings keyed by detector ID. 

811 for name in deferred_dataset_types: 

812 handles_list = inputs[name] 

813 inputs[name] = {handle.dataId["detector"]: handle for handle in handles_list} 

814 for record in inputs["input_summary_catalog"]: 

815 detector_id = record.getId() 

816 if detector_id not in inputs[name]: 

817 raise InvalidQuantumError( 

818 f"No {name!r} with detector {detector_id} for visit " 

819 f"{butlerQC.quantum.dataId['visit']} even though this detector is present " 

820 "in the input visit summary catalog. " 

821 "This is most likely to occur when the QuantumGraph that includes this task " 

822 "was incorrectly generated with an explicit or implicit (from datasets) tract " 

823 "constraint." 

824 ) 

825 # Actually run the task and write the results. 

826 result = Struct() 

827 try: 

828 self.run(**inputs, result=result) 

829 except AlgorithmError as e: 

830 error = AnnotatedPartialOutputsError.annotate( 

831 e, self, result.output_summary_catalog, log=self.log 

832 ) 

833 butlerQC.put(result, outputRefs) 

834 raise error from e 

835 

836 butlerQC.put(result, outputRefs) 

837 

838 def run( 

839 self, 

840 *, 

841 result: Struct, 

842 input_summary_catalog: ExposureCatalog, 

843 input_exposures: Mapping[int, DeferredDatasetHandle], 

844 psf_overrides: ExposureCatalog, 

845 psf_star_catalog: astropy.table.Table, 

846 ap_corr_overrides: ExposureCatalog, 

847 camera: Camera | None = None, 

848 photo_calib_overrides: PossiblyMultipleInput | None = None, 

849 wcs_overrides: PossiblyMultipleInput | None = None, 

850 background_originals: Mapping[int, DeferredDatasetHandle] | None = None, 

851 background_overrides: Mapping[int, DeferredDatasetHandle] | None = None, 

852 ): 

853 """Build an updated version of a visit summary catalog. 

854 

855 Parameters 

856 ---------- 

857 result : `lsst.pipe.base.Struct` 

858 Output struct to modify in-place. On successful return, this will 

859 have the following attributes: 

860 

861 - ``output_summary_catalog`` (`lsst.afw.table.ExposureCatalog`) 

862 The output visit summary catalog. 

863 - ``visit_geometry`` (`lsst.obs.base.visit_geometry.VisitGeometry`) 

864 Updated visit[, detector] regions that can be used to update 

865 butler dimension records. 

866 

867 input_summary_catalog : `lsst.afw.table.ExposureCatalog` 

868 Input catalog. Each row in this catalog will be used to produce 

869 a row in the output catalog. Any override parameter that is `None` 

870 will leave the corresponding values unchanged from those in this 

871 input catalog. 

872 input_exposures : `collections.abc.Mapping` [`int`, 

873 `lsst.daf.butler.DeferredDatasetHandle`] 

874 Deferred-load objects that fetch `lsst.afw.image.Exposure` 

875 instances. Only the image, mask, and variance are used; all other 

876 components are assumed to be superceded by at least 

877 ``input_summary_catalog`` and probably some ``_overrides`` 

878 arguments as well. This usually corresponds to the ``calexp`` 

879 dataset. 

880 psf_overrides : `lsst.afw.table.ExposureCatalog` 

881 Catalog with attached `lsst.afw.detection.Psf` objects that 

882 supersede the input catalog's PSFs. 

883 psf_star_catalog : `astropy.table.Table` 

884 Table containing PSF stars for use in computing PSF summary 

885 statistics. 

886 ap_corr_overrides : `lsst.afw.table.ExposureCatalog` 

887 Catalog with attached `lsst.afw.image.ApCorrMap` objects that 

888 supersede the input catalog's aperture corrections. 

889 camera : `lsst.afw.cameraGeom.Camera`, optional 

890 Camera geometry. 

891 photo_calib_overrides : `PossiblyMultipleInput`, optional 

892 Catalog wrappers with attached `lsst.afw.image.PhotoCalib` 

893 objects that supersede the input catalog's photometric 

894 calibrations. 

895 wcs_overrides : `PossiblyMultipleInput`, optional 

896 Catalog wrappers with attached `lsst.afw.geom.SkyWcs` objects 

897 that supersede the input catalog's astrometric calibrations. 

898 background_originals : `collections.abc.Mapping` [`int`, 

899 `lsst.daf.butler.DeferredDatasetHandle`], optional 

900 Deferred-load objects that fetch `lsst.afw.math.BackgroundList` 

901 instances. These should correspond to the background already 

902 subtracted from ``input_exposures``. If not provided and 

903 ``background_overrides`` is, it is assumed that the background in 

904 ``input_exposures`` has not been subtracted. If provided, all keys 

905 in ``background_overrides`` must also be present in 

906 ``background_originals``. 

907 background_overrides : `collections.abc.Mapping` [`int`, 

908 `lsst.daf.butler.DeferredDatasetHandle`], optional 

909 Deferred-load objects that fetch `lsst.afw.math.BackgroundList` 

910 instances. These should correspond to the background that should 

911 now be subtracted from``input_exposures`` to yield the final 

912 background-subtracted image. 

913 

914 Returns 

915 ------- 

916 result : `lsst.pipe.base.Struct` 

917 The same output struct passed in. 

918 

919 Notes 

920 ----- 

921 If any override parameter is provided but does not have a value for a 

922 particular detector, that component will be set to `None` in the 

923 returned catalog for that detector and all summary statistics derived 

924 from that component will be reset (usually to ``NaN``) as well. Not 

925 passing an override parameter at all will instead pass through the 

926 original component and values from the input catalog unchanged. 

927 """ 

928 result.output_summary_catalog = ExposureCatalog(self.schema) 

929 result.output_summary_catalog.setMetadata(input_summary_catalog.getMetadata()) 

930 for input_record in input_summary_catalog: 

931 detector_id = input_record.getId() 

932 output_record = result.output_summary_catalog.addNew() 

933 

934 # Make a new ExposureSummaryStats from the input record. These 

935 # might be full of NaNs if the summary stats were not computed 

936 # originally because initial astrometry or photometry failed, but 

937 # if that's possible there should be a flag for it. 

938 summary_stats = ExposureSummaryStats.from_record(input_record) 

939 

940 # Also copy the input record values to output record; this copies 

941 # many of the same values just copied into `summary_stats` (which 

942 # will be overridden later by summary_stats.update_record), but it 

943 # also copies fields that aren't part of summary_stats, including 

944 # the actual components like Psf, Wcs, etc. 

945 output_record.assign(input_record, self.schema_mapper) 

946 

947 exposure = input_exposures[detector_id].get() 

948 bbox = exposure.getBBox() 

949 

950 if wcs_overrides: 

951 wcs_region, wcs_record = wcs_overrides.best_for_detector(detector_id, bbox=bbox) 

952 if wcs_record is not None: 

953 wcs = wcs_record.getWcs() 

954 else: 

955 wcs = None 

956 if self.config.wcs_provider == "tract": 

957 output_record["wcsTractId"] = wcs_region 

958 elif "healpix" in self.config.wcs_provider: 

959 output_record["wcsSkypixId"] = wcs_region 

960 wcs = self._finish_wcs_assignment(input_record, output_record, wcs) 

961 self.compute_summary_stats.update_wcs_stats( 

962 summary_stats, wcs, bbox, output_record.getVisitInfo() 

963 ) 

964 else: 

965 wcs = input_record.getWcs() 

966 

967 sources = None 

968 

969 if (psf_record := psf_overrides.find(detector_id)) is not None: 

970 psf = psf_record.getPsf() 

971 else: 

972 psf = None 

973 output_record.setPsf(psf) 

974 sources = psf_star_catalog[psf_star_catalog["detector"] == detector_id] 

975 if len(sources) == 0: 

976 sources = None 

977 

978 if (ap_corr_record := ap_corr_overrides.find(detector_id)) is not None: 

979 ap_corr = ap_corr_record.getApCorrMap() 

980 else: 

981 ap_corr = None 

982 output_record.setApCorrMap(ap_corr) 

983 self.compute_summary_stats.update_psf_stats( 

984 summary_stats, 

985 psf, 

986 bbox, 

987 sources, 

988 image_mask=exposure.mask, 

989 image_ap_corr_map=ap_corr, 

990 sources_is_astropy=True, 

991 ) 

992 

993 if photo_calib_overrides: 

994 center = compute_center_for_detector_record(output_record, bbox, wcs) 

995 ( 

996 photo_calib_tract, 

997 photo_calib_record, 

998 ) = photo_calib_overrides.best_for_detector(detector_id, center=center) 

999 if photo_calib_record is not None: 

1000 photo_calib = photo_calib_record.getPhotoCalib() 

1001 else: 

1002 photo_calib = None 

1003 if self.config.photo_calib_provider == "tract": 

1004 output_record["photoCalibTractId"] = photo_calib_tract 

1005 output_record.setPhotoCalib(photo_calib) 

1006 self.compute_summary_stats.update_photo_calib_stats(summary_stats, photo_calib) 

1007 

1008 if background_overrides is not None: 

1009 if (handle := background_overrides.get(detector_id)) is not None: 

1010 new_bkg = handle.get() 

1011 if background_originals is not None: 

1012 orig_bkg = background_originals[detector_id].get() 

1013 else: 

1014 orig_bkg = BackgroundList() 

1015 

1016 full_bkg = orig_bkg.clone() 

1017 for layer in new_bkg: 

1018 full_bkg.append(layer) 

1019 exposure.image -= new_bkg.getImage() 

1020 self.compute_summary_stats.update_background_stats(summary_stats, full_bkg) 

1021 self.compute_summary_stats.update_masked_image_stats( 

1022 summary_stats, exposure.getMaskedImage() 

1023 ) 

1024 

1025 # Update the estimated depth and effective exposure time. 

1026 self.compute_summary_stats.update_magnitude_limit_stats(summary_stats, exposure) 

1027 self.compute_summary_stats.update_effective_time_stats(summary_stats, exposure) 

1028 

1029 summary_stats.update_record(output_record) 

1030 del exposure 

1031 

1032 if self.config.do_refit_pointing: 

1033 refit_pointing_result = self.refit_pointing.run( 

1034 catalog=result.output_summary_catalog, 

1035 camera=camera, 

1036 ) 

1037 result.visit_geometry = refit_pointing_result.regions 

1038 

1039 if self.config.do_fit_sip_approximations: 

1040 for output_record in result.output_summary_catalog: 

1041 wcs = output_record.getWcs() 

1042 if wcs is None: 

1043 continue 

1044 sip = self.fit_sip_approximations.run(wcs=wcs, bbox=output_record.getBBox()) 

1045 output_record.setWcs(sip.wcs) 

1046 output_record["wcs_sip_delta_sky"] = sip.delta_sky 

1047 output_record["wcs_sip_delta_pixel"] = sip.delta_pixel 

1048 

1049 return result 

1050 

1051 def _finish_wcs_assignment( 

1052 self, input_record: ExposureRecord, output_record: ExposureRecord, updated: SkyWcs 

1053 ) -> SkyWcs | None: 

1054 preliminary = input_record.getWcs() 

1055 if preliminary is None: 

1056 # We can't compare to preliminary, so shortcut early. 

1057 if self._wcs_consistency_threshold is not None: 

1058 # We can't check consistency, so don't trust the updated WCS. 

1059 output_record.setWcs(None) 

1060 return None 

1061 else: 

1062 output_record.setWcs(updated) 

1063 return updated 

1064 if updated is None: 

1065 # At present we assume all downstream code is "updated WCS or 

1066 # bust", because we know some code (e.g. coaddition) is, and we 

1067 # don't have a good way to mark a WCS as fallback preliminary 

1068 # downstream. 

1069 output_record.setWcs(None) 

1070 return None 

1071 pixel_corners = Box2D(input_record.getBBox()).getCorners() 

1072 preliminary_sky_corners = preliminary.pixelToSky(pixel_corners) 

1073 updated_sky_corners = updated.pixelToSky(pixel_corners) 

1074 max_offset = max(a.separation(b) for a, b in zip(preliminary_sky_corners, updated_sky_corners)) 

1075 output_record["wcs_corner_max_offset"] = max_offset 

1076 if self._wcs_consistency_threshold is not None and max_offset > self._wcs_consistency_threshold: 

1077 output_record.setWcs(None) 

1078 return None 

1079 output_record.setWcs(updated) 

1080 return updated