Coverage for python / lsst / ip / isr / photodiode.py: 14%

124 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-28 08:55 +0000

1# This file is part of ip_isr. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21""" 

22Photodiode storage class. 

23""" 

24 

25__all__ = ["PhotodiodeCalib"] 

26 

27import numpy as np 

28from astropy.table import Table 

29from astropy.stats import sigma_clipped_stats 

30 

31from lsst.ip.isr import IsrCalib 

32 

33 

34class PhotodiodeCalib(IsrCalib): 

35 """Independent current measurements from photodiode for linearity 

36 calculations. 

37 

38 Parameters 

39 ---------- 

40 timeSamples : `list` or `numpy.ndarray` 

41 List of samples the photodiode was measured at. 

42 currentSamples : `list` or `numpy.ndarray` 

43 List of current measurements at each time sample. 

44 log : `logging.Logger`, optional 

45 Log to write messages to. If `None` a default logger will be used. 

46 **kwargs : 

47 Additional parameters. These will be passed to the parent 

48 constructor with the exception of: 

49 

50 ``"integrationMethod"`` 

51 Name of the algorithm to use to integrate the current 

52 samples. Allowed values are ``DIRECT_SUM``, 

53 ``TRIMMED_SUM``, ``CHARGE_SUM``, ``MEAN`` (`str`). 

54 ``"currentScale"`` 

55 Scale factor to apply to the current samples for the 

56 ``CHARGE_SUM`` integration method. A typical value 

57 would be `-1`, to flip the sign of the integrated charge. 

58 """ 

59 

60 _OBSTYPE = 'PHOTODIODE' 

61 _SCHEMA = 'Photodiode' 

62 _VERSION = 1.0 

63 

64 def __init__(self, timeSamples=None, currentSamples=None, **kwargs): 

65 if timeSamples is not None and currentSamples is not None: 

66 if len(timeSamples) != len(currentSamples): 

67 raise RuntimeError(f"Inconsitent vector lengths: {len(timeSamples)} vs {len(currentSamples)}") 

68 else: 

69 self.timeSamples = np.array(timeSamples).ravel() 

70 self.currentSamples = np.array(currentSamples).ravel() 

71 else: 

72 self.timeSamples = np.array([]).ravel() 

73 self.currentSamples = np.array([]).ravel() 

74 

75 super().__init__(**kwargs) 

76 

77 if 'integrationMethod' in kwargs: 

78 self.integrationMethod = kwargs.pop('integrationMethod') 

79 else: 

80 self.integrationMethod = 'DIRECT_SUM' 

81 

82 if 'currentScale' in kwargs: 

83 self.currentScale = kwargs.pop('currentScale') 

84 else: 

85 self.currentScale = 1.0 

86 

87 if 'day_obs' in kwargs: 

88 self.updateMetadata(day_obs=kwargs['day_obs']) 

89 if 'seq_num' in kwargs: 

90 self.updateMetadata(seq_num=kwargs['seq_num']) 

91 

92 self.requiredAttributes.update(['timeSamples', 'currentSamples', 'integrationMethod']) 

93 

94 @classmethod 

95 def fromDict(cls, dictionary): 

96 """Construct a PhotodiodeCalib from a dictionary of properties. 

97 

98 Parameters 

99 ---------- 

100 dictionary : `dict` 

101 Dictionary of properties. 

102 

103 Returns 

104 ------- 

105 calib : `lsst.ip.isr.PhotodiodeCalib` 

106 Constructed photodiode data. 

107 

108 Raises 

109 ------ 

110 RuntimeError 

111 Raised if the supplied dictionary is for a different 

112 calibration type. 

113 """ 

114 calib = cls() 

115 

116 if calib._OBSTYPE != dictionary['metadata']['OBSTYPE']: 

117 raise RuntimeError(f"Incorrect photodiode supplied. Expected {calib._OBSTYPE}, " 

118 f"found {dictionary['metadata']['OBSTYPE']}") 

119 

120 calib.setMetadata(dictionary['metadata']) 

121 

122 calib.timeSamples = np.array(dictionary['timeSamples']).ravel() 

123 calib.currentSamples = np.array(dictionary['currentSamples']).ravel() 

124 calib.integrationMethod = dictionary.get('integrationMethod', "DIRECT_SUM") 

125 

126 calib.updateMetadata() 

127 return calib 

128 

129 def toDict(self): 

130 """Return a dictionary containing the photodiode properties. 

131 

132 The dictionary should be able to be round-tripped through. 

133 `fromDict`. 

