Coverage for python / lsst / cp / pipe / cpSpectroFlat.py: 22%

108 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-30 09:25 +0000

1# This file is part of cp_pipe. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21 

22__all__ = ["CpSpectroFlatTask", "CpSpectroFlatTaskConfig"] 

23 

24import numpy as np 

25from scipy.ndimage import gaussian_filter, uniform_filter, median_filter 

26from astropy.stats import SigmaClip 

27from photutils.background import Background2D, MedianBackground 

28 

29import lsst.afw.math as afwMath 

30import lsst.pex.config as pexConfig 

31import lsst.pipe.base as pipeBase 

32import lsst.pipe.base.connectionTypes as cT 

33 

34 

35class CpSpectroFlatTaskConnections(pipeBase.PipelineTaskConnections, 

36 dimensions=("instrument", "detector", "physical_filter")): 

37 dummyExpRef = cT.Input( 

38 name="raw", 

39 storageClass="Exposure", 

40 dimensions=("instrument", "detector", "exposure"), 

41 doc="Dummy exposure reference to constrain butler calibration lookups.", 

42 deferLoad=True 

43 ) 

44 inputFlat = cT.PrerequisiteInput( 

45 name="flat", 

46 doc="Input white-light flat to use as basis of spectroFlat.", 

47 storageClass="Exposure", 

48 dimensions=("instrument", "detector", "physical_filter"), 

49 multiple=False, 

50 isCalibration=True, 

51 ) 

52 inputPtc = cT.PrerequisiteInput( 

53 name="ptc", 

54 doc="Input PTC containing gains to scale spectroFlat by.", 

55 storageClass="PhotonTransferCurveDataset", 

56 dimensions=("instrument", "detector"), 

57 multiple=False, 

58 isCalibration=True, 

59 ) 

60 

61 outputFlat = cT.Output( 

62 name="spectroFlat", 

63 doc="Scaled and filtered spectroscopic flat.", 

64 storageClass="Exposure", 

65 dimensions=("instrument", "detector"), 

66 isCalibration=True, 

67 ) 

68 

69 

70class CpSpectroFlatTaskConfig(pipeBase.PipelineTaskConfig, 

71 pipelineConnections=CpSpectroFlatTaskConnections): 

72 """Configuration parameters for constructing spectroFlat from 

73 a white-band flat. 

74 """ 

75 

76 inputFlatPhysicalFilter = pexConfig.Field( 

77 dtype=str, 

78 doc="Physical filter to expect in inputFlat.", 

79 default="empty~empty", 

80 ) 

81 

82 # makeGainFlat 

83 applyGainDirection = pexConfig.Field( 

84 dtype=bool, 

85 doc="Should flat be divided by the gains?", 

86 default=True, 

87 ) 

88 scaleGainsByMean = pexConfig.Field( 

89 dtype=bool, 

90 doc="Scale gains by the mean?", 

91 default=True, 

92 ) 

93 

94 # makeOpticFlat 

95 smoothingAlgorithm = pexConfig.ChoiceField( 

96 dtype=str, 

97 doc="Method to use to smooth amplifiers.", 

98 default="mean", 

99 allowed={"mean": "Use scipy uniform_filter", 

100 "gauss": "Use scipy gaussian_filter", 

101 "median": "Use scipy median_filter"}, 

102 ) 

103 smoothingWindowSize = pexConfig.Field( 

104 dtype=int, 

105 default=40, 

106 doc="Size of smoothing window.", 

107 ) 

108 smoothingMode = pexConfig.Field( 

109 dtype=str, 

110 default="mirror", 

111 doc="Filter mode/downstream.", 

112 ) 

113 smoothingOutlierPercentile = pexConfig.Field( 

114 dtype=float, 

115 default=1.0, 

116 doc="Percentile of extreme values to clip from top and bottom of distribution.", 

117 ) 

118 

119 # makePixelFlat 

120 doBackgroundRemoval = pexConfig.Field( 

121 dtype=bool, 

122 default=True, 

123 doc="Should amplifier backgrounds be measured and divided out?", 

124 ) 

125 backgroundSigmaClipThreshold = pexConfig.Field( 

126 dtype=float, 

127 default=3.0, 

128 doc="Backgroud sigma clip threshold.", 

129 ) 

130 backgroundBoxSize = pexConfig.Field( 

131 dtype=int, 

132 default=20, 

133 doc="Size of background super-pixel.", 

134 ) 

135 backgroundFilterSize = pexConfig.Field( 

136 dtype=int, 

137 default=3, 

138 doc="Size of smoothing filter for backgrounds.", 

139 ) 

