Coverage for python / lsst / cp / pipe / utilsEfd.py: 8%

187 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-05 08:44 +0000

1# This file is part of cp_pipe. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21# 

22 

23__all__ = ["CpEfdClient"] 

24 

25import logging 

26import numpy as np 

27import re 

28import requests 

29 

30from astropy.table import Table 

31from astropy.time import Time 

32from urllib.parse import urljoin 

33 

34 

35class CpEfdClient(): 

36 """An EFD client to retrieve calibration results. 

37 

38 Parameters 

39 ---------- 

40 efdInstance : `str`, optional 

41 EFD instance name to connect to. 

42 log : `logging.Logger`, optional 

43 Log to write messages to. 

44 """ 

45 

46 def __init__(self, efdInstance="usdf_efd", dieOnSearch=False, log=None): 

47 self.log = log if log else logging.getLogger(__name__) 

48 self.dieOnSearch = dieOnSearch 

49 

50 authDict = self._getAuth(efdInstance) 

51 self._auth = (authDict["username"], authDict["password"]) 

52 self._databaseName = "efd" 

53 self._databaseUrl = urljoin(f"https://{authDict['host']}", authDict["path"]) 

54 

55 self.checkConnection() 

56 

57 def _getAuth(self, instanceAlias): 

58 """Get authorization credentials. 

59 

60 Parameters 

61 ---------- 

62 instanceAlias : `str` 

63 EFD instance to get credentials for. 

64 

65 Returns 

66 ------- 

67 credentials : `dict` [`str`, `str`] 

68 A dictionary of authorization credentials, including at 

69 least these key/value pairs: 

70 

71 ``"username"`` 

72 Login username. 

73 ``"password"`` 

74 Login passwords. 

75 ``"host"`` 

76 Host to connect to. 

77 ``"path"`` 

78 Directory path for EFD instance. 

79 

80 Raises 

81 ------ 

82 RuntimeError : 

83 Raised if the HTTPS request fails. 

84 """ 

85 serviceEndpoint = "https://roundtable.lsst.codes/segwarides/" 

86 url = urljoin(serviceEndpoint, f"creds/{instanceAlias}") 

87 response = requests.get(url) 

88 

89 if response.status_code == 200: 

90 return response.json() 

91 else: 

92 raise RuntimeError(f"Could not connect to {url}") 

93 

94 def checkConnection(self): 

95 """Check the connection to the EFD. 

96 

97 Raises 

98 ------ 

99 RuntimeError : 

100 Raised if the connection check fails. 

101 """ 

102 # The ping command will return 204 (No Content) on success. 

103 

104 params = {"wait_for_leader": "5s"} 

105 

106 response = requests.get(f"{self._databaseUrl}/ping", params=params, auth=self._auth) 

107 response.raise_for_status() 

108 if response.status_code != 204: 

109 raise RuntimeError(f"Connection check failed for {self._databaseUrl}") 

110 

111 def getSchemaDtype(self, topicName): 

112 """Get datatypes for a topic. 

113 

114 Parameters 

115 ---------- 

116 topicName : `str` 

117 Topic to get datatypes for 

118 

119 Returns 

120 ------- 

121 datatypes : `list` [`tuple` [`str`, `str`]] 

122 List of tuples of field names and data types. 

123 """ 

124 query = f"SHOW FIELD KEYS FROM \"{topicName}\"" 

125 data = self.query(query) 

126 

127 values = data["results"][0]["series"][0]["values"] 

128 

129 dtype = [("time", "str")] 

130 for (fieldName, fieldType) in values: 

131 if fieldType == "float": 

132 fieldDtype = np.float64 

133 elif fieldType == "integer": 

134 fieldDtype = np.int64 

135 elif fieldType == "string": 

136 fieldDtype = "str" 

137 dtype.append((fieldName, fieldDtype)) 

138 return dtype 

139 

140 def query(self, query): 

141 """Execute an EFD query. 

142 

143 Parameters 

144 ---------- 

145 query : `str` 

146 Query to run. 

147 

148 Returns 

149 ------- 

150 results : `dict` 

151 Dictionary of results returned. 

152 

153 Raises 

154 ------ 

155 RuntimeError : 

156 Raised if the the database could not be read from. 

157 """ 

