Coverage for python / lsst / meas / base / compensatedGaussian / _compensatedTophat.py: 23%
90 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-22 09:01 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-22 09:01 +0000
1# This file is part of meas_base.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
22from __future__ import annotations
24__all__ = (
25 "SingleFrameCompensatedTophatFluxConfig",
26 "SingleFrameCompensatedTophatFluxPlugin",
27)
29import numpy as np
30import math
32from lsst.pex.config import RangeField, ListField
33from lsst.geom import Point2I
34import lsst.afw.geom
36from ..sfm import SingleFramePlugin, SingleFramePluginConfig
37from ..pluginRegistry import register
39from .._measBaseLib import ApertureFluxAlgorithm, FlagHandler, FlagDefinitionList
42class SingleFrameCompensatedTophatFluxConfig(SingleFramePluginConfig):
43 apertures = ListField(
44 doc="The aperture radii (in pixels) to measure the top-hats.",
45 dtype=int,
46 minLength=1,
47 default=[12, ],
48 )
49 inner_scale = RangeField(
50 doc="Inner background annulus scale (relative to aperture).",
51 dtype=float,
52 default=1.0,
53 min=1.0,
54 )
55 outer_scale = RangeField(
56 doc="Outer background annulus scale (relative to aperture).",
57 dtype=float,
58 default=1.7,
59 min=1.0,
60 )
62 def validate(self):
63 super().validate()
65 if not (self.outer_scale > self.inner_scale):
66 raise ValueError("The outer_scale must be greater than the inner_scale")
69@register("base_CompensatedTophatFlux")
70class SingleFrameCompensatedTophatFluxPlugin(SingleFramePlugin):
71 ConfigClass = SingleFrameCompensatedTophatFluxConfig
73 @classmethod
74 def getExecutionOrder(cls):
75 return cls.FLUX_ORDER
77 def __init__(
78 self,
79 config: SingleFrameCompensatedTophatFluxConfig,
80 name: str,
81 schema,
82 metadata,
83 logName=None,
84 **kwds,
85 ):
86 super().__init__(config, name, schema, metadata, logName, **kwds)
88 flagDefs = FlagDefinitionList()
90 self.aperture_keys = {}
91 self._rads = {}
92 self._inner_scale = config.inner_scale
93 self._outer_scale = config.outer_scale
94 for aperture in config.apertures:
95 base_key = f"{name}_{aperture}"
97 # flux
98 flux_str = f"{base_key}_instFlux"
99 flux_key = schema.addField(
100 flux_str,
101 type="D",
102 doc="Compensated Tophat flux measurement.",
103 units="count",
104 )
106 # flux error
107 err_str = f"{base_key}_instFluxErr"
108 err_key = schema.addField(
109 err_str,
110 type="D",
111 doc="Compensated Tophat flux error.",
112 units="count",
113 )
115 # mask bits
116 mask_str = f"{base_key}_mask_bits"
117 mask_key = schema.addField(mask_str, type=np.int32, doc="Mask bits set within aperture.")
119 # failure flags
120 failure_flag = flagDefs.add(f"{aperture}_flag", "Compensated Tophat measurement failed")
121 oob_flag = flagDefs.add(f"{aperture}_flag_bounds", "Compensated Tophat out-of-bounds")
123 self.aperture_keys[aperture] = (flux_key, err_key, mask_key, failure_flag, oob_flag)
124 self._rads[aperture] = int(math.ceil(self._outer_scale*aperture))
126 self.flagHandler = FlagHandler.addFields(schema, name, flagDefs)
127 self._max_rad = max(self._rads)
129 def fail(self, measRecord, error=None):
130 if error is None:
131 self.flagHandler.handleFailure(measRecord)
132 else:
133 self.flagHandler.handleFailure(measRecord, error.cpp)
135 def measure(self, measRecord, exposure):
136 center = measRecord.getCentroid()
137 bbox = exposure.getBBox()
139 y = center.getY() - bbox.beginY
140 x = center.getX() - bbox.beginX
142 y_floor = math.floor(y)
143 x_floor = math.floor(x)
145 ctrl = ApertureFluxAlgorithm.Control()
147 for aperture, (flux_key, err_key, mask_key, failure_flag, oob_flag) in self.aperture_keys.items():
148 rad = self._rads[aperture]
150 # This will fail if even a single pixel is outside the bounding
151 # box.
152 if Point2I(center) not in exposure.getBBox().erodedBy(rad):
153 self.flagHandler.setValue(measRecord, failure_flag.number, True)
154 self.flagHandler.setValue(measRecord, oob_flag.number, True)
155 continue
157 # We confirmed that the bounding box is sufficient to hold these
158 # slices, so no additional range checking is needed.
159 y_slice = slice(y_floor - rad, y_floor + rad + 1, 1)
160 x_slice = slice(x_floor - rad, x_floor + rad + 1, 1)
162 # We will need the mask below, we can use this to test bounds as
163 # well.
164 sub_mask = exposure.mask.array[y_slice, x_slice]
166 if sub_mask.size == 0 or sub_mask.shape[0] != sub_mask.shape[1] or (sub_mask.shape[0] % 2) == 0:
167 self.flagHandler.setValue(measRecord, failure_flag.number, True)
168 self.flagHandler.setValue(measRecord, oob_flag.number, True)
169 continue
171 # Compute three aperture fluxes.
172 ellipse = lsst.afw.geom.Ellipse(lsst.afw.geom.ellipses.Axes(float(aperture),
173 float(aperture), 0.0),
174 center)
175 tophat = ApertureFluxAlgorithm.computeFlux(exposure.maskedImage, ellipse, ctrl)
176 ellipse.grow((self._inner_scale - 1.0)*aperture)
177 inner = ApertureFluxAlgorithm.computeFlux(exposure.maskedImage, ellipse, ctrl)
178 ellipse.grow((self._outer_scale - self._inner_scale)*aperture)
179 outer = ApertureFluxAlgorithm.computeFlux(exposure.maskedImage, ellipse, ctrl)
181 # We have flux in 3 circular apertures, a_0, a_1, a_2 with
182 # associated variances \sigma_{a_0}^2, \sigma_{a_1}^2,
183 # \sigma_{a_2)^2.
184 # We transform these to annular fluxes:
185 # b_0 = a_0
186 # \sigma_{b_0}^2 = \sigma_{a_0}^2
187 # b_1 = a_1 - a_0
188 # \sigma_{b_1}^2 = \sigma_{a_1}^2 - \sigma_{a_0}^2
189 # b_2 = a_2 - a_1
190 # \sigma_{b_2}^2 = \sigma_{a_2}^2 - \sigma_{a_1}^2
191 # Generally, the flux is then a weighted combination:
192 # f = s_0*b_0 + s_1*b_1 + s_2*b_2
193 # \sigma_f^2 = s_0^2*\sigma_{b_0}^2 + s_1^2*\sigma_{b_1}^2
194 # + s_2^2*\sigma_{b_2}^2
195 # The inner aperture we use as-is, so s_0 = 1.0
196 # We do not need the middle annulus, so s_1 = 0.0
197 # The outer annulus is scaled by s_2 = -area_0 / (area_2 - area_1)
199 a_0 = tophat.instFlux
200 var_a_0 = tophat.instFluxErr*tophat.instFluxErr
201 a_1 = inner.instFlux
202 var_a_1 = inner.instFluxErr*inner.instFluxErr
203 a_2 = outer.instFlux
204 var_a_2 = outer.instFluxErr*outer.instFluxErr
206 b_2 = a_2 - a_1
207 var_b_2 = var_a_2 - var_a_1
208 s_2 = 1.0/(self._outer_scale**2. - self._inner_scale**2.)
210 flux = a_0 - s_2*b_2
211 err = np.sqrt(var_a_0 + s_2*s_2*var_b_2)
213 measRecord.set(flux_key, flux)
214 measRecord.set(err_key, err)
215 measRecord.set(mask_key, np.bitwise_or.reduce(sub_mask, axis=None))