Coverage for python / lsst / cp / pipe / cpFilterScan.py: 23%

133 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-15 00:22 +0000

1# This file is part of cp_pipe. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <http://www.gnu.org/licenses/>. 

21import numpy as np 

22import copy 

23 

24from astropy.table import Table 

25 

26import lsst.pex.config as pexConfig 

27import lsst.pipe.base as pipeBase 

28import lsst.pipe.base.connectionTypes as cT 

29 

30from .cpCombine import CalibCombineTask 

31from .utilsEfd import CpEfdClient 

32from lsst.daf.base import PropertyList, DateTime 

33 

34__all__ = [ 

35 "CpFilterScanTask", "CpFilterScanTaskConfig", 

36 "CpMonochromatorScanTask", "CpMonochromatorScanConfig" 

37] 

38 

39 

40class CpFilterScanConnections(pipeBase.PipelineTaskConnections, 

41 dimensions=("instrument", "detector")): 

42 inputExpHandles = cT.Input( 

43 name="postISRCCD", 

44 doc="Input exposures to measure statistics from.", 

45 storageClass="Exposure", 

46 dimensions=("instrument", "physical_filter", "exposure", "detector"), 

47 multiple=True, 

48 deferLoad=True, 

49 ) 

50 inputElectrometerHandles = cT.Input( 

51 name="electrometer", 

52 doc="Input electrometer data.", 

53 storageClass="IsrCalib", 

54 dimensions=("instrument", "exposure"), 

55 multiple=True, 

56 deferLoad=True, 

57 ) 

58 outputData = cT.Output( 

59 name="cpFilterScan", 

60 doc="Output table to write.", 

61 storageClass="ArrowAstropy", 

62 dimensions=("instrument", "detector"), 

63 ) 

64 

65 def __init__(self, *, config=None): 

66 super().__init__(config=config) 

67 

68 if config and not config.useElectrometer: 

69 self.inputs.discard("inputElectrometerHandles") 

70 

71 

72class CpFilterScanTaskConfig(pipeBase.PipelineTaskConfig, 

73 pipelineConnections=CpFilterScanConnections): 

74 maskNameList = pexConfig.ListField( 

75 dtype=str, 

76 doc="Mask list to exclude from statistics calculations.", 

77 default=["DETECTED", "BAD", "CR", "NO_DATA", "INTRP"] 

78 ) 

79 useElectrometer = pexConfig.Field( 

80 dtype=bool, 

81 doc="Use electrometer data?", 

82 default=False, 

83 ) 

84 referenceFilter = pexConfig.Field( 

85 dtype=str, 

86 doc="Filter to use as baseline reference.", 

87 default="empty", 

88 ) 

89 combine = pexConfig.ConfigurableField( 

90 target=CalibCombineTask, 

91 doc="Combine task to use for header merging.", 

92 ) 

93 efdClientInstance = pexConfig.Field( 

94 dtype=str, 

95 doc="EFD instance to use for monochromator results.", 

96 default="usdf_efd", 

97 ) 

98 

99 

100class CpFilterScanTask(pipeBase.PipelineTask): 

101 r"""Create filter scan from appropriate data. 

102 

103 This task constructs a filter scan from a sequence of flat 

104 exposures taken in the following manner: 

105 

106 - A monochromator is set to a target wavelength. 

107 - An optional spectrum may be taken with the fiber spectrograph to 

108 provide an independent measure of the peak wavelength and 

109 bandpass. 

110 - A flat exposure is taken with a "reference filter," usually a 

111 white-band or empty filter, that provides a baseline source 

112 brightness at the monochromator's target wavelength. 

113 - A flat exposure is taken with the filter to be scanned. 

114 - Optional electrometer/photodiode data may also be taken during 

115 the two flat exposures to correct for source brightness 

116 variations. 

117 

118 From these pairs of exposures, we can determine the filter 

119 throughput by calculating the flux per second with the filter: 

120 :math:`F_filter(\lambda0) = median(f_amplifiers) / t_exposure` 

121 And without: 

122 :math:`F_reference(\lambda0) = median(f_amplifiers) / t_exposure` 

123 where the f_amplifiers are the per-amplifier statistics calculated 

124 by IsrTask. If the illumination source was perfectly stable, the 

125 filter throughput at that wavelength would simply be: 

126 :math:`throughput_raw(\lambda0) = F_filter / F_reference` 

127 

128 We can correct for any illumination changes with the optional 

129 the electrometer measurements, E, which provide an independent 

130 measure of the incident flux for the two exposures, such that: 

131 :math:`throughput(\lambda0) = throughput_raw * E_reference / E_filter` 

132 

133 Repeating this procedure at multiple monochromator settings builds 

134 up a catalog of throughput measurements across the filter 

135 bandpass. Additional differences can exist between the 

136 monochromator setting (retrieved here from the EFD) and the actual 

137 wavelengths of light that are permitted, so a matching 

138 CpMonochromatorScan can be generated to determine what the actual 

139 values of :math:`\lambda0` observed were. 

140 """ 