158 params = { 

159 "db": self._databaseName, 

160 "q": query, 

161 } 

162 

163 try: 

164 response = requests.get(f"{self._databaseUrl}/query", params=params, auth=self._auth) 

165 response.raise_for_status() 

166 return response.json() 

167 except requests.exceptions.RequestException as e: 

168 raise RuntimeError(f"Could not read data from database with query: {query}") from e 

169 

170 def selectTimeSeries(self, topicName, fields=[], startDate=None, endDate=None): 

171 """Query a topic for a time series. 

172 

173 Parameters 

174 ---------- 

175 topicName : `str` 

176 Database "topic" to query. 

177 fields : `list`, optional 

178 List of fields to return. If empty, all fields are 

179 returned. 

180 startDate : `astropy.time.Time`, optional 

181 Start date to limit the results returned. 

182 endDate : `astropy.time.Time`, optional 

183 End date to limit the results returned. 

184 

185 Returns 

186 ------- 

187 table : `astropy.table.Table` 

188 A table containing the fields requested, with each row 

189 corresponding to one date (available in the ``"time"`` 

190 column). 

191 """ 

192 query = "SELECT " 

193 

194 if not fields: 

195 query += "*" 

196 else: 

197 query += ",".join(fields) 

198 

199 query += f" FROM \"{topicName}\"" 

200 

201 if startDate is not None or endDate is not None: 

202 query += " WHERE" 

203 

204 if startDate is not None: 

205 query += f" time >= '{startDate.utc.isot}Z'" 

206 if endDate is not None: 

207 query += " AND" 

208 if endDate is not None: 

209 query += f" time <= '{endDate.utc.isot}Z'" 

210 

211 data = self.query(query) 

212 

213 # data is a dictionary with one key, "results" 

214 results = data["results"][0] 

215 if "series" in results: 

216 series = results["series"][0] 

217 else: 

218 raise RuntimeError(f"No results found for query: {query}") 

219 

220 schemaDtype = self.getSchemaDtype(topicName) 

221 tableDtype = [] 

222 for dtype in schemaDtype: 

223 if dtype[0] in series["columns"]: 

224 tableDtype.append(dtype[1]) 

225 

226 # The value stored in "time" may not be consistent and 

227 # monotonic (and is in UTC). "private_sndStamp" comes from 

228 # the device itself, and is therefore preferred. 

229 table = Table(rows=series["values"], names=series["columns"], dtype=tableDtype) 

230 table["time"] = Time(table["time"], scale="utc").tai 

231 table.sort("time") 

232 if "private_sndStamp" in table.columns: 

233 table["private_sndStamp"] = Time(table["private_sndStamp"], format="unix_tai") 

234 table.sort("private_sndStamp") 

235 

236 return table 

237 

238 def searchResults(self, data, dateStr): 

239 """Find the row entry in ``data`` immediately preceding the specified 

240 date. 

241 

242 Parameters 

243 ---------- 

244 data : `astropy.table.Table` 

245 The table of results from the EFD. 

246 dateStr : `str` 

247 The date (in TAI) to look up in the status for. 

248 

249 Returns 

250 ------- 

251 result = `astropy.table.Row` 

252 The row of the data table corresponding to ``dateStr``. 

253 """ 

254 dateValue = Time(dateStr, scale='tai', format="isot") 

255 # Table is now sorted on "time", which is in TAI. 

256 

257 # Check that the date we want to consider is contained in the 

258 # EFD data. 

259 if data["time"][0] > dateValue or data["time"][-1] < dateValue: 

260 msg = f"Requested date {dateStr} outside of data range {data['time'][0]}-{data['time'][-1]}" 

261 if self.dieOnSearch: 

262 raise RuntimeError(msg) 

263 else: 

264 # Return the start, as we're more likely to have 

265 # errors in that direction. 

266 self.log.warning(msg) 

267 return data[0], np.nan 

268 

269 # Binary search through the EFD entries in date, until the 

270 # most recent monochromator state update prior to the spectrum 

271 # in question is found. 

272 low = 0 

273 high = len(data) 

274 idx = (high + low) // 2 

275 found = False 

276 iteration = 0 

277 while not found: 

278 if idx < 0 or idx > len(data) or iteration > 20: 

279 raise RuntimeError(f"Search for date failed: {dateValue} {idx} {iteration}.") 

