Coverage for python / lsst / cp / pipe / cpLinearityNormalize.py: 15%

114 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-25 08:53 +0000

1# This file is part of cp_pipe. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21# 

22import esutil 

23import numpy as np 

24from astropy.table import Table 

25import warnings 

26 

27import lsst.pipe.base as pipeBase 

28import lsst.pex.config as pexConfig 

29 

30__all__ = [ 

31 "LinearityNormalizeConfig", 

32 "LinearityNormalizeTask", 

33] 

34 

35 

36class LinearityNormalizeConnnections( 

37 pipeBase.PipelineTaskConnections, 

38 dimensions=("instrument",), 

39): 

40 inputPtcHandles = pipeBase.connectionTypes.Input( 

41 name="linearizerPtc", 

42 doc="Input covariances.", 

43 storageClass="PhotonTransferCurveDataset", 

44 dimensions=("instrument", "detector"), 

45 isCalibration=True, 

46 multiple=True, 

47 deferLoad=True, 

48 ) 

49 

50 inputLinearizerHandles = pipeBase.connectionTypes.Input( 

51 name="linearizerUnnormalized", 

52 doc="Unnormalized linearizers.", 

53 storageClass="Linearizer", 

54 dimensions=("instrument", "detector"), 

55 isCalibration=True, 

56 multiple=True, 

57 deferLoad=True, 

58 ) 

59 

60 camera = pipeBase.connectionTypes.PrerequisiteInput( 

61 name="camera", 

62 doc="Camera the input data comes from.", 

63 storageClass="Camera", 

64 dimensions=("instrument",), 

65 isCalibration=True, 

66 ) 

67 

68 outputNormalization = pipeBase.connectionTypes.Output( 

69 name="cpLinearizerNormalization", 

70 doc="Normalization table for PTC.", 

71 storageClass="ArrowAstropy", 

72 dimensions=("instrument",), 

73 isCalibration=True, 

74 ) 

75 

76 def adjustQuantum(self, inputs, outputs, label, dataId): 

77 ptcRefs = [] 

78 foundRefDetector = False 

79 for ref in inputs["inputPtcHandles"][1]: 

80 if ref.dataId["detector"] in self.config.normalizeDetectors: 

81 ptcRefs.append(ref) 

82 if ref.dataId["detector"] == self.config.referenceDetector: 

83 foundRefDetector = True 

84 

85 if len(ptcRefs) == 0: 

86 raise pipeBase.NoWorkFound("No input PTCs match the normalization detectors.") 

87 if not foundRefDetector: 

88 raise pipeBase.NoWorkFound( 

89 "LinearityNormalize reference detector not in list of PTC inputs.", 

90 ) 

91 

92 linearizerRefs = [] 

93 foundRefDetector = False 

94 for ref in inputs["inputLinearizerHandles"][1]: 

95 if ref.dataId["detector"] in self.config.normalizeDetectors: 

96 linearizerRefs.append(ref) 

97 if ref.dataId["detector"] == self.config.referenceDetector: 

98 foundRefDetector = True 

99 

100 if len(linearizerRefs) == 0: 

101 raise pipeBase.NoWorkFound("No input linearizers match the normalization detectors.") 

102 if not foundRefDetector: 

103 raise pipeBase.NoWorkFound( 

104 "LinearityNormalize reference detector not in list of linearizer inputs.", 

105 ) 

106 

107 inputs["inputPtcHandles"] = (inputs["inputPtcHandles"][0], tuple(ptcRefs)) 

108 inputs["inputLinearizerHandles"] = (inputs["inputLinearizerHandles"][0], tuple(linearizerRefs)) 

109 

110 return inputs, outputs 

111 

112 

113class LinearityNormalizeConfig( 

114 pipeBase.PipelineTaskConfig, 

115 pipelineConnections=LinearityNormalizeConnnections, 

116): 

117 normalizeDetectors = pexConfig.ListField( 

118 dtype=int, 

119 doc="List of detector numbers to use for normalization.", 

120 default=None, 

121 optional=False, 

122 ) 

123 referenceDetector = pexConfig.Field( 

124 dtype=int, 

125 doc="Detector to use as an overall reference for sorting/labeling " 

126 "exposures. Must be in list of normalizeDetectors.", 

127 default=None, 

128 optional=False, 

129 ) 