140 

141 

142class CpSpectroFlatTask(pipeBase.PipelineTask): 

143 """Construct a spectro-flat from a white-band flat.""" 

144 

145 ConfigClass = CpSpectroFlatTaskConfig 

146 _DefaultName = "cpSpectroFlat" 

147 

148 def runQuantum(self, butlerQC, inputRefs, outputRefs): 

149 inputs = butlerQC.get(inputRefs) 

150 

151 # Get provenance. This is essential for debugging. 

152 inputMD = inputs['inputFlat'].getMetadata() 

153 for inputName in sorted(list(inputs.keys())): 

154 reference = getattr(inputRefs, inputName, None) 

155 if reference is not None and hasattr(reference, "run"): 

156 runKey = f"LSST CALIB RUN {inputName.upper()}" 

157 runValue = reference.run 

158 idKey = f"LSST CALIB UUID {inputName.upper()}" 

159 idValue = str(reference.id) 

160 

161 inputMD[runKey] = runValue 

162 inputMD[idKey] = idValue 

163 inputs['inputMD'] = inputMD 

164 

165 outputs = self.run(**inputs) 

166 butlerQC.put(outputs, outputRefs) 

167 

168 def run(self, dummyExpRef, inputFlat, inputPtc, inputMD): 

169 """Create spectro flat from an input flat and PTC. 

170 

171 Parameters 

172 ---------- 

173 dummyExpRef : `lsst.daf.butler.DeferredDatasetHandle` 

174 Exposure used to ensure the correct flat is chosen. 

175 inputFlat : `lsst.afw.image.Exposure` 

176 Input flat to convert to spectro-flat. 

177 inputPtc : `lsst.ip.isr.PhotoTransferCurveDataset` 

178 Input PTC containing the gains to apply to spectro-flat. 

179 inputMD : `lsst.daf.base.PropertyList` 

180 Provenance metadata derived from the butler references. 

181 

182 Returns 

183 ------- 

184 results : `lsst.pipe.base.Struct` 

185 The results struct containing: 

186 

187 ``outputData`` 

188 Final modified flat generated from the inputs 

189 (`lsst.afw.image.Exposure`). 

190 """ 

191 # Verify input 

192 exposureFilter = dummyExpRef.get(component="filter") 

193 if exposureFilter.physicalLabel != self.config.inputFlatPhysicalFilter: 

194 raise RuntimeError("Did not receive expected physical_filter: " 

195 f"{exposureFilter.physicalLabel} != " 

196 f"{self.config.inputFlatPhysicalFilter}") 

197 

198 detector = inputFlat.getDetector() 

199 statsControl = afwMath.StatisticsControl(self.config.backgroundSigmaClipThreshold, 

200 5, 

201 0x0) 

202 

203 # Make gain flat 

204 # https://github.com/lsst/atmospec/blob/u/jneveu/doFlat/python/lsst/atmospec/spectroFlat.py#L85 

205 # TODO: DM-46080 Ensure cpSpectroFlat can distinguish between 

206 # new and old flats. 

207 gains = inputPtc.gain 

208 scaledGains = {} 

209 gainFlat = inputFlat.clone() 

210 

211 # Todo: Get temperature dependence terms. 

212 if self.config.scaleGainsByMean: 

213 gainMean = np.mean(list(gains.values())) 

214 else: 

215 gainMean = 1.0 

216 

217 for amp in detector: 

218 bbox = amp.getBBox() 

219 scaledGains[amp.getName()] = gains[amp.getName()] / gainMean 

220 if self.config.applyGainDirection: 

221 gainFlat[bbox].image.array[:, :] = 1 / scaledGains[amp.getName()] 

222 else: 

223 gainFlat[bbox].image.array[:, :] = scaledGains[amp.getName()] 

224 

225 # gainFlat is now a realization of the scaled gains into a 

226 # full image. scaledGains are the gains optionally (default 

227 # True) scaled by mean value. 

228 

229 # Make optic flat 

230 # https://github.com/lsst/atmospec/blob/u/jneveu/doFlat/python/lsst/atmospec/spectroFlat.py#L130 

231 opticFlat = inputFlat.clone() 

232 for amp in detector: 

233 # Divide amp by median 

234 ampExp = opticFlat.Factory(opticFlat, amp.getBBox()) 

235 stats = afwMath.makeStatistics(ampExp.getMaskedImage(), 

236 afwMath.MEDIAN, statsControl) 

237 ampExp.image.array[:, :] /= stats.getValue(afwMath.MEDIAN) 

238 

239 inputData = ampExp.image.array.copy() 

