Coverage for python / lsst / ip / isr / transmissionCurve.py: 24%

90 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-18 08:54 +0000

1# This file is part of ip_isr. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21 

22__all__ = ["IntermediateTransmissionCurve", 

23 "IntermediateOpticsTransmissionCurve", 

24 "IntermediateFilterTransmissionCurve", 

25 "IntermediateFilterDetectorTransmissionCurve", 

26 "IntermediateSensorTransmissionCurve", 

27 "IntermediateAtmosphereTransmissionCurve", 

28 "IntermediateSystemTransmissionCurve", ] 

29 

30import numpy as np 

31from astropy import units as u 

32from astropy.units import cds 

33 

34from lsst.afw.image import TransmissionCurve 

35from .calibType import IsrCalib 

36 

37 

38class IntermediateTransmissionCurve(IsrCalib): 

39 """Definition for the TransmissionCurve format used as inputs. 

40 

41 Parameters 

42 ---------- 

43 filename : `str` 

44 Filename of a transmission curve dataset. 

45 """ 

46 

47 _OBSTYPE = "transmission_curve" 

48 _SCHEMA = "" 

49 _VERSION = 1.0 

50 

51 def __init__(self, filename=None): 

52 self.data = None 

53 self.transmissionCurve = None 

54 self.isSpatiallyConstant = True 

55 super().__init__() 

56 

57 # Because we are not persisting this calib as itself, we 

58 # should skip adding any other attributes. 

59 self.requiredAttributes.update(['isSpatiallyConstant']) 

60 

61 def setMetadata(self, metadata): 

62 # Inherits from lsst.ip.isr.IsrCalib.setMetadata. 

63 super().setMetadata(metadata) 

64 

65 @classmethod 

66 def fromTable(cls, tableList): 

67 """Construct intermediate transmission curve from a list of input 

68 tables. Only the first table is used. 

69 

70 Parameters 

71 ---------- 

72 tableList : `list` [`astropy.table.Table`] 

73 List containing input tables. 

74 

75 Returns 

76 ------- 

77 calib : `lsst.ip.isr.IntermediateTransmissionCurve` 

78 The final calibration. 

79 """ 

80 calib = cls() 

81 

82 metadata = tableList[0].meta 

83 calib.setMetadata(metadata) 

84 calib.updateMetadata() 

85 

86 calib.data = tableList[0] 

87 calib.setTransmissionCurveRepresentation() 

88 return calib 

89 

90 def setTransmissionCurveRepresentation(self): 

91 """Construct transmission curve representation from the data that was 

92 read. 

93 

94 Raises 

95 ------ 

96 RuntimeError 

97 This is raised if no table data exists in the calibration, 

98 if there are array length mismatches, or if the wavelength 

99 sampling for multi-amp tables differ. 

100 """ 

101 if self.data is None: 

102 raise RuntimeError("No table data was found to convert to a transmission curve!") 

103 

104 throughputKey = None 

105 if 'wavelength' not in self.data.columns: 

106 raise RuntimeError("Expected column [wavelength] not found.") 

107 if 'efficiency' in self.data.columns: 

108 throughputKey = 'efficiency' 

109 elif 'throughput' in self.data.columns: 

110 throughputKey = 'throughput' 

111 else: 

112 raise RuntimeError("Expected columns [throughput|efficiency] not found.") 

113 

114 doAverageCurves = False 

115 if 'amp_name' in self.data.columns: 

116 doAverageCurves = True 

117 

118 # These will be used to construct the 

119 # ``lsst.afw.image.TransmissionCurve`` object. 

120 wavelengths = None 

121 throughput = None 

122 

123 if doAverageCurves: 

124 curveStack = None 

125 amplifierNames = set(self.data['amp_name']) 

126 comparisonIndices = np.where(self.data['amp_name'] == next(iter(amplifierNames))) 

127 wavelengths = self.data[comparisonIndices]['wavelength'] 

128 

129 for amplifier in amplifierNames: 

130 indices = np.where(self.data['amp_name'] == amplifier) 

131 if len(self.data[indices]) != len(self.data[comparisonIndices]): 

132 raise RuntimeError("Incompatible lengths in average.") 

133 if not np.array_equal(self.data[indices]['wavelength'], wavelengths): 

134 raise RuntimeError("Mismatch in wavelength samples.") 

135 

136 if curveStack is not None: 

137 curveStack = np.column_stack((curveStack, self.data[indices][throughputKey])) 

138 else: 

139 curveStack = self.data[indices][throughputKey] 

140 throughput = np.mean(curveStack, 1) 

141 

142 # This averaging operation has stripped units. 

143 throughput = throughput * self.data[throughputKey].unit 

144 else: 

145 wavelengths = self.data['wavelength'] 

146 throughput = self.data[throughputKey] 

147 

148 # Convert units: 

149 # import pdb; pdb.set_trace() 

150 with cds.enable(): 

151 # These need to be in Angstroms, for consistency. 

152 wavelengths = wavelengths.to(u.Angstrom).to_value() 

153 

154 if throughput.unit != u.dimensionless_unscaled and throughput.unit != u.UnrecognizedUnit('-'): 

155 # These need to be fractions, not percent. 

156 throughput = throughput.to(u.dimensionless_unscaled).to_value() 

157 

158 self.transmissionCurve = TransmissionCurve.makeSpatiallyConstant( 

159 throughput.astype(np.float64), 

160 wavelengths.astype(np.float64), 

161 throughputAtMin=0.0, 

162 throughputAtMax=0.0 

163 ) 

164 

165 def getTransmissionCurve(self): 

166 if self.transmissionCurve is None and self.data is None: 

167 raise RuntimeError("No transmission curve data found.") 

168 if self.transmissionCurve is None: 

169 self.setTransmissionCurveRepresentation() 

170 return self.transmissionCurve 

171 

172 def writeFits(self, outputFilename): 

173 """Write the transmission curve data to a file. 

174 

175 Parameters 

176 ---------- 

177 outputFilename : `str` 

178 Destination filename. 

179 

180 Returns 

181 ------- 

182 outputFilename : `str` 

183 The output filename actually used. 

184 

185 Raises 

186 ------ 

187 RuntimeError 

188 Raised if no transmission curve can be created. 

189 """ 

190 if self.transmissionCurve is None and self.data is None: 

191 raise RuntimeError("No transmission curve data found.") 

192 if self.transmissionCurve is None: 

193 self.setTransmissionCurveRepresentation() 

194 

195 return self.transmissionCurve.writeFits(outputFilename) 

196 

197 

198class IntermediateSensorTransmissionCurve(IntermediateTransmissionCurve): 

199 _OBSTYPE = 'transmission_sensor' 

200 

201 

202class IntermediateFilterTransmissionCurve(IntermediateTransmissionCurve): 

203 _OBSTYPE = 'transmission_filter' 

204 

205 

206class IntermediateFilterDetectorTransmissionCurve(IntermediateTransmissionCurve): 

207 _OBSTYPE = 'transmission_filter_detector' 

208 

209 

210class IntermediateOpticsTransmissionCurve(IntermediateTransmissionCurve): 

211 _OBSTYPE = 'transmission_optics' 

212 

213 

214class IntermediateAtmosphereTransmissionCurve(IntermediateTransmissionCurve): 

215 _OBSTYPE = 'transmission_atmosphere' 

216 

217 

218class IntermediateSystemTransmissionCurve(IntermediateTransmissionCurve): 

219 _OBSTYPE = 'transmission_system'