280 

281 myTime = data["private_sndStamp"][idx] 

282 if myTime <= dateValue: 

283 low = idx 

284 elif myTime > dateValue: 

285 high = idx 

286 

287 idx = (high + low) // 2 

288 iteration += 1 

289 if high - low == 1: 

290 found = True 

291 self.log.debug("parse search %d %d %d %d %s %s", 

292 low, high, idx, found, myTime, dateValue) 

293 

294 # End binary search. 

295 return data[idx], idx 

296 

297 def getEfdMonochromatorData(self, dataSeries=None, dateMin=None, dateMax=None): 

298 """Retrieve Monochromator data from the EFD. 

299 

300 Parameters 

301 ---------- 

302 dataSeries : `str`, optional 

303 Data series to request from the EFD. 

304 dateMin : `str`, optional 

305 Minimum date (in TAI) to retrieve from EFD. 

306 dateMax : `str`, optional 

307 Maximum date (in TAI) to retrieve from EFD. 

308 

309 Returns 

310 ------- 

311 results : `astropy.table.Table` 

312 The table of results returned from the EFD. 

313 """ 

314 # This is currently the only monochromator available. 

315 dataSeries = dataSeries if dataSeries else "lsst.sal.ATMonochromator.logevent_wavelength" 

316 

317 if dateMin: 

318 startDate = Time(dateMin, format="isot", scale="tai") 

319 else: 

320 startDate = None 

321 if dateMax: 

322 stopDate = Time(dateMax, format="isot", scale="tai") 

323 else: 

324 stopDate = None 

325 

326 results = self.selectTimeSeries(dataSeries, ["wavelength", "private_sndStamp"], 

327 startDate, stopDate) 

328 return results 

329 

330 def parseMonochromatorStatus(self, data, dateStr): 

331 """Determine monochromator status for a specific date. 

332 

333 Parameters 

334 ---------- 

335 data : `astropy.table.Table` 

336 The dataframe of monochromator results from the EFD. 

337 dateStr : `str` 

338 The date (in TAI) to look up in the status for. 

339 

340 Returns 

341 ------- 

342 indexDate : `str` 

343 Date string (in TAI) indicating the monochromator state change. 

344 wavelength : `float` 

345 Monochromator commanded peak. 

346 """ 

347 result, _ = self.searchResults(data, dateStr) 

348 myTime = result["private_sndStamp"] 

349 return myTime.strftime("%Y-%m-%dT%H:%M:%S.%f"), result["wavelength"] 

350 

351 def getEfdElectrometerData(self, dataSeries=None, dateMin=None, dateMax=None): 

352 """Retrieve Electrometer data from the EFD. 

353 

354 Parameters 

355 ---------- 

356 dataSeries : `str`, optional 

357 Data series to request from the EFD. 

358 dateMin : `str`, optional 

359 Minimum date (in TAI) to retrieve from EFD. 

360 dateMax : `str`, optional 

361 Maximum date (in TAI) to retrieve from EFD. 

362 

363 Returns 

364 ------- 

365 results : `astropy.table.Table` 

366 The table of results returned from the EFD. 

367 """ 

368 # All electrometer data gets written to the same series. 

369 defaultSeries = "lsst.sal.Electrometer.logevent_intensity" 

370 alternateSeries = "lsst.sal.Electrometer.logevent_logMessage" 

371 dataSeries = dataSeries if dataSeries else defaultSeries 

372 

373 if dateMin: 

374 startDate = Time(dateMin, format="isot", scale="tai") 

375 else: 

376 startDate = None 

377 if dateMax: 

378 stopDate = Time(dateMax, format="isot", scale="tai") 

379 else: 

380 stopDate = None 

381 

382 if (dataSeries == alternateSeries and (startDate is None or stopDate is None)): 

383 raise RuntimeError("Cannot query logevent_logMessage without dates to limit memory issues.") 

384 

385 results = self.selectTimeSeries(dataSeries, [], 

386 startDate, stopDate) 

387 if dataSeries == "lsst.sal.Electrometer.logevent_logMessage": 

388 results = self.rewriteElectrometerStatus(results) 

389 return results 

390 

391 def rewriteElectrometerStatus(self, inResults): 

