Coverage for python / lsst / cp / pipe / cpFlatMeasure.py: 18%

172 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-25 08:53 +0000

1# This file is part of cp_pipe. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <http://www.gnu.org/licenses/>. 

21import numpy as np 

22from collections import defaultdict 

23 

24import lsst.afw.math as afwMath 

25import lsst.daf.base as dafBase 

26import lsst.pex.config as pexConfig 

27import lsst.pipe.base as pipeBase 

28import lsst.pipe.base.connectionTypes as cT 

29from lsst.ip.isr.vignette import maskVignettedRegion 

30from lsst.cp.pipe.utils import ddict2dict 

31 

32__all__ = ["CpFlatMeasureTask", "CpFlatMeasureTaskConfig", 

33 "CpFlatNormalizationTask", "CpFlatNormalizationTaskConfig"] 

34 

35 

36class CpFlatMeasureConnections(pipeBase.PipelineTaskConnections, 

37 dimensions=("instrument", "exposure", "detector")): 

38 inputExp = cT.Input( 

39 name="postISRCCD", 

40 doc="Input exposure to measure statistics from.", 

41 storageClass="Exposure", 

42 dimensions=("instrument", "exposure", "detector"), 

43 ) 

44 outputStats = cT.Output( 

45 name="flatStats", 

46 doc="Output statistics to write.", 

47 storageClass="PropertyList", 

48 dimensions=("instrument", "exposure", "detector"), 

49 ) 

50 

51 

52class CpFlatMeasureTaskConfig(pipeBase.PipelineTaskConfig, 

53 pipelineConnections=CpFlatMeasureConnections): 

54 maskNameList = pexConfig.ListField( 

55 dtype=str, 

56 doc="Mask list to exclude from statistics calculations.", 

57 default=['DETECTED', 'BAD', 'NO_DATA'], 

58 ) 

59 doVignette = pexConfig.Field( 

60 dtype=bool, 

61 doc="Mask vignetted regions?", 

62 default=True, 

63 ) 

64 numSigmaClip = pexConfig.Field( 

65 dtype=float, 

66 doc="Rejection threshold (sigma) for statistics clipping.", 

67 default=3.0, 

68 ) 

69 clipMaxIter = pexConfig.Field( 

70 dtype=int, 

71 doc="Max number of clipping iterations to apply.", 

72 default=3, 

73 ) 

74 

75 

76class CpFlatMeasureTask(pipeBase.PipelineTask): 

77 """Apply extra masking and measure image statistics. 

78 """ 

79 

80 ConfigClass = CpFlatMeasureTaskConfig 

81 _DefaultName = "cpFlatMeasure" 

82 

83 def run(self, inputExp): 

84 """Mask ISR processed FLAT exposures to ensure consistent statistics. 

85 

86 Parameters 

87 ---------- 

88 inputExp : `lsst.afw.image.Exposure` 

89 Post-ISR processed exposure to measure. 

90 

91 Returns 

92 ------- 

93 results : `lsst.pipe.base.Struct` 

94 The results struct containing: 

95 

96 ``outputStats`` 

97 List containing the statistics (`lsst.daf.base.PropertyList`). 

98 """ 

99 if self.config.doVignette: 

100 polygon = inputExp.getInfo().getValidPolygon() 

101 maskVignettedRegion(inputExp, polygon, vignetteValue=None, log=self.log) 

102 mask = inputExp.getMask() 

103 maskVal = mask.getPlaneBitMask(self.config.maskNameList) 

104 statsControl = afwMath.StatisticsControl(self.config.numSigmaClip, 

105 self.config.clipMaxIter, 

106 maskVal) 

107 statsControl.setAndMask(maskVal) 

108 

109 outputStats = dafBase.PropertyList() 

110 

111 # Detector level: 

112 stats = afwMath.makeStatistics(inputExp.getMaskedImage(), 

113 afwMath.MEANCLIP | afwMath.STDEVCLIP | afwMath.NPOINT, 

114 statsControl) 