141 ConfigClass = CpFilterScanTaskConfig 

142 _DefaultName = "cpFilterScan" 

143 

144 def __init__(self, **kwargs): 

145 super().__init__(**kwargs) 

146 self.makeSubtask("combine") 

147 

148 def run(self, inputExpHandles, inputElectrometerHandles=None): 

149 """Construct filter scan from the header and visit info of processed 

150 exposures. 

151 

152 Parameters 

153 ---------- 

154 inputExpHandles : `list` [`lsst.daf.butler.DeferredDatasetHandle`] 

155 Input list of exposure handles to combine. 

156 inputElectrometerHandles : `list` [`lsst.daf.butler.DeferredDatasetHandle`], optional # noqa W505 

157 Input list of electrometer/photodiode measurement handles 

158 to combine. 

159 

160 Returns 

161 ------- 

162 results : `lsst.pipe.base.Struct` 

163 The results struct containing: 

164 

165 ``outputData`` 

166 Final combined filter scan, with a single table 

167 containing the measured throughput for all input 

168 filters at the various wavelength values indicated in 

169 the exposure's observationReason 

170 (`astropy.table.Table`). 

171 """ 

172 filterScanResults = {} 

173 filterSet = set() 

174 efdClient = CpEfdClient(efdInstance=self.config.efdClientInstance) 

175 monochromatorData = efdClient.getEfdMonochromatorData() 

176 

177 # Iterate over input exposures: 

178 for handle in inputExpHandles: 

179 visitInfo = handle.get(component="visitInfo") 

180 metadata = handle.get(component="metadata") 

181 

182 physical_filter = handle.dataId['physical_filter'] 

183 exposureId = handle.dataId['exposure'] 

184 exposureTime = visitInfo.exposureTime 

185 

186 values = [v for k, v in metadata.items() if "LSST ISR FINAL MEDIAN" in k] 

187 

188 scaleFactor = exposureTime 

189 if self.config.useElectrometer: 

190 for eHandle in inputElectrometerHandles: 

191 if eHandle.dataId.exposure == exposureId: 

192 charge = eHandle.get().integrate() 

193 if charge > 0.0 and np.isfinite(charge): 

194 scaleFactor /= charge 

195 break 

196 

197 # Create a scan entry for this exposure. 

198 scan = { 

199 'physical_filter': physical_filter, 

200 'scale': scaleFactor, 

201 'flux': np.median(values), 

202 } 

203 filterSet.add(physical_filter) 

204 

205 _, wavelengthKey = efdClient.parseMonochromatorStatus(monochromatorData, 

206 visitInfo.date.toString(DateTime.TAI)) 

207 wavelengthKey = float(wavelengthKey) 

208 self.log.debug(f"Scan: {exposureId} {wavelengthKey} {scan}") 

209 

210 # Append the scan for this exposure to the list of other 

211 # scans taken at this wavelength setting: 

212 if wavelengthKey in filterScanResults: 

213 filterScanResults[wavelengthKey].append(scan) 

214 else: 

215 filterScanResults[wavelengthKey] = [scan] 

216 

217 filterScan = [] 

218 for wavelength in filterScanResults.keys(): 

219 # We expect there to be at least one pair of exposures 

220 # with a given wavelength setting: One with a filter in 

221 # the beam, and one with the "reference filter" (which 

222 # should be the empty/white/no-filter setting). The ratio 

223 # of the scaled fluxes in this pair should give the filter 

224 # transmission at that wavelength for that filter. 

225 scans = filterScanResults[wavelength] 

226 

227 referenceScan = [x for x in scans if x['physical_filter'] == self.config.referenceFilter] 

228 if len(referenceScan) == 0: 

229 # No reference scan at this wavelength. 

230 self.log.warning(f"No reference scan at this wavelength: {wavelengthKey}") 

231 continue 

232 referenceScan = referenceScan[0] 