240 match self.config.smoothingAlgorithm: 

241 case "gauss": 

242 ampExp.image.array[:, :] = gaussian_filter(inputData, 

243 sigma=self.config.smoothingWindowSize, 

244 mode=self.config.smoothingMode) 

245 case "mean": 

246 # This method is easily skewed by outliers. 

247 # Filter those. 

248 mask1 = (inputData >= np.percentile(inputData.ravel(), 

249 self.config.smoothingOutlierPercentile)) 

250 mask2 = (inputData <= np.percentile(inputData.ravel(), 

251 100 - self.config.smoothingOutlierPercentile)) 

252 mask = mask1 * mask2 

253 # We've already divided the image by the median, 

254 # so the median image value is 1.0 

255 inputData[~mask] = 1.0 

256 ampExp.image.array[:, :] = uniform_filter(inputData, 

257 size=self.config.smoothingWindowSize, 

258 mode=self.config.smoothingMode) 

259 case "median": 

260 ampExp.image.array[:, :] = median_filter(inputData, 

261 size=(self.config.smoothingWindowSize, 

262 self.config.smoothingWindowSize), 

263 mode=self.config.smoothingMode) 

264 case _: 

265 raise RuntimeError(f"Unknown smoothing method: {self.config.smoothingAlgorithm}") 

266 ampExp.variance.array[:, :] = 1.0 

267 ampExp.mask.array[:, :] = 0x0 

268 

269 # Make pixel flat 

270 # https://github.com/lsst/atmospec/blob/u/jneveu/doFlat/python/lsst/atmospec/spectroFlat.py#L18 

271 # TODO: DM-46079: Determine appropriate DM Background 

272 # replacement for cpSpectroFlat 

273 pixelFlat = inputFlat.clone() 

274 clip_function = SigmaClip(sigma=self.config.backgroundSigmaClipThreshold) 

275 bkg_function = MedianBackground() 

276 

277 for amp in detector: 

278 # Divide amp by median, as above. 

279 ampExp = pixelFlat.Factory(pixelFlat, amp.getBBox()) 

280 opticAmpExp = opticFlat.Factory(opticFlat, amp.getBBox()) 

281 stats = afwMath.makeStatistics(ampExp.getMaskedImage(), 

282 afwMath.MEDIAN, statsControl) 

283 ampExp.image.array[:, :] /= stats.getValue(afwMath.MEDIAN) 

284 

285 # Remove the smoothed version 

286 ampExp.image.array[:, :] /= opticAmpExp.image.array[:, :] 

287 

288 if self.config.doBackgroundRemoval: 

289 ampBg = Background2D(ampExp.image.array[:, :], 

290 box_size=(self.config.backgroundBoxSize, 

291 self.config.backgroundBoxSize), 

292 filter_size=(self.config.backgroundFilterSize, 

293 self.config.backgroundFilterSize), 

294 sigma_clip=clip_function, 

295 bkg_estimator=bkg_function) 

296 ampExp.image.array[:, :] /= ampBg.background 

297 

298 # Make sensor flat 

299 # https://github.com/lsst/atmospec/blob/u/jneveu/doFlat/python/lsst/atmospec/spectroFlat.py#L230 

300 sensorFlat = pixelFlat.clone() 

301 for amp in detector: 

302 sensorFlatAmp = sensorFlat.Factory(sensorFlat, amp.getBBox()) 

303 gainFlatAmp = gainFlat.Factory(gainFlat, amp.getBBox()) 

304 sensorFlatAmp.maskedImage.scaledMultiplies(1.0, gainFlatAmp.maskedImage) 

305 

306 # Add PTC/gain information to outputFlat header. This allows 

307 # SpectractorShim.spectractorImageFromLsstExposure to populate 

308 # image.gain and image.flat from our outputFlat. 

309 md = sensorFlat.metadata 

310 md["LSST CP SPECTROFLAT GAIN MEAN"] = gainMean 

311 for amp in detector: 

312 md[f"LSST CP SPECTROFLAT GAIN {amp.getName()}"] = scaledGains[amp.getName()] 

313 

314 # Provenance info: 

315 for key, value in inputMD.items(): 

316 if "LSST CALIB" in key: 

317 md[key] = value 

318 

319 # Debug "prints": 

320 # gainFlat.writeFits("./gainFlat.fits") 

321 # opticFlat.writeFits("./opticFlat.fits") 

322 # pixelFlat.writeFits("./pixelFlat.fits") 

323 # sensorFlat.writeFits("./sensorFlat.fits") 

324 return pipeBase.Struct( 

325 outputFlat=sensorFlat, 

326 )