Coverage for python / lsst / cp / pipe / utilsEfd.py: 8%
187 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-15 00:22 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-15 00:22 +0000
1# This file is part of cp_pipe.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
21#
23__all__ = ["CpEfdClient"]
25import logging
26import numpy as np
27import re
28import requests
30from astropy.table import Table
31from astropy.time import Time
32from urllib.parse import urljoin
35class CpEfdClient():
36 """An EFD client to retrieve calibration results.
38 Parameters
39 ----------
40 efdInstance : `str`, optional
41 EFD instance name to connect to.
42 log : `logging.Logger`, optional
43 Log to write messages to.
44 """
46 def __init__(self, efdInstance="usdf_efd", dieOnSearch=False, log=None):
47 self.log = log if log else logging.getLogger(__name__)
48 self.dieOnSearch = dieOnSearch
50 authDict = self._getAuth(efdInstance)
51 self._auth = (authDict["username"], authDict["password"])
52 self._databaseName = "efd"
53 self._databaseUrl = urljoin(f"https://{authDict['host']}", authDict["path"])
55 self.checkConnection()
57 def _getAuth(self, instanceAlias):
58 """Get authorization credentials.
60 Parameters
61 ----------
62 instanceAlias : `str`
63 EFD instance to get credentials for.
65 Returns
66 -------
67 credentials : `dict` [`str`, `str`]
68 A dictionary of authorization credentials, including at
69 least these key/value pairs:
71 ``"username"``
72 Login username.
73 ``"password"``
74 Login passwords.
75 ``"host"``
76 Host to connect to.
77 ``"path"``
78 Directory path for EFD instance.
80 Raises
81 ------
82 RuntimeError :
83 Raised if the HTTPS request fails.
84 """
85 serviceEndpoint = "https://roundtable.lsst.codes/segwarides/"
86 url = urljoin(serviceEndpoint, f"creds/{instanceAlias}")
87 response = requests.get(url)
89 if response.status_code == 200:
90 return response.json()
91 else:
92 raise RuntimeError(f"Could not connect to {url}")
94 def checkConnection(self):
95 """Check the connection to the EFD.
97 Raises
98 ------
99 RuntimeError :
100 Raised if the connection check fails.
101 """
102 # The ping command will return 204 (No Content) on success.
104 params = {"wait_for_leader": "5s"}
106 response = requests.get(f"{self._databaseUrl}/ping", params=params, auth=self._auth)
107 response.raise_for_status()
108 if response.status_code != 204:
109 raise RuntimeError(f"Connection check failed for {self._databaseUrl}")
111 def getSchemaDtype(self, topicName):
112 """Get datatypes for a topic.
114 Parameters
115 ----------
116 topicName : `str`
117 Topic to get datatypes for
119 Returns
120 -------
121 datatypes : `list` [`tuple` [`str`, `str`]]
122 List of tuples of field names and data types.
123 """
124 query = f"SHOW FIELD KEYS FROM \"{topicName}\""
125 data = self.query(query)
127 values = data["results"][0]["series"][0]["values"]
129 dtype = [("time", "str")]
130 for (fieldName, fieldType) in values:
131 if fieldType == "float":
132 fieldDtype = np.float64
133 elif fieldType == "integer":
134 fieldDtype = np.int64
135 elif fieldType == "string":
136 fieldDtype = "str"
137 dtype.append((fieldName, fieldDtype))
138 return dtype
140 def query(self, query):
141 """Execute an EFD query.
143 Parameters
144 ----------
145 query : `str`
146 Query to run.
148 Returns
149 -------
150 results : `dict`
151 Dictionary of results returned.
153 Raises
154 ------
155 RuntimeError :
156 Raised if the the database could not be read from.
157 """
158 params = {
159 "db": self._databaseName,
160 "q": query,
161 }
163 try:
164 response = requests.get(f"{self._databaseUrl}/query", params=params, auth=self._auth)
165 response.raise_for_status()
166 return response.json()
167 except requests.exceptions.RequestException as e:
168 raise RuntimeError(f"Could not read data from database with query: {query}") from e
170 def selectTimeSeries(self, topicName, fields=[], startDate=None, endDate=None):
171 """Query a topic for a time series.
173 Parameters
174 ----------
175 topicName : `str`
176 Database "topic" to query.
177 fields : `list`, optional
178 List of fields to return. If empty, all fields are
179 returned.
180 startDate : `astropy.time.Time`, optional
181 Start date to limit the results returned.
182 endDate : `astropy.time.Time`, optional
183 End date to limit the results returned.
185 Returns
186 -------
187 table : `astropy.table.Table`
188 A table containing the fields requested, with each row
189 corresponding to one date (available in the ``"time"``
190 column).
191 """
192 query = "SELECT "
194 if not fields:
195 query += "*"
196 else:
197 query += ",".join(fields)
199 query += f" FROM \"{topicName}\""
201 if startDate is not None or endDate is not None:
202 query += " WHERE"
204 if startDate is not None:
205 query += f" time >= '{startDate.utc.isot}Z'"
206 if endDate is not None:
207 query += " AND"
208 if endDate is not None:
209 query += f" time <= '{endDate.utc.isot}Z'"
211 data = self.query(query)
213 # data is a dictionary with one key, "results"
214 results = data["results"][0]
215 if "series" in results:
216 series = results["series"][0]
217 else:
218 raise RuntimeError(f"No results found for query: {query}")
220 schemaDtype = self.getSchemaDtype(topicName)
221 tableDtype = []
222 for dtype in schemaDtype:
223 if dtype[0] in series["columns"]:
224 tableDtype.append(dtype[1])
226 # The value stored in "time" may not be consistent and
227 # monotonic (and is in UTC). "private_sndStamp" comes from
228 # the device itself, and is therefore preferred.
229 table = Table(rows=series["values"], names=series["columns"], dtype=tableDtype)
230 table["time"] = Time(table["time"], scale="utc").tai
231 table.sort("time")
232 if "private_sndStamp" in table.columns:
233 table["private_sndStamp"] = Time(table["private_sndStamp"], format="unix_tai")
234 table.sort("private_sndStamp")
236 return table
238 def searchResults(self, data, dateStr):
239 """Find the row entry in ``data`` immediately preceding the specified
240 date.
242 Parameters
243 ----------
244 data : `astropy.table.Table`
245 The table of results from the EFD.
246 dateStr : `str`
247 The date (in TAI) to look up in the status for.
249 Returns
250 -------
251 result = `astropy.table.Row`
252 The row of the data table corresponding to ``dateStr``.
253 """
254 dateValue = Time(dateStr, scale='tai', format="isot")
255 # Table is now sorted on "time", which is in TAI.
257 # Check that the date we want to consider is contained in the
258 # EFD data.
259 if data["time"][0] > dateValue or data["time"][-1] < dateValue:
260 msg = f"Requested date {dateStr} outside of data range {data['time'][0]}-{data['time'][-1]}"
261 if self.dieOnSearch:
262 raise RuntimeError(msg)
263 else:
264 # Return the start, as we're more likely to have
265 # errors in that direction.
266 self.log.warning(msg)
267 return data[0], np.nan
269 # Binary search through the EFD entries in date, until the
270 # most recent monochromator state update prior to the spectrum
271 # in question is found.
272 low = 0
273 high = len(data)
274 idx = (high + low) // 2
275 found = False
276 iteration = 0
277 while not found:
278 if idx < 0 or idx > len(data) or iteration > 20:
279 raise RuntimeError(f"Search for date failed: {dateValue} {idx} {iteration}.")
281 myTime = data["private_sndStamp"][idx]
282 if myTime <= dateValue:
283 low = idx
284 elif myTime > dateValue:
285 high = idx
287 idx = (high + low) // 2
288 iteration += 1
289 if high - low == 1:
290 found = True
291 self.log.debug("parse search %d %d %d %d %s %s",
292 low, high, idx, found, myTime, dateValue)
294 # End binary search.
295 return data[idx], idx
297 def getEfdMonochromatorData(self, dataSeries=None, dateMin=None, dateMax=None):
298 """Retrieve Monochromator data from the EFD.
300 Parameters
301 ----------
302 dataSeries : `str`, optional
303 Data series to request from the EFD.
304 dateMin : `str`, optional
305 Minimum date (in TAI) to retrieve from EFD.
306 dateMax : `str`, optional
307 Maximum date (in TAI) to retrieve from EFD.
309 Returns
310 -------
311 results : `astropy.table.Table`
312 The table of results returned from the EFD.
313 """
314 # This is currently the only monochromator available.
315 dataSeries = dataSeries if dataSeries else "lsst.sal.ATMonochromator.logevent_wavelength"
317 if dateMin:
318 startDate = Time(dateMin, format="isot", scale="tai")
319 else:
320 startDate = None
321 if dateMax:
322 stopDate = Time(dateMax, format="isot", scale="tai")
323 else:
324 stopDate = None
326 results = self.selectTimeSeries(dataSeries, ["wavelength", "private_sndStamp"],
327 startDate, stopDate)
328 return results
330 def parseMonochromatorStatus(self, data, dateStr):
331 """Determine monochromator status for a specific date.
333 Parameters
334 ----------
335 data : `astropy.table.Table`
336 The dataframe of monochromator results from the EFD.
337 dateStr : `str`
338 The date (in TAI) to look up in the status for.
340 Returns
341 -------
342 indexDate : `str`
343 Date string (in TAI) indicating the monochromator state change.
344 wavelength : `float`
345 Monochromator commanded peak.
346 """
347 result, _ = self.searchResults(data, dateStr)
348 myTime = result["private_sndStamp"]
349 return myTime.strftime("%Y-%m-%dT%H:%M:%S.%f"), result["wavelength"]
351 def getEfdElectrometerData(self, dataSeries=None, dateMin=None, dateMax=None):
352 """Retrieve Electrometer data from the EFD.
354 Parameters
355 ----------
356 dataSeries : `str`, optional
357 Data series to request from the EFD.
358 dateMin : `str`, optional
359 Minimum date (in TAI) to retrieve from EFD.
360 dateMax : `str`, optional
361 Maximum date (in TAI) to retrieve from EFD.
363 Returns
364 -------
365 results : `astropy.table.Table`
366 The table of results returned from the EFD.
367 """
368 # All electrometer data gets written to the same series.
369 defaultSeries = "lsst.sal.Electrometer.logevent_intensity"
370 alternateSeries = "lsst.sal.Electrometer.logevent_logMessage"
371 dataSeries = dataSeries if dataSeries else defaultSeries
373 if dateMin:
374 startDate = Time(dateMin, format="isot", scale="tai")
375 else:
376 startDate = None
377 if dateMax:
378 stopDate = Time(dateMax, format="isot", scale="tai")
379 else:
380 stopDate = None
382 if (dataSeries == alternateSeries and (startDate is None or stopDate is None)):
383 raise RuntimeError("Cannot query logevent_logMessage without dates to limit memory issues.")
385 results = self.selectTimeSeries(dataSeries, [],
386 startDate, stopDate)
387 if dataSeries == "lsst.sal.Electrometer.logevent_logMessage":
388 results = self.rewriteElectrometerStatus(results)
389 return results
391 def rewriteElectrometerStatus(self, inResults):
392 """Rewrite intermediate electrometer data extracted from the EFD
393 logEvents.
395 Parameters
396 ----------
397 inResults : `astropy.table.Table`
398 The table of results returned from the EFD.
400 Returns
401 -------
402 outResults : `astropy.table.Table`
403 The rewritten table containing only electrometer summary
404 status events.
405 """
406 # This is fragile against upstream changes.
407 # Ignore all entries that are not the ones we care about.
408 outResults = inResults[inResults["functionName"] == "write_fits_file"]
409 outResults = outResults[outResults["level"] == 20]
411 # These will be new columns
412 intensityMean = []
413 intensityStdev = []
414 intensityTime = []
415 intensityFile = []
417 for row in outResults:
418 # Fallback values in case the regexp fails
419 mean = np.nan
420 stdev = np.nan
421 time_mean = np.nan
422 filename = "REGEXP_FAIL"
424 # Find the last "filename\.extension" before the first newline;
425 # the last [] grouped values before the second newline;
426 # the last [] grouped values before the end of the string.
427 magic = re.findall(r"\b\w+.+?(\w+?\.\w+)\n\b\w+.+\[(.*?)\]\n\b\w+.+\[(.*?)\]$",
428 row["message"])
429 if len(magic) != 0:
430 # If we matched, split the grouped values, cast them to floats.
431 filename, intensity_str, time_str = magic[0]
432 mean, median, stdev = intensity_str.split(",")
433 time_mean, time_median = time_str.split(",")
434 mean = float(mean)
435 median = float(median)
436 stdev = float(stdev)
437 time_mean = float(time_mean)
438 time_median = float(time_median)
439 # Censor the saturated points so plots look nice.
440 if np.abs(mean) > 1e37:
441 mean = np.nan
443 intensityMean.append(mean)
444 intensityStdev.append(stdev)
445 intensityFile.append(filename)
446 intensityTime.append(time_mean)
448 # Add our new columns at the start of the column list
449 outResults.add_column(intensityMean, name="intensity", index=1)
450 outResults.add_column(intensityStdev, name="intensityStd", index=2)
451 outResults.add_column(intensityTime, name="intensityTimeMean", index=3)
452 outResults.add_column(intensityFile, name="expectedLfaFile", index=4)
453 return outResults
455 def parseElectrometerStatus(self, data, dateStr, dateEnd=None,
456 doIntegrateSamples=False, index=201):
457 """Determine electrometer status for a specific date.
459 Parameters
460 ----------
461 data : `astropy.table.Table`
462 The dataframe of electrometer results from the EFD.
463 dateStr : `str`
464 The date (in TAI) to look up in the status for.
465 dateEnd : `str`
466 The end date (in TAI) to look in the status for.
467 doIntegrateSamples: `bool`
468 If true, take the average of all samples between
469 ``dateStr`` and ``dateEnd``.
470 index : `int`
471 The salIndex of the device we want to read. For LATISS,
472 this should be 201. For the main telescope, 101.
474 Returns
475 -------
476 indexDate : `str`
477 Date string (in TAI) indicating the electrometer state
478 change.
479 intensity: `float`
480 Average electrometer intensity.
481 """
482 if index is not None:
483 mask = (data["salIndex"] == index)
484 data = data[mask]
486 # searchResults returns the first entry prior to this date
487 result, idx = self.searchResults(data, dateStr)
489 myTime = result["private_sndStamp"]
490 myIntensity = result["intensity"]
491 myEndTime = None
493 if doIntegrateSamples:
494 myEndResult, myEndIdx = self.searchResults(data, dateEnd)
495 if myEndIdx != idx:
496 myEndTime = myEndResult["private_sndStamp"]
497 myIntensity = np.mean(data[idx+1:myEndIdx]["intensity"])
499 return (myTime.strftime("%Y-%m-%dT%H:%M:%S.%f"),
500 myIntensity,
501 myEndTime.strftime("%Y-%m-%dT%H:%M:%S.%f") if myEndTime else None)