392 """Rewrite intermediate electrometer data extracted from the EFD 

393 logEvents. 

394 

395 Parameters 

396 ---------- 

397 inResults : `astropy.table.Table` 

398 The table of results returned from the EFD. 

399 

400 Returns 

401 ------- 

402 outResults : `astropy.table.Table` 

403 The rewritten table containing only electrometer summary 

404 status events. 

405 """ 

406 # This is fragile against upstream changes. 

407 # Ignore all entries that are not the ones we care about. 

408 outResults = inResults[inResults["functionName"] == "write_fits_file"] 

409 outResults = outResults[outResults["level"] == 20] 

410 

411 # These will be new columns 

412 intensityMean = [] 

413 intensityStdev = [] 

414 intensityTime = [] 

415 intensityFile = [] 

416 

417 for row in outResults: 

418 # Fallback values in case the regexp fails 

419 mean = np.nan 

420 stdev = np.nan 

421 time_mean = np.nan 

422 filename = "REGEXP_FAIL" 

423 

424 # Find the last "filename\.extension" before the first newline; 

425 # the last [] grouped values before the second newline; 

426 # the last [] grouped values before the end of the string. 

427 magic = re.findall(r"\b\w+.+?(\w+?\.\w+)\n\b\w+.+\[(.*?)\]\n\b\w+.+\[(.*?)\]$", 

428 row["message"]) 

429 if len(magic) != 0: 

430 # If we matched, split the grouped values, cast them to floats. 

431 filename, intensity_str, time_str = magic[0] 

432 mean, median, stdev = intensity_str.split(",") 

433 time_mean, time_median = time_str.split(",") 

434 mean = float(mean) 

435 median = float(median) 

436 stdev = float(stdev) 

437 time_mean = float(time_mean) 

438 time_median = float(time_median) 

439 # Censor the saturated points so plots look nice. 

440 if np.abs(mean) > 1e37: 

441 mean = np.nan 

442 

443 intensityMean.append(mean) 

444 intensityStdev.append(stdev) 

445 intensityFile.append(filename) 

446 intensityTime.append(time_mean) 

447 

448 # Add our new columns at the start of the column list 

449 outResults.add_column(intensityMean, name="intensity", index=1) 

450 outResults.add_column(intensityStdev, name="intensityStd", index=2) 

451 outResults.add_column(intensityTime, name="intensityTimeMean", index=3) 

452 outResults.add_column(intensityFile, name="expectedLfaFile", index=4) 

453 return outResults 

454 

455 def parseElectrometerStatus(self, data, dateStr, dateEnd=None, 

456 doIntegrateSamples=False, index=201): 

457 """Determine electrometer status for a specific date. 

458 

459 Parameters 

460 ---------- 

461 data : `astropy.table.Table` 

462 The dataframe of electrometer results from the EFD. 

463 dateStr : `str` 

464 The date (in TAI) to look up in the status for. 

465 dateEnd : `str` 

466 The end date (in TAI) to look in the status for. 

467 doIntegrateSamples: `bool` 

468 If true, take the average of all samples between 

469 ``dateStr`` and ``dateEnd``. 

470 index : `int` 

471 The salIndex of the device we want to read. For LATISS, 

472 this should be 201. For the main telescope, 101. 

473 

474 Returns 

475 ------- 

476 indexDate : `str` 

477 Date string (in TAI) indicating the electrometer state 

478 change. 

479 intensity: `float` 

480 Average electrometer intensity. 

481 """ 

482 if index is not None: 

483 mask = (data["salIndex"] == index) 

484 data = data[mask] 

485 

486 # searchResults returns the first entry prior to this date 

487 result, idx = self.searchResults(data, dateStr) 

488 

489 myTime = result["private_sndStamp"] 

490 myIntensity = result["intensity"] 

491 myEndTime = None 

492 

493 if doIntegrateSamples: 

494 myEndResult, myEndIdx = self.searchResults(data, dateEnd) 

495 if myEndIdx != idx: 

496 myEndTime = myEndResult["private_sndStamp"] 

497 myIntensity = np.mean(data[idx+1:myEndIdx]["intensity"]) 

498 

499 return (myTime.strftime("%Y-%m-%dT%H:%M:%S.%f"), 

500 myIntensity, 

501 myEndTime.strftime("%Y-%m-%dT%H:%M:%S.%f") if myEndTime else None)