Coverage for python / lsst / fgcmcal / fgcmOutputIlluminationCorrection.py: 22%

113 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-17 09:32 +0000

1# See COPYRIGHT file at the top of the source tree. 

2# 

3# This file is part of fgcmcal. 

4# 

5# Developed for the LSST Data Management System. 

6# This product includes software developed by the LSST Project 

7# (https://www.lsst.org). 

8# See the COPYRIGHT file at the top-level directory of this distribution 

9# for details of code ownership. 

10# 

11# This program is free software: you can redistribute it and/or modify 

12# it under the terms of the GNU General Public License as published by 

13# the Free Software Foundation, either version 3 of the License, or 

14# (at your option) any later version. 

15# 

16# This program is distributed in the hope that it will be useful, 

17# but WITHOUT ANY WARRANTY; without even the implied warranty of 

18# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

19# GNU General Public License for more details. 

20# 

21# You should have received a copy of the GNU General Public License 

22# along with this program. If not, see <https://www.gnu.org/licenses/>. 

23"""Make an illumination correction from the fgcmcal outputs. 

24""" 

25from datetime import datetime, UTC 

26import numpy as np 

27from astropy.time import Time, TimeDelta 

28import dataclasses 

29 

30from lsst.afw.image import ExposureF, FilterLabel 

31from lsst.daf.butler import Timespan 

32from lsst.pipe.base import PipelineTaskConnections, PipelineTaskConfig, connectionTypes, PipelineTask, Struct 

33import lsst.pex.config 

34 

35from .fgcmOutputProducts import FgcmOutputProductsTask 

36from .utilities import computePixelAreaFieldDetector, computeReferencePixelScale 

37 

38__all__ = ["FgcmOutputIlluminationCorrectionConfig", "FgcmOutputIlluminationCorrectionTask"] 

39 

40 

41class FgcmOutputIlluminationCorrectionConnections( 

42 PipelineTaskConnections, 

43 dimensions=("instrument", "detector"), 

44 defaultTemplates={"cycleNumber": "0"}, 

45): 

46 camera = connectionTypes.PrerequisiteInput( 

47 doc="Camera instrument", 

48 name="camera", 

49 storageClass="Camera", 

50 dimensions=("instrument",), 

51 isCalibration=True, 

52 ) 

53 fgcm_visit_catalog = connectionTypes.Input( 

54 doc="Catalog of visit information for fgcm.", 

55 name="fgcmVisitCatalog", 

56 storageClass="Catalog", 

57 dimensions=("instrument",), 

58 ) 

59 fgcm_fit_parameters_catalog = connectionTypes.Input( 

60 doc="Catalog of fgcm fit parameters.", 

61 name="fgcm_Cycle{cycleNumber}_FitParameters", 

62 storageClass="Catalog", 

63 dimensions=("instrument",), 

64 ) 

65 flat_metadata = connectionTypes.PrerequisiteInput( 

66 doc="Flat field metadata associated with illumination correction epoch.", 

67 name="flat.metadata", 

68 storageClass="PropertyList", 

69 dimensions=["instrument", "detector", "physical_filter"], 

70 isCalibration=True, 

71 multiple=True, 

72 ) 

73 illumination_corrections = connectionTypes.Output( 

74 doc="Illumination corrections from fgcm fit.", 

75 name="illuminationCorrection", 

76 storageClass="ExposureF", 

77 dimensions=["instrument", "detector", "physical_filter"], 

78 isCalibration=True, 

79 multiple=True, 

80 ) 

81 

82 def __init__(self, *, config=None): 

83 super().__init__(config=config) 

84 

85 if str(int(config.connections.cycleNumber)) != config.connections.cycleNumber: 

86 raise ValueError("cycleNumber must be of integer format") 

87 

88 if not config.use_flat_metadata: 

89 del self.flat_metadata 

90 else: 

91 def lookup_flat_metadata(dataset_type, registry, data_id, collections): 

92 

93 time = Time(config.epoch_time, format=config.epoch_format) 

94 

95 return [ 

96 registry.findDataset( 

97 dataset_type.makeCompositeDatasetType(), 

98 data_id, 

99 physical_filter=physical_filter, 

100 timespan=Timespan( 

101 time - TimeDelta(1, format="sec"), 

102 time + TimeDelta(1, format="sec"), 

103 ), 

104 collections=collections, 

105 ).makeComponentRef("metadata") 

106 for physical_filter in config.physical_filters 

107 ] 

108 

109 self.flat_metadata = dataclasses.replace(self.flat_metadata, lookupFunction=lookup_flat_metadata) 

110 

111 

112class FgcmOutputIlluminationCorrectionConfig( 

113 PipelineTaskConfig, 

114 pipelineConnections=FgcmOutputIlluminationCorrectionConnections, 

115): 

116 """Configuration for FgcmOutputIlluminationCorrectionTask.""" 

117 

118 use_flat_metadata = lsst.pex.config.Field( 

119 doc="Use flat-field metadata for illumination correction metadata?", 

120 dtype=bool, 

121 default=True, 

122 ) 