115 outputStats['DETECTOR_MEDIAN'] = stats.getValue(afwMath.MEANCLIP) 

116 outputStats['DETECTOR_SIGMA'] = stats.getValue(afwMath.STDEVCLIP) 

117 outputStats['DETECTOR_N'] = stats.getValue(afwMath.NPOINT) 

118 self.log.info("Stats: median=%f sigma=%f n=%d", 

119 outputStats['DETECTOR_MEDIAN'], 

120 outputStats['DETECTOR_SIGMA'], 

121 outputStats['DETECTOR_N']) 

122 

123 # AMP LEVEL: 

124 for ampIdx, amp in enumerate(inputExp.getDetector()): 

125 ampName = amp.getName() 

126 ampExp = inputExp.Factory(inputExp, amp.getBBox()) 

127 stats = afwMath.makeStatistics(ampExp.getMaskedImage(), 

128 afwMath.MEANCLIP | afwMath.STDEVCLIP | afwMath.NPOINT, 

129 statsControl) 

130 outputStats[f'AMP_NAME_{ampIdx}'] = ampName 

131 outputStats[f'AMP_MEDIAN_{ampIdx}'] = stats.getValue(afwMath.MEANCLIP) 

132 outputStats[f'AMP_SIGMA_{ampIdx}'] = stats.getValue(afwMath.STDEVCLIP) 

133 outputStats[f'AMP_N_{ampIdx}'] = stats.getValue(afwMath.NPOINT) 

134 

135 return pipeBase.Struct( 

136 outputStats=outputStats 

137 ) 

138 

139 

140class CpFlatNormalizationConnections(pipeBase.PipelineTaskConnections, 

141 dimensions=("instrument", "physical_filter")): 

142 inputMDs = cT.Input( 

143 name="cpFlatProc_metadata", 

144 doc="Input metadata for each visit/detector in input set.", 

145 storageClass="PropertyList", 

146 dimensions=("instrument", "physical_filter", "detector", "exposure"), 

147 multiple=True, 

148 ) 

149 camera = cT.PrerequisiteInput( 

150 name="camera", 

151 doc="Input camera to use for gain lookup.", 

152 storageClass="Camera", 

153 dimensions=("instrument",), 

154 isCalibration=True, 

155 ) 

156 

157 outputScales = cT.Output( 

158 name="cpFlatNormScales", 

159 doc="Output combined proposed calibration.", 

160 storageClass="StructuredDataDict", 

161 dimensions=("instrument", "physical_filter"), 

162 ) 

163 

164 def adjustQuantum(self, inputs, outputs, label, dataId): 

165 if self.config.doDownSelection: 

166 mdRefs = [] 

167 for ref in inputs["inputMDs"][1]: 

168 value = getattr(ref.dataId.exposure, self.config.downSelectionField).lower() 

169 if value == self.config.downSelectionValue.lower(): 

170 mdRefs.append(ref) 

171 

172 if len(mdRefs) == 0: 

173 raise pipeBase.NoWorkFound("No input exposures match the required down selection value.") 

174 inputs["inputMDs"] = (inputs["inputMDs"][0], tuple(mdRefs)) 

175 

176 return inputs, outputs 

177 

178 

179class CpFlatNormalizationTaskConfig(pipeBase.PipelineTaskConfig, 

180 pipelineConnections=CpFlatNormalizationConnections): 

181 level = pexConfig.ChoiceField( 

182 dtype=str, 

183 doc="Which level to apply normalizations.", 

184 default='DETECTOR', 

185 allowed={ 

186 'DETECTOR': "Correct using full detector statistics.", 

187 'AMP': "Correct using individual amplifiers.", 

188 }, 

189 ) 

190 scaleMaxIter = pexConfig.Field( 

191 dtype=int, 

192 doc="Max number of iterations to use in scale solver.", 

193 default=10, 

194 ) 

195 doDownSelection = pexConfig.Field( 

196 dtype=bool, 

197 doc="Down-select inputs based on a header keyword?", 

198 default=False, 

199 ) 

