Coverage for python / lsst / cp / pipe / cpFilterScan.py: 23%
133 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-18 09:18 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-18 09:18 +0000
1# This file is part of cp_pipe.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
21import numpy as np
22import copy
24from astropy.table import Table
26import lsst.pex.config as pexConfig
27import lsst.pipe.base as pipeBase
28import lsst.pipe.base.connectionTypes as cT
30from .cpCombine import CalibCombineTask
31from .utilsEfd import CpEfdClient
32from lsst.daf.base import PropertyList, DateTime
34__all__ = [
35 "CpFilterScanTask", "CpFilterScanTaskConfig",
36 "CpMonochromatorScanTask", "CpMonochromatorScanConfig"
37]
40class CpFilterScanConnections(pipeBase.PipelineTaskConnections,
41 dimensions=("instrument", "detector")):
42 inputExpHandles = cT.Input(
43 name="postISRCCD",
44 doc="Input exposures to measure statistics from.",
45 storageClass="Exposure",
46 dimensions=("instrument", "physical_filter", "exposure", "detector"),
47 multiple=True,
48 deferLoad=True,
49 )
50 inputElectrometerHandles = cT.Input(
51 name="electrometer",
52 doc="Input electrometer data.",
53 storageClass="IsrCalib",
54 dimensions=("instrument", "exposure"),
55 multiple=True,
56 deferLoad=True,
57 )
58 outputData = cT.Output(
59 name="cpFilterScan",
60 doc="Output table to write.",
61 storageClass="ArrowAstropy",
62 dimensions=("instrument", "detector"),
63 )
65 def __init__(self, *, config=None):
66 super().__init__(config=config)
68 if config and not config.useElectrometer:
69 self.inputs.discard("inputElectrometerHandles")
72class CpFilterScanTaskConfig(pipeBase.PipelineTaskConfig,
73 pipelineConnections=CpFilterScanConnections):
74 maskNameList = pexConfig.ListField(
75 dtype=str,
76 doc="Mask list to exclude from statistics calculations.",
77 default=["DETECTED", "BAD", "CR", "NO_DATA", "INTRP"]
78 )
79 useElectrometer = pexConfig.Field(
80 dtype=bool,
81 doc="Use electrometer data?",
82 default=False,
83 )
84 referenceFilter = pexConfig.Field(
85 dtype=str,
86 doc="Filter to use as baseline reference.",
87 default="empty",
88 )
89 combine = pexConfig.ConfigurableField(
90 target=CalibCombineTask,
91 doc="Combine task to use for header merging.",
92 )
93 efdClientInstance = pexConfig.Field(
94 dtype=str,
95 doc="EFD instance to use for monochromator results.",
96 default="usdf_efd",
97 )
100class CpFilterScanTask(pipeBase.PipelineTask):
101 r"""Create filter scan from appropriate data.
103 This task constructs a filter scan from a sequence of flat
104 exposures taken in the following manner:
106 - A monochromator is set to a target wavelength.
107 - An optional spectrum may be taken with the fiber spectrograph to
108 provide an independent measure of the peak wavelength and
109 bandpass.
110 - A flat exposure is taken with a "reference filter," usually a
111 white-band or empty filter, that provides a baseline source
112 brightness at the monochromator's target wavelength.
113 - A flat exposure is taken with the filter to be scanned.
114 - Optional electrometer/photodiode data may also be taken during
115 the two flat exposures to correct for source brightness
116 variations.
118 From these pairs of exposures, we can determine the filter
119 throughput by calculating the flux per second with the filter:
120 :math:`F_filter(\lambda0) = median(f_amplifiers) / t_exposure`
121 And without:
122 :math:`F_reference(\lambda0) = median(f_amplifiers) / t_exposure`
123 where the f_amplifiers are the per-amplifier statistics calculated
124 by IsrTask. If the illumination source was perfectly stable, the
125 filter throughput at that wavelength would simply be:
126 :math:`throughput_raw(\lambda0) = F_filter / F_reference`
128 We can correct for any illumination changes with the optional
129 the electrometer measurements, E, which provide an independent
130 measure of the incident flux for the two exposures, such that:
131 :math:`throughput(\lambda0) = throughput_raw * E_reference / E_filter`
133 Repeating this procedure at multiple monochromator settings builds
134 up a catalog of throughput measurements across the filter
135 bandpass. Additional differences can exist between the
136 monochromator setting (retrieved here from the EFD) and the actual
137 wavelengths of light that are permitted, so a matching
138 CpMonochromatorScan can be generated to determine what the actual
139 values of :math:`\lambda0` observed were.
140 """
141 ConfigClass = CpFilterScanTaskConfig
142 _DefaultName = "cpFilterScan"
144 def __init__(self, **kwargs):
145 super().__init__(**kwargs)
146 self.makeSubtask("combine")
148 def run(self, inputExpHandles, inputElectrometerHandles=None):
149 """Construct filter scan from the header and visit info of processed
150 exposures.
152 Parameters
153 ----------
154 inputExpHandles : `list` [`lsst.daf.butler.DeferredDatasetHandle`]
155 Input list of exposure handles to combine.
156 inputElectrometerHandles : `list` [`lsst.daf.butler.DeferredDatasetHandle`], optional # noqa W505
157 Input list of electrometer/photodiode measurement handles
158 to combine.
160 Returns
161 -------
162 results : `lsst.pipe.base.Struct`
163 The results struct containing:
165 ``outputData``
166 Final combined filter scan, with a single table
167 containing the measured throughput for all input
168 filters at the various wavelength values indicated in
169 the exposure's observationReason
170 (`astropy.table.Table`).
171 """
172 filterScanResults = {}
173 filterSet = set()
174 efdClient = CpEfdClient(efdInstance=self.config.efdClientInstance)
175 monochromatorData = efdClient.getEfdMonochromatorData()
177 # Iterate over input exposures:
178 for handle in inputExpHandles:
179 visitInfo = handle.get(component="visitInfo")
180 metadata = handle.get(component="metadata")
182 physical_filter = handle.dataId['physical_filter']
183 exposureId = handle.dataId['exposure']
184 exposureTime = visitInfo.exposureTime
186 values = [v for k, v in metadata.items() if "LSST ISR FINAL MEDIAN" in k]
188 scaleFactor = exposureTime
189 if self.config.useElectrometer:
190 for eHandle in inputElectrometerHandles:
191 if eHandle.dataId.exposure == exposureId:
192 charge = eHandle.get().integrate()
193 if charge > 0.0 and np.isfinite(charge):
194 scaleFactor /= charge
195 break
197 # Create a scan entry for this exposure.
198 scan = {
199 'physical_filter': physical_filter,
200 'scale': scaleFactor,
201 'flux': np.median(values),
202 }
203 filterSet.add(physical_filter)
205 _, wavelengthKey = efdClient.parseMonochromatorStatus(monochromatorData,
206 visitInfo.date.toString(DateTime.TAI))
207 wavelengthKey = float(wavelengthKey)
208 self.log.debug(f"Scan: {exposureId} {wavelengthKey} {scan}")
210 # Append the scan for this exposure to the list of other
211 # scans taken at this wavelength setting:
212 if wavelengthKey in filterScanResults:
213 filterScanResults[wavelengthKey].append(scan)
214 else:
215 filterScanResults[wavelengthKey] = [scan]
217 filterScan = []
218 for wavelength in filterScanResults.keys():
219 # We expect there to be at least one pair of exposures
220 # with a given wavelength setting: One with a filter in
221 # the beam, and one with the "reference filter" (which
222 # should be the empty/white/no-filter setting). The ratio
223 # of the scaled fluxes in this pair should give the filter
224 # transmission at that wavelength for that filter.
225 scans = filterScanResults[wavelength]
227 referenceScan = [x for x in scans if x['physical_filter'] == self.config.referenceFilter]
228 if len(referenceScan) == 0:
229 # No reference scan at this wavelength.
230 self.log.warning(f"No reference scan at this wavelength: {wavelengthKey}")
231 continue
232 referenceScan = referenceScan[0]
233 referenceValue = referenceScan['scale'] / referenceScan['flux']
235 # Create a dictionary of measurements at this wavelength,
236 # using the filter names as the keys. Since we may do
237 # multiple filters in a single sequence (iterating through
238 # all filters at a certain monochromator setting),
239 # initialize all known filters (from our initial exposure
240 # scan) to NaN. Note that the reference filter is a known
241 # filter, and should have a throughput of 1.0 relative to
242 # itself.
243 wavelengthScan = {'wavelength': float(wavelength)}
244 for filterName in filterSet:
245 wavelengthScan[filterName] = np.nan
247 for scan in scans:
248 # If the entry for this filter already exists, then we
249 # have multiple entries at this wavelength.
250 if np.isfinite(wavelengthScan[scan['physical_filter']]):
251 self.log.warning(
252 f"Multiple instances of filter {scan['physical_filter']} at {wavelength}"
253 )
255 wavelengthScan[scan['physical_filter']] = referenceValue * scan['flux'] / scan['scale']
257 filterScan.append(copy.copy(wavelengthScan))
259 outMetadata = PropertyList()
260 self.combine.combineHeaders(inputExpHandles, calib=None, metadata=outMetadata,
261 calibType="FilterScan", scales=None)
262 filteredMetadata = {k: v for k, v in outMetadata.toDict().items() if v is not None}
264 catalog = Table(filterScan)
265 catalog.meta = filteredMetadata
267 return pipeBase.Struct(outputData=catalog)
270class CpMonochromatorScanConnections(pipeBase.PipelineTaskConnections,
271 dimensions=("instrument", )):
272 inputExpHandles = cT.Input(
273 name="rawSpectrum",
274 doc="Input spectrograph measurements.",
275 storageClass="FiberSpectrum",
276 dimensions=("instrument", "exposure", "detector"),
277 multiple=True,
278 deferLoad=True,
279 )
280 outputData = cT.Output(
281 name="cpMonochromatorScan",
282 doc="Output table to write.",
283 storageClass="ArrowAstropy",
284 dimensions=("instrument", ),
285 )
288class CpMonochromatorScanConfig(pipeBase.PipelineTaskConfig,
289 pipelineConnections=CpMonochromatorScanConnections):
290 efdClientInstance = pexConfig.Field(
291 dtype=str,
292 doc="EFD instance to use for monochromator results.",
293 default="usdf_efd",
294 )
295 efdUnits = pexConfig.Field(
296 dtype=str,
297 doc="Units to use for EFD monochromator results.",
298 default="nm",
299 )
300 headerDateKey = pexConfig.Field(
301 dtype=str,
302 doc="Header keyword to use for observation date.",
303 default="DATE-BEG",
304 )
305 peakBoxSize = pexConfig.Field(
306 dtype=int,
307 doc="Half-size of the box used for fitting the spectrum peak.",
308 default=50,
309 )
312class CpMonochromatorScanTask(pipeBase.PipelineTask):
313 """Compare EFD monochromator results to fiber spectrograph spectra.
315 This task provides a complementary measurement to associate with
316 the CpFilterScan. While taking the filter scan exposures used for
317 CpFilterScanTask, the attached fiber spectrograph can be used to
318 measure the spectrum of the light that passes through the
319 monochromator. This task takes those spectra, fits a Gaussian to
320 the peak in each one, and records those fit parameters along with
321 the monochromator setting recorded in the EFD. This information
322 can then be used to correct the CpFilterScan measurements from the
323 nominal wavelength values to those actually observed.
324 """
325 ConfigClass = CpMonochromatorScanConfig
326 _DefaultName = "cpMonochromatorScan"
328 def run(self, inputExpHandles):
329 """Match EFD results to spectrograph.
331 Parameters
332 ----------
333 inputExpHandles : `list` [`lsst.daf.butler.DeferredDatasetHandle`]
334 Input list of exposure handles to combine.
336 Returns
337 -------
338 results : `lsst.pipe.base.Struct`
339 The results struct containing:
341 ``outputData``
342 Final combined filter scan, with a single table
343 containing the commanded monochromator wavelength, as
344 well as the Gaussian fit to the peak of the fiber
345 spectrograph spectrum (`astropy.table.Table`).
346 """
347 monochromatorScanResults = []
348 efdClient = CpEfdClient(efdInstance=self.config.efdClientInstance)
349 monochromatorData = efdClient.getEfdMonochromatorData()
351 for handle in inputExpHandles:
352 exposureId = handle.dataId['exposure']
353 spectrum = handle.get()
355 # Fit spectrum with Gaussian model.
356 fitMean, fitSigma, peakWavelength, fwhm, fitRange = self._fitPeak(spectrum,
357 self.config.peakBoxSize)
359 # Look up the monochromator state for this spectrum:
360 date = spectrum.metadata[self.config.headerDateKey]
361 monoDate, monoValue = efdClient.parseMonochromatorStatus(monochromatorData, date)
362 entry = {
363 'exposure': exposureId,
364 'fit_mean': fitMean,
365 'fit_sigma': fitSigma,
366 'peak': peakWavelength,
367 'fwhm': fwhm,
368 'fit_range': fitRange,
369 'exp_date': date,
370 'mono_date': monoDate,
371 'mono_state': monoValue,
372 }
373 monochromatorScanResults.append(entry)
375 return pipeBase.Struct(
376 outputData=Table(monochromatorScanResults)
377 )
379 @staticmethod
380 def _fitPeak(spectrum, peakBoxSize=20):
381 """Fit a Gaussian to the peak of the spectrum.
383 Parameters
384 ----------
385 spectrum : `lsst.obs.fiberspectrograph.FiberSpectrum`
386 The spectrum to fit.
387 peakBoxSize : `int`
388 Size of the box to use for fitting.
390 Returns
391 -------
392 mean : `float`
393 The fitted mean of the peak.
394 sigma : `float`
395 The fitted standard deviation of the peak.
396 peak : `float`
397 The wavelength of the peak.
398 fwhm : `float`
399 Wavelength quantized estimate of the FWHM.
400 range : `float`
401 Range over which Gaussian fit was performed.
402 """
403 maxIdx = np.argmax(spectrum.flux)
404 maxF = spectrum.flux[maxIdx]
405 med = np.median(spectrum.flux)
407 # Normalize spectrum so the peak is at 1.0 and the median
408 # value is zero.
409 normFlux = (spectrum.flux - med) / (maxF - med)
411 highHM = None
412 lowHM = None
413 high = None
414 low = None
415 # Search for the 50% points for FWHM estimate. Search for the
416 # 10% points to define the range for Gaussian fit.
417 for dx in range(maxIdx, maxIdx + peakBoxSize):
418 if normFlux[dx] >= 0.5:
419 highHM = dx
420 if normFlux[dx] >= 0.1:
421 high = dx
423 for dx in range(maxIdx, maxIdx - peakBoxSize, -1):
424 if normFlux[dx] >= 0.5:
425 lowHM = dx - 1
426 if normFlux[dx] >= 0.1:
427 low = dx - 1
429 # The above search should be fine, but let's ensure we have
430 # reasonable limits.
431 if highHM and not high:
432 high = highHM
433 if lowHM and not low:
434 low = lowHM
436 # Do Gaussian fit in log-space
437 ff = np.polyfit(spectrum.wavelength[low:high].to_value(), np.log(normFlux[low:high]), 2)
438 # Convert fit parameters to Gaussian parameters:
439 # log(F) = (log(A) - 0.5 * (m/s)**2) + x m/s**2 - 0.5 (x/s)**2
440 # ff[2] ff[1] ff[0]
441 # s**2 = 1 / (-2 * ff[0]) = m / ff[1]
443 sigma = np.sqrt(1 / (-2.0 * ff[0]))
444 mean = -0.5 * ff[1] / ff[0]
446 return (mean, sigma,
447 spectrum.wavelength[maxIdx].to_value(),
448 spectrum.wavelength[highHM].to_value() - spectrum.wavelength[lowHM].to_value(),
449 spectrum.wavelength[high].to_value() - spectrum.wavelength[low].to_value())