Coverage for python / lsst / cp / pipe / cpFlatGradients.py: 25%
194 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-05 18:52 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-05 18:52 +0000
1# This file is part of cp_pipe.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
21import numpy as np
23import lsst.afw.cameraGeom
24import lsst.pipe.base as pipeBase
25from lsst.ip.isr import FlatGradient
26import lsst.pex.config as pexConfig
27from lsst.utils.plotting import make_figure
29from .utils import FlatGradientFitter, bin_focal_plane
31__all__ = [
32 "CpFlatFitGradientsTask",
33 "CpFlatFitGradientsConfig",
34 "CpFlatApplyGradientsTask",
35 "CpFlatApplyGradientsConfig",
36]
39class CpFlatFitGradientsConnections(
40 pipeBase.PipelineTaskConnections,
41 dimensions=("instrument", "physical_filter"),
42):
43 camera = pipeBase.connectionTypes.PrerequisiteInput(
44 name="camera",
45 doc="Camera Geometry definition.",
46 storageClass="Camera",
47 dimensions=("instrument", ),
48 isCalibration=True,
49 )
50 input_flats = pipeBase.connectionTypes.Input(
51 name="flat_uncorrected",
52 doc="Input flats to fit gradients.",
53 storageClass="ExposureF",
54 dimensions=("instrument", "detector", "physical_filter"),
55 multiple=True,
56 deferLoad=True,
57 isCalibration=True,
58 )
59 input_defects = pipeBase.connectionTypes.PrerequisiteInput(
60 name="defects",
61 doc="Input defect tables.",
62 storageClass="Defects",
63 dimensions=("instrument", "detector"),
64 isCalibration=True,
65 multiple=True,
66 deferLoad=True,
67 )
68 output_reference_gradient = pipeBase.connectionTypes.Output(
69 name="flat_gradient_reference",
70 doc="Reference flat gradient calibration.",
71 storageClass="IsrCalib",
72 dimensions=("instrument", "physical_filter"),
73 isCalibration=True,
74 )
75 output_gradient = pipeBase.connectionTypes.Output(
76 name="flat_gradient",
77 doc="Flat gradient fit.",
78 storageClass="IsrCalib",
79 dimensions=("instrument", "physical_filter"),
80 )
81 model_residual_plot = pipeBase.connectionTypes.Output(
82 name="gradient_model_residual_plot",
83 doc="Residual plot from flat gradient model.",
84 storageClass="Plot",
85 dimensions=("instrument", "physical_filter"),
86 )
87 radial_model_plot = pipeBase.connectionTypes.Output(
88 name="gradient_radial_model_plot",
89 doc="Radial model plot for flat-field gradient.",
90 storageClass="Plot",
91 dimensions=("instrument", "physical_filter"),
92 )
94 def __init__(self, *, config=None):
95 super().__init__(config=config)
97 if config.do_reference_gradient:
98 del self.output_gradient
99 else:
100 del self.output_reference_gradient
103class CpFlatFitGradientsConfig(
104 pipeBase.PipelineTaskConfig,
105 pipelineConnections=CpFlatFitGradientsConnections,
106):
107 do_reference_gradient = pexConfig.Field(
108 dtype=bool,
109 doc="Use task to produce a reference gradient (from sky)? This is used to "
110 "control the dataset type of the output.",
111 default=False,
112 )
113 bin_factor = pexConfig.Field(
114 dtype=int,
115 doc="Binning factor for flats going into the focal plane.",
116 default=128,
117 )
118 do_constrain_zero = pexConfig.Field(
119 dtype=bool,
120 doc="Constrain the outermost radial spline value to zero?",
121 default=False,
122 )
123 do_fit_centroid = pexConfig.Field(
124 dtype=bool,
125 doc="Fit a centroid offset from the focal plane centroid?",
126 default=False,
127 )
128 do_fit_gradient = pexConfig.Field(
129 dtype=bool,
130 doc="Fit a linear gradient over the focal plane?",
131 default=False,
132 )
133 do_normalize_center = pexConfig.Field(
134 dtype=bool,
135 doc="Normalize center of focal plane to 1.0?",
136 default=True,
137 )
138 normalize_center_radius = pexConfig.Field(
139 dtype=float,
140 doc="Center normalization will be done using the average within this radius (mm).",
141 default=25.0,
142 )
143 fp_centroid_x = pexConfig.Field(
144 dtype=float,
145 doc="Focal plane centroid x (mm).",
146 default=0.0,
147 )
148 fp_centroid_y = pexConfig.Field(
149 dtype=float,
150 doc="Focal plane centroid y (mm).",
151 default=0.0,
152 )
153 radial_spline_nodes_initial = pexConfig.ListField(
154 dtype=float,
155 doc="Spline nodes for use in initial fit (mm). Initial fit will only be performed "
156 "within the maximum radius for this list. The maximum radius should be chosen "
157 "to make ITL/E2V ratio fitting more stable.",
158 default=[0.0, 100.0, 200.0, 250.0, 275.0, 317.0],
159 )
160 radial_spline_nodes = pexConfig.ListField(
161 dtype=float,
162 doc="Spline nodes to use for full radial fit (mm).",
163 default=[
164 0.0,
165 100.0,
166 200.0,
167 250.0,
168 275.0,
169 290.0,
170 300.0,
171 310.0,
172 315.0,
173 317.0,
174 320.0,
175 325.0,
176 333.0,
177 340.0,
178 350.0,
179 ],
180 )
181 min_flat_value = pexConfig.Field(
182 dtype=float,
183 doc="Minimum (relative) flat value to use in fit.",
184 default=0.05,
185 )
186 max_flat_value = pexConfig.Field(
187 dtype=float,
188 doc="Maximum (relative) flat value to use in fit.",
189 default=1.5,
190 )
191 detector_boundary = pexConfig.Field(
192 dtype=int,
193 doc="Do not use pixels within detector_boundary of the edge for fitting.",
194 default=10,
195 )
196 fit_eps = pexConfig.Field(
197 dtype=float,
198 doc="Minimizer epsilon parameter.",
199 default=1e-8,
200 )
201 fit_gtol = pexConfig.Field(
202 dtype=float,
203 doc="Minimizer gtol parameter.",
204 default=1e-10,
205 )
206 do_use_non_science_detectors = pexConfig.Field(
207 dtype=bool,
208 doc="Use non-science detectors in addition to science detectors?",
209 default=False,
210 )
211 fixed_model_residual_plot_scale = pexConfig.Field(
212 dtype=float,
213 doc="Fixed scale for making residual plots.",
214 optional=True,
215 default=None,
216 )
219class CpFlatFitGradientsTask(pipeBase.PipelineTask):
220 """Task to measure gradients on sky/dome flats.
221 """
223 ConfigClass = CpFlatFitGradientsConfig
224 _DefaultName = "cpFlatFitGradients"
226 def runQuantum(self, butlerQC, inputRefs, outputRefs):
227 inputs = butlerQC.get(inputRefs)
229 input_flats = inputs["input_flats"]
230 input_defects = inputs["input_defects"]
232 input_flat_handle_dict = {
233 handle.dataId["detector"]: handle for handle in input_flats
234 }
235 input_defect_handle_dict = {
236 handle.dataId["detector"]: handle for handle in input_defects
237 }
239 struct = self.run(
240 camera=inputs["camera"],
241 input_flat_handle_dict=input_flat_handle_dict,
242 input_defect_handle_dict=input_defect_handle_dict,
243 )
245 butlerQC.put(struct, outputRefs)
247 def run(self, *, camera, input_flat_handle_dict, input_defect_handle_dict, binned_image=None):
248 """Run the CpFlatFitGradientsTask.
250 This task will fit full focal-plane gradients. See
251 `lsst.cp.pipe.utils.FlatGradientFitter` for details.
253 The return struct will contain ``output_gradient`` if
254 do_reference_gradient is False, or ``output_reference_gradient``
255 otherwise.
257 Parameters
258 ----------
259 camera : `lsst.afw.cameraGeom.Camera`
260 Camera object.
261 input_flat_handle_dict : `dict` [`int`,
262 `lsst.daf.butler.DeferredDatasetHandle`]
263 Dictionary of input flat handles, keyed by detector.
264 input_defect_handle_dict : `dict` [`int`,
265 `lsst.daf.butler.DeferredDatasetHandle`]
266 Dictionary of input defect handles, keyed by detector.
267 binned_image : `np.ndarray`, optional
268 Binned image from a previous run of the task. This will take
269 precedence over the input handles, for easy re-use. This may
270 be used for debugging, or if CpFlatFitGradientsTask is used
271 as a sub-task.
273 Returns
274 -------
275 struct : `lsst.pipe.base.Struct`
276 Output structure with:
277 ``output_gradient``: `lsst.ip.isr.FlatGradient` or
278 ``output_reference_gradient``: `lsst.ip.isr.FlatGradient`
279 ``model_residual_plot``: `matplotlib.Figure`
280 ``radial_model_plot``: `matplotlib.Figure`
281 ``binned_image``: `np.ndarray`
282 """
283 # Load in and bin the data.
284 if binned_image is None:
285 self.log.info("Loading and binning %d flats.", len(input_flat_handle_dict))
286 binned = bin_focal_plane(
287 input_flat_handle_dict,
288 self.config.detector_boundary,
289 self.config.bin_factor,
290 defect_handle_dict=input_defect_handle_dict,
291 include_itl_flag=True,
292 )
293 else:
294 self.log.info("Using provided binned flat image.")
295 binned = binned_image
297 binned_unaltered = binned.copy()
299 # Renormalize and filter out bad data.
300 fp_radius = np.sqrt(
301 (binned["xf"] - self.config.fp_centroid_x)**2.
302 + (binned["yf"] - self.config.fp_centroid_y)**2.
303 )
304 use = (fp_radius < self.config.normalize_center_radius)
305 central_value = np.median(binned["value"][use])
307 if self.config.do_normalize_center:
308 normalization = central_value
309 binned["value"] /= normalization
310 value_min = self.config.min_flat_value
311 value_max = self.config.max_flat_value
312 else:
313 normalization = 1.0
314 value_min = self.config.min_flat_value * central_value
315 value_max = self.config.max_flat_value * central_value
317 good = np.ones(len(binned), dtype=np.bool_)
318 if not self.config.do_use_non_science_detectors:
319 # Remove science detectors.
320 for detector in camera:
321 if detector.getType() != lsst.afw.cameraGeom.DetectorType.SCIENCE:
322 good[binned["detector"] == detector.getId()] = False
324 good &= (
325 np.isfinite(binned["value"])
326 & (binned["value"] >= value_min)
327 & (binned["value"] <= value_max)
328 & (fp_radius <= self.config.radial_spline_nodes[-1])
329 )
330 binned = binned[good]
331 fp_radius = fp_radius[good]
333 # Do the fit.
334 self.log.info("Fitting gradient to binned flat data.")
336 # First pass, within a smaller radius.
337 # This first pass is to get a robust measurement of the
338 # ITL/E2V ratio which can be then used in the second pass
339 # (below).
340 nodes_initial = np.asarray(self.config.radial_spline_nodes_initial)
341 initial_fit_radius = np.max(nodes_initial)
342 use_initial = (fp_radius < initial_fit_radius)
343 self.log.info(
344 "Initial fit will be performed with an outer radius cut of %.2f mm.",
345 initial_fit_radius,
346 )
348 fitter_initial = FlatGradientFitter(
349 nodes_initial,
350 binned["xf"][use_initial],
351 binned["yf"][use_initial],
352 binned["value"][use_initial],
353 np.where(binned["itl"][use_initial])[0],
354 constrain_zero=self.config.do_constrain_zero,
355 fit_centroid=self.config.do_fit_centroid,
356 fit_gradient=self.config.do_fit_gradient,
357 fp_centroid_x=self.config.fp_centroid_x,
358 fp_centroid_y=self.config.fp_centroid_y,
359 )
360 p0_initial = fitter_initial.compute_p0()
361 pars_initial = fitter_initial.fit(
362 p0_initial,
363 fit_eps=self.config.fit_eps,
364 fit_gtol=self.config.fit_gtol,
365 )
366 if "itl_ratio" in fitter_initial.indices:
367 itl_ratio_initial = pars_initial[fitter_initial.indices["itl_ratio"]]
368 else:
369 itl_ratio_initial = 1.0
371 # Second pass, full radial range.
372 # This second pass uses the ITL/E2V ratio from the first
373 # pass (above) to avoid degeneracies with the strongly varying
374 # radial function in the outer radii.
376 nodes = np.asarray(self.config.radial_spline_nodes)
377 self.log.info("Final fit will be performed with an outer radius cut of %.2f mm.", np.max(nodes))
379 fitter = FlatGradientFitter(
380 nodes,
381 binned["xf"],
382 binned["yf"],
383 binned["value"],
384 np.where(binned["itl"])[0],
385 constrain_zero=self.config.do_constrain_zero,
386 fit_centroid=self.config.do_fit_centroid,
387 fit_gradient=self.config.do_fit_gradient,
388 fp_centroid_x=self.config.fp_centroid_x,
389 fp_centroid_y=self.config.fp_centroid_y,
390 )
391 p0 = fitter.compute_p0(itl_ratio=itl_ratio_initial)
392 pars = fitter.fit(
393 p0,
394 freeze_itl_ratio=True,
395 fit_eps=self.config.fit_eps,
396 fit_gtol=self.config.fit_gtol,
397 )
399 # Create the output FlatGradient calibration.
400 gradient = FlatGradient()
402 if "itl_ratio" in fitter.indices:
403 itl_ratio = pars[fitter.indices["itl_ratio"]]
404 else:
405 itl_ratio = 1.0
407 if self.config.do_fit_centroid:
408 centroid_delta_x, centroid_delta_y = pars[fitter.indices["centroid_delta"]]
409 else:
410 centroid_delta_x, centroid_delta_y = 0.0, 0.0
412 if self.config.do_fit_gradient:
413 gradient_x, gradient_y = pars[fitter.indices["gradient"]]
414 else:
415 gradient_x, gradient_y = 0.0, 0.0
417 gradient.setParameters(
418 radialSplineNodes=nodes,
419 radialSplineValues=pars[fitter.indices["spline"]],
420 itlRatio=itl_ratio,
421 centroidX=self.config.fp_centroid_x,
422 centroidY=self.config.fp_centroid_y,
423 centroidDeltaX=centroid_delta_x,
424 centroidDeltaY=centroid_delta_y,
425 gradientX=gradient_x,
426 gradientY=gradient_y,
427 normalizationFactor=normalization,
428 )
430 flat = input_flat_handle_dict[list(input_flat_handle_dict.keys())[0]].get()
432 self.log.info("Making QA plots.")
433 plot_dict = self._make_qa_plots(binned, gradient, flat.getFilter())
435 # Set the calib metadata.
436 filter_label = flat.getFilter()
438 gradient.updateMetadata(camera=camera, filterName=filter_label.physicalLabel)
439 gradient.updateMetadata(setDate=True, setCalibId=True)
441 struct = pipeBase.Struct(
442 model_residual_plot=plot_dict["model_residuals"],
443 radial_model_plot=plot_dict["radial"],
444 binned_image=binned_unaltered,
445 )
446 if self.config.do_reference_gradient:
447 struct.output_reference_gradient = gradient
448 else:
449 struct.output_gradient = gradient
451 return struct
453 def _make_qa_plots(self, binned, gradient, filter_label):
454 """Make QA plots for the binned data.
456 Parameters
457 ----------
458 binned : `np.ndarray`
459 Array with binned flat data.
460 gradient : `lsst.ip.isr.FlatGradient`
461 Flat gradient parameters.
462 filter_label : `lsst.afw.image.FilterLabel`
463 Filter label for labeling the plot.
465 Returns
466 -------
467 plot_dict : `dict` [`str`, `matplotlib.Figure`]
468 Dictionary of plot figures, keyed by name.
469 """
470 vmin, vmax = np.nanpercentile(binned["value"], [5, 95])
471 model = gradient.computeFullModel(binned["xf"], binned["yf"], binned["itl"])
473 # Fig1 is a 3 panel plot of data, model, and data/model.
474 fig1 = make_figure(figsize=(16, 6))
476 ax1 = fig1.add_subplot(131)
478 im1 = ax1.hexbin(binned["xf"], binned["yf"], C=binned["value"], vmin=vmin, vmax=vmax)
480 ax1.set_xlabel("Focal Plane x (mm)")
481 ax1.set_ylabel("Focal Plane y (mm)")
482 ax1.set_aspect("equal")
483 ax1.set_title("Data")
485 fig1.colorbar(im1, ax=ax1)
487 ax2 = fig1.add_subplot(132)
489 im2 = ax2.hexbin(binned["xf"], binned["yf"], C=model, vmin=vmin, vmax=vmax)
491 ax2.set_xlabel("Focal Plane x (mm)")
492 ax2.set_ylabel("Focal Plane y (mm)")
493 ax2.set_aspect("equal")
494 ax2.set_title("Model")
496 fig1.colorbar(im2, ax=ax2)
498 ax3 = fig1.add_subplot(133)
500 ratio = binned["value"] / model
501 if self.config.fixed_model_residual_plot_scale is not None:
502 vmin = 1.0 - self.config.fixed_model_residual_plot_scale
503 vmax = 1.0 + self.config.fixed_model_residual_plot_scale
504 else:
505 vmin, vmax = np.nanpercentile(ratio, [1, 99])
507 im3 = ax3.hexbin(binned["xf"], binned["yf"], C=ratio, vmin=vmin, vmax=vmax)
509 ax3.set_xlabel("Focal Plane x (mm)")
510 ax3.set_ylabel("Focal Plane y (mm)")
511 ax3.set_aspect("equal")
512 ax3.set_title("Data / Model")
514 fig1.colorbar(im3, ax=ax3)
516 fig1.suptitle(f"{filter_label.physicalLabel}: {self.config.connections.input_flats}")
518 # Fig2 is a plot of the adjusted radial plot.
519 fig2 = make_figure(figsize=(8, 6))
520 ax = fig2.add_subplot(111)
522 centroid_x = gradient.centroidX + gradient.centroidDeltaX
523 centroid_y = gradient.centroidY + gradient.centroidDeltaY
525 radius = np.sqrt((binned["xf"] - centroid_x)**2. + (binned["yf"] - centroid_y)**2.)
526 value_adjusted = binned["value"].copy()
528 value_adjusted *= gradient.computeGradientModel(binned["xf"], binned["yf"])
529 value_adjusted[binned["itl"]] /= gradient.itlRatio
531 ax.hexbin(radius, value_adjusted, bins="log")
532 xvals = np.linspace(gradient.radialSplineNodes[0], gradient.radialSplineNodes[-1], 1000)
533 yvals = gradient.computeRadialSplineModel(xvals)
534 ax.plot(xvals, yvals, "r-")
535 ax.set_xlabel("Radius (mm)")
536 ax.set_ylabel("Value (adjusted)")
537 ax.set_title(f"{filter_label.physicalLabel}: {self.config.connections.input_flats}")
539 plot_dict = {
540 "model_residuals": fig1,
541 "radial": fig2,
542 }
544 return plot_dict
547class CpFlatApplyGradientsConnections(
548 pipeBase.PipelineTaskConnections,
549 dimensions=("instrument", "physical_filter", "detector"),
550):
551 camera = pipeBase.connectionTypes.PrerequisiteInput(
552 name="camera",
553 doc="Camera Geometry definition.",
554 storageClass="Camera",
555 dimensions=("instrument", ),
556 isCalibration=True,
557 )
558 input_flat = pipeBase.connectionTypes.Input(
559 name="flat_uncorrected",
560 doc="Input flat to apply gradient correction.",
561 storageClass="ExposureF",
562 dimensions=("instrument", "detector", "physical_filter"),
563 isCalibration=True,
564 )
565 reference_gradient = pipeBase.connectionTypes.PrerequisiteInput(
566 name="flat_gradient_reference",
567 doc="Reference flat gradient.",
568 storageClass="IsrCalib",
569 dimensions=("instrument", "physical_filter"),
570 isCalibration=True,
571 )
572 gradient = pipeBase.connectionTypes.Input(
573 name="flat_gradient",
574 doc="Flat gradient fit to full focal plane.",
575 storageClass="IsrCalib",
576 dimensions=("instrument", "physical_filter"),
577 )
578 output_flat = pipeBase.connectionTypes.Output(
579 name="flat",
580 doc="Output gradient-corrected flat.",
581 storageClass="ExposureF",
582 dimensions=("instrument", "detector", "physical_filter"),
583 isCalibration=True,
584 )
587class CpFlatApplyGradientsConfig(
588 pipeBase.PipelineTaskConfig,
589 pipelineConnections=CpFlatApplyGradientsConnections,
590):
591 pass
594class CpFlatApplyGradientsTask(pipeBase.PipelineTask):
595 """Task to apply/remove gradients for dome flats.
596 """
598 ConfigClass = CpFlatApplyGradientsConfig
599 _DefaultName = "cpFlatApplyGradients"
601 def run(self, *, camera, input_flat, reference_gradient, gradient):
602 """Run the CpFlatApplyGradientsTask.
604 This will apply (remove) any gradients from a flat.
606 Parameters
607 ----------
608 camera : `lsst.afw.cameraGeom.Camera`
609 Camera object.
610 input_flat : `lsst.afw.Exposure`
611 Input flat to apply/remove gradients.
612 reference_gradient : `lsst.ip.isr.FlatGradient`
613 Reference gradient with target radial function.
614 gradient : `lsst.ip.isr.FlatGradient`
615 Gradient fit to the full focal plane.
617 Returns
618 -------
619 struct : `lsst.pipe.base.Struct`
620 Output structure with:
621 ``output_flat``: `lsst.afw.image.Exposure`
622 """
623 output_flat = input_flat.clone()
625 detector = output_flat.getDetector()
627 # Convert pixels to focal plane coordinates.
628 xx = np.arange(output_flat.image.array.shape[1], dtype=np.int64)
629 yy = np.arange(output_flat.image.array.shape[0], dtype=np.int64)
630 x, y = np.meshgrid(xx, yy)
631 x = x.ravel()
632 y = y.ravel()
634 transform = detector.getTransform(lsst.afw.cameraGeom.PIXELS, lsst.afw.cameraGeom.FOCAL_PLANE)
635 xy = np.vstack((x, y))
636 xf, yf = np.vsplit(transform.getMapping().applyForward(xy.astype(np.float64)), 2)
637 xf = xf.ravel()
638 yf = yf.ravel()
640 # First we want to divide out the planar gradients.
641 gradient_values = gradient.computeGradientModel(xf, yf)
642 output_flat.image.array[y, x] *= gradient_values
644 # Next we need the relative radial scaling.
645 radial_values = (
646 gradient.computeRadialSplineModelXY(xf, yf)
647 / reference_gradient.computeRadialSplineModelXY(xf, yf)
648 )
649 output_flat.image.array[y, x] /= radial_values
651 # And apply the normalization
652 output_flat.image.array[:, :] /= gradient.normalizationFactor
654 struct = pipeBase.Struct(output_flat=output_flat)
656 return struct