134 

135 Returns 

136 ------- 

137 dictionary : `dict` 

138 Dictionary of properties. 

139 """ 

140 self.updateMetadata() 

141 

142 outDict = {} 

143 outDict['metadata'] = self.getMetadata() 

144 

145 outDict['timeSamples'] = self.timeSamples.tolist() 

146 outDict['currentSamples'] = self.currentSamples.tolist() 

147 

148 outDict['integrationMethod'] = self.integrationMethod 

149 

150 return outDict 

151 

152 @classmethod 

153 def fromTable(cls, tableList, **kwargs): 

154 """Construct calibration from a list of tables. 

155 

156 This method uses the `fromDict` method to create the 

157 calibration after constructing an appropriate dictionary from 

158 the input tables. 

159 

160 Parameters 

161 ---------- 

162 tableList : `list` [`astropy.table.Table`] 

163 List of tables to use to construct the crosstalk 

164 calibration. 

165 

166 Returns 

167 ------- 

168 calib : `lsst.ip.isr.PhotodiodeCalib` 

169 The calibration defined in the tables. 

170 """ 

171 dataTable = tableList[0] 

172 metadata = dataTable.meta 

173 

174 # Dump useless entries that are carried over from merging 

175 # HDU[0]'s header with the header from HDU[1] (which has the 

176 # data table). 

177 for key in ("SIMPLE", "BITPIX", "NAXIS", "EXTEND"): 

178 metadata.pop(key) 

179 

180 # Do translations: 

181 instrument = metadata.pop("INSTRUME", None) 

182 location = metadata.pop("LOCATN", "NO_LOCATION") 

183 

184 if instrument == "Electrometer_index_201" and location == "AuxTel": 

185 metadata["INSTRUME"] = "LATISS" 

186 elif location == "MainTel" and instrument in ("Electrometer_index_101", 

187 "Electrometer_index_102", 

188 "Electrometer_index_103"): 

189 metadata["INSTRUME"] = "LSSTCam" 

190 else: 

191 # This will cause problems in ingest, but we don't know 

192 # what to associate it with. 

193 metadata["INSTRUME"] = instrument 

194 

195 inDict = {} 

196 inDict['metadata'] = metadata 

197 

198 if 'OBSTYPE' not in metadata: 

199 inDict['metadata']['OBSTYPE'] = cls._OBSTYPE 

200 inDict['integrationMethod'] = metadata.pop('INTEGRATION_METHOD', 'DIRECT_SUM') 

201 

202 # These will use the last column found, so "RNUM" (which is in 

203 # seconds) will replace "Elapsed Time" (which is in integer 

204 # sample counts) when both are found in the table. 

205 for key in ('TIME', 'Elapsed Time', 'RNUM'): 

206 if key in dataTable.columns: 

207 inDict['timeSamples'] = dataTable[key] 

208 for key in ('CURRENT', 'Signal', ): 

209 if key in dataTable.columns: 

210 inDict['currentSamples'] = dataTable[key] 

211 

212 return cls().fromDict(inDict) 

213 

214 def toTable(self): 

215 """Construct a list of tables containing the information in this 

216 calibration. 

217 

218 The list of tables should create an identical calibration 

219 after being passed to this class's fromTable method. 

220 

221 Returns 

222 ------- 

223 tableList : `list` [`astropy.table.Table`] 

224 List of tables containing the photodiode calibration 

225 information. 

226 """ 

227 self.updateMetadata() 

228 catalog = Table([{'TIME': self.timeSamples, 

229 'CURRENT': self.currentSamples}]) 

230 inMeta = self.getMetadata().toDict() 

231 outMeta = {k: v for k, v in inMeta.items() if v is not None} 

232 outMeta.update({k: "" for k, v in inMeta.items() if v is None}) 

233 outMeta['INTEGRATION_METHOD'] = self.integrationMethod 

234 catalog.meta = outMeta 

235 

236 return [catalog] 

237 

238 @classmethod 

239 def readTwoColumnPhotodiodeData(cls, filename): 

240 """Construct a PhotodiodeCalib by reading the simple column format. 

241 

242 Parameters 

243 ---------- 

244 filename : `str` 

245 File to read samples from. 

246 

247 Returns 

248 ------- 

249 calib : `lsst.ip.isr.PhotodiodeCalib` 

250 The calibration defined in the file. 

