Coverage for python / lsst / summit / utils / m1m3 / inertia_compensation_system.py: 20%
149 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-15 00:32 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-15 00:32 +0000
1# This file is part of summit_utils.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
21from __future__ import annotations
23import logging
24from datetime import timedelta
26import numpy as np
27import pandas as pd
28from astropy import units as u
29from astropy.time import Time
30from pandas import DataFrame, DatetimeIndex, Series
32from lsst.summit.utils.efdUtils import getEfdData
33from lsst.summit.utils.tmaUtils import TMAEvent, TMAEventMaker
34from lsst.ts.xml.tables.m1m3 import FATABLE_XFA, FATABLE_YFA, FATABLE_ZFA, HP_COUNT # type: ignore
36HAS_EFD_CLIENT = True
37try:
38 from lsst_efd_client import EfdClient
39except ImportError:
40 EfdClient = None # this is currently just for mypy
41 HAS_EFD_CLIENT = False
43__all__ = [
44 "M1M3ICSAnalysis",
45 "find_adjacent_true_regions",
46 "evaluate_m1m3_ics_single_slew",
47 "evaluate_m1m3_ics_day_obs",
48]
51class M1M3ICSAnalysis:
52 """
53 Evaluate the M1M3 Inertia Compensation System's performance by calculating
54 the minima, maxima and peak-to-peak values during a slew. In addition,
55 calculates the mean, median and standard deviation when the slew has
56 contant velocity or zero acceleration.
58 Parameters
59 ----------
60 event : `lsst.summit.utils.tmaUtils.TMAEvent`
61 Abtract representation of a slew event.
62 efd_client : `EfdClient`
63 Client to access the EFD.
64 inner_pad : `float`, optional
65 Time padding inside the stable time window of the slew.
66 outer_pad : `float`, optional
67 Time padding outside the slew time window.
68 n_sigma : `float`, optional
69 Number of standard deviations to use for the stable region.
70 log : `logging.Logger`, optional
71 Logger object to use for logging messages.
72 """
74 def __init__(
75 self,
76 event: TMAEvent,
77 efd_client: EfdClient,
78 inner_pad: float = 1.0,
79 outer_pad: float = 1.0,
80 n_sigma: float = 1.0,
81 log: logging.Logger | None = None,
82 ) -> None:
83 self.log = (
84 log.getChild(type(self).__name__) if log is not None else logging.getLogger(type(self).__name__)
85 )
87 self.event = event
88 self.inner_pad = inner_pad * u.second
89 self.outer_pad = outer_pad * u.second
90 self.n_sigma = n_sigma
91 self.client = efd_client
93 self.number_of_hardpoints = HP_COUNT
94 self.measured_forces_topics = [f"measuredForce{i}" for i in range(self.number_of_hardpoints)]
96 self.applied_forces_topics = (
97 [f"xForces{actuator}" for actuator in range(FATABLE_XFA)]
98 + [f"yForces{actuator}" for actuator in range(FATABLE_YFA)]
99 + [f"zForces{actuator}" for actuator in range(FATABLE_ZFA)]
100 )
102 self.log.info(f"Querying datasets for {event.dayObs=} {event.seqNum=}")
103 self.df = self.query_dataset()
105 self.log.info("Calculating statistics")
106 self.stats = self.get_stats()
108 self.log.info("Packing results into a Series")
109 self.stats = self.pack_stats_series()
111 def query_dataset(self) -> DataFrame:
112 """
113 Queries all the relevant data, resampling them to have the same
114 frequency, and merges them into a single dataframe.
116 Returns
117 -------
118 data : `pd.DataFrame`
119 The data.
120 """
121 evt = self.event
122 query_config = {
123 "hp_measured_forces": {
124 "topic": "lsst.sal.MTM1M3.hardpointActuatorData",
125 "columns": self.measured_forces_topics,
126 "err_msg": f"No hard-point data found for event {evt.seqNum} on {evt.dayObs}",
127 },
128 "tma_az": {
129 "topic": "lsst.sal.MTMount.azimuth",
130 "columns": [
131 "timestamp",
132 "actualPosition",
133 "actualVelocity",
134 "actualTorque",
135 ],
136 "err_msg": f"No TMA azimuth data found for event {evt.seqNum} on {evt.dayObs}",
137 "reset_index": True,
138 "rename_columns": {
139 "actualTorque": "az_actual_torque",
140 "actualVelocity": "az_actual_velocity",
141 "actualPosition": "az_actual_position",
142 },
143 },
144 "tma_el": {
145 "topic": "lsst.sal.MTMount.elevation",
146 "columns": [
147 "timestamp",
148 "actualPosition",
149 "actualVelocity",
150 "actualTorque",
151 ],
152 "err_msg": f"No TMA elevation data found for event {evt.seqNum} on {evt.dayObs}",
153 "reset_index": True,
154 "rename_columns": {
155 "actualPosition": "el_actual_position",
156 "actualTorque": "el_actual_torque",
157 "actualVelocity": "el_actual_velocity",
158 },
159 },
160 }
162 # Query datasets
163 queries = {key: self.query_efd_data(**cfg) for key, cfg in query_config.items()} # type: ignore
165 # Merge datasets
166 df = self.merge_datasets(queries)
168 # Convert torque from Nm to kNm
169 cols = ["az_actual_torque", "el_actual_torque"]
170 df.loc[:, cols] *= 1e-3
172 return df
174 def merge_datasets(self, queries: dict[str, DataFrame]) -> DataFrame:
175 """
176 Merge multiple datasets based on their timestamps.
178 Parameters
179 ----------
180 queries (dict[str, pd.DataFrame]):
181 A dictionary of dataframes to be merged.
183 Returns
184 -------
185 df : `pd.DataFrame`
186 A merged dataframe.
187 """
188 merge_cfg = {
189 "left_index": True,
190 "right_index": True,
191 "tolerance": timedelta(seconds=1),
192 "direction": "nearest",
193 }
195 self.log.info("Merging datasets")
196 df_list = [df for _, df in queries.items()]
197 merged_df = df_list[0]
199 for df in df_list[1:]:
200 merged_df = pd.merge_asof(merged_df, df, **merge_cfg)
202 return merged_df
204 def query_efd_data(
205 self,
206 topic: str,
207 columns: list[str],
208 err_msg: str | None = None,
209 reset_index: bool = False,
210 rename_columns: dict | None = None,
211 resample: float | None = None,
212 ) -> DataFrame:
213 """
214 Query the EFD data for a given topic and return a dataframe.
216 Parameters
217 ----------
218 topic : `str`
219 The topic to query.
220 columns : `List[str]`
221 The columns to query.
222 err_msg : `str`, optional
223 The error message to raise if no data is found. If None, it creates
224 a dataframe padded with zeros.
225 reset_index : `bool`, optional
226 Whether to reset the index of the dataframe.
227 rename_columns : `dict`, optional
228 A dictionary of column names to rename.
229 resample : `float`, optional
230 The resampling frequency in seconds.
232 Returns
233 -------
234 df : `pd.DataFrame`
235 A dataframe containing the queried data. If no data is found and
236 `err_msg` is None, returns a dataframe padded with zeros.
237 """
238 self.log.info(f"Querying dataset: {topic}")
239 df = getEfdData(
240 self.client,
241 topic,
242 columns=columns,
243 event=self.event,
244 prePadding=self.outer_pad,
245 postPadding=self.outer_pad,
246 warn=False,
247 raiseIfTopicNotInSchema=False,
248 )
250 self.log.debug(f"Queried {df.index.size} rows from {topic}")
251 if df.index.size == 0:
252 if err_msg is not None:
253 self.log.error(err_msg)
254 raise ValueError(err_msg)
255 else:
256 self.log.warning(f"Empty dataset for {topic}. Returning a zero-padded dataframe.")
257 begin_timestamp = pd.Timestamp(self.event.begin.unix, unit="s")
258 end_timestamp = pd.Timestamp(self.event.end.unix, unit="s")
259 index = pd.DatetimeIndex(pd.date_range(begin_timestamp, end_timestamp, freq="1S"))
260 df = pd.DataFrame(
261 columns=columns,
262 index=index,
263 data=np.zeros((index.size, len(columns))),
264 )
266 if rename_columns is not None:
267 df = df.rename(columns=rename_columns)
269 if reset_index:
270 df["timestamp"] = Time(df["timestamp"], format="unix_tai", scale="utc").datetime
271 df.set_index("timestamp", inplace=True)
272 df.index = df.index.tz_localize("UTC")
274 return df
276 def get_midppoint(self) -> Time:
277 """Return the halfway point between begin and end."""
278 return self.df.index[len(self.df.index) // 2]
280 def get_stats(self) -> DataFrame:
281 """
282 Calculate the statistics for each column in the retrieved dataset.
284 Returns
285 -------
286 data : `pd.DataFrame`
287 A DataFrame containing calculated statistics for each column in the
288 dataset. For each column, the statistics include minimum, maximum,
289 mean, and peak-to-peak values.
291 Notes
292 -----
293 This function computes statistics for each column in the provided
294 dataset. It utilizes the `get_minmax` function to calculate minimum,
295 maximum, mean, and peak-to-peak values for each column's data.
296 """
297 cols = self.measured_forces_topics
298 stats = DataFrame(data=[self.get_slew_minmax(self.df[col]) for col in cols], index=cols)
300 return stats
302 @staticmethod
303 def get_slew_minmax(s: Series) -> Series:
304 """
305 Calculates the min, max, mean, and peak-to-peak values for a data
306 series.
308 Parameters
309 ----------
310 s : `pd.Series`
311 The input pandas Series containing data.
313 Returns
314 -------
315 stats : `pd.Series`
316 A Series containing the following calculated values for the two
317 halves of the input Series:
318 - min: Minimum value of the Series.
319 - max: Maximum value of the Series.
320 - mean: Mean value of the Series.
321 - ptp: Peak-to-peak (ptp) value of the Series (abs(max - min)).
322 """
323 result = Series(
324 data=[s.min(), s.max(), s.mean(), np.ptp(s)],
325 index=["min", "max", "mean", "ptp"],
326 name=s.name,
327 )
328 return result
330 def pack_stats_series(self) -> Series:
331 """
332 Packs the stats DataFrame into a Series with custom index labels.
334 This method takes the DataFrame of statistics stored in the 'stats'
335 attribute of the current object and reshapes it into a Series where the
336 indexes are generated using custom labels based on the column names and
337 index positions. The resulting Series combines values from all columns
338 of the DataFrame.
340 Returns
341 -------
342 stats : `pd.Series`
343 A Series with custom index labels based on the column names and
344 index positions. The Series contains values from all columns of the
345 DataFrame.
346 """
347 if isinstance(self.stats, Series):
348 self.log.info("Stats are already packed into a Series.")
349 return self.stats
351 self.log.info("Packing stats into a Series.")
352 df = self.stats.transpose()
354 # Define the prefix patterns
355 column_prefixes = df.columns
356 index_positions = df.index
358 # Generate all combinations of prefixes and positions
359 index_prefixes = [
360 f"measuredForce{stat.capitalize()}{position}"
361 for stat in index_positions
362 for position, _ in enumerate(column_prefixes)
363 ]
365 # Flatten the DataFrame and set the new index
366 result_series = df.stack().reset_index(drop=True)
367 result_series.index = index_prefixes
369 # Append the event information to the Series
370 event_keys = [
371 "dayObs",
372 "seqNum",
373 "version",
374 "begin",
375 "end",
376 "duration",
377 "type",
378 "endReason",
379 ]
380 event_dict = vars(self.event)
381 event_dict = {key: val for key, val in event_dict.items() if key in event_keys}
383 # Create a pandas Series from the dictionary
384 event_series = pd.Series(event_dict)
386 # Create a new Pandas Series correlating event and system information
387 system_series = pd.Series(
388 {
389 "az_start": self.get_nearest_value("az_actual_torque", self.event.begin),
390 "az_end": self.get_nearest_value("az_actual_torque", self.event.end),
391 "az_extreme_vel": self.get_extreme_value("az_actual_velocity"),
392 "az_extreme_torque": self.get_extreme_value("az_actual_torque"),
393 "el_start": self.get_nearest_value("el_actual_torque", self.event.begin),
394 "el_end": self.get_nearest_value("el_actual_torque", self.event.end),
395 "el_extreme_vel": self.get_extreme_value("el_actual_velocity"),
396 "el_extreme_torque": self.get_extreme_value("el_actual_torque"),
397 "ics_enabled": self.get_ics_status(),
398 }
399 )
401 system_series["az_diff"] = system_series["az_end"] - system_series["az_start"]
402 system_series["el_diff"] = system_series["el_end"] - system_series["el_start"]
404 # Concatenate the two Series
405 result_series = pd.concat([event_series, system_series, result_series])
407 # Rename the series columns
408 result_series = result_series.rename(
409 {
410 "dayObs": "day_obs",
411 "seqNum": "seq_num",
412 "version": "version",
413 "begin": "time_begin",
414 "end": "time_end",
415 "duration": "time_duration",
416 "type": "slew_type",
417 "endReason": "end_reason",
418 }
419 )
421 # Display the resulting Series
422 return result_series
424 def get_extreme_value(self, column: str) -> float:
425 """
426 Returns the most extreme (either max or min) value from a given column.
428 Parameters
429 ----------
430 column : `str`
431 The column to query.
433 Returns
434 -------
435 extreme_val : `float`
436 The most extreme value from the given column.
437 """
438 index_of_extreme = self.df[column].abs().idxmax()
439 extreme_value = self.df.loc[index_of_extreme, column]
440 return extreme_value
442 def get_nearest_value(self, column: str, timestamp: Time) -> float:
443 """
444 Returns the nearest value to a given timestamp from a given column.
446 Parameters
447 ----------
448 column : `str`
449 The column to query.
450 timestamp : `astropy.time.Time`
451 The timestamp to query.
453 Returns
454 -------
455 nearest_val : float
456 The nearest value to the given timestamp from the given column.
457 """
458 timestamp = pd.Timestamp(timestamp.iso, tz="UTC")
459 time_diff = abs(self.df.index - timestamp)
460 idx = time_diff.argmin()
461 return self.df[column].iloc[idx]
463 def get_ics_status(self, threshold: float = 1e-6) -> bool:
464 """Get the status of the ICS for the given event.
466 Evaluates the values of the applied velocity and acceleration forces
467 inside the padded stable time window. If the values are all zero, then
468 this function will return False as the ICS was not enabled. Otherwise,
469 it will return True.
471 Parameters
472 ----------
473 threshold : `float`, optional
474 Threshold value used to determine if the ICS is enabled or not. If
475 all the values of the applied velocity and acceleration forces are
476 below this threshold, then the ICS is considered to be disabled.
478 Returns
479 -------
480 status : `bool`
481 True if the ICS is enabled, False otherwise.
482 """
483 avf0 = (self.df[[c for c in self.df.columns if "avf" in c]].abs() < threshold).all().eq(True).all()
484 aaf0 = (self.df[[c for c in self.df.columns if "aaf" in c]].abs() < threshold).all().eq(True).all()
485 return not (avf0 and aaf0)
488def find_adjacent_true_regions(
489 series: Series, min_adjacent: None | int = None
490) -> list[tuple[DatetimeIndex, DatetimeIndex]]:
491 """Find regions in a boolean Series containing adjacent True values.
493 Parameters
494 ----------
495 series : `pd.Series`
496 The boolean Series to search for regions.
497 min_adjacent : `int`, optional
498 Minimum number of adjacent True values in a region. Defaults to half
499 size of the series.
501 Returns
502 -------
503 true_regions : list[tuple[pd.DatetimeIndex, pd.DatetimeIndex]]
504 A list of tuples representing the start and end indices of regions
505 containing more than or equal to min_adjacent adjacent True values.
506 """
507 min_adjacent = min_adjacent if min_adjacent else 0.5 * series.size
508 regions = []
509 for key, group in series.groupby((series != series.shift()).cumsum()):
510 if key and len(group) >= min_adjacent:
511 region_indices = group.index
512 regions.append((region_indices.min(), region_indices.max()))
513 return regions
516def evaluate_m1m3_ics_single_slew(
517 event: TMAEvent,
518 efd_client: EfdClient,
519 inner_pad: float = 1.0,
520 outer_pad: float = 1.0,
521 n_sigma: float = 1.0,
522 log: logging.Logger | None = None,
523) -> M1M3ICSAnalysis:
524 """
525 Evaluate the M1M3 Inertia Compensation System for a single TMAEvent.
527 Parameters
528 ----------
529 event : `TMAEvent`
530 The TMA event to analyze.
531 efd_client : `EfdClient`
532 The EFD client to use to retrieve data.
533 inner_pad : `float`, optional
534 Time padding inside the stable time window of the slew.
535 outer_pad : `float`, optional
536 Time padding outside the slew time window.
537 n_sigma : `float`, optional
538 Number of standard deviations to use for the stable region.
539 log : `logging.Logger`, optional
540 Logger object to use for logging messages.
542 Returns
543 -------
544 result : `M1M3ICSAnalysis`
545 The results of the analysis.
547 Raises
548 ------
549 ValueError
550 Raised if there is no hardpoint data for the specified event.
551 """
552 log = log.getChild(__name__) if log is not None else logging.getLogger(__name__)
554 log.info("Starting inertia compensation system analysis.")
555 performance_analysis = M1M3ICSAnalysis(
556 event,
557 efd_client,
558 inner_pad=inner_pad,
559 outer_pad=outer_pad,
560 n_sigma=n_sigma,
561 log=log,
562 )
564 return performance_analysis
567def evaluate_m1m3_ics_day_obs(
568 day_obs: int,
569 event_maker: TMAEventMaker,
570 inner_pad: float = 1.0,
571 outer_pad: float = 1.0,
572 n_sigma: float = 1.0,
573 log: logging.Logger | None = None,
574) -> DataFrame:
575 """
576 Evaluate the M1M3 Inertia Compensation System in every slew event during a
577 `dayObs`.
579 Parameters
580 ----------
581 day_obs : `int`
582 Observation day in the YYYYMMDD format.
583 event_maker : `TMAEventMaker`
584 Object to retrieve TMA events.
585 inner_pad : `float`, optional
586 Time padding inside the stable time window of the slew.
587 outer_pad : `float`, optional
588 Time padding outside the slew time window.
589 n_sigma : `float`, optional
590 Number of standard deviations to use for the stable region.
591 log : `logging.Logger`, optional
592 Logger object to use for logging messages.
594 Returns
595 -------
596 results : `pd.DataFrame`
597 A data-frame containing the statistical summary of the analysis.
598 """
599 log = log.getChild(__name__) if log is not None else logging.getLogger(__name__)
600 log.setLevel(logging.DEBUG)
602 log.info("Retrieving TMA slew events.")
603 events = event_maker.getEvents(day_obs)
604 log.info(f"Found {len(events)} events for day {day_obs}")
606 stats = None
607 for event in events:
608 log.info(f"Start inertia compensation system analysis on {event.seqNum}.")
610 try:
611 performance_analysis = M1M3ICSAnalysis(
612 event,
613 event_maker.client,
614 inner_pad=inner_pad,
615 outer_pad=outer_pad,
616 n_sigma=n_sigma,
617 log=log,
618 )
619 log.info(f"Complete inertia compensation system analysis on {event.seqNum}.")
620 except ValueError:
621 log.warning(f"Missing data for {event.seqNum} on {event.dayObs}")
622 continue
624 if stats is None:
625 stats = performance_analysis.stats
626 else:
627 stats = pd.concat((stats.T, performance_analysis.stats), axis=1).T
629 assert isinstance(stats, DataFrame)
630 stats = stats.set_index("seq_num", drop=False)
631 return stats