130 minValidFraction = pexConfig.Field( 

131 dtype=float, 

132 doc="Minimum fraction of normalization amplifiers that must have valid " 

133 "residuals in order to be used to create a normalization value.", 

134 default=0.5, 

135 ) 

136 doNormalizeAbsoluteLinearizer = pexConfig.Field( 

137 dtype=bool, 

138 doc="Do the normalization with an ``absolute`` linearizer, which implies " 

139 "both single-image (vs pairs) and only using the absolute reference " 

140 "amplifier for normalization.", 

141 default=True, 

142 ) 

143 

144 def validate(self): 

145 super().validate() 

146 if self.referenceDetector not in self.normalizeDetectors: 

147 raise ValueError("The selected referenceDetector must be in the list of normalizeDetectors.") 

148 

149 

150class LinearityNormalizeTask(pipeBase.PipelineTask): 

151 """Class to use data to normalize linearity inputs.""" 

152 

153 ConfigClass = LinearityNormalizeConfig 

154 _DefaultName = "cpPtcNormalize" 

155 

156 def run(self, *, camera, inputPtcHandles, inputLinearizerHandles): 

157 """Compute the focal-plane normalization. 

158 

159 Parameters 

160 ---------- 

161 camera : `lsst.afw.cameraGeom.Camera` 

162 Input camera. 

163 inputPtcHandles : `list` [`lsst.daf.butler.DeferredDatasetHandle`] 

164 Handles for input PTCs to do normalization. 

165 inputLinearizerHandles : 

166 `list` [`lsst.daf.butler.DeferredDatasetHandle`] 

167 handles for input linearizers to do normalization. 

168 

169 Returns 

170 ------- 

171 results : `lsst.pipe.base.Struct` 

172 The output results structure contains: 

173 

174 ``outputNormalization`` 

175 Normalization table, `astropy.table.Table`. 

176 """ 

177 ptcHandleDict = {handle.dataId["detector"]: handle for handle in inputPtcHandles} 

178 linHandleDict = {handle.dataId["detector"]: handle for handle in inputLinearizerHandles} 

179 

180 ptcReferenceHandle = ptcHandleDict.get(self.config.referenceDetector, None) 

181 linReferenceHandle = linHandleDict.get(self.config.referenceDetector, None) 

182 

183 if ptcReferenceHandle is None or linReferenceHandle is None: 

184 raise RuntimeError("Reference detector not found in input PTCs or Linearizers.") 

185 

186 referencePtc = ptcReferenceHandle.get() 

187 referenceLinearizer = linReferenceHandle.get() 

188 

189 # Get the exposure numbers and exposure times from the reference PTC. 

190 # These will be matched for all amps. 

191 # Note that all amps in a ptc must have the same input exposures. 

192 

193 if self.config.doNormalizeAbsoluteLinearizer: 

194 nAmp = 1 

195 refReferenceAmp = referenceLinearizer.absoluteReferenceAmplifier 

196 ampNames = [""] 

197 else: 

198 nAmp = len(referencePtc.ampNames) 

199 refReferenceAmp = referencePtc.ampNames[0] 

200 ampNames = referencePtc.ampNames 

201 

202 if self.config.doNormalizeAbsoluteLinearizer: 

203 exposures = np.asarray(referencePtc.inputExpIdPairs[refReferenceAmp]).ravel() 

204 exptimes = np.repeat(referencePtc.rawExpTimes[refReferenceAmp], 2) 

205 else: 

206 exposures = np.asarray(referencePtc.inputExpIdPairs[refReferenceAmp])[:, 0] 

207 exptimes = referencePtc.rawExpTimes[refReferenceAmp] 

208 

209 # Get the input normalization values from the reference linearizer. 

210 # These will be matched for all amps. 

211 inputNormalization = referenceLinearizer.inputNormalization[refReferenceAmp] 

212 

213 # The rawMeans and fitResiduals arrays will hold all the exposures 

214 # and amps for the detectors that go into the global normalization. 

215 rawMeans = np.zeros((len(self.config.normalizeDetectors), len(exposures), nAmp)) 

216 models = np.zeros_like(rawMeans) 

217 fitResiduals = np.zeros_like(rawMeans) 

218 rawMeans[:, :, :] = np.nan 