200 downSelectionField = pexConfig.ChoiceField( 

201 dtype=str, 

202 doc="Which exposure record field to use for down-selection. Only used " 

203 "if ``doDownSelection`` is True.", 

204 default="observation_reason", 

205 allowed={ 

206 "observation_reason": "Select on exposure.observation_reason.", 

207 "observation_type": "Select on exposure.observation_type.", 

208 "science_program": "Select on exposure.science_program.", 

209 "group": "Select on exposure.group", 

210 }, 

211 ) 

212 downSelectionValue = pexConfig.Field( 

213 dtype=str, 

214 doc="Exposure record value (corresponding to ``downSelectionField``) to match " 

215 "for down-selection. Only used if ``doDownSelection`` is True, " 

216 "in which case this must be set.", 

217 optional=True, 

218 default=None, 

219 ) 

220 

221 def validate(self): 

222 super().validate() 

223 

224 if self.doDownSelection: 

225 if self.downSelectionValue is None or self.downSelectionValue == "": 

226 msg = "downSelectionValue must be overridden if doDownSelection is True." 

227 raise pexConfig.FieldValidationError( 

228 CpFlatNormalizationTaskConfig.downSelectionValue, 

229 self, 

230 msg, 

231 ) 

232 

233 

234class CpFlatNormalizationTask(pipeBase.PipelineTask): 

235 """Rescale merged flat frames to remove unequal screen illumination. 

236 """ 

237 

238 ConfigClass = CpFlatNormalizationTaskConfig 

239 _DefaultName = "cpFlatNorm" 

240 

241 def runQuantum(self, butlerQC, inputRefs, outputRefs): 

242 inputs = butlerQC.get(inputRefs) 

243 

244 # Use the dimensions of the inputs for generating 

245 # output scales. 

246 dimensions = [dict(ref.dataId.required) for ref in inputRefs.inputMDs] 

247 inputs['inputDims'] = dimensions 

248 

249 outputs = self.run(**inputs) 

250 butlerQC.put(outputs, outputRefs) 

251 

252 def run(self, inputMDs, inputDims, camera): 

253 """Normalize FLAT exposures to a consistent level. 

254 

255 Parameters 

256 ---------- 

257 inputMDs : `list` [`lsst.daf.base.PropertyList`] 

258 Amplifier-level metadata used to construct scales. 

259 inputDims : `list` [`dict`] 

260 List of dictionaries of input data dimensions/values. 

261 Each list entry should contain: 

262 

263 ``"exposure"`` 

264 exposure id value (`int`) 

265 ``"detector"`` 

266 detector id value (`int`) 

267 

268 Returns 

269 ------- 

270 results : `lsst.pipe.base.Struct` 

271 The results struct containing: 

272 

273 ``outputScales`` 

274 Dictionary of scales, indexed by detector (`int`), 

275 amplifier (`int`), and exposure (`int`) (`dict` 

276 [`dict` [`dict` [`float`]]]). 

277 

278 Raises 

279 ------ 

280 KeyError 

281 Raised if the input dimensions do not contain detector and 

282 exposure, or if the metadata does not contain the expected 

283 statistic entry. 

284 """ 

285 expSet = sorted(set([d['exposure'] for d in inputDims])) 

286 detSet = sorted(set([d['detector'] for d in inputDims])) 

287 

288 expMap = {exposureId: idx for idx, exposureId in enumerate(expSet)} 

289 detMap = {detectorId: idx for idx, detectorId in enumerate(detSet)} 

290 

291 nExp = len(expSet) 

292 nDet = len(detSet) 

293 if self.config.level == 'DETECTOR': 

294 bgMatrix = np.zeros((nDet, nExp)) 

295 bgCounts = np.ones((nDet, nExp)) 

296 elif self.config.level == 'AMP': 

297 nAmp = len(camera[detSet[0]]) 

298 bgMatrix = np.zeros((nDet * nAmp, nExp)) 

299 bgCounts = np.ones((nDet * nAmp, nExp)) 

