Coverage for python / lsst / cp / pipe / cpSpectroFlat.py: 22%
108 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-21 10:53 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-21 10:53 +0000
1# This file is part of cp_pipe.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
22__all__ = ["CpSpectroFlatTask", "CpSpectroFlatTaskConfig"]
24import numpy as np
25from scipy.ndimage import gaussian_filter, uniform_filter, median_filter
26from astropy.stats import SigmaClip
27from photutils.background import Background2D, MedianBackground
29import lsst.afw.math as afwMath
30import lsst.pex.config as pexConfig
31import lsst.pipe.base as pipeBase
32import lsst.pipe.base.connectionTypes as cT
35class CpSpectroFlatTaskConnections(pipeBase.PipelineTaskConnections,
36 dimensions=("instrument", "detector", "physical_filter")):
37 dummyExpRef = cT.Input(
38 name="raw",
39 storageClass="Exposure",
40 dimensions=("instrument", "detector", "exposure"),
41 doc="Dummy exposure reference to constrain butler calibration lookups.",
42 deferLoad=True
43 )
44 inputFlat = cT.PrerequisiteInput(
45 name="flat",
46 doc="Input white-light flat to use as basis of spectroFlat.",
47 storageClass="Exposure",
48 dimensions=("instrument", "detector", "physical_filter"),
49 multiple=False,
50 isCalibration=True,
51 )
52 inputPtc = cT.PrerequisiteInput(
53 name="ptc",
54 doc="Input PTC containing gains to scale spectroFlat by.",
55 storageClass="PhotonTransferCurveDataset",
56 dimensions=("instrument", "detector"),
57 multiple=False,
58 isCalibration=True,
59 )
61 outputFlat = cT.Output(
62 name="spectroFlat",
63 doc="Scaled and filtered spectroscopic flat.",
64 storageClass="Exposure",
65 dimensions=("instrument", "detector"),
66 isCalibration=True,
67 )
70class CpSpectroFlatTaskConfig(pipeBase.PipelineTaskConfig,
71 pipelineConnections=CpSpectroFlatTaskConnections):
72 """Configuration parameters for constructing spectroFlat from
73 a white-band flat.
74 """
76 inputFlatPhysicalFilter = pexConfig.Field(
77 dtype=str,
78 doc="Physical filter to expect in inputFlat.",
79 default="empty~empty",
80 )
82 # makeGainFlat
83 applyGainDirection = pexConfig.Field(
84 dtype=bool,
85 doc="Should flat be divided by the gains?",
86 default=True,
87 )
88 scaleGainsByMean = pexConfig.Field(
89 dtype=bool,
90 doc="Scale gains by the mean?",
91 default=True,
92 )
94 # makeOpticFlat
95 smoothingAlgorithm = pexConfig.ChoiceField(
96 dtype=str,
97 doc="Method to use to smooth amplifiers.",
98 default="mean",
99 allowed={"mean": "Use scipy uniform_filter",
100 "gauss": "Use scipy gaussian_filter",
101 "median": "Use scipy median_filter"},
102 )
103 smoothingWindowSize = pexConfig.Field(
104 dtype=int,
105 default=40,
106 doc="Size of smoothing window.",
107 )
108 smoothingMode = pexConfig.Field(
109 dtype=str,
110 default="mirror",
111 doc="Filter mode/downstream.",
112 )
113 smoothingOutlierPercentile = pexConfig.Field(
114 dtype=float,
115 default=1.0,
116 doc="Percentile of extreme values to clip from top and bottom of distribution.",
117 )
119 # makePixelFlat
120 doBackgroundRemoval = pexConfig.Field(
121 dtype=bool,
122 default=True,
123 doc="Should amplifier backgrounds be measured and divided out?",
124 )
125 backgroundSigmaClipThreshold = pexConfig.Field(
126 dtype=float,
127 default=3.0,
128 doc="Backgroud sigma clip threshold.",
129 )
130 backgroundBoxSize = pexConfig.Field(
131 dtype=int,
132 default=20,
133 doc="Size of background super-pixel.",
134 )
135 backgroundFilterSize = pexConfig.Field(
136 dtype=int,
137 default=3,
138 doc="Size of smoothing filter for backgrounds.",
139 )
142class CpSpectroFlatTask(pipeBase.PipelineTask):
143 """Construct a spectro-flat from a white-band flat."""
145 ConfigClass = CpSpectroFlatTaskConfig
146 _DefaultName = "cpSpectroFlat"
148 def runQuantum(self, butlerQC, inputRefs, outputRefs):
149 inputs = butlerQC.get(inputRefs)
151 # Get provenance. This is essential for debugging.
152 inputMD = inputs['inputFlat'].getMetadata()
153 for inputName in sorted(list(inputs.keys())):
154 reference = getattr(inputRefs, inputName, None)
155 if reference is not None and hasattr(reference, "run"):
156 runKey = f"LSST CALIB RUN {inputName.upper()}"
157 runValue = reference.run
158 idKey = f"LSST CALIB UUID {inputName.upper()}"
159 idValue = str(reference.id)
161 inputMD[runKey] = runValue
162 inputMD[idKey] = idValue
163 inputs['inputMD'] = inputMD
165 outputs = self.run(**inputs)
166 butlerQC.put(outputs, outputRefs)
168 def run(self, dummyExpRef, inputFlat, inputPtc, inputMD):
169 """Create spectro flat from an input flat and PTC.
171 Parameters
172 ----------
173 dummyExpRef : `lsst.daf.butler.DeferredDatasetHandle`
174 Exposure used to ensure the correct flat is chosen.
175 inputFlat : `lsst.afw.image.Exposure`
176 Input flat to convert to spectro-flat.
177 inputPtc : `lsst.ip.isr.PhotoTransferCurveDataset`
178 Input PTC containing the gains to apply to spectro-flat.
179 inputMD : `lsst.daf.base.PropertyList`
180 Provenance metadata derived from the butler references.
182 Returns
183 -------
184 results : `lsst.pipe.base.Struct`
185 The results struct containing:
187 ``outputData``
188 Final modified flat generated from the inputs
189 (`lsst.afw.image.Exposure`).
190 """
191 # Verify input
192 exposureFilter = dummyExpRef.get(component="filter")
193 if exposureFilter.physicalLabel != self.config.inputFlatPhysicalFilter:
194 raise RuntimeError("Did not receive expected physical_filter: "
195 f"{exposureFilter.physicalLabel} != "
196 f"{self.config.inputFlatPhysicalFilter}")
198 detector = inputFlat.getDetector()
199 statsControl = afwMath.StatisticsControl(self.config.backgroundSigmaClipThreshold,
200 5,
201 0x0)
203 # Make gain flat
204 # https://github.com/lsst/atmospec/blob/u/jneveu/doFlat/python/lsst/atmospec/spectroFlat.py#L85
205 # TODO: DM-46080 Ensure cpSpectroFlat can distinguish between
206 # new and old flats.
207 gains = inputPtc.gain
208 scaledGains = {}
209 gainFlat = inputFlat.clone()
211 # Todo: Get temperature dependence terms.
212 if self.config.scaleGainsByMean:
213 gainMean = np.mean(list(gains.values()))
214 else:
215 gainMean = 1.0
217 for amp in detector:
218 bbox = amp.getBBox()
219 scaledGains[amp.getName()] = gains[amp.getName()] / gainMean
220 if self.config.applyGainDirection:
221 gainFlat[bbox].image.array[:, :] = 1 / scaledGains[amp.getName()]
222 else:
223 gainFlat[bbox].image.array[:, :] = scaledGains[amp.getName()]
225 # gainFlat is now a realization of the scaled gains into a
226 # full image. scaledGains are the gains optionally (default
227 # True) scaled by mean value.
229 # Make optic flat
230 # https://github.com/lsst/atmospec/blob/u/jneveu/doFlat/python/lsst/atmospec/spectroFlat.py#L130
231 opticFlat = inputFlat.clone()
232 for amp in detector:
233 # Divide amp by median
234 ampExp = opticFlat.Factory(opticFlat, amp.getBBox())
235 stats = afwMath.makeStatistics(ampExp.getMaskedImage(),
236 afwMath.MEDIAN, statsControl)
237 ampExp.image.array[:, :] /= stats.getValue(afwMath.MEDIAN)
239 inputData = ampExp.image.array.copy()
240 match self.config.smoothingAlgorithm:
241 case "gauss":
242 ampExp.image.array[:, :] = gaussian_filter(inputData,
243 sigma=self.config.smoothingWindowSize,
244 mode=self.config.smoothingMode)
245 case "mean":
246 # This method is easily skewed by outliers.
247 # Filter those.
248 mask1 = (inputData >= np.percentile(inputData.ravel(),
249 self.config.smoothingOutlierPercentile))
250 mask2 = (inputData <= np.percentile(inputData.ravel(),
251 100 - self.config.smoothingOutlierPercentile))
252 mask = mask1 * mask2
253 # We've already divided the image by the median,
254 # so the median image value is 1.0
255 inputData[~mask] = 1.0
256 ampExp.image.array[:, :] = uniform_filter(inputData,
257 size=self.config.smoothingWindowSize,
258 mode=self.config.smoothingMode)
259 case "median":
260 ampExp.image.array[:, :] = median_filter(inputData,
261 size=(self.config.smoothingWindowSize,
262 self.config.smoothingWindowSize),
263 mode=self.config.smoothingMode)
264 case _:
265 raise RuntimeError(f"Unknown smoothing method: {self.config.smoothingAlgorithm}")
266 ampExp.variance.array[:, :] = 1.0
267 ampExp.mask.array[:, :] = 0x0
269 # Make pixel flat
270 # https://github.com/lsst/atmospec/blob/u/jneveu/doFlat/python/lsst/atmospec/spectroFlat.py#L18
271 # TODO: DM-46079: Determine appropriate DM Background
272 # replacement for cpSpectroFlat
273 pixelFlat = inputFlat.clone()
274 clip_function = SigmaClip(sigma=self.config.backgroundSigmaClipThreshold)
275 bkg_function = MedianBackground()
277 for amp in detector:
278 # Divide amp by median, as above.
279 ampExp = pixelFlat.Factory(pixelFlat, amp.getBBox())
280 opticAmpExp = opticFlat.Factory(opticFlat, amp.getBBox())
281 stats = afwMath.makeStatistics(ampExp.getMaskedImage(),
282 afwMath.MEDIAN, statsControl)
283 ampExp.image.array[:, :] /= stats.getValue(afwMath.MEDIAN)
285 # Remove the smoothed version
286 ampExp.image.array[:, :] /= opticAmpExp.image.array[:, :]
288 if self.config.doBackgroundRemoval:
289 ampBg = Background2D(ampExp.image.array[:, :],
290 box_size=(self.config.backgroundBoxSize,
291 self.config.backgroundBoxSize),
292 filter_size=(self.config.backgroundFilterSize,
293 self.config.backgroundFilterSize),
294 sigma_clip=clip_function,
295 bkg_estimator=bkg_function)
296 ampExp.image.array[:, :] /= ampBg.background
298 # Make sensor flat
299 # https://github.com/lsst/atmospec/blob/u/jneveu/doFlat/python/lsst/atmospec/spectroFlat.py#L230
300 sensorFlat = pixelFlat.clone()
301 for amp in detector:
302 sensorFlatAmp = sensorFlat.Factory(sensorFlat, amp.getBBox())
303 gainFlatAmp = gainFlat.Factory(gainFlat, amp.getBBox())
304 sensorFlatAmp.maskedImage.scaledMultiplies(1.0, gainFlatAmp.maskedImage)
306 # Add PTC/gain information to outputFlat header. This allows
307 # SpectractorShim.spectractorImageFromLsstExposure to populate
308 # image.gain and image.flat from our outputFlat.
309 md = sensorFlat.metadata
310 md["LSST CP SPECTROFLAT GAIN MEAN"] = gainMean
311 for amp in detector:
312 md[f"LSST CP SPECTROFLAT GAIN {amp.getName()}"] = scaledGains[amp.getName()]
314 # Provenance info:
315 for key, value in inputMD.items():
316 if "LSST CALIB" in key:
317 md[key] = value
319 # Debug "prints":
320 # gainFlat.writeFits("./gainFlat.fits")
321 # opticFlat.writeFits("./opticFlat.fits")
322 # pixelFlat.writeFits("./pixelFlat.fits")
323 # sensorFlat.writeFits("./sensorFlat.fits")
324 return pipeBase.Struct(
325 outputFlat=sensorFlat,
326 )