219 models[:, :, :] = np.nan 

220 fitResiduals[:, :, :] = np.nan 

221 

222 for i, detector in enumerate(self.config.normalizeDetectors): 

223 ptcHandle = ptcHandleDict.get(detector, None) 

224 linHandle = linHandleDict.get(detector, None) 

225 

226 if ptcHandle is None or linHandle is None: 

227 self.log.warning( 

228 f"Detector {detector} configured for normalization, but not found in inputs.", 

229 ) 

230 

231 ptc = ptcHandle.get() 

232 lin = linHandle.get() 

233 

234 if self.config.doNormalizeAbsoluteLinearizer: 

235 ptcExposures = np.asarray(ptc.inputExpIdPairs[lin.absoluteReferenceAmplifier]).ravel() 

236 else: 

237 ptcExposures = np.asarray(ptc.inputExpIdPairs[refReferenceAmp])[:, 0] 

238 

239 if len(ptcExposures) != len(exposures): 

240 self.log.warning( 

241 "PTC for detector %d has %d pairs, fewer than expected %d.", 

242 ptcHandle.dataId["detector"], 

243 len(ptcExposures), 

244 len(exposures), 

245 ) 

246 

247 a, b = esutil.numpy_util.match(exposures, ptcExposures) 

248 if len(a) == 0: 

249 self.log.warning( 

250 "PTC for detector %d has no exposure matches to the reference!", 

251 ptcHandle.dataId["detector"], 

252 ) 

253 continue 

254 

255 for j in range(len(ampNames)): 

256 # If we are using the absolute linearizer, we need to use the 

257 # specific reference amplifier for each detector. 

258 if self.config.doNormalizeAbsoluteLinearizer: 

259 ampName = lin.absoluteReferenceAmplifier 

260 else: 

261 ampName = ampNames[j] 

262 

263 rawMeans[i, a, j] = lin.inputOrdinate[ampName][b] 

264 models[i, a, j] = lin.fitResidualsModel[ampName][b] 

265 fitResiduals[i, a, j] = ( 

266 lin.fitResiduals[ampName][b] / lin.fitResidualsModel[ampName][b] 

267 ) 

268 

269 # Compute the median levels for the normalization detectors. 

270 medianRawMeans = np.nanmedian(rawMeans, axis=[0, 2]) 

271 medianModel = np.nanmedian(models, axis=[0, 2]) 

272 with warnings.catch_warnings(): 

273 warnings.simplefilter("ignore") 

274 medianResiduals = np.nanmedian(fitResiduals, axis=[0, 2]) 

275 nValid = np.sum(np.isfinite(fitResiduals), axis=0).sum(axis=1) 

276 

277 # Only use a normalization value for exposures that have a 

278 # sufficient number of valid amplifiers 

279 nNormalizeAmps = fitResiduals.shape[0] * fitResiduals.shape[2] 

280 bad = ((nValid / nNormalizeAmps) < self.config.minValidFraction) | ~np.isfinite(medianResiduals) 

281 medianResiduals[bad] = 0.0 

282 

283 # The residual is computed as: 

284 # <r> = (y - m*pe)/y (1) 

285 # where <r> is the median residual, y is the model linearized 

286 # quantity, and m is the slope of the fit of the linearized 

287 # counts vs the input photocharge or exposure time (pe). 

288 # 

289 # We are looking for a normalization factor k which when 

290 # applied to the photocharge or exposure time (pe) gives 

291 # zero residual, so we also have the constraint: 

292 # 0 = (y - m*pe*k)/y (2) 

293 # 

294 # Substituting y = m*pe*k (2) into equation (1) and 

295 # solving for k we find the slope cancels out and we get: 

296 # k = 1 / (1 - <r>) 

297 

298 # The output normalization is applied on top of the input 

299 # normalization (which may be all 1s for the first iteration). 

300 outputNormalization = inputNormalization / (1.0 - medianResiduals) 

301 

302 # These are the per-exposure normalization values. 

303 table = Table( 

304 { 

305 "exposure": exposures, 

306 "exptime": exptimes, 

307 "mean": medianRawMeans, 

308 "model": medianModel, 

309 "normalization": outputNormalization, 

310 }, 

311 ) 

312 

313 result = pipeBase.Struct(outputNormalization=table) 

314 

315 return result