251 """ 

252 import os.path 

253 

254 rawData = np.loadtxt(filename, dtype=[('time', 'float'), ('current', 'float')]) 

255 

256 basename = os.path.basename(filename) 

257 cleaned = os.path.splitext(basename)[0] 

258 _, _, day_obs, seq_num = cleaned.split("_") 

259 

260 return cls(timeSamples=rawData['time'], currentSamples=rawData['current'], 

261 day_obs=int(day_obs), seq_num=int(seq_num)) 

262 

263 def integrate(self, exposureTime=None): 

264 """Integrate the current. 

265 

266 Parameters 

267 ---------- 

268 exposureTime : `float`, optional 

269 Image exposure time. Required if integrationMethod is ``MEAN``. 

270 

271 Raises 

272 ------ 

273 RuntimeError 

274 Raised if the integration method is not known. 

275 ValueError 

276 Raised if the exposure time is not set and method is MEAN. 

277 """ 

278 if self.integrationMethod == 'DIRECT_SUM': 

279 return self.integrateDirectSum() 

280 elif self.integrationMethod == 'TRIMMED_SUM': 

281 return self.integrateTrimmedSum() 

282 elif self.integrationMethod == 'CHARGE_SUM': 

283 return self.integrateChargeSum() 

284 elif self.integrationMethod == 'MEAN': 

285 if exposureTime is None: 

286 raise ValueError("Exposure time must be provided if integration method is MEAN.") 

287 return self.integrateMean(exposureTime) 

288 else: 

289 raise RuntimeError(f"Unknown integration method {self.integrationMethod}") 

290 

291 def integrateDirectSum(self): 

292 """Integrate points. 

293 

294 This uses numpy's trapezoidal integrator. 

295 

296 Returns 

297 ------- 

298 sum : `float` 

299 Total charge measured. 

300 """ 

301 return np.trapezoid(self.currentSamples, x=self.timeSamples) 

302 

303 def integrateTrimmedSum(self): 

304 """Integrate points with a baseline level subtracted. 

305 

306 This uses numpy's trapezoidal integrator. 

307 

308 Returns 

309 ------- 

310 sum : `float` 

311 Total charge measured. 

312 

313 See Also 

314 -------- 

315 lsst.eotask.gen3.eoPtc 

316 """ 

317 # Apply the current scale to pick up any sign flip in the 

318 # current sample values. 

319 cs = self.currentScale * self.currentSamples 

320 currentThreshold = (max(cs) - min(cs))/5.0 + min(cs) 

321 lowValueIndices = np.where(cs < currentThreshold) 

322 baseline = sigma_clipped_stats(cs[lowValueIndices])[0] 

323 return np.trapezoid(cs - baseline, self.timeSamples) 

324 

325 def integrateChargeSum(self): 

326 """For this method, the values in .currentSamples are actually the 

327 integrated charge values as measured by the ammeter for each 

328 sampling interval. We need to do a baseline subtraction, 

329 based on the charge values when the LED is off, then sum up 

330 the corrected signals. 

331 

332 Returns 

333 ------- 

334 sum : `float` 

335 Total charge measured. 

336 """ 

337 dt = self.timeSamples[1:] - self.timeSamples[:-1] 

338 # The .currentSamples values are the current integrals over 

339 # the interval preceding the current time stamp, so omit the 

340 # first value. 

341 charge = self.currentScale*self.currentSamples[1:] 

342 # The current per interval to use for baseline subtraction 

343 # without assuming all of the dt values are the same: 

344 current = charge/dt 

345 # To determine the baseline current level, exclude points with 

346 # signal levels > 5% of the maximum (measured relative to the 

347 # overall minimum), and extend that selection 2 entries on 

348 # either side to avoid otherwise low-valued points that sample 

349 # the signal ramp and which should not be included in the 

350 # baseline estimate. 

351 dy = np.max(current) - np.min(current) 

352 signal, = np.where(current > dy/20. + np.min(current)) 

353 imin = signal[0] - 2 

354 imax = signal[-1] + 2 

355 bg = np.concatenate([np.arange(0, imin), np.arange(imax, len(current))]) 

356 bg_current = np.sum(charge[bg])/np.sum(dt[bg]) 

357 # Return the background-subtracted total charge. 

358 return np.sum(charge - bg_current*dt) 

359 

360 def integrateMean(self, exposureTime): 

361 """ 

362 Take the mean of the photodiode trace, and multiply by exposure time. 

363 

364 The current scale is also used. 

365 

366 Parameters 

367 ---------- 

368 exposureTime : `float` 

369 Exposure time in sections. 

370 """ 

371 mean = self.currentScale * np.mean(self.currentSamples) 

372 

373 return mean * exposureTime