300 

301 for inMetadata, inDimensions in zip(inputMDs, inputDims): 

302 try: 

303 exposureId = inDimensions['exposure'] 

304 detectorId = inDimensions['detector'] 

305 except Exception as e: 

306 raise KeyError(f"Cannot find expected dimensions in {inDimensions}") from e 

307 

308 if self.config.level == 'DETECTOR': 

309 detIdx = detMap[detectorId] 

310 expIdx = expMap[exposureId] 

311 try: 

312 value = inMetadata.get('DETECTOR_MEDIAN') 

313 count = inMetadata.get('DETECTOR_N') 

314 except Exception as e: 

315 raise KeyError("Cannot read expected metadata string.") from e 

316 

317 if np.isfinite(value): 

318 bgMatrix[detIdx][expIdx] = value 

319 bgCounts[detIdx][expIdx] = count 

320 else: 

321 bgMatrix[detIdx][expIdx] = np.nan 

322 bgCounts[detIdx][expIdx] = 1 

323 

324 elif self.config.level == 'AMP': 

325 detector = camera[detectorId] 

326 nAmp = len(detector) 

327 

328 detIdx = detMap[detectorId] * nAmp 

329 expIdx = expMap[exposureId] 

330 

331 for ampIdx, amp in enumerate(detector): 

332 try: 

333 value = inMetadata.get(f'AMP_MEDIAN_{ampIdx}') 

334 count = inMetadata.get(f'AMP_N_{ampIdx}') 

335 except Exception as e: 

336 raise KeyError("cannot read expected metadata string.") from e 

337 

338 detAmpIdx = detIdx + ampIdx 

339 if np.isfinite(value): 

340 bgMatrix[detAmpIdx][expIdx] = value 

341 bgCounts[detAmpIdx][expIdx] = count 

342 else: 

343 bgMatrix[detAmpIdx][expIdx] = np.nan 

344 bgMatrix[detAmpIdx][expIdx] = 1 

345 

346 scaleResult = self.measureScales(bgMatrix, bgCounts, iterations=self.config.scaleMaxIter) 

347 expScales = scaleResult.expScales 

348 detScales = scaleResult.detScales 

349 

350 outputScales = defaultdict(lambda: defaultdict(lambda: defaultdict(lambda: defaultdict(float)))) 

351 

352 # Note that the enumerated "detId"/"expId" here index the 

353 # "detScales" and "expScales" arrays. 

354 if self.config.level == 'DETECTOR': 

355 for detIdx, det in enumerate(detSet): 

356 for amp in camera[det]: 

357 for expIdx, exp in enumerate(expSet): 

358 outputScales['expScale'][det][amp.getName()][exp] = expScales[expIdx].tolist() 

359 outputScales['detScale'][det] = detScales[detIdx].tolist() 

360 elif self.config.level == 'AMP': 

361 for detIdx, det in enumerate(detSet): 

362 for ampIdx, amp in enumerate(camera[det]): 

363 for expIdx, exp in enumerate(expSet): 

364 outputScales['expScale'][det][amp.getName()][exp] = expScales[expIdx].tolist() 

365 detAmpIdx = detIdx + ampIdx 

366 outputScales['detScale'][det][amp.getName()] = detScales[detAmpIdx].tolist() 

367 

368 return pipeBase.Struct( 

369 outputScales=ddict2dict(outputScales), 

370 ) 

371 

372 def measureScales(self, bgMatrix, bgCounts=None, iterations=10): 

