Coverage for python / lsst / drp / tasks / update_visit_summary.py: 18%
367 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-22 09:28 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-22 09:28 +0000
1# This file is part of drp_tasks.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
22from __future__ import annotations
24__all__ = (
25 "UpdateVisitSummaryConnections",
26 "UpdateVisitSummaryConfig",
27 "UpdateVisitSummaryTask",
28 "PossiblyMultipleInput",
29 "PerTractInput",
30 "PerHealpixInput",
31 "GlobalInput",
32)
34import dataclasses
35import re
36from abc import ABC, abstractmethod
37from collections.abc import Iterable, Mapping
38from typing import Any
40import astropy.table
41import numpy as np
43import lsst.pipe.base.connectionTypes as cT
44from lsst.afw.cameraGeom import Camera
45from lsst.afw.geom import SkyWcs
46from lsst.afw.image import ExposureSummaryStats
47from lsst.afw.math import BackgroundList
48from lsst.afw.table import ExposureCatalog, ExposureRecord, SchemaMapper
49from lsst.daf.butler import Butler, DatasetRef, DeferredDatasetHandle
50from lsst.geom import Angle, Box2D, Box2I, SpherePoint, arcseconds, degrees
51from lsst.meas.astrom.fit_sip_approximation import FitSipApproximationTask
52from lsst.meas.astrom.refit_pointing import RefitPointingTask
53from lsst.pex.config import ChoiceField, ConfigurableField, Field, ListField
54from lsst.pipe.base import (
55 AlgorithmError,
56 AnnotatedPartialOutputsError,
57 InputQuantizedConnection,
58 InvalidQuantumError,
59 OutputQuantizedConnection,
60 PipelineTask,
61 PipelineTaskConfig,
62 PipelineTaskConnections,
63 QuantumContext,
64 Struct,
65)
66from lsst.pipe.tasks.computeExposureSummaryStats import ComputeExposureSummaryStatsTask
67from lsst.skymap import BaseSkyMap, TractInfo
68from lsst.skymap.detail import makeSkyPolygonFromBBox
69from lsst.sphgeom import HealpixPixelization
72def compute_center_for_detector_record(
73 record: ExposureRecord, bbox: Box2I | None = None, wcs: SkyWcs | None = None
74) -> SpherePoint | None:
75 """Compute the sky coordinate center for a detector to be used when
76 testing distance to tract center.
78 Parameters
79 ----------
80 record : `lsst.afw.table.ExposureRecord`
81 Exposure record to obtain WCS and bbox from if not provided.
82 bbox : `lsst.geom.Box2I`, optional
83 Bounding box for the detector in its own pixel coordinates.
84 wcs : `lsst.afw.geom.SkyWcs`, optional
85 WCS that maps the detector's pixel coordinate system to celestial
86 coordinates.
88 Returns
89 -------
90 center : `lsst.geom.SpherePoint` or `None`
91 Center of the detector in sky coordinates, or `None` if no WCS was
92 given or present in the given record.
93 """
94 if bbox is None:
95 bbox = record.getBBox()
96 if wcs is None:
97 wcs = record.getWcs()
98 if wcs is None:
99 return None
100 region = makeSkyPolygonFromBBox(bbox, wcs)
101 return SpherePoint(region.getCentroid())
104def _validate_wcs_provider(name: str) -> bool:
105 """A helper function to validate the WCS provider at config time.
107 Parameters
108 ----------
109 name : str
110 The source or dimension of the source of the WCS.
112 Returns
113 -------
114 is_valid : bool
115 Whether the provided source is valid.
116 """
117 # Check if ``name`` is a healpix dimension.
118 match_healpix = re.match(re.compile(r"healpix\d+"), name)
119 # Check if ``name`` is one of the other source options.
120 match_other = name in ["input_summary", "global", "tract"]
121 return match_healpix or match_other
124class PossiblyMultipleInput(ABC):
125 """A helper ABC for handling input `~lsst.afw.table.ExposureCatalog`
126 datasets that may be multiple (one per tract/visit combination) or
127 unique/global (one per visit).
128 """
130 @abstractmethod
131 def best_for_detector(
132 self,
133 detector_id: int,
134 center: SpherePoint | None = None,
135 bbox: Box2I | None = None,
136 ) -> tuple[int, ExposureRecord | None]:
137 """Return the exposure record for this detector that is the best match
138 for this detector.
140 Parameters
141 ----------
142 detector_id : `int`
143 Detector ID; used to find the right row in the catalog or catalogs.
144 center : `lsst.geom.SpherePoint` or `None`
145 Center of the detector in sky coordinates. If not provided, one
146 will be computed via `compute_center_for_detector_record`.
147 bbox : `lsst.geom.Box2I`, optional
148 Bounding box for the detector in its own pixel coordinates.
150 Returns
151 -------
152 tract_id : `int`
153 ID of the tract that supplied this record, or `-1` if ``record`` is
154 `None` or if the input was not per-tract.
155 record : `lsst.afw.table.ExposureRecord` or `None`
156 Best record for this detector, or `None` if there either were no
157 records for this detector or no WCS available to compute a center.
158 """
159 raise NotImplementedError()
162@dataclasses.dataclass
163class PerTractInput(PossiblyMultipleInput):
164 """Wrapper class for input `~lsst.afw.table.ExposureCatalog` datasets
165 that are per-tract.
167 This selects the best tract via the minimum average distance (on the sky)
168 from the detector's corners to the tract center.
169 """
171 catalogs_by_tract: list[tuple[TractInfo, ExposureCatalog]]
172 """List of tuples of catalogs and the tracts they correspond to
173 (`list` [`tuple` [`lsst.skymap.TractInfo`,
174 `lsst.afw.table.ExposureCatalog`]]).
175 """
177 @classmethod
178 def load(
179 cls,
180 butler: QuantumContext | Butler,
181 sky_map: BaseSkyMap,
182 refs: Iterable[DatasetRef],
183 ) -> PerTractInput:
184 """Load and wrap input catalogs.
186 Parameters
187 ----------
188 butler : `lsst.pipe.base.QuantumContext`
189 Butler proxy used in `~lsst.pipe.base.PipelineTask.runQuantum`.
190 sky_map : `lsst.skymap.BaseSkyMap`
191 Definition of tracts and patches.
192 refs : `~collections.abc.Iterable` [`lsst.daf.butler.DatasetRef`]
193 References to the catalog datasets to load.
195 Returns
196 -------
197 wrapper : `PerTractInput`
198 Wrapper object for the loaded catalogs.
199 """
200 catalogs_by_tract = []
201 for ref in refs:
202 tract_id = ref.dataId["tract"]
203 tract_info = sky_map[tract_id]
204 catalogs_by_tract.append(
205 (
206 tract_info,
207 butler.get(ref),
208 )
209 )
210 return cls(catalogs_by_tract)
212 def best_for_detector(
213 self,
214 detector_id: int,
215 center: SpherePoint | None = None,
216 bbox: Box2I | None = None,
217 ) -> tuple[int, ExposureRecord | None]:
218 # Docstring inherited.
219 best_result: tuple[int, ExposureRecord | None] = (-1, None)
220 best_distance: Angle = float("inf") * degrees
221 for tract_info, catalog in self.catalogs_by_tract:
222 record = catalog.find(detector_id)
223 if record is None:
224 continue
225 if center is None:
226 center_for_record = compute_center_for_detector_record(record, bbox=bbox)
227 if center_for_record is None:
228 continue
229 else:
230 center_for_record = center
231 center_distance = tract_info.ctr_coord.separation(center_for_record)
232 if best_distance > center_distance:
233 best_result = (tract_info.tract_id, record)
234 best_distance = center_distance
235 return best_result
238@dataclasses.dataclass
239class PerHealpixInput(PossiblyMultipleInput):
240 """Wrapper class for input `~lsst.afw.table.ExposureCatalog` datasets
241 that are per-healpix pixel.
243 This selects the best pixel based on which one contains the detector
244 centroid. Since the pixel regions are ConvexPolygons that do not perfectly
245 describe the pixel borders, in case of ambiguity, the pixel is chosen via
246 the minimum distance (on the sky) from the detector's centroid to the pixel
247 centroid.
248 """
250 catalogs_by_pixel: list[int, tuple[int, SpherePoint, ExposureCatalog]]
251 """List of tuples of catalogs and the pixels they correspond to
252 (`list` [`tuple` [`int`, `lsst.geom.SpherePoint`,
253 `lsst.afw.table.ExposureCatalog`]]).
254 """
256 @classmethod
257 def load(
258 cls,
259 butler: QuantumContext | Butler,
260 refs: Iterable[DatasetRef],
261 ) -> PerHealpixInput:
262 """Load and wrap input catalogs.
264 Parameters
265 ----------
266 butler : `lsst.pipe.base.QuantumContext`
267 Butler proxy used in `~lsst.pipe.base.PipelineTask.runQuantum`.
268 refs : `~collections.abc.Iterable` [`lsst.daf.butler.DatasetRef`]
269 References to the catalog datasets to load.
271 Returns
272 -------
273 wrapper : `PerHealpixInput`
274 Wrapper object for the loaded catalogs.
275 """
276 catalogs_by_pixel = []
277 pixelization = HealpixPixelization(3)
278 for ref in refs:
279 pixel_id = ref.dataId["healpix3"]
280 pixel_region = pixelization.pixel(pixel_id)
281 catalogs_by_pixel.append(
282 (
283 pixel_id,
284 pixel_region,
285 butler.get(ref),
286 )
287 )
288 return cls(catalogs_by_pixel)
290 def best_for_detector(
291 self,
292 detector_id: int,
293 center: SpherePoint | None = None,
294 bbox: Box2I | None = None,
295 ) -> tuple[int, ExposureRecord | None]:
296 # Docstring inherited.
297 best_result: tuple[int, ExposureRecord | None] = (-1, None)
298 inPixel = []
299 distances = []
300 results = []
301 for pixel_id, pixel_region, catalog in self.catalogs_by_pixel:
302 record = catalog.find(detector_id)
303 if record is None:
304 continue
305 if center is None:
306 center_for_record = compute_center_for_detector_record(record, bbox=bbox)
307 if center_for_record is None:
308 continue
309 else:
310 center_for_record = center
312 inPixel.append(pixel_region.contains(center_for_record.getVector()))
313 distances.append(SpherePoint(pixel_region.getCentroid()).separation(center_for_record))
314 results.append((pixel_id, record))
316 if len(results) == 0:
317 # There were no entries at all; return the null result.
318 return best_result
320 inPixel = np.array(inPixel)
321 results = np.array(results)
322 distances = np.array(distances)
323 if inPixel.sum() == 1:
324 # There was one match in the pixel; return it.
325 best_result = results[inPixel][0]
326 elif inPixel.sum() == 0:
327 # There were no entries in the pixel, but choose the closest.
328 best_result = results[distances.argmin()]
329 else:
330 # There were multiple entries in the pixel. Choose the closest.
331 best_result = results[inPixel][distances[inPixel].argmin()]
332 return best_result
335@dataclasses.dataclass
336class GlobalInput(PossiblyMultipleInput):
337 """Wrapper class for input `~lsst.afw.table.ExposureCatalog` datasets
338 that are not per-tract.
339 """
341 catalog: ExposureCatalog
342 """Loaded per-visit catalog dataset (`lsst.afw.table.ExposureCatalog`).
343 """
345 def best_for_detector(
346 self,
347 detector_id: int,
348 center: SpherePoint | None = None,
349 bbox: Box2I | None = None,
350 ) -> tuple[int, ExposureRecord | None]:
351 # Docstring inherited.
352 return -1, self.catalog.find(detector_id)
355class UpdateVisitSummaryConnections(
356 PipelineTaskConnections,
357 dimensions=("instrument", "visit"),
358 defaultTemplates={
359 "skyWcsName": "gbdesAstrometricFit",
360 "photoCalibName": "fgcm",
361 },
362):
363 camera = cT.PrerequisiteInput(
364 doc="Camera geometry.",
365 name="camera",
366 dimensions=("instrument",),
367 storageClass="Camera",
368 isCalibration=True,
369 )
370 sky_map = cT.Input(
371 doc="Description of tract/patch geometry.",
372 name=BaseSkyMap.SKYMAP_DATASET_TYPE_NAME,
373 dimensions=("skymap",),
374 storageClass="SkyMap",
375 )
376 input_summary_schema = cT.InitInput(
377 doc="Schema for input_summary_catalog.",
378 name="visitSummary_schema",
379 storageClass="ExposureCatalog",
380 )
381 input_summary_catalog = cT.Input(
382 doc="Visit summary table to load and modify.",
383 name="visitSummary",
384 dimensions=("instrument", "visit"),
385 storageClass="ExposureCatalog",
386 )
387 input_exposures = cT.Input(
388 doc=(
389 "Per-detector images to obtain image, mask, and variance from "
390 "(embedded summary stats and other components are ignored)."
391 ),
392 name="calexp",
393 dimensions=("instrument", "detector", "visit"),
394 storageClass="ExposureF",
395 multiple=True,
396 deferLoad=True,
397 deferGraphConstraint=True,
398 )
399 psf_overrides = cT.Input(
400 doc="Visit-level catalog of updated PSFs to use.",
401 name="finalized_psf_ap_corr_catalog",
402 dimensions=("instrument", "visit"),
403 storageClass="ExposureCatalog",
404 deferGraphConstraint=True,
405 )
406 psf_star_catalog = cT.Input(
407 doc="Per-visit table of PSF reserved- and used-star measurements.",
408 name="finalized_src_table",
409 dimensions=("instrument", "visit"),
410 storageClass="ArrowAstropy",
411 deferGraphConstraint=True,
412 )
413 ap_corr_overrides = cT.Input(
414 doc="Visit-level catalog of updated aperture correction maps to use.",
415 name="finalized_psf_ap_corr_catalog",
416 dimensions=("instrument", "visit"),
417 storageClass="ExposureCatalog",
418 deferGraphConstraint=True,
419 )
420 photo_calib_overrides_tract = cT.Input(
421 doc="Per-Tract visit-level catalog of updated photometric calibration objects to use.",
422 name="{photoCalibName}PhotoCalibCatalog",
423 dimensions=("instrument", "visit", "tract"),
424 storageClass="ExposureCatalog",
425 multiple=True,
426 deferGraphConstraint=True,
427 )
428 photo_calib_overrides_global = cT.Input(
429 doc="Global visit-level catalog of updated photometric calibration objects to use.",
430 name="{photoCalibName}PhotoCalibCatalog",
431 dimensions=("instrument", "visit"),
432 storageClass="ExposureCatalog",
433 deferGraphConstraint=True,
434 )
435 wcs_overrides_tract = cT.Input(
436 doc="Per-tract visit-level catalog of updated astrometric calibration objects to use.",
437 name="{skyWcsName}SkyWcsCatalog",
438 dimensions=("instrument", "visit", "tract"),
439 storageClass="ExposureCatalog",
440 multiple=True,
441 deferGraphConstraint=True,
442 )
443 wcs_overrides_skypix = cT.Input(
444 doc="Per-skypix pixel visit-level catalog of updated astrometric calibration objects to use.",
445 name="gbdesHealpix3AstrometricFitSkyWcsCatalog",
446 dimensions=("instrument", "visit", "healpix3"),
447 storageClass="ExposureCatalog",
448 multiple=True,
449 deferGraphConstraint=True,
450 )
451 wcs_overrides_global = cT.Input(
452 doc="Global visit-level catalog of updated astrometric calibration objects to use.",
453 name="{skyWcsName}SkyWcsCatalog",
454 dimensions=("instrument", "visit"),
455 storageClass="ExposureCatalog",
456 deferGraphConstraint=True,
457 )
458 background_originals = cT.Input(
459 doc="Per-detector original background that has already been subtracted from 'input_exposures'.",
460 name="calexpBackground",
461 dimensions=("instrument", "visit", "detector"),
462 storageClass="Background",
463 multiple=True,
464 deferLoad=True,
465 deferGraphConstraint=True,
466 )
467 background_overrides = cT.Input(
468 doc="Per-detector background that can be subtracted directly from 'input_exposures'.",
469 name="skyCorr",
470 dimensions=("instrument", "visit", "detector"),
471 storageClass="Background",
472 multiple=True,
473 deferLoad=True,
474 deferGraphConstraint=True,
475 )
476 output_summary_schema = cT.InitOutput(
477 doc="Schema of the output visit summary catalog.",
478 name="finalVisitSummary_schema",
479 storageClass="ExposureCatalog",
480 )
481 output_summary_catalog = cT.Output(
482 doc="Visit-level catalog summarizing all image characterizations and calibrations.",
483 name="finalVisitSummary",
484 dimensions=("instrument", "visit"),
485 storageClass="ExposureCatalog",
486 )
487 visit_geometry = cT.Output(
488 doc="Updated visit[, detector] regions that can be used to update butler dimensions records.",
489 name="visit_geometry",
490 dimensions=("instrument", "visit"),
491 storageClass="VisitGeometry",
492 )
494 def __init__(self, *, config: UpdateVisitSummaryConfig | None = None):
495 super().__init__(config=config)
496 match self.config.wcs_provider:
497 case "input_summary":
498 self.inputs.remove("wcs_overrides_tract")
499 self.inputs.remove("wcs_overrides_global")
500 self.inputs.remove("wcs_overrides_skypix")
501 case "tract":
502 self.inputs.remove("wcs_overrides_global")
503 self.inputs.remove("wcs_overrides_skypix")
504 case "global":
505 self.inputs.remove("wcs_overrides_tract")
506 self.inputs.remove("wcs_overrides_skypix")
507 case x if "healpix" in x:
508 self.inputs.remove("wcs_overrides_tract")
509 self.inputs.remove("wcs_overrides_global")
510 self.wcs_overrides_skypix = dataclasses.replace(
511 self.wcs_overrides_skypix, dimensions=("instrument", "visit", self.config.wcs_provider)
512 )
513 case bad:
514 raise ValueError(f"Invalid value wcs_provider={bad!r}; config was not validated.")
515 match self.config.photo_calib_provider:
516 case "input_summary":
517 self.inputs.remove("photo_calib_overrides_tract")
518 self.inputs.remove("photo_calib_overrides_global")
519 case "tract":
520 self.inputs.remove("photo_calib_overrides_global")
521 case "global":
522 self.inputs.remove("photo_calib_overrides_tract")
523 case bad:
524 raise ValueError(f"Invalid value photo_calib_provider={bad!r}; config was not validated.")
525 match self.config.background_provider:
526 case "input_summary":
527 self.inputs.remove("background_originals")
528 self.inputs.remove("background_overrides")
529 case "replacement":
530 pass
531 case bad:
532 raise ValueError(f"Invalid value background_provider={bad!r}; config was not validated.")
533 if self.config.do_refit_pointing:
534 self.camera = dataclasses.replace(self.camera, dimensions=config.camera_dimensions)
535 else:
536 del self.camera
537 if not self.config.do_write_visit_geometry:
538 del self.visit_geometry
541class UpdateVisitSummaryConfig(PipelineTaskConfig, pipelineConnections=UpdateVisitSummaryConnections):
542 """Configuration for UpdateVisitSummaryTask.
544 Notes
545 -----
546 The configuration defaults for this task reflect a simple or "least common
547 denominator" pipeline, not the more complete, more sophisticated pipeline
548 we run on the instruments we support best. The expectation is that the
549 various full pipeline definitions will generally import the simpler
550 definition, so making the defaults correspond to any full pipeline would
551 just lead to the simple pipeline setting them back to the simple-pipeline
552 values and the full pipeline still having to then override them to the
553 full-pipeline values.
554 """
556 compute_summary_stats = ConfigurableField(
557 doc="Subtask that computes summary statistics from Exposure components.",
558 target=ComputeExposureSummaryStatsTask,
559 )
560 wcs_provider = Field(
561 doc=(
562 "Which connection and behavior to use when applying WCS overrides."
563 "The string should be one of the following:\n"
564 "- input_summary: Propagate the WCS from the input visit summary "
565 "catalog and do not recompute WCS-based summary statistics.\n"
566 "- tract: Use the 'wcs_overrides_tract' connection to load an "
567 "`ExposureCatalog` with {visit, tract} dimensions and per-"
568 "detector rows, and recommpute WCS-based summary statistics.\n"
569 "- global: Use the 'wcs_overrides_global' connection to load an "
570 "`ExposureCatalog` with {visit} dimensions and per-detector rows,"
571 " and recommpute WCS-based summary statistics.\n"
572 "- healpixN: Use the 'wcs_overrides_skypix' connection to load an "
573 "`ExposureCatalog` with {visit, healpixN} dimensions and per-"
574 "detector rows, and recommpute WCS-based summary statistics."
575 ),
576 # If needed, we could add options here to propagate the WCS from
577 # the input exposures and/or transfer WCS-based summary statistics
578 # from them as well. Right now there's no use case for that, since
579 # the input visit summary is always produced after the last time we
580 # write a new Exposure.
581 dtype=str,
582 default="input_summary",
583 optional=False,
584 check=_validate_wcs_provider,
585 )
586 photo_calib_provider = ChoiceField(
587 doc="Which connection and behavior to use when applying photometric calibration overrides.",
588 dtype=str,
589 allowed={
590 "input_summary": (
591 "Propagate the PhotoCalib from the input visit summary catalog "
592 "and do not recompute photometric calibration summary "
593 "statistics."
594 ),
595 "tract": {
596 "Use the 'photo_calib_overrides_tract' connection to load an "
597 "`ExposureCatalog` with {visit, tract} dimensions and per-"
598 "detector rows, and recommpute photometric calibration summary "
599 "statistics."
600 },
601 "global": {
602 "Use the 'photo_calib_overrides_global' connection to load an "
603 "`ExposureCatalog` with {visit} dimensions and per-"
604 "detector rows, and recommpute photometric calibration summary "
605 "statistics."
606 },
607 # If needed, we could add options here to propagate the PhotoCalib
608 # from the input exposures and/or transfer photometric calibration
609 # summary statistics them as well. Right now there's no use case
610 # for that, since the input visit summary is always produced after
611 # the last time we write a new Exposure.
612 },
613 default="input_summary",
614 optional=False,
615 )
616 background_provider = ChoiceField(
617 doc="Which connection(s) and behavior to use when applying background overrides.",
618 dtype=str,
619 allowed={
620 "input_summary": (
621 "The input visit summary catalog already includes summary "
622 "statistics for the final backgrounds that can be used as-is."
623 ),
624 "replacement": {
625 "The 'background_originals' connection refers to a background "
626 "model that has been superseded by the model referred to by "
627 "the 'background_overrides' connection."
628 },
629 # Could also imagine an option in which there is no original
630 # background and the new one stands alone; can add later if needed.
631 },
632 default="input_summary",
633 optional=False,
634 )
635 # Could imagine an option here to say that the original background has not
636 # been subtracted from the input exposures, allowing postISRCCD to be used
637 # as input exposures. Can add later if needed.
638 wcs_consistency_threshold = Field(
639 doc=(
640 "Maximum distance (in arcseconds) between the on-sky corners of a ``wcs_provider`` WCS "
641 "and the ``input_summary`` WCS. When this threshold is exceeded, the WCS is set to None. "
642 "When this is not `None` and there is no input summary WCS, the WCS is also nulled."
643 ),
644 dtype=float,
645 optional=True,
646 default=None,
647 )
648 refit_pointing = ConfigurableField(
649 "A subtask for refitting the boresight pointing and orientation, "
650 "and using those to produce new regions for butler dimensions.",
651 target=RefitPointingTask,
652 )
653 camera_dimensions = ListField(
654 "The dimensions of the 'camera' prerequisite input connection.",
655 dtype=str,
656 default=["instrument"],
657 )
658 do_refit_pointing = Field(
659 "Whether to re-fit the pointing model.",
660 dtype=bool,
661 default=True,
662 )
663 do_write_visit_geometry = Field(
664 "Whether to write refit-pointing regions that can be used to update butler dimension records.",
665 dtype=bool,
666 default=False,
667 )
668 do_fit_sip_approximations = Field(
669 "Whether to fit TAN-SIP WCS approximations for use in FITS headers. "
670 "This also causes the 'wcs_sip_delta_sky' and 'wcs_sip_delta_pixel' fields to be added "
671 "to track the precision of these approximations.",
672 dtype=bool,
673 default=True,
674 )
675 fit_sip_approximations = ConfigurableField(
676 "Configuration for fitting TAN-SIP WCS approximations for use in FITS headers.",
677 target=FitSipApproximationTask,
678 )
680 def validate(self):
681 super().validate()
682 if self.wcs_consistency_threshold is not None and self.wcs_provider == "input_summary":
683 raise ValueError("Cannot check for WCS consistency if there is only one WCS to consider.")
684 if self.do_write_visit_geometry and not self.do_refit_pointing:
685 raise ValueError("Cannot write visit_geometry without refitting the pointing.")
688class UpdateVisitSummaryTask(PipelineTask):
689 """A pipeline task that creates a new visit-summary table after all
690 `lsst.afw.image.Exposure` components have been finalized.
692 Notes
693 -----
694 This task is designed to be run just prior to making warps for coaddition,
695 as it aggregates all inputs other than the images and backgrounds into a
696 single ``ExposureCatalog`` dataset and recomputes summary statistics that
697 are useful in selecting which images should go into a coadd. Its output
698 can also be used to reconstruct a final processed visit image when combined
699 with a post-ISR image, the background model, and the final mask.
700 """
702 # The `run` method of this task can conditionally apply overrides for PSFs
703 # and aperture corrections, but its `PipelineTask` interface always applies
704 # them. We can always add the config options to make them optional later,
705 # if that turns out to be useful.
707 _DefaultName = "updateVisitSummary"
708 ConfigClass = UpdateVisitSummaryConfig
710 compute_summary_stats: ComputeExposureSummaryStatsTask
712 def __init__(self, *, initInputs: dict[str, Any] | None = None, **kwargs: Any):
713 super().__init__(initInputs=initInputs, **kwargs)
714 self.makeSubtask("compute_summary_stats")
715 if initInputs is None or "input_summary_schema" not in initInputs:
716 raise RuntimeError("Task requires 'input_summary_schema' in initInputs.")
717 input_summary_schema = initInputs["input_summary_schema"].schema
718 self.schema_mapper = SchemaMapper(input_summary_schema)
719 self.schema_mapper.addMinimalSchema(input_summary_schema)
720 self.schema = self.schema_mapper.getOutputSchema()
721 if self.config.wcs_provider == "tract":
722 self.schema.addField("wcsTractId", type="L", doc="ID of the tract that provided the WCS.")
723 elif "healpix" in self.config.wcs_provider:
724 self.schema.addField("wcsSkypixId", type="L", doc="ID of the skypix pixel that provided the WCS.")
725 if self.config.wcs_provider != "input_summary":
726 self.schema.addField(
727 "wcs_corner_max_offset",
728 type="Angle",
729 doc=(
730 "Maximum on-sky difference between the input-summary WCS and the wcs_provider WCS "
731 "at detector corners."
732 ),
733 )
734 if self.config.photo_calib_provider == "tract":
735 self.schema.addField(
736 "photoCalibTractId",
737 type="L",
738 doc="ID of the tract that provided the PhotoCalib.",
739 )
740 if self.config.do_refit_pointing:
741 self.makeSubtask("refit_pointing", schema=self.schema)
742 self.output_summary_schema = ExposureCatalog(self.schema)
743 self._wcs_consistency_threshold = (
744 self.config.wcs_consistency_threshold * arcseconds
745 if self.config.wcs_consistency_threshold is not None
746 else None
747 )
748 if self.config.do_fit_sip_approximations:
749 self.makeSubtask("fit_sip_approximations")
750 self.schema.addField(
751 "wcs_sip_delta_sky",
752 type="Angle",
753 doc="Maximum pixel-to-sky difference between the WCS and its FITS TAN-SIP approximation.",
754 )
755 self.schema.addField(
756 "wcs_sip_delta_pixel",
757 type=np.float64,
758 units="pixel",
759 doc="Maximum sky-to-pixl difference between the WCS and its FITS TAN-SIP approximation.",
760 )
762 def runQuantum(
763 self,
764 butlerQC: QuantumContext,
765 inputRefs: InputQuantizedConnection,
766 outputRefs: OutputQuantizedConnection,
767 ) -> None:
768 # Docstring inherited.
769 sky_map = butlerQC.get(inputRefs.sky_map)
770 del inputRefs.sky_map
771 inputs = {}
772 # Collapse the wcs_override_ and photo_calib_override_ connection pairs
773 # into individual inputs (either ExposureCatalog or PerTractInput
774 # objects).
775 match self.config.wcs_provider:
776 case "tract":
777 inputs["wcs_overrides"] = PerTractInput.load(butlerQC, sky_map, inputRefs.wcs_overrides_tract)
778 del inputRefs.wcs_overrides_tract
779 case "global":
780 inputs["wcs_overrides"] = GlobalInput(butlerQC.get(inputRefs.wcs_overrides_global))
781 del inputRefs.wcs_overrides_global
782 case "input_summary":
783 inputs["wcs_overrides"] = None
784 case x if "healpix" in x:
785 # TODO: match healpix of arbitrary int?
786 inputs["wcs_overrides"] = PerHealpixInput.load(butlerQC, inputRefs.wcs_overrides_skypix)
787 del inputRefs.wcs_overrides_skypix
788 match self.config.photo_calib_provider:
789 case "tract":
790 inputs["photo_calib_overrides"] = PerTractInput.load(
791 butlerQC, sky_map, inputRefs.photo_calib_overrides_tract
792 )
793 del inputRefs.photo_calib_overrides_tract
794 case "global":
795 inputs["photo_calib_overrides"] = GlobalInput(
796 butlerQC.get(inputRefs.photo_calib_overrides_global)
797 )
798 del inputRefs.photo_calib_overrides_global
799 case "input_summary":
800 inputs["photo_calib_overrides"] = None
801 # Load or make DeferredDatasetHandles for everything else.
802 inputs.update(butlerQC.get(inputRefs))
803 deferred_dataset_types = ["input_exposures"]
804 # Handle whether to look for background originals and overrides at all.
805 match self.config.background_provider:
806 case "replacement":
807 deferred_dataset_types.append("background_originals")
808 deferred_dataset_types.append("background_overrides")
809 # Transform the lists of DeferredDatasetHandles for the multiple=True,
810 # deferLoad=True connections into mappings keyed by detector ID.
811 for name in deferred_dataset_types:
812 handles_list = inputs[name]
813 inputs[name] = {handle.dataId["detector"]: handle for handle in handles_list}
814 for record in inputs["input_summary_catalog"]:
815 detector_id = record.getId()
816 if detector_id not in inputs[name]:
817 raise InvalidQuantumError(
818 f"No {name!r} with detector {detector_id} for visit "
819 f"{butlerQC.quantum.dataId['visit']} even though this detector is present "
820 "in the input visit summary catalog. "
821 "This is most likely to occur when the QuantumGraph that includes this task "
822 "was incorrectly generated with an explicit or implicit (from datasets) tract "
823 "constraint."
824 )
825 # Actually run the task and write the results.
826 result = Struct()
827 try:
828 self.run(**inputs, result=result)
829 except AlgorithmError as e:
830 error = AnnotatedPartialOutputsError.annotate(
831 e, self, result.output_summary_catalog, log=self.log
832 )
833 butlerQC.put(result, outputRefs)
834 raise error from e
836 butlerQC.put(result, outputRefs)
838 def run(
839 self,
840 *,
841 result: Struct,
842 input_summary_catalog: ExposureCatalog,
843 input_exposures: Mapping[int, DeferredDatasetHandle],
844 psf_overrides: ExposureCatalog,
845 psf_star_catalog: astropy.table.Table,
846 ap_corr_overrides: ExposureCatalog,
847 camera: Camera | None = None,
848 photo_calib_overrides: PossiblyMultipleInput | None = None,
849 wcs_overrides: PossiblyMultipleInput | None = None,
850 background_originals: Mapping[int, DeferredDatasetHandle] | None = None,
851 background_overrides: Mapping[int, DeferredDatasetHandle] | None = None,
852 ):
853 """Build an updated version of a visit summary catalog.
855 Parameters
856 ----------
857 result : `lsst.pipe.base.Struct`
858 Output struct to modify in-place. On successful return, this will
859 have the following attributes:
861 - ``output_summary_catalog`` (`lsst.afw.table.ExposureCatalog`)
862 The output visit summary catalog.
863 - ``visit_geometry`` (`lsst.obs.base.visit_geometry.VisitGeometry`)
864 Updated visit[, detector] regions that can be used to update
865 butler dimension records.
867 input_summary_catalog : `lsst.afw.table.ExposureCatalog`
868 Input catalog. Each row in this catalog will be used to produce
869 a row in the output catalog. Any override parameter that is `None`
870 will leave the corresponding values unchanged from those in this
871 input catalog.
872 input_exposures : `collections.abc.Mapping` [`int`,
873 `lsst.daf.butler.DeferredDatasetHandle`]
874 Deferred-load objects that fetch `lsst.afw.image.Exposure`
875 instances. Only the image, mask, and variance are used; all other
876 components are assumed to be superceded by at least
877 ``input_summary_catalog`` and probably some ``_overrides``
878 arguments as well. This usually corresponds to the ``calexp``
879 dataset.
880 psf_overrides : `lsst.afw.table.ExposureCatalog`
881 Catalog with attached `lsst.afw.detection.Psf` objects that
882 supersede the input catalog's PSFs.
883 psf_star_catalog : `astropy.table.Table`
884 Table containing PSF stars for use in computing PSF summary
885 statistics.
886 ap_corr_overrides : `lsst.afw.table.ExposureCatalog`
887 Catalog with attached `lsst.afw.image.ApCorrMap` objects that
888 supersede the input catalog's aperture corrections.
889 camera : `lsst.afw.cameraGeom.Camera`, optional
890 Camera geometry.
891 photo_calib_overrides : `PossiblyMultipleInput`, optional
892 Catalog wrappers with attached `lsst.afw.image.PhotoCalib`
893 objects that supersede the input catalog's photometric
894 calibrations.
895 wcs_overrides : `PossiblyMultipleInput`, optional
896 Catalog wrappers with attached `lsst.afw.geom.SkyWcs` objects
897 that supersede the input catalog's astrometric calibrations.
898 background_originals : `collections.abc.Mapping` [`int`,
899 `lsst.daf.butler.DeferredDatasetHandle`], optional
900 Deferred-load objects that fetch `lsst.afw.math.BackgroundList`
901 instances. These should correspond to the background already
902 subtracted from ``input_exposures``. If not provided and
903 ``background_overrides`` is, it is assumed that the background in
904 ``input_exposures`` has not been subtracted. If provided, all keys
905 in ``background_overrides`` must also be present in
906 ``background_originals``.
907 background_overrides : `collections.abc.Mapping` [`int`,
908 `lsst.daf.butler.DeferredDatasetHandle`], optional
909 Deferred-load objects that fetch `lsst.afw.math.BackgroundList`
910 instances. These should correspond to the background that should
911 now be subtracted from``input_exposures`` to yield the final
912 background-subtracted image.
914 Returns
915 -------
916 result : `lsst.pipe.base.Struct`
917 The same output struct passed in.
919 Notes
920 -----
921 If any override parameter is provided but does not have a value for a
922 particular detector, that component will be set to `None` in the
923 returned catalog for that detector and all summary statistics derived
924 from that component will be reset (usually to ``NaN``) as well. Not
925 passing an override parameter at all will instead pass through the
926 original component and values from the input catalog unchanged.
927 """
928 result.output_summary_catalog = ExposureCatalog(self.schema)
929 result.output_summary_catalog.setMetadata(input_summary_catalog.getMetadata())
930 for input_record in input_summary_catalog:
931 detector_id = input_record.getId()
932 output_record = result.output_summary_catalog.addNew()
934 # Make a new ExposureSummaryStats from the input record. These
935 # might be full of NaNs if the summary stats were not computed
936 # originally because initial astrometry or photometry failed, but
937 # if that's possible there should be a flag for it.
938 summary_stats = ExposureSummaryStats.from_record(input_record)
940 # Also copy the input record values to output record; this copies
941 # many of the same values just copied into `summary_stats` (which
942 # will be overridden later by summary_stats.update_record), but it
943 # also copies fields that aren't part of summary_stats, including
944 # the actual components like Psf, Wcs, etc.
945 output_record.assign(input_record, self.schema_mapper)
947 exposure = input_exposures[detector_id].get()
948 bbox = exposure.getBBox()
950 if wcs_overrides:
951 wcs_region, wcs_record = wcs_overrides.best_for_detector(detector_id, bbox=bbox)
952 if wcs_record is not None:
953 wcs = wcs_record.getWcs()
954 else:
955 wcs = None
956 if self.config.wcs_provider == "tract":
957 output_record["wcsTractId"] = wcs_region
958 elif "healpix" in self.config.wcs_provider:
959 output_record["wcsSkypixId"] = wcs_region
960 wcs = self._finish_wcs_assignment(input_record, output_record, wcs)
961 self.compute_summary_stats.update_wcs_stats(
962 summary_stats, wcs, bbox, output_record.getVisitInfo()
963 )
964 else:
965 wcs = input_record.getWcs()
967 sources = None
969 if (psf_record := psf_overrides.find(detector_id)) is not None:
970 psf = psf_record.getPsf()
971 else:
972 psf = None
973 output_record.setPsf(psf)
974 sources = psf_star_catalog[psf_star_catalog["detector"] == detector_id]
975 if len(sources) == 0:
976 sources = None
978 if (ap_corr_record := ap_corr_overrides.find(detector_id)) is not None:
979 ap_corr = ap_corr_record.getApCorrMap()
980 else:
981 ap_corr = None
982 output_record.setApCorrMap(ap_corr)
983 self.compute_summary_stats.update_psf_stats(
984 summary_stats,
985 psf,
986 bbox,
987 sources,
988 image_mask=exposure.mask,
989 image_ap_corr_map=ap_corr,
990 sources_is_astropy=True,
991 )
993 if photo_calib_overrides:
994 center = compute_center_for_detector_record(output_record, bbox, wcs)
995 (
996 photo_calib_tract,
997 photo_calib_record,
998 ) = photo_calib_overrides.best_for_detector(detector_id, center=center)
999 if photo_calib_record is not None:
1000 photo_calib = photo_calib_record.getPhotoCalib()
1001 else:
1002 photo_calib = None
1003 if self.config.photo_calib_provider == "tract":
1004 output_record["photoCalibTractId"] = photo_calib_tract
1005 output_record.setPhotoCalib(photo_calib)
1006 self.compute_summary_stats.update_photo_calib_stats(summary_stats, photo_calib)
1008 if background_overrides is not None:
1009 if (handle := background_overrides.get(detector_id)) is not None:
1010 new_bkg = handle.get()
1011 if background_originals is not None:
1012 orig_bkg = background_originals[detector_id].get()
1013 else:
1014 orig_bkg = BackgroundList()
1016 full_bkg = orig_bkg.clone()
1017 for layer in new_bkg:
1018 full_bkg.append(layer)
1019 exposure.image -= new_bkg.getImage()
1020 self.compute_summary_stats.update_background_stats(summary_stats, full_bkg)
1021 self.compute_summary_stats.update_masked_image_stats(
1022 summary_stats, exposure.getMaskedImage()
1023 )
1025 # Update the estimated depth and effective exposure time.
1026 self.compute_summary_stats.update_magnitude_limit_stats(summary_stats, exposure)
1027 self.compute_summary_stats.update_effective_time_stats(summary_stats, exposure)
1029 summary_stats.update_record(output_record)
1030 del exposure
1032 if self.config.do_refit_pointing:
1033 refit_pointing_result = self.refit_pointing.run(
1034 catalog=result.output_summary_catalog,
1035 camera=camera,
1036 )
1037 result.visit_geometry = refit_pointing_result.regions
1039 if self.config.do_fit_sip_approximations:
1040 for output_record in result.output_summary_catalog:
1041 wcs = output_record.getWcs()
1042 if wcs is None:
1043 continue
1044 sip = self.fit_sip_approximations.run(wcs=wcs, bbox=output_record.getBBox())
1045 output_record.setWcs(sip.wcs)
1046 output_record["wcs_sip_delta_sky"] = sip.delta_sky
1047 output_record["wcs_sip_delta_pixel"] = sip.delta_pixel
1049 return result
1051 def _finish_wcs_assignment(
1052 self, input_record: ExposureRecord, output_record: ExposureRecord, updated: SkyWcs
1053 ) -> SkyWcs | None:
1054 preliminary = input_record.getWcs()
1055 if preliminary is None:
1056 # We can't compare to preliminary, so shortcut early.
1057 if self._wcs_consistency_threshold is not None:
1058 # We can't check consistency, so don't trust the updated WCS.
1059 output_record.setWcs(None)
1060 return None
1061 else:
1062 output_record.setWcs(updated)
1063 return updated
1064 if updated is None:
1065 # At present we assume all downstream code is "updated WCS or
1066 # bust", because we know some code (e.g. coaddition) is, and we
1067 # don't have a good way to mark a WCS as fallback preliminary
1068 # downstream.
1069 output_record.setWcs(None)
1070 return None
1071 pixel_corners = Box2D(input_record.getBBox()).getCorners()
1072 preliminary_sky_corners = preliminary.pixelToSky(pixel_corners)
1073 updated_sky_corners = updated.pixelToSky(pixel_corners)
1074 max_offset = max(a.separation(b) for a, b in zip(preliminary_sky_corners, updated_sky_corners))
1075 output_record["wcs_corner_max_offset"] = max_offset
1076 if self._wcs_consistency_threshold is not None and max_offset > self._wcs_consistency_threshold:
1077 output_record.setWcs(None)
1078 return None
1079 output_record.setWcs(updated)
1080 return updated