233 referenceValue = referenceScan['scale'] / referenceScan['flux'] 

234 

235 # Create a dictionary of measurements at this wavelength, 

236 # using the filter names as the keys. Since we may do 

237 # multiple filters in a single sequence (iterating through 

238 # all filters at a certain monochromator setting), 

239 # initialize all known filters (from our initial exposure 

240 # scan) to NaN. Note that the reference filter is a known 

241 # filter, and should have a throughput of 1.0 relative to 

242 # itself. 

243 wavelengthScan = {'wavelength': float(wavelength)} 

244 for filterName in filterSet: 

245 wavelengthScan[filterName] = np.nan 

246 

247 for scan in scans: 

248 # If the entry for this filter already exists, then we 

249 # have multiple entries at this wavelength. 

250 if np.isfinite(wavelengthScan[scan['physical_filter']]): 

251 self.log.warning( 

252 f"Multiple instances of filter {scan['physical_filter']} at {wavelength}" 

253 ) 

254 

255 wavelengthScan[scan['physical_filter']] = referenceValue * scan['flux'] / scan['scale'] 

256 

257 filterScan.append(copy.copy(wavelengthScan)) 

258 

259 outMetadata = PropertyList() 

260 self.combine.combineHeaders(inputExpHandles, calib=None, metadata=outMetadata, 

261 calibType="FilterScan", scales=None) 

262 filteredMetadata = {k: v for k, v in outMetadata.toDict().items() if v is not None} 

263 

264 catalog = Table(filterScan) 

265 catalog.meta = filteredMetadata 

266 

267 return pipeBase.Struct(outputData=catalog) 

268 

269 

270class CpMonochromatorScanConnections(pipeBase.PipelineTaskConnections, 

271 dimensions=("instrument", )): 

272 inputExpHandles = cT.Input( 

273 name="rawSpectrum", 

274 doc="Input spectrograph measurements.", 

275 storageClass="FiberSpectrum", 

276 dimensions=("instrument", "exposure", "detector"), 

277 multiple=True, 

278 deferLoad=True, 

279 ) 

280 outputData = cT.Output( 

281 name="cpMonochromatorScan", 

282 doc="Output table to write.", 

283 storageClass="ArrowAstropy", 

284 dimensions=("instrument", ), 

285 ) 

286 

287 

288class CpMonochromatorScanConfig(pipeBase.PipelineTaskConfig, 

289 pipelineConnections=CpMonochromatorScanConnections): 

290 efdClientInstance = pexConfig.Field( 

291 dtype=str, 

292 doc="EFD instance to use for monochromator results.", 

293 default="usdf_efd", 

294 ) 

295 efdUnits = pexConfig.Field( 

296 dtype=str, 

297 doc="Units to use for EFD monochromator results.", 

298 default="nm", 

299 ) 

300 headerDateKey = pexConfig.Field( 

301 dtype=str, 

302 doc="Header keyword to use for observation date.", 

303 default="DATE-BEG", 

304 ) 

305 peakBoxSize = pexConfig.Field( 

306 dtype=int, 

307 doc="Half-size of the box used for fitting the spectrum peak.", 

308 default=50, 

309 ) 

310 

311 

312class CpMonochromatorScanTask(pipeBase.PipelineTask): 

313 """Compare EFD monochromator results to fiber spectrograph spectra. 

314 

315 This task provides a complementary measurement to associate with 

316 the CpFilterScan. While taking the filter scan exposures used for 

317 CpFilterScanTask, the attached fiber spectrograph can be used to 

318 measure the spectrum of the light that passes through the 

319 monochromator. This task takes those spectra, fits a Gaussian to 

320 the peak in each one, and records those fit parameters along with 

321 the monochromator setting recorded in the EFD. This information 

322 can then be used to correct the CpFilterScan measurements from the 

323 nominal wavelength values to those actually observed. 

324 """ 

325 ConfigClass = CpMonochromatorScanConfig 

326 _DefaultName = "cpMonochromatorScan" 

327 

328 def run(self, inputExpHandles): 

329 """Match EFD results to spectrograph. 

330 

331 Parameters 

332 ---------- 

333 inputExpHandles : `list` [`lsst.daf.butler.DeferredDatasetHandle`] 

334 Input list of exposure handles to combine. 

335 

336 Returns 

337 ------- 

338 results : `lsst.pipe.base.Struct` 

339 The results struct containing: 

340 

341 ``outputData`` 

342 Final combined filter scan, with a single table 

343 containing the commanded monochromator wavelength, as 

344 well as the Gaussian fit to the peak of the fiber 

345 spectrograph spectrum (`astropy.table.Table`). 

346 """ 

