Coverage for python / lsst / cp / pipe / cpLinearityNormalize.py: 15%
114 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-07 08:52 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-07 08:52 +0000
1# This file is part of cp_pipe.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
21#
22import esutil
23import numpy as np
24from astropy.table import Table
25import warnings
27import lsst.pipe.base as pipeBase
28import lsst.pex.config as pexConfig
30__all__ = [
31 "LinearityNormalizeConfig",
32 "LinearityNormalizeTask",
33]
36class LinearityNormalizeConnnections(
37 pipeBase.PipelineTaskConnections,
38 dimensions=("instrument",),
39):
40 inputPtcHandles = pipeBase.connectionTypes.Input(
41 name="linearizerPtc",
42 doc="Input covariances.",
43 storageClass="PhotonTransferCurveDataset",
44 dimensions=("instrument", "detector"),
45 isCalibration=True,
46 multiple=True,
47 deferLoad=True,
48 )
50 inputLinearizerHandles = pipeBase.connectionTypes.Input(
51 name="linearizerUnnormalized",
52 doc="Unnormalized linearizers.",
53 storageClass="Linearizer",
54 dimensions=("instrument", "detector"),
55 isCalibration=True,
56 multiple=True,
57 deferLoad=True,
58 )
60 camera = pipeBase.connectionTypes.PrerequisiteInput(
61 name="camera",
62 doc="Camera the input data comes from.",
63 storageClass="Camera",
64 dimensions=("instrument",),
65 isCalibration=True,
66 )
68 outputNormalization = pipeBase.connectionTypes.Output(
69 name="cpLinearizerNormalization",
70 doc="Normalization table for PTC.",
71 storageClass="ArrowAstropy",
72 dimensions=("instrument",),
73 isCalibration=True,
74 )
76 def adjustQuantum(self, inputs, outputs, label, dataId):
77 ptcRefs = []
78 foundRefDetector = False
79 for ref in inputs["inputPtcHandles"][1]:
80 if ref.dataId["detector"] in self.config.normalizeDetectors:
81 ptcRefs.append(ref)
82 if ref.dataId["detector"] == self.config.referenceDetector:
83 foundRefDetector = True
85 if len(ptcRefs) == 0:
86 raise pipeBase.NoWorkFound("No input PTCs match the normalization detectors.")
87 if not foundRefDetector:
88 raise pipeBase.NoWorkFound(
89 "LinearityNormalize reference detector not in list of PTC inputs.",
90 )
92 linearizerRefs = []
93 foundRefDetector = False
94 for ref in inputs["inputLinearizerHandles"][1]:
95 if ref.dataId["detector"] in self.config.normalizeDetectors:
96 linearizerRefs.append(ref)
97 if ref.dataId["detector"] == self.config.referenceDetector:
98 foundRefDetector = True
100 if len(linearizerRefs) == 0:
101 raise pipeBase.NoWorkFound("No input linearizers match the normalization detectors.")
102 if not foundRefDetector:
103 raise pipeBase.NoWorkFound(
104 "LinearityNormalize reference detector not in list of linearizer inputs.",
105 )
107 inputs["inputPtcHandles"] = (inputs["inputPtcHandles"][0], tuple(ptcRefs))
108 inputs["inputLinearizerHandles"] = (inputs["inputLinearizerHandles"][0], tuple(linearizerRefs))
110 return inputs, outputs
113class LinearityNormalizeConfig(
114 pipeBase.PipelineTaskConfig,
115 pipelineConnections=LinearityNormalizeConnnections,
116):
117 normalizeDetectors = pexConfig.ListField(
118 dtype=int,
119 doc="List of detector numbers to use for normalization.",
120 default=None,
121 optional=False,
122 )
123 referenceDetector = pexConfig.Field(
124 dtype=int,
125 doc="Detector to use as an overall reference for sorting/labeling "
126 "exposures. Must be in list of normalizeDetectors.",
127 default=None,
128 optional=False,
129 )
130 minValidFraction = pexConfig.Field(
131 dtype=float,
132 doc="Minimum fraction of normalization amplifiers that must have valid "
133 "residuals in order to be used to create a normalization value.",
134 default=0.5,
135 )
136 doNormalizeAbsoluteLinearizer = pexConfig.Field(
137 dtype=bool,
138 doc="Do the normalization with an ``absolute`` linearizer, which implies "
139 "both single-image (vs pairs) and only using the absolute reference "
140 "amplifier for normalization.",
141 default=True,
142 )
144 def validate(self):
145 super().validate()
146 if self.referenceDetector not in self.normalizeDetectors:
147 raise ValueError("The selected referenceDetector must be in the list of normalizeDetectors.")
150class LinearityNormalizeTask(pipeBase.PipelineTask):
151 """Class to use data to normalize linearity inputs."""
153 ConfigClass = LinearityNormalizeConfig
154 _DefaultName = "cpPtcNormalize"
156 def run(self, *, camera, inputPtcHandles, inputLinearizerHandles):
157 """Compute the focal-plane normalization.
159 Parameters
160 ----------
161 camera : `lsst.afw.cameraGeom.Camera`
162 Input camera.
163 inputPtcHandles : `list` [`lsst.daf.butler.DeferredDatasetHandle`]
164 Handles for input PTCs to do normalization.
165 inputLinearizerHandles :
166 `list` [`lsst.daf.butler.DeferredDatasetHandle`]
167 handles for input linearizers to do normalization.
169 Returns
170 -------
171 results : `lsst.pipe.base.Struct`
172 The output results structure contains:
174 ``outputNormalization``
175 Normalization table, `astropy.table.Table`.
176 """
177 ptcHandleDict = {handle.dataId["detector"]: handle for handle in inputPtcHandles}
178 linHandleDict = {handle.dataId["detector"]: handle for handle in inputLinearizerHandles}
180 ptcReferenceHandle = ptcHandleDict.get(self.config.referenceDetector, None)
181 linReferenceHandle = linHandleDict.get(self.config.referenceDetector, None)
183 if ptcReferenceHandle is None or linReferenceHandle is None:
184 raise RuntimeError("Reference detector not found in input PTCs or Linearizers.")
186 referencePtc = ptcReferenceHandle.get()
187 referenceLinearizer = linReferenceHandle.get()
189 # Get the exposure numbers and exposure times from the reference PTC.
190 # These will be matched for all amps.
191 # Note that all amps in a ptc must have the same input exposures.
193 if self.config.doNormalizeAbsoluteLinearizer:
194 nAmp = 1
195 refReferenceAmp = referenceLinearizer.absoluteReferenceAmplifier
196 ampNames = [""]
197 else:
198 nAmp = len(referencePtc.ampNames)
199 refReferenceAmp = referencePtc.ampNames[0]
200 ampNames = referencePtc.ampNames
202 if self.config.doNormalizeAbsoluteLinearizer:
203 exposures = np.asarray(referencePtc.inputExpIdPairs[refReferenceAmp]).ravel()
204 exptimes = np.repeat(referencePtc.rawExpTimes[refReferenceAmp], 2)
205 else:
206 exposures = np.asarray(referencePtc.inputExpIdPairs[refReferenceAmp])[:, 0]
207 exptimes = referencePtc.rawExpTimes[refReferenceAmp]
209 # Get the input normalization values from the reference linearizer.
210 # These will be matched for all amps.
211 inputNormalization = referenceLinearizer.inputNormalization[refReferenceAmp]
213 # The rawMeans and fitResiduals arrays will hold all the exposures
214 # and amps for the detectors that go into the global normalization.
215 rawMeans = np.zeros((len(self.config.normalizeDetectors), len(exposures), nAmp))
216 models = np.zeros_like(rawMeans)
217 fitResiduals = np.zeros_like(rawMeans)
218 rawMeans[:, :, :] = np.nan
219 models[:, :, :] = np.nan
220 fitResiduals[:, :, :] = np.nan
222 for i, detector in enumerate(self.config.normalizeDetectors):
223 ptcHandle = ptcHandleDict.get(detector, None)
224 linHandle = linHandleDict.get(detector, None)
226 if ptcHandle is None or linHandle is None:
227 self.log.warning(
228 f"Detector {detector} configured for normalization, but not found in inputs.",
229 )
231 ptc = ptcHandle.get()
232 lin = linHandle.get()
234 if self.config.doNormalizeAbsoluteLinearizer:
235 ptcExposures = np.asarray(ptc.inputExpIdPairs[lin.absoluteReferenceAmplifier]).ravel()
236 else:
237 ptcExposures = np.asarray(ptc.inputExpIdPairs[refReferenceAmp])[:, 0]
239 if len(ptcExposures) != len(exposures):
240 self.log.warning(
241 "PTC for detector %d has %d pairs, fewer than expected %d.",
242 ptcHandle.dataId["detector"],
243 len(ptcExposures),
244 len(exposures),
245 )
247 a, b = esutil.numpy_util.match(exposures, ptcExposures)
248 if len(a) == 0:
249 self.log.warning(
250 "PTC for detector %d has no exposure matches to the reference!",
251 ptcHandle.dataId["detector"],
252 )
253 continue
255 for j in range(len(ampNames)):
256 # If we are using the absolute linearizer, we need to use the
257 # specific reference amplifier for each detector.
258 if self.config.doNormalizeAbsoluteLinearizer:
259 ampName = lin.absoluteReferenceAmplifier
260 else:
261 ampName = ampNames[j]
263 rawMeans[i, a, j] = lin.inputOrdinate[ampName][b]
264 models[i, a, j] = lin.fitResidualsModel[ampName][b]
265 fitResiduals[i, a, j] = (
266 lin.fitResiduals[ampName][b] / lin.fitResidualsModel[ampName][b]
267 )
269 # Compute the median levels for the normalization detectors.
270 medianRawMeans = np.nanmedian(rawMeans, axis=[0, 2])
271 medianModel = np.nanmedian(models, axis=[0, 2])
272 with warnings.catch_warnings():
273 warnings.simplefilter("ignore")
274 medianResiduals = np.nanmedian(fitResiduals, axis=[0, 2])
275 nValid = np.sum(np.isfinite(fitResiduals), axis=0).sum(axis=1)
277 # Only use a normalization value for exposures that have a
278 # sufficient number of valid amplifiers
279 nNormalizeAmps = fitResiduals.shape[0] * fitResiduals.shape[2]
280 bad = ((nValid / nNormalizeAmps) < self.config.minValidFraction) | ~np.isfinite(medianResiduals)
281 medianResiduals[bad] = 0.0
283 # The residual is computed as:
284 # <r> = (y - m*pe)/y (1)
285 # where <r> is the median residual, y is the model linearized
286 # quantity, and m is the slope of the fit of the linearized
287 # counts vs the input photocharge or exposure time (pe).
288 #
289 # We are looking for a normalization factor k which when
290 # applied to the photocharge or exposure time (pe) gives
291 # zero residual, so we also have the constraint:
292 # 0 = (y - m*pe*k)/y (2)
293 #
294 # Substituting y = m*pe*k (2) into equation (1) and
295 # solving for k we find the slope cancels out and we get:
296 # k = 1 / (1 - <r>)
298 # The output normalization is applied on top of the input
299 # normalization (which may be all 1s for the first iteration).
300 outputNormalization = inputNormalization / (1.0 - medianResiduals)
302 # These are the per-exposure normalization values.
303 table = Table(
304 {
305 "exposure": exposures,
306 "exptime": exptimes,
307 "mean": medianRawMeans,
308 "model": medianModel,
309 "normalization": outputNormalization,
310 },
311 )
313 result = pipeBase.Struct(outputNormalization=table)
315 return result