Coverage for python / lsst / fgcmcal / fgcmOutputIlluminationCorrection.py: 22%
113 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-26 09:32 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-26 09:32 +0000
1# See COPYRIGHT file at the top of the source tree.
2#
3# This file is part of fgcmcal.
4#
5# Developed for the LSST Data Management System.
6# This product includes software developed by the LSST Project
7# (https://www.lsst.org).
8# See the COPYRIGHT file at the top-level directory of this distribution
9# for details of code ownership.
10#
11# This program is free software: you can redistribute it and/or modify
12# it under the terms of the GNU General Public License as published by
13# the Free Software Foundation, either version 3 of the License, or
14# (at your option) any later version.
15#
16# This program is distributed in the hope that it will be useful,
17# but WITHOUT ANY WARRANTY; without even the implied warranty of
18# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19# GNU General Public License for more details.
20#
21# You should have received a copy of the GNU General Public License
22# along with this program. If not, see <https://www.gnu.org/licenses/>.
23"""Make an illumination correction from the fgcmcal outputs.
24"""
25from datetime import datetime, UTC
26import numpy as np
27from astropy.time import Time, TimeDelta
28import dataclasses
30from lsst.afw.image import ExposureF, FilterLabel
31from lsst.daf.butler import Timespan
32from lsst.pipe.base import PipelineTaskConnections, PipelineTaskConfig, connectionTypes, PipelineTask, Struct
33import lsst.pex.config
35from .fgcmOutputProducts import FgcmOutputProductsTask
36from .utilities import computePixelAreaFieldDetector, computeReferencePixelScale
38__all__ = ["FgcmOutputIlluminationCorrectionConfig", "FgcmOutputIlluminationCorrectionTask"]
41class FgcmOutputIlluminationCorrectionConnections(
42 PipelineTaskConnections,
43 dimensions=("instrument", "detector"),
44 defaultTemplates={"cycleNumber": "0"},
45):
46 camera = connectionTypes.PrerequisiteInput(
47 doc="Camera instrument",
48 name="camera",
49 storageClass="Camera",
50 dimensions=("instrument",),
51 isCalibration=True,
52 )
53 fgcm_visit_catalog = connectionTypes.Input(
54 doc="Catalog of visit information for fgcm.",
55 name="fgcmVisitCatalog",
56 storageClass="Catalog",
57 dimensions=("instrument",),
58 )
59 fgcm_fit_parameters_catalog = connectionTypes.Input(
60 doc="Catalog of fgcm fit parameters.",
61 name="fgcm_Cycle{cycleNumber}_FitParameters",
62 storageClass="Catalog",
63 dimensions=("instrument",),
64 )
65 flat_metadata = connectionTypes.PrerequisiteInput(
66 doc="Flat field metadata associated with illumination correction epoch.",
67 name="flat.metadata",
68 storageClass="PropertyList",
69 dimensions=["instrument", "detector", "physical_filter"],
70 isCalibration=True,
71 multiple=True,
72 )
73 illumination_corrections = connectionTypes.Output(
74 doc="Illumination corrections from fgcm fit.",
75 name="illuminationCorrection",
76 storageClass="ExposureF",
77 dimensions=["instrument", "detector", "physical_filter"],
78 isCalibration=True,
79 multiple=True,
80 )
82 def __init__(self, *, config=None):
83 super().__init__(config=config)
85 if str(int(config.connections.cycleNumber)) != config.connections.cycleNumber:
86 raise ValueError("cycleNumber must be of integer format")
88 if not config.use_flat_metadata:
89 del self.flat_metadata
90 else:
91 def lookup_flat_metadata(dataset_type, registry, data_id, collections):
93 time = Time(config.epoch_time, format=config.epoch_format)
95 return [
96 registry.findDataset(
97 dataset_type.makeCompositeDatasetType(),
98 data_id,
99 physical_filter=physical_filter,
100 timespan=Timespan(
101 time - TimeDelta(1, format="sec"),
102 time + TimeDelta(1, format="sec"),
103 ),
104 collections=collections,
105 ).makeComponentRef("metadata")
106 for physical_filter in config.physical_filters
107 ]
109 self.flat_metadata = dataclasses.replace(self.flat_metadata, lookupFunction=lookup_flat_metadata)
112class FgcmOutputIlluminationCorrectionConfig(
113 PipelineTaskConfig,
114 pipelineConnections=FgcmOutputIlluminationCorrectionConnections,
115):
116 """Configuration for FgcmOutputIlluminationCorrectionTask."""
118 use_flat_metadata = lsst.pex.config.Field(
119 doc="Use flat-field metadata for illumination correction metadata?",
120 dtype=bool,
121 default=True,
122 )
123 epoch_time = lsst.pex.config.Field(
124 doc="Time string (UTC) that corresponds to a date in the desired epoch.",
125 dtype=str,
126 default=None,
127 optional=False,
128 )
129 epoch_format = lsst.pex.config.Field(
130 doc="Format for time string (e.g. iso, mjd, etc.), used by astropy.time.Time()",
131 dtype=str,
132 default="iso",
133 )
134 physical_filters = lsst.pex.config.ListField(
135 doc="List of physical filters to produce illumination corrections.",
136 dtype=str,
137 default=[],
138 )
139 include_wcs_jacobian = lsst.pex.config.Field(
140 doc="Include WCS jacobian in illumination correction?",
141 dtype=bool,
142 default=True,
143 )
144 approximate_wcs_jacobian = lsst.pex.config.Field(
145 doc="Use a Chebyshev approximation of the WCS jacobian in illumination correction?",
146 dtype=bool,
147 default=True,
148 )
150 def validate(self):
151 try:
152 _ = Time(self.epoch_time, format=self.epoch_format)
153 except Exception as e:
154 raise ValueError(
155 "Could not parse epoch_time/epoch_format ", e)
158class FgcmOutputIlluminationCorrectionTask(PipelineTask):
159 """Output illumination corrections from fgcm."""
160 ConfigClass = FgcmOutputIlluminationCorrectionConfig
161 _DefaultName = "fgcmOutputIlluminationCorrection"
163 def runQuantum(self, butlerQC, inputRefs, outputRefs):
165 inputs = butlerQC.get(inputRefs)
167 detector_id = butlerQC.quantum.dataId["detector"]
169 filter_label_dict = {ref.dataId["physical_filter"]:
170 FilterLabel(physical=ref.dataId["physical_filter"], band=ref.dataId["band"])
171 for ref in outputRefs.illumination_corrections}
173 flat_metadata_dict = {}
174 if self.config.use_flat_metadata:
175 for i, flat_metadata in enumerate(inputs["flat_metadata"]):
176 ref = inputRefs.flat_metadata[i]
177 flat_metadata_dict[ref.dataId["physical_filter"]] = (ref.id, flat_metadata)
179 retval = self.run(
180 camera=inputs["camera"],
181 detector_id=detector_id,
182 fgcm_fit_parameters_catalog=inputs["fgcm_fit_parameters_catalog"],
183 filter_label_dict=filter_label_dict,
184 flat_metadata_dict=flat_metadata_dict,
185 )
187 # And put the outputs.
188 illum_corr_ref_dict = {ref.dataId["physical_filter"]:
189 ref for ref in outputRefs.illumination_corrections}
190 for physical_filter in retval.illum_corr_dict:
191 if physical_filter in illum_corr_ref_dict:
192 self.log.debug(
193 "Serializing illumination correction for detector %d, physical_filter %s",
194 detector_id,
195 physical_filter,
196 )
197 butlerQC.put(retval.illum_corr_dict[physical_filter], illum_corr_ref_dict[physical_filter])
199 def run(
200 self,
201 *,
202 camera,
203 detector_id,
204 fgcm_fit_parameters_catalog,
205 filter_label_dict,
206 flat_metadata_dict={},
207 ):
208 """Run the illumination correction output task.
210 Parameters
211 ----------
212 camera : `lsst.afw.cameraGeom.camera`
213 The camera with camera geometry
214 detector_id : `int`
215 The id of the detector.
216 fgcm_fit_parameters_catalog : `lsst.afw.SimpleCatalog`
217 Catalog of fgcm fit parameters.
218 filter_label_dict : `dict` [`str`: `lsst.afw.image.FilterLabel`]
219 Dictionary of filter labels, keyed by physical_filter.
220 flat_metadata_dict : `dict` [`str`: (`uuid.UUID`, `lsst.pipe.base.PropertyList`]
221 Dictionary of UUIDs and flat metadata, keyed by physical_filter.
223 Returns
224 -------
225 struct : `lsst.pipe.base.Struct`
226 Output structure with keys:
228 ``illum_corr_dict``: dictionary keyed by physical_filter,
229 with illumination correction ExposureF.
230 """
231 epoch_time = Time(self.config.epoch_time, format=self.config.epoch_format)
232 epoch_mjd = epoch_time.mjd
234 detector_index = detector_id - camera[0].getId()
236 # This is the illumination correction array from fgcm.
237 fgcm_star_flat = np.zeros(fgcm_fit_parameters_catalog["superstarSize"][0, :], dtype="f8")
238 fgcm_star_flat[:, :, :, :] = fgcm_fit_parameters_catalog["superstar"][0, :].reshape(
239 fgcm_star_flat.shape,
240 )
242 # These are the filter names associated with the illumination
243 # corrections.
244 fgcm_filter_names = np.asarray(fgcm_fit_parameters_catalog[0]["lutFilterNames"].split(","))
246 epoch_mjd_start = fgcm_fit_parameters_catalog[0]["epochMjdStart"]
247 epoch_mjd_end = fgcm_fit_parameters_catalog[0]["epochMjdEnd"]
249 epoch_index, = np.where((epoch_mjd > epoch_mjd_start) & (epoch_mjd < epoch_mjd_end))
251 if len(epoch_index) == 0:
252 raise RuntimeError(f"Could not find epoch at {epoch_time} in fgcm epochs.")
254 detector = camera[detector_id]
255 xymax = np.array([detector.getBBox().getMaxX(), detector.getBBox().getMaxY()])
256 area_scaling = 1. / computeReferencePixelScale(camera)**2.
258 illum_corr_dict = {}
260 count = 0
261 for physical_filter in self.config.physical_filters:
262 if physical_filter not in filter_label_dict:
263 # There are no data associated, so we do not make an
264 # illumination correction.
265 continue
267 if physical_filter not in fgcm_filter_names:
268 self.log.warning(
269 "FgcmOutputIlluminationCorrectionTask configured to generate correction for "
270 f"physical filter {physical_filter} but that filter was not calibrated.",
271 )
272 continue
274 filter_index, = np.where(fgcm_filter_names == physical_filter)
276 star_flat_pars = fgcm_star_flat[epoch_index, filter_index, detector_index, :].ravel()
278 illum_corr = ExposureF(detector.getBBox())
279 illum_corr.image.array[:, :] = 1.0
280 illum_corr.setDetector(detector)
282 # Get the flat uuid and units if available.
283 if physical_filter in flat_metadata_dict:
284 flat_uuid = str(flat_metadata_dict[physical_filter][0])
285 flat_md = flat_metadata_dict[physical_filter][1]
286 units = flat_md["LSST ISR UNITS"]
287 else:
288 # This is used for testdata_jointcal testing only.
289 flat_uuid = "Unknown"
290 # Assume adu unless otherwise specified.
291 units = "adu"
293 illum_corr.info.setFilter(filter_label_dict[physical_filter])
294 illum_corr.metadata["LSST CALIB UUID FLAT"] = flat_uuid
295 illum_corr.metadata["LSST ISR UNITS"] = units
297 # Creation date. Calibration team standard is for local time to be
298 # available. Also form UTC (not TAI) version for easier comparisons
299 # across multiple processing sites.
300 now = datetime.now(tz=UTC)
301 illum_corr.metadata.set(
302 "CALIB_CREATION_DATETIME",
303 now.strftime("%Y-%m-%dT%T"),
304 comment="UTC of processing",
305 )
306 local_time = now.astimezone()
307 calib_date = local_time.strftime("%Y-%m-%d")
308 calib_time = local_time.strftime("%X %Z")
309 illum_corr.metadata.set(
310 "CALIB_CREATION_DATE",
311 calib_date,
312 comment="Local time day of creation",
313 )
314 illum_corr.metadata.set(
315 "CALIB_CREATION_TIME",
316 calib_time,
317 comment="Local time in day of creation",
318 )
320 # Make sure this is a legal illumination correction; fgcm
321 # uses 100.0 as a sentinel for unfit detectors.
322 if star_flat_pars[0] < 100.0:
323 star_flat_field = FgcmOutputProductsTask._getChebyshevBoundedField(
324 star_flat_pars,
325 xymax,
326 )
328 star_flat_field.divideImage(illum_corr.image)
329 # fgcm includes an additional clipping for strongly vignetted regions.
330 illum_corr.image.array[:, :] = np.clip(illum_corr.image.array[:, :], None, 10.0)
332 else:
333 self.log.warning(
334 f"Invalid star flat found for detector {physical_filter} {detector_id}; "
335 "setting to all 1.0s.",
336 )
338 if self.config.include_wcs_jacobian:
339 # This code is here (per-filter) because in the future
340 # we plan to allow for a different field per band.
341 pixel_area_field = computePixelAreaFieldDetector(
342 camera[detector_id],
343 areaScaling=area_scaling,
344 approximate=self.config.approximate_wcs_jacobian,
345 )
347 pixel_area_field.divideImage(illum_corr.image)
349 count += 1
350 illum_corr_dict[physical_filter] = illum_corr
352 self.log.info("Successfully created %d illumination corrections for detector %d", count, detector_id)
354 return Struct(illum_corr_dict=illum_corr_dict)