347 monochromatorScanResults = [] 

348 efdClient = CpEfdClient(efdInstance=self.config.efdClientInstance) 

349 monochromatorData = efdClient.getEfdMonochromatorData() 

350 

351 for handle in inputExpHandles: 

352 exposureId = handle.dataId['exposure'] 

353 spectrum = handle.get() 

354 

355 # Fit spectrum with Gaussian model. 

356 fitMean, fitSigma, peakWavelength, fwhm, fitRange = self._fitPeak(spectrum, 

357 self.config.peakBoxSize) 

358 

359 # Look up the monochromator state for this spectrum: 

360 date = spectrum.metadata[self.config.headerDateKey] 

361 monoDate, monoValue = efdClient.parseMonochromatorStatus(monochromatorData, date) 

362 entry = { 

363 'exposure': exposureId, 

364 'fit_mean': fitMean, 

365 'fit_sigma': fitSigma, 

366 'peak': peakWavelength, 

367 'fwhm': fwhm, 

368 'fit_range': fitRange, 

369 'exp_date': date, 

370 'mono_date': monoDate, 

371 'mono_state': monoValue, 

372 } 

373 monochromatorScanResults.append(entry) 

374 

375 return pipeBase.Struct( 

376 outputData=Table(monochromatorScanResults) 

377 ) 

378 

379 @staticmethod 

380 def _fitPeak(spectrum, peakBoxSize=20): 

381 """Fit a Gaussian to the peak of the spectrum. 

382 

383 Parameters 

384 ---------- 

385 spectrum : `lsst.obs.fiberspectrograph.FiberSpectrum` 

386 The spectrum to fit. 

387 peakBoxSize : `int` 

388 Size of the box to use for fitting. 

389 

390 Returns 

391 ------- 

392 mean : `float` 

393 The fitted mean of the peak. 

394 sigma : `float` 

395 The fitted standard deviation of the peak. 

396 peak : `float` 

397 The wavelength of the peak. 

398 fwhm : `float` 

399 Wavelength quantized estimate of the FWHM. 

400 range : `float` 

401 Range over which Gaussian fit was performed. 

402 """ 

403 maxIdx = np.argmax(spectrum.flux) 

404 maxF = spectrum.flux[maxIdx] 

405 med = np.median(spectrum.flux) 

406 

407 # Normalize spectrum so the peak is at 1.0 and the median 

408 # value is zero. 

409 normFlux = (spectrum.flux - med) / (maxF - med) 

410 

411 highHM = None 

412 lowHM = None 

413 high = None 

414 low = None 

415 # Search for the 50% points for FWHM estimate. Search for the 

416 # 10% points to define the range for Gaussian fit. 

417 for dx in range(maxIdx, maxIdx + peakBoxSize): 

418 if normFlux[dx] >= 0.5: 

419 highHM = dx 

420 if normFlux[dx] >= 0.1: 

421 high = dx 

422 

423 for dx in range(maxIdx, maxIdx - peakBoxSize, -1): 

424 if normFlux[dx] >= 0.5: 

425 lowHM = dx - 1 

426 if normFlux[dx] >= 0.1: 

427 low = dx - 1 

428 

429 # The above search should be fine, but let's ensure we have 

430 # reasonable limits. 

431 if highHM and not high: 

432 high = highHM 

433 if lowHM and not low: 

434 low = lowHM 

435 

436 # Do Gaussian fit in log-space 

437 ff = np.polyfit(spectrum.wavelength[low:high].to_value(), np.log(normFlux[low:high]), 2) 

438 # Convert fit parameters to Gaussian parameters: 

439 # log(F) = (log(A) - 0.5 * (m/s)**2) + x m/s**2 - 0.5 (x/s)**2 

440 # ff[2] ff[1] ff[0] 

441 # s**2 = 1 / (-2 * ff[0]) = m / ff[1] 

442 

443 sigma = np.sqrt(1 / (-2.0 * ff[0])) 

444 mean = -0.5 * ff[1] / ff[0] 

445 

446 return (mean, sigma, 

447 spectrum.wavelength[maxIdx].to_value(), 

448 spectrum.wavelength[highHM].to_value() - spectrum.wavelength[lowHM].to_value(), 

449 spectrum.wavelength[high].to_value() - spectrum.wavelength[low].to_value())