373 """Convert backgrounds to exposure and detector components. 

374 

375 Parameters 

376 ---------- 

377 bgMatrix : `np.ndarray`, (nDetectors, nExposures) 

378 Input backgrounds indexed by exposure (axis=0) and 

379 detector (axis=1). 

380 bgCounts : `np.ndarray`, (nDetectors, nExposures), optional 

381 Input pixel counts used to in measuring bgMatrix, indexed 

382 identically. 

383 iterations : `int`, optional 

384 Number of iterations to use in decomposition. 

385 

386 Returns 

387 ------- 

388 scaleResult : `lsst.pipe.base.Struct` 

389 Result struct containing fields: 

390 

391 ``vectorE`` 

392 Output E vector of exposure level scalings 

393 (`np.array`, (nExposures)). 

394 ``vectorG`` 

395 Output G vector of detector level scalings 

396 (`np.array`, (nExposures)). 

397 ``bgModel`` 

398 Expected model bgMatrix values, calculated from E and G 

399 (`np.ndarray`, (nDetectors, nExposures)). 

400 

401 Notes 

402 ----- 

403 

404 The set of background measurements B[exposure, detector] of 

405 flat frame data should be defined by a "Cartesian" product of 

406 two vectors, E[exposure] and G[detector]. The E vector 

407 represents the total flux incident on the focal plane. In a 

408 perfect camera, this is simply the sum along the columns of B 

409 (np.sum(B, axis=0)). 

410 

411 However, this simple model ignores differences in detector 

412 gains, the vignetting of the detectors, and the illumination 

413 pattern of the source lamp. The G vector describes these 

414 detector dependent differences, which should be identical over 

415 different exposures. For a perfect lamp of unit total 

416 intensity, this is simply the sum along the rows of B 

417 (np.sum(B, axis=1)). This algorithm divides G by the total 

418 flux level, to provide the relative (not absolute) scales 

419 between detectors. 

420 

421 The algorithm here, from pipe_drivers/constructCalibs.py and 

422 from there from Eugene Magnier/PanSTARRS [1]_, attempts to 

423 iteratively solve this decomposition from initial "perfect" E 

424 and G vectors. The operation is performed in log space to 

425 reduce the multiply and divides to linear additions and 

426 subtractions. 

427 

428 References 

429 ---------- 

430 .. [1] https://svn.pan-starrs.ifa.hawaii.edu/trac/ipp/browser/trunk/psModules/src/detrend/pmFlatNormalize.c # noqa: W505, E501 

431 """ 

432 numExps = bgMatrix.shape[1] 

433 numChips = bgMatrix.shape[0] 

434 if bgCounts is None: 

435 bgCounts = np.ones_like(bgMatrix) 

436 

437 logMeas = np.log(bgMatrix) 

438 logMeas = np.ma.masked_array(logMeas, ~np.isfinite(logMeas)) 

439 logG = np.zeros(numChips) 

440 logE = np.array([np.average(logMeas[:, iexp] - logG, 

441 weights=bgCounts[:, iexp]) for iexp in range(numExps)]) 

442 

443 for iter in range(iterations): 

444 logG = np.array([np.average(logMeas[ichip, :] - logE, 

445 weights=bgCounts[ichip, :]) for ichip in range(numChips)]) 

446 

447 bad = np.isnan(logG) 

448 if np.any(bad): 

449 logG[bad] = logG[~bad].mean() 

450 

451 logE = np.array([np.average(logMeas[:, iexp] - logG, 

452 weights=bgCounts[:, iexp]) for iexp in range(numExps)]) 

453 fluxLevel = np.average(np.exp(logG), weights=np.sum(bgCounts, axis=1)) 

454 

455 logG -= np.log(fluxLevel) 

456 if self.log.isEnabledFor(self.log.DEBUG): 

457 # Only calculate these values if debug log is enabled. 

458 self.log.debug("ITER %d: Flux: %f", iter, fluxLevel) 

459 self.log.debug("Exps: %s", np.exp(logE)) 

460 self.log.debug("%f", np.mean(logG)) 

461 

462 logE = np.array([np.average(logMeas[:, iexp] - logG, 

463 weights=bgCounts[:, iexp]) for iexp in range(numExps)]) 

464 

465 bgModel = np.exp(logE[np.newaxis, :] - logG[:, np.newaxis]) 

466 return pipeBase.Struct( 

467 expScales=np.exp(logE), 

468 detScales=np.exp(logG), 

469 bgModel=bgModel, 

470 )