123 epoch_time = lsst.pex.config.Field( 

124 doc="Time string (UTC) that corresponds to a date in the desired epoch.", 

125 dtype=str, 

126 default=None, 

127 optional=False, 

128 ) 

129 epoch_format = lsst.pex.config.Field( 

130 doc="Format for time string (e.g. iso, mjd, etc.), used by astropy.time.Time()", 

131 dtype=str, 

132 default="iso", 

133 ) 

134 physical_filters = lsst.pex.config.ListField( 

135 doc="List of physical filters to produce illumination corrections.", 

136 dtype=str, 

137 default=[], 

138 ) 

139 include_wcs_jacobian = lsst.pex.config.Field( 

140 doc="Include WCS jacobian in illumination correction?", 

141 dtype=bool, 

142 default=True, 

143 ) 

144 approximate_wcs_jacobian = lsst.pex.config.Field( 

145 doc="Use a Chebyshev approximation of the WCS jacobian in illumination correction?", 

146 dtype=bool, 

147 default=True, 

148 ) 

149 

150 def validate(self): 

151 try: 

152 _ = Time(self.epoch_time, format=self.epoch_format) 

153 except Exception as e: 

154 raise ValueError( 

155 "Could not parse epoch_time/epoch_format ", e) 

156 

157 

158class FgcmOutputIlluminationCorrectionTask(PipelineTask): 

159 """Output illumination corrections from fgcm.""" 

160 ConfigClass = FgcmOutputIlluminationCorrectionConfig 

161 _DefaultName = "fgcmOutputIlluminationCorrection" 

162 

163 def runQuantum(self, butlerQC, inputRefs, outputRefs): 

164 

165 inputs = butlerQC.get(inputRefs) 

166 

167 detector_id = butlerQC.quantum.dataId["detector"] 

168 

169 filter_label_dict = {ref.dataId["physical_filter"]: 

170 FilterLabel(physical=ref.dataId["physical_filter"], band=ref.dataId["band"]) 

171 for ref in outputRefs.illumination_corrections} 

172 

173 flat_metadata_dict = {} 

174 if self.config.use_flat_metadata: 

175 for i, flat_metadata in enumerate(inputs["flat_metadata"]): 

176 ref = inputRefs.flat_metadata[i] 

177 flat_metadata_dict[ref.dataId["physical_filter"]] = (ref.id, flat_metadata) 

178 

179 retval = self.run( 

180 camera=inputs["camera"], 

181 detector_id=detector_id, 

182 fgcm_fit_parameters_catalog=inputs["fgcm_fit_parameters_catalog"], 

183 filter_label_dict=filter_label_dict, 

184 flat_metadata_dict=flat_metadata_dict, 

185 ) 

186 

187 # And put the outputs. 

188 illum_corr_ref_dict = {ref.dataId["physical_filter"]: 

189 ref for ref in outputRefs.illumination_corrections} 

190 for physical_filter in retval.illum_corr_dict: 

191 if physical_filter in illum_corr_ref_dict: 

192 self.log.debug( 

193 "Serializing illumination correction for detector %d, physical_filter %s", 

194 detector_id, 

195 physical_filter, 

196 ) 

197 butlerQC.put(retval.illum_corr_dict[physical_filter], illum_corr_ref_dict[physical_filter]) 

198 

199 def run( 

200 self, 

201 *, 

202 camera, 

203 detector_id, 

204 fgcm_fit_parameters_catalog, 

205 filter_label_dict, 

206 flat_metadata_dict={}, 

207 ): 

208 """Run the illumination correction output task. 

209 

210 Parameters 

211 ---------- 

212 camera : `lsst.afw.cameraGeom.camera` 

213 The camera with camera geometry 

214 detector_id : `int` 

215 The id of the detector. 

216 fgcm_fit_parameters_catalog : `lsst.afw.SimpleCatalog` 

217 Catalog of fgcm fit parameters. 

218 filter_label_dict : `dict` [`str`: `lsst.afw.image.FilterLabel`] 

219 Dictionary of filter labels, keyed by physical_filter. 

220 flat_metadata_dict : `dict` [`str`: (`uuid.UUID`, `lsst.pipe.base.PropertyList`] 

221 Dictionary of UUIDs and flat metadata, keyed by physical_filter. 

222 

223 Returns 

224 ------- 

225 struct : `lsst.pipe.base.Struct` 

226 Output structure with keys: 

227 

228 ``illum_corr_dict``: dictionary keyed by physical_filter, 

229 with illumination correction ExposureF. 

230 """ 

231 epoch_time = Time(self.config.epoch_time, format=self.config.epoch_format) 

232 epoch_mjd = epoch_time.mjd 

233 

234 detector_index = detector_id - camera[0].getId() 

235 

236 # This is the illumination correction array from fgcm. 

237 fgcm_star_flat = np.zeros(fgcm_fit_parameters_catalog["superstarSize"][0, :], dtype="f8") 

238 fgcm_star_flat[:, :, :, :] = fgcm_fit_parameters_catalog["superstar"][0, :].reshape( 

239 fgcm_star_flat.shape, 

240 ) 

