Coverage for python / lsst / cp / pipe / cpFlatMeasure.py: 18%
172 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-22 09:07 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-22 09:07 +0000
1# This file is part of cp_pipe.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
21import numpy as np
22from collections import defaultdict
24import lsst.afw.math as afwMath
25import lsst.daf.base as dafBase
26import lsst.pex.config as pexConfig
27import lsst.pipe.base as pipeBase
28import lsst.pipe.base.connectionTypes as cT
29from lsst.ip.isr.vignette import maskVignettedRegion
30from lsst.cp.pipe.utils import ddict2dict
32__all__ = ["CpFlatMeasureTask", "CpFlatMeasureTaskConfig",
33 "CpFlatNormalizationTask", "CpFlatNormalizationTaskConfig"]
36class CpFlatMeasureConnections(pipeBase.PipelineTaskConnections,
37 dimensions=("instrument", "exposure", "detector")):
38 inputExp = cT.Input(
39 name="postISRCCD",
40 doc="Input exposure to measure statistics from.",
41 storageClass="Exposure",
42 dimensions=("instrument", "exposure", "detector"),
43 )
44 outputStats = cT.Output(
45 name="flatStats",
46 doc="Output statistics to write.",
47 storageClass="PropertyList",
48 dimensions=("instrument", "exposure", "detector"),
49 )
52class CpFlatMeasureTaskConfig(pipeBase.PipelineTaskConfig,
53 pipelineConnections=CpFlatMeasureConnections):
54 maskNameList = pexConfig.ListField(
55 dtype=str,
56 doc="Mask list to exclude from statistics calculations.",
57 default=['DETECTED', 'BAD', 'NO_DATA'],
58 )
59 doVignette = pexConfig.Field(
60 dtype=bool,
61 doc="Mask vignetted regions?",
62 default=True,
63 )
64 numSigmaClip = pexConfig.Field(
65 dtype=float,
66 doc="Rejection threshold (sigma) for statistics clipping.",
67 default=3.0,
68 )
69 clipMaxIter = pexConfig.Field(
70 dtype=int,
71 doc="Max number of clipping iterations to apply.",
72 default=3,
73 )
76class CpFlatMeasureTask(pipeBase.PipelineTask):
77 """Apply extra masking and measure image statistics.
78 """
80 ConfigClass = CpFlatMeasureTaskConfig
81 _DefaultName = "cpFlatMeasure"
83 def run(self, inputExp):
84 """Mask ISR processed FLAT exposures to ensure consistent statistics.
86 Parameters
87 ----------
88 inputExp : `lsst.afw.image.Exposure`
89 Post-ISR processed exposure to measure.
91 Returns
92 -------
93 results : `lsst.pipe.base.Struct`
94 The results struct containing:
96 ``outputStats``
97 List containing the statistics (`lsst.daf.base.PropertyList`).
98 """
99 if self.config.doVignette:
100 polygon = inputExp.getInfo().getValidPolygon()
101 maskVignettedRegion(inputExp, polygon, vignetteValue=None, log=self.log)
102 mask = inputExp.getMask()
103 maskVal = mask.getPlaneBitMask(self.config.maskNameList)
104 statsControl = afwMath.StatisticsControl(self.config.numSigmaClip,
105 self.config.clipMaxIter,
106 maskVal)
107 statsControl.setAndMask(maskVal)
109 outputStats = dafBase.PropertyList()
111 # Detector level:
112 stats = afwMath.makeStatistics(inputExp.getMaskedImage(),
113 afwMath.MEANCLIP | afwMath.STDEVCLIP | afwMath.NPOINT,
114 statsControl)
115 outputStats['DETECTOR_MEDIAN'] = stats.getValue(afwMath.MEANCLIP)
116 outputStats['DETECTOR_SIGMA'] = stats.getValue(afwMath.STDEVCLIP)
117 outputStats['DETECTOR_N'] = stats.getValue(afwMath.NPOINT)
118 self.log.info("Stats: median=%f sigma=%f n=%d",
119 outputStats['DETECTOR_MEDIAN'],
120 outputStats['DETECTOR_SIGMA'],
121 outputStats['DETECTOR_N'])
123 # AMP LEVEL:
124 for ampIdx, amp in enumerate(inputExp.getDetector()):
125 ampName = amp.getName()
126 ampExp = inputExp.Factory(inputExp, amp.getBBox())
127 stats = afwMath.makeStatistics(ampExp.getMaskedImage(),
128 afwMath.MEANCLIP | afwMath.STDEVCLIP | afwMath.NPOINT,
129 statsControl)
130 outputStats[f'AMP_NAME_{ampIdx}'] = ampName
131 outputStats[f'AMP_MEDIAN_{ampIdx}'] = stats.getValue(afwMath.MEANCLIP)
132 outputStats[f'AMP_SIGMA_{ampIdx}'] = stats.getValue(afwMath.STDEVCLIP)
133 outputStats[f'AMP_N_{ampIdx}'] = stats.getValue(afwMath.NPOINT)
135 return pipeBase.Struct(
136 outputStats=outputStats
137 )
140class CpFlatNormalizationConnections(pipeBase.PipelineTaskConnections,
141 dimensions=("instrument", "physical_filter")):
142 inputMDs = cT.Input(
143 name="cpFlatProc_metadata",
144 doc="Input metadata for each visit/detector in input set.",
145 storageClass="PropertyList",
146 dimensions=("instrument", "physical_filter", "detector", "exposure"),
147 multiple=True,
148 )
149 camera = cT.PrerequisiteInput(
150 name="camera",
151 doc="Input camera to use for gain lookup.",
152 storageClass="Camera",
153 dimensions=("instrument",),
154 isCalibration=True,
155 )
157 outputScales = cT.Output(
158 name="cpFlatNormScales",
159 doc="Output combined proposed calibration.",
160 storageClass="StructuredDataDict",
161 dimensions=("instrument", "physical_filter"),
162 )
164 def adjustQuantum(self, inputs, outputs, label, dataId):
165 if self.config.doDownSelection:
166 mdRefs = []
167 for ref in inputs["inputMDs"][1]:
168 value = getattr(ref.dataId.exposure, self.config.downSelectionField).lower()
169 if value == self.config.downSelectionValue.lower():
170 mdRefs.append(ref)
172 if len(mdRefs) == 0:
173 raise pipeBase.NoWorkFound("No input exposures match the required down selection value.")
174 inputs["inputMDs"] = (inputs["inputMDs"][0], tuple(mdRefs))
176 return inputs, outputs
179class CpFlatNormalizationTaskConfig(pipeBase.PipelineTaskConfig,
180 pipelineConnections=CpFlatNormalizationConnections):
181 level = pexConfig.ChoiceField(
182 dtype=str,
183 doc="Which level to apply normalizations.",
184 default='DETECTOR',
185 allowed={
186 'DETECTOR': "Correct using full detector statistics.",
187 'AMP': "Correct using individual amplifiers.",
188 },
189 )
190 scaleMaxIter = pexConfig.Field(
191 dtype=int,
192 doc="Max number of iterations to use in scale solver.",
193 default=10,
194 )
195 doDownSelection = pexConfig.Field(
196 dtype=bool,
197 doc="Down-select inputs based on a header keyword?",
198 default=False,
199 )
200 downSelectionField = pexConfig.ChoiceField(
201 dtype=str,
202 doc="Which exposure record field to use for down-selection. Only used "
203 "if ``doDownSelection`` is True.",
204 default="observation_reason",
205 allowed={
206 "observation_reason": "Select on exposure.observation_reason.",
207 "observation_type": "Select on exposure.observation_type.",
208 "science_program": "Select on exposure.science_program.",
209 "group": "Select on exposure.group",
210 },
211 )
212 downSelectionValue = pexConfig.Field(
213 dtype=str,
214 doc="Exposure record value (corresponding to ``downSelectionField``) to match "
215 "for down-selection. Only used if ``doDownSelection`` is True, "
216 "in which case this must be set.",
217 optional=True,
218 default=None,
219 )
221 def validate(self):
222 super().validate()
224 if self.doDownSelection:
225 if self.downSelectionValue is None or self.downSelectionValue == "":
226 msg = "downSelectionValue must be overridden if doDownSelection is True."
227 raise pexConfig.FieldValidationError(
228 CpFlatNormalizationTaskConfig.downSelectionValue,
229 self,
230 msg,
231 )
234class CpFlatNormalizationTask(pipeBase.PipelineTask):
235 """Rescale merged flat frames to remove unequal screen illumination.
236 """
238 ConfigClass = CpFlatNormalizationTaskConfig
239 _DefaultName = "cpFlatNorm"
241 def runQuantum(self, butlerQC, inputRefs, outputRefs):
242 inputs = butlerQC.get(inputRefs)
244 # Use the dimensions of the inputs for generating
245 # output scales.
246 dimensions = [dict(ref.dataId.required) for ref in inputRefs.inputMDs]
247 inputs['inputDims'] = dimensions
249 outputs = self.run(**inputs)
250 butlerQC.put(outputs, outputRefs)
252 def run(self, inputMDs, inputDims, camera):
253 """Normalize FLAT exposures to a consistent level.
255 Parameters
256 ----------
257 inputMDs : `list` [`lsst.daf.base.PropertyList`]
258 Amplifier-level metadata used to construct scales.
259 inputDims : `list` [`dict`]
260 List of dictionaries of input data dimensions/values.
261 Each list entry should contain:
263 ``"exposure"``
264 exposure id value (`int`)
265 ``"detector"``
266 detector id value (`int`)
268 Returns
269 -------
270 results : `lsst.pipe.base.Struct`
271 The results struct containing:
273 ``outputScales``
274 Dictionary of scales, indexed by detector (`int`),
275 amplifier (`int`), and exposure (`int`) (`dict`
276 [`dict` [`dict` [`float`]]]).
278 Raises
279 ------
280 KeyError
281 Raised if the input dimensions do not contain detector and
282 exposure, or if the metadata does not contain the expected
283 statistic entry.
284 """
285 expSet = sorted(set([d['exposure'] for d in inputDims]))
286 detSet = sorted(set([d['detector'] for d in inputDims]))
288 expMap = {exposureId: idx for idx, exposureId in enumerate(expSet)}
289 detMap = {detectorId: idx for idx, detectorId in enumerate(detSet)}
291 nExp = len(expSet)
292 nDet = len(detSet)
293 if self.config.level == 'DETECTOR':
294 bgMatrix = np.zeros((nDet, nExp))
295 bgCounts = np.ones((nDet, nExp))
296 elif self.config.level == 'AMP':
297 nAmp = len(camera[detSet[0]])
298 bgMatrix = np.zeros((nDet * nAmp, nExp))
299 bgCounts = np.ones((nDet * nAmp, nExp))
301 for inMetadata, inDimensions in zip(inputMDs, inputDims):
302 try:
303 exposureId = inDimensions['exposure']
304 detectorId = inDimensions['detector']
305 except Exception as e:
306 raise KeyError(f"Cannot find expected dimensions in {inDimensions}") from e
308 if self.config.level == 'DETECTOR':
309 detIdx = detMap[detectorId]
310 expIdx = expMap[exposureId]
311 try:
312 value = inMetadata.get('DETECTOR_MEDIAN')
313 count = inMetadata.get('DETECTOR_N')
314 except Exception as e:
315 raise KeyError("Cannot read expected metadata string.") from e
317 if np.isfinite(value):
318 bgMatrix[detIdx][expIdx] = value
319 bgCounts[detIdx][expIdx] = count
320 else:
321 bgMatrix[detIdx][expIdx] = np.nan
322 bgCounts[detIdx][expIdx] = 1
324 elif self.config.level == 'AMP':
325 detector = camera[detectorId]
326 nAmp = len(detector)
328 detIdx = detMap[detectorId] * nAmp
329 expIdx = expMap[exposureId]
331 for ampIdx, amp in enumerate(detector):
332 try:
333 value = inMetadata.get(f'AMP_MEDIAN_{ampIdx}')
334 count = inMetadata.get(f'AMP_N_{ampIdx}')
335 except Exception as e:
336 raise KeyError("cannot read expected metadata string.") from e
338 detAmpIdx = detIdx + ampIdx
339 if np.isfinite(value):
340 bgMatrix[detAmpIdx][expIdx] = value
341 bgCounts[detAmpIdx][expIdx] = count
342 else:
343 bgMatrix[detAmpIdx][expIdx] = np.nan
344 bgMatrix[detAmpIdx][expIdx] = 1
346 scaleResult = self.measureScales(bgMatrix, bgCounts, iterations=self.config.scaleMaxIter)
347 expScales = scaleResult.expScales
348 detScales = scaleResult.detScales
350 outputScales = defaultdict(lambda: defaultdict(lambda: defaultdict(lambda: defaultdict(float))))
352 # Note that the enumerated "detId"/"expId" here index the
353 # "detScales" and "expScales" arrays.
354 if self.config.level == 'DETECTOR':
355 for detIdx, det in enumerate(detSet):
356 for amp in camera[det]:
357 for expIdx, exp in enumerate(expSet):
358 outputScales['expScale'][det][amp.getName()][exp] = expScales[expIdx].tolist()
359 outputScales['detScale'][det] = detScales[detIdx].tolist()
360 elif self.config.level == 'AMP':
361 for detIdx, det in enumerate(detSet):
362 for ampIdx, amp in enumerate(camera[det]):
363 for expIdx, exp in enumerate(expSet):
364 outputScales['expScale'][det][amp.getName()][exp] = expScales[expIdx].tolist()
365 detAmpIdx = detIdx + ampIdx
366 outputScales['detScale'][det][amp.getName()] = detScales[detAmpIdx].tolist()
368 return pipeBase.Struct(
369 outputScales=ddict2dict(outputScales),
370 )
372 def measureScales(self, bgMatrix, bgCounts=None, iterations=10):
373 """Convert backgrounds to exposure and detector components.
375 Parameters
376 ----------
377 bgMatrix : `np.ndarray`, (nDetectors, nExposures)
378 Input backgrounds indexed by exposure (axis=0) and
379 detector (axis=1).
380 bgCounts : `np.ndarray`, (nDetectors, nExposures), optional
381 Input pixel counts used to in measuring bgMatrix, indexed
382 identically.
383 iterations : `int`, optional
384 Number of iterations to use in decomposition.
386 Returns
387 -------
388 scaleResult : `lsst.pipe.base.Struct`
389 Result struct containing fields:
391 ``vectorE``
392 Output E vector of exposure level scalings
393 (`np.array`, (nExposures)).
394 ``vectorG``
395 Output G vector of detector level scalings
396 (`np.array`, (nExposures)).
397 ``bgModel``
398 Expected model bgMatrix values, calculated from E and G
399 (`np.ndarray`, (nDetectors, nExposures)).
401 Notes
402 -----
404 The set of background measurements B[exposure, detector] of
405 flat frame data should be defined by a "Cartesian" product of
406 two vectors, E[exposure] and G[detector]. The E vector
407 represents the total flux incident on the focal plane. In a
408 perfect camera, this is simply the sum along the columns of B
409 (np.sum(B, axis=0)).
411 However, this simple model ignores differences in detector
412 gains, the vignetting of the detectors, and the illumination
413 pattern of the source lamp. The G vector describes these
414 detector dependent differences, which should be identical over
415 different exposures. For a perfect lamp of unit total
416 intensity, this is simply the sum along the rows of B
417 (np.sum(B, axis=1)). This algorithm divides G by the total
418 flux level, to provide the relative (not absolute) scales
419 between detectors.
421 The algorithm here, from pipe_drivers/constructCalibs.py and
422 from there from Eugene Magnier/PanSTARRS [1]_, attempts to
423 iteratively solve this decomposition from initial "perfect" E
424 and G vectors. The operation is performed in log space to
425 reduce the multiply and divides to linear additions and
426 subtractions.
428 References
429 ----------
430 .. [1] https://svn.pan-starrs.ifa.hawaii.edu/trac/ipp/browser/trunk/psModules/src/detrend/pmFlatNormalize.c # noqa: W505, E501
431 """
432 numExps = bgMatrix.shape[1]
433 numChips = bgMatrix.shape[0]
434 if bgCounts is None:
435 bgCounts = np.ones_like(bgMatrix)
437 logMeas = np.log(bgMatrix)
438 logMeas = np.ma.masked_array(logMeas, ~np.isfinite(logMeas))
439 logG = np.zeros(numChips)
440 logE = np.array([np.average(logMeas[:, iexp] - logG,
441 weights=bgCounts[:, iexp]) for iexp in range(numExps)])
443 for iter in range(iterations):
444 logG = np.array([np.average(logMeas[ichip, :] - logE,
445 weights=bgCounts[ichip, :]) for ichip in range(numChips)])
447 bad = np.isnan(logG)
448 if np.any(bad):
449 logG[bad] = logG[~bad].mean()
451 logE = np.array([np.average(logMeas[:, iexp] - logG,
452 weights=bgCounts[:, iexp]) for iexp in range(numExps)])
453 fluxLevel = np.average(np.exp(logG), weights=np.sum(bgCounts, axis=1))
455 logG -= np.log(fluxLevel)
456 if self.log.isEnabledFor(self.log.DEBUG):
457 # Only calculate these values if debug log is enabled.
458 self.log.debug("ITER %d: Flux: %f", iter, fluxLevel)
459 self.log.debug("Exps: %s", np.exp(logE))
460 self.log.debug("%f", np.mean(logG))
462 logE = np.array([np.average(logMeas[:, iexp] - logG,
463 weights=bgCounts[:, iexp]) for iexp in range(numExps)])
465 bgModel = np.exp(logE[np.newaxis, :] - logG[:, np.newaxis])
466 return pipeBase.Struct(
467 expScales=np.exp(logE),
468 detScales=np.exp(logG),
469 bgModel=bgModel,
470 )