Coverage for python / lsst / cell_coadds / _coadd_ap_corr_map.py: 23%
78 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-21 10:41 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-21 10:41 +0000
1# This file is part of cell_coadds.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
22from __future__ import annotations
24__all__ = (
25 "CoaddApCorrMapStacker",
26 "EMPTY_AP_CORR_MAP",
27)
30from collections.abc import Iterable
32import numpy as np
33from frozendict import frozendict
35from lsst.afw.image import ApCorrMap
36from lsst.afw.math import BoundedField
37from lsst.geom import Point2D
39from .typing_helpers import SingleCellCoaddApCorrMap
41EMPTY_AP_CORR_MAP: SingleCellCoaddApCorrMap = frozendict()
42"""Default empty aperture correction map for a single cell coadd."""
45class CoaddApCorrMapStacker:
46 """Online aperture correction map for a cell-based coadd.
48 This class is responsible for implementing the logic to coadd the
49 aperture correction values and their uncertainties.
51 Parameters
52 ----------
53 evaluation_point : `~lsst.geom.Point2D`
54 The point at which the input aperture correction is evaluated.
55 do_coadd_inverse_ap_corr : `bool`, optional
56 If True, the inverse aperture correction is applied to the coadd.
58 Notes
59 -----
60 At least one of class variables are set dynamically the first time the
61 ``add`` method is called on any instance of this class. This behavior
62 is based on the practical assumption that all ``ApCorrMap`` instances will
63 have the same set of field names during the entire processing. A schema
64 is therefore not expected at the time of initialization.
65 """
67 def __init__(self, evaluation_point: Point2D, do_coadd_inverse_ap_corr: bool = True) -> None:
68 # Initialize frozen attributes.
69 self._evaluation_point = evaluation_point
70 self._do_coadd_inverse_ap_corr = do_coadd_inverse_ap_corr
72 # Initialize mutable attributes.
73 self._total_weight = 0.0
74 self._intermediate_ap_corr_map: dict[str, float] = {}
75 self._ap_corr_names: Iterable[str] = ()
76 # An iterable of algorithm names that have aperture correction values.
77 # This is set when the first time the add method is called on any
78 # instance.
80 def _setup_ap_corr_names(self, ap_corr_map: ApCorrMap) -> None:
81 """Set up the aperture correction name set.
83 Parameters
84 ----------
85 ap_corr_map : `~lsst.meas.base.ApCorrMap`
86 The aperture correction map to add.
88 Raises
89 ------
90 RuntimeError
91 Raised if the keys in `ap_corr_map` do not end in "_instFlux" or
92 "_instFluxErr".
93 """
94 ap_corr_name_set = set()
95 for field_name in ap_corr_map:
96 algorithm_name, suffix = field_name.split("_instFlux")
97 if suffix not in ("", "Err"):
98 raise RuntimeError(f"Invalid field name {field_name} in aperture correction map.")
100 ap_corr_name_set.add(algorithm_name)
102 self._ap_corr_names = tuple(sorted(ap_corr_name_set))
104 @property
105 def evaluation_point(self) -> Point2D:
106 """The point at which the aperture correction is evaluated."""
107 return self._evaluation_point
109 @property
110 def do_coadd_inverse_ap_corr(self) -> bool:
111 """If True, the inverse aperture correction is applied to the coadd."""
112 return self._do_coadd_inverse_ap_corr
114 @property
115 def ap_corr_names(self) -> Iterable[str]:
116 """Iterable of algorithm names that have aperture correction values."""
117 return self._ap_corr_names
119 @property
120 def total_weight(self) -> float:
121 """The total weight of the aperture correction map."""
122 return self._total_weight
124 def add(self, ap_corr_map: ApCorrMap, weight: float) -> None:
125 """Add an aperture correction map to the coadd.
127 Parameters
128 ----------
129 ap_corr_map : `~lsst.meas.base.ApCorrMap`
130 The aperture correction map to add.
131 weight : `float`
132 The weight to apply to the aperture correction map.
134 Raises
135 ------
136 RuntimeError
137 Raised if the keys in `ap_corr_map` do not end in "_instFlux" or
138 "_instFluxErr".
139 ValueError
140 Raised if the aperture correction value or its error is missing.
141 """
142 if not self.ap_corr_names:
143 # Lazily initialize the aperture correction name set.
144 self._setup_ap_corr_names(ap_corr_map)
146 if not self._intermediate_ap_corr_map:
147 self._intermediate_ap_corr_map = dict.fromkeys(
148 [f"{algorithm_name}_instFlux" for algorithm_name in self.ap_corr_names]
149 + [f"{algorithm_name}_instFluxErr" for algorithm_name in self.ap_corr_names],
150 0.0,
151 )
153 # Accumulate the aperture correction values in a temporary dict.
154 # This is so that if we error out in the middle, we don't leave the
155 # aperture correction map in an inconsistent state.
156 temp_ap_corr_map = dict.fromkeys(self._intermediate_ap_corr_map, 0.0)
158 for algorithm_name in self.ap_corr_names:
159 # Accumulate the aperture correction values.
160 ap_corr_field: BoundedField | None
161 if (ap_corr_field := ap_corr_map.get(f"{algorithm_name}_instFlux")) is None:
162 ap_corr_value = np.nan
163 else:
164 ap_corr_value = ap_corr_field.evaluate(self.evaluation_point)
166 # Calculate the term to accumulate depending on the boolean.
167 if self.do_coadd_inverse_ap_corr:
168 if ap_corr_value == 0:
169 raise ValueError("This should not have happened. ap_corr_value is zero.")
170 else:
171 term = weight / ap_corr_value
172 else:
173 term = weight * ap_corr_value
175 temp_ap_corr_map[f"{algorithm_name}_instFlux"] = term
177 # Accumulate the aperture correction error values.
178 ap_corr_err_field: BoundedField | None
179 if (ap_corr_err_field := ap_corr_map.get(f"{algorithm_name}_instFluxErr")) is None:
180 ap_corr_err_value = np.nan
181 else:
182 ap_corr_err_value = ap_corr_err_field.evaluate(self.evaluation_point)
184 # Calculate the term to accumulate depending on the boolean.
185 if self.do_coadd_inverse_ap_corr:
186 term = (weight * ap_corr_err_value) ** 2 / ap_corr_value**4
187 else:
188 term = (weight * ap_corr_err_value) ** 2
190 temp_ap_corr_map[f"{algorithm_name}_instFluxErr"] = term
192 # Update the intermediate aperture correction map.
193 for key in self._intermediate_ap_corr_map:
194 self._intermediate_ap_corr_map[key] += temp_ap_corr_map[key]
196 # Add the weight to the total weight.
197 self._total_weight += weight
199 @property
200 def final_ap_corr_map(self) -> SingleCellCoaddApCorrMap:
201 """Final coadded aperture correction map.
203 This should be called after all aperture correction maps have been
204 added.
206 Raises
207 ------
208 RuntimeError
209 Raised if the total weight is zero.
210 """
211 if self.total_weight == 0 or not self.ap_corr_names:
212 raise RuntimeError("Cannot get an empty aperture correction map.")
214 final_ap_corr_map = dict.fromkeys(self._intermediate_ap_corr_map, 0.0)
216 # The transformation equation is different for the aperture correction
217 # values and their uncertainties and it also depends on whether we
218 # accumulate the aperture corrections or their inverse.
220 if self.do_coadd_inverse_ap_corr:
221 for algorithm_name in self.ap_corr_names:
222 if (
223 inverse_ap_corr_value := self._intermediate_ap_corr_map[f"{algorithm_name}_instFlux"]
224 ) > 0:
225 final_ap_corr_map[f"{algorithm_name}_instFlux"] = (
226 self.total_weight / inverse_ap_corr_value
227 )
228 final_ap_corr_map[f"{algorithm_name}_instFluxErr"] = (
229 final_ap_corr_map[f"{algorithm_name}_instFlux"] ** 2
230 * np.sqrt(self._intermediate_ap_corr_map[f"{algorithm_name}_instFluxErr"])
231 / self.total_weight
232 )
233 else:
234 for algorithm_name in self.ap_corr_names:
235 final_ap_corr_map[f"{algorithm_name}_instFlux"] = (
236 self._intermediate_ap_corr_map[f"{algorithm_name}_instFlux"] / self.total_weight
237 )
238 final_ap_corr_map[f"{algorithm_name}_instFluxErr"] = (
239 np.sqrt(self._intermediate_ap_corr_map[f"{algorithm_name}_instFluxErr"])
240 / self.total_weight
241 )
243 # Return the finalized (immutable) aperture correction map.
244 return frozendict(final_ap_corr_map)