241 

242 # These are the filter names associated with the illumination 

243 # corrections. 

244 fgcm_filter_names = np.asarray(fgcm_fit_parameters_catalog[0]["lutFilterNames"].split(",")) 

245 

246 epoch_mjd_start = fgcm_fit_parameters_catalog[0]["epochMjdStart"] 

247 epoch_mjd_end = fgcm_fit_parameters_catalog[0]["epochMjdEnd"] 

248 

249 epoch_index, = np.where((epoch_mjd > epoch_mjd_start) & (epoch_mjd < epoch_mjd_end)) 

250 

251 if len(epoch_index) == 0: 

252 raise RuntimeError(f"Could not find epoch at {epoch_time} in fgcm epochs.") 

253 

254 detector = camera[detector_id] 

255 xymax = np.array([detector.getBBox().getMaxX(), detector.getBBox().getMaxY()]) 

256 area_scaling = 1. / computeReferencePixelScale(camera)**2. 

257 

258 illum_corr_dict = {} 

259 

260 count = 0 

261 for physical_filter in self.config.physical_filters: 

262 if physical_filter not in filter_label_dict: 

263 # There are no data associated, so we do not make an 

264 # illumination correction. 

265 continue 

266 

267 if physical_filter not in fgcm_filter_names: 

268 self.log.warning( 

269 "FgcmOutputIlluminationCorrectionTask configured to generate correction for " 

270 f"physical filter {physical_filter} but that filter was not calibrated.", 

271 ) 

272 continue 

273 

274 filter_index, = np.where(fgcm_filter_names == physical_filter) 

275 

276 star_flat_pars = fgcm_star_flat[epoch_index, filter_index, detector_index, :].ravel() 

277 

278 illum_corr = ExposureF(detector.getBBox()) 

279 illum_corr.image.array[:, :] = 1.0 

280 illum_corr.setDetector(detector) 

281 

282 # Get the flat uuid and units if available. 

283 if physical_filter in flat_metadata_dict: 

284 flat_uuid = str(flat_metadata_dict[physical_filter][0]) 

285 flat_md = flat_metadata_dict[physical_filter][1] 

286 units = flat_md["LSST ISR UNITS"] 

287 else: 

288 # This is used for testdata_jointcal testing only. 

289 flat_uuid = "Unknown" 

290 # Assume adu unless otherwise specified. 

291 units = "adu" 

292 

293 illum_corr.info.setFilter(filter_label_dict[physical_filter]) 

294 illum_corr.metadata["LSST CALIB UUID FLAT"] = flat_uuid 

295 illum_corr.metadata["LSST ISR UNITS"] = units 

296 

297 # Creation date. Calibration team standard is for local time to be 

298 # available. Also form UTC (not TAI) version for easier comparisons 

299 # across multiple processing sites. 

300 now = datetime.now(tz=UTC) 

301 illum_corr.metadata.set( 

302 "CALIB_CREATION_DATETIME", 

303 now.strftime("%Y-%m-%dT%T"), 

304 comment="UTC of processing", 

305 ) 

306 local_time = now.astimezone() 

307 calib_date = local_time.strftime("%Y-%m-%d") 

308 calib_time = local_time.strftime("%X %Z") 

309 illum_corr.metadata.set( 

310 "CALIB_CREATION_DATE", 

311 calib_date, 

312 comment="Local time day of creation", 

313 ) 

314 illum_corr.metadata.set( 

315 "CALIB_CREATION_TIME", 

316 calib_time, 

317 comment="Local time in day of creation", 

318 ) 

319 

320 # Make sure this is a legal illumination correction; fgcm 

321 # uses 100.0 as a sentinel for unfit detectors. 

322 if star_flat_pars[0] < 100.0: 

323 star_flat_field = FgcmOutputProductsTask._getChebyshevBoundedField( 

324 star_flat_pars, 

325 xymax, 

326 ) 

327 

328 star_flat_field.divideImage(illum_corr.image) 

329 # fgcm includes an additional clipping for strongly vignetted regions. 

330 illum_corr.image.array[:, :] = np.clip(illum_corr.image.array[:, :], None, 10.0) 

331 

332 else: 

333 self.log.warning( 

334 f"Invalid star flat found for detector {physical_filter} {detector_id}; " 

335 "setting to all 1.0s.", 

336 ) 

337 

338 if self.config.include_wcs_jacobian: 

339 # This code is here (per-filter) because in the future 

340 # we plan to allow for a different field per band. 

341 pixel_area_field = computePixelAreaFieldDetector( 

342 camera[detector_id], 

343 areaScaling=area_scaling, 

344 approximate=self.config.approximate_wcs_jacobian, 

345 ) 

346 

347 pixel_area_field.divideImage(illum_corr.image) 

348 

349 count += 1 

350 illum_corr_dict[physical_filter] = illum_corr 

351 

352 self.log.info("Successfully created %d illumination corrections for detector %d", count, detector_id) 

353 

354 return Struct(illum_corr_dict=illum_corr_dict)