Coverage for python / lsst / summit / utils / m1m3 / inertia_compensation_system.py: 20%

149 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-28 09:29 +0000

1# This file is part of summit_utils. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21from __future__ import annotations 

22 

23import logging 

24from datetime import timedelta 

25 

26import numpy as np 

27import pandas as pd 

28from astropy import units as u 

29from astropy.time import Time 

30from pandas import DataFrame, DatetimeIndex, Series 

31 

32from lsst.summit.utils.efdUtils import getEfdData 

33from lsst.summit.utils.tmaUtils import TMAEvent, TMAEventMaker 

34from lsst.ts.xml.tables.m1m3 import FATABLE_XFA, FATABLE_YFA, FATABLE_ZFA, HP_COUNT # type: ignore 

35 

36HAS_EFD_CLIENT = True 

37try: 

38 from lsst_efd_client import EfdClient 

39except ImportError: 

40 EfdClient = None # this is currently just for mypy 

41 HAS_EFD_CLIENT = False 

42 

43__all__ = [ 

44 "M1M3ICSAnalysis", 

45 "find_adjacent_true_regions", 

46 "evaluate_m1m3_ics_single_slew", 

47 "evaluate_m1m3_ics_day_obs", 

48] 

49 

50 

51class M1M3ICSAnalysis: 

52 """ 

53 Evaluate the M1M3 Inertia Compensation System's performance by calculating 

54 the minima, maxima and peak-to-peak values during a slew. In addition, 

55 calculates the mean, median and standard deviation when the slew has 

56 contant velocity or zero acceleration. 

57 

58 Parameters 

59 ---------- 

60 event : `lsst.summit.utils.tmaUtils.TMAEvent` 

61 Abtract representation of a slew event. 

62 efd_client : `EfdClient` 

63 Client to access the EFD. 

64 inner_pad : `float`, optional 

65 Time padding inside the stable time window of the slew. 

66 outer_pad : `float`, optional 

67 Time padding outside the slew time window. 

68 n_sigma : `float`, optional 

69 Number of standard deviations to use for the stable region. 

70 log : `logging.Logger`, optional 

71 Logger object to use for logging messages. 

72 """ 

73 

74 def __init__( 

75 self, 

76 event: TMAEvent, 

77 efd_client: EfdClient, 

78 inner_pad: float = 1.0, 

79 outer_pad: float = 1.0, 

80 n_sigma: float = 1.0, 

81 log: logging.Logger | None = None, 

82 ) -> None: 

83 self.log = ( 

84 log.getChild(type(self).__name__) if log is not None else logging.getLogger(type(self).__name__) 

85 ) 

86 

87 self.event = event 

88 self.inner_pad = inner_pad * u.second 

89 self.outer_pad = outer_pad * u.second 

90 self.n_sigma = n_sigma 

91 self.client = efd_client 

92 

93 self.number_of_hardpoints = HP_COUNT 

94 self.measured_forces_topics = [f"measuredForce{i}" for i in range(self.number_of_hardpoints)] 

95 

96 self.applied_forces_topics = ( 

97 [f"xForces{actuator}" for actuator in range(FATABLE_XFA)] 

98 + [f"yForces{actuator}" for actuator in range(FATABLE_YFA)] 

99 + [f"zForces{actuator}" for actuator in range(FATABLE_ZFA)] 

100 ) 

101 

102 self.log.info(f"Querying datasets for {event.dayObs=} {event.seqNum=}") 

103 self.df = self.query_dataset() 

104 

105 self.log.info("Calculating statistics") 

106 self.stats = self.get_stats() 

107 

108 self.log.info("Packing results into a Series") 

109 self.stats = self.pack_stats_series() 

110 

111 def query_dataset(self) -> DataFrame: 

112 """ 

113 Queries all the relevant data, resampling them to have the same 

114 frequency, and merges them into a single dataframe. 

115 

116 Returns 

117 ------- 

118 data : `pd.DataFrame` 

119 The data. 

120 """ 

121 evt = self.event 

122 query_config = { 

123 "hp_measured_forces": { 

124 "topic": "lsst.sal.MTM1M3.hardpointActuatorData", 

125 "columns": self.measured_forces_topics, 

126 "err_msg": f"No hard-point data found for event {evt.seqNum} on {evt.dayObs}", 

127 }, 

128 "tma_az": { 

129 "topic": "lsst.sal.MTMount.azimuth", 

130 "columns": [ 

131 "timestamp", 

132 "actualPosition", 

133 "actualVelocity", 

134 "actualTorque", 

135 ], 

136 "err_msg": f"No TMA azimuth data found for event {evt.seqNum} on {evt.dayObs}", 

137 "reset_index": True, 

138 "rename_columns": { 

139 "actualTorque": "az_actual_torque", 

140 "actualVelocity": "az_actual_velocity", 

141 "actualPosition": "az_actual_position", 

142 }, 

143 }, 

144 "tma_el": { 

145 "topic": "lsst.sal.MTMount.elevation", 

146 "columns": [ 

147 "timestamp", 

148 "actualPosition", 

149 "actualVelocity", 

150 "actualTorque", 

151 ], 

152 "err_msg": f"No TMA elevation data found for event {evt.seqNum} on {evt.dayObs}", 

153 "reset_index": True, 

154 "rename_columns": { 

155 "actualPosition": "el_actual_position", 

156 "actualTorque": "el_actual_torque", 

157 "actualVelocity": "el_actual_velocity", 

158 }, 

159 }, 

160 } 

161 

162 # Query datasets 

163 queries = {key: self.query_efd_data(**cfg) for key, cfg in query_config.items()} # type: ignore 

164 

165 # Merge datasets 

166 df = self.merge_datasets(queries) 

167 

168 # Convert torque from Nm to kNm 

169 cols = ["az_actual_torque", "el_actual_torque"] 

170 df.loc[:, cols] *= 1e-3 

171 

172 return df 

173 

174 def merge_datasets(self, queries: dict[str, DataFrame]) -> DataFrame: 

175 """ 

176 Merge multiple datasets based on their timestamps. 

177 

178 Parameters 

179 ---------- 

180 queries (dict[str, pd.DataFrame]): 

181 A dictionary of dataframes to be merged. 

182 

183 Returns 

184 ------- 

185 df : `pd.DataFrame` 

186 A merged dataframe. 

187 """ 

188 merge_cfg = { 

189 "left_index": True, 

190 "right_index": True, 

191 "tolerance": timedelta(seconds=1), 

192 "direction": "nearest", 

193 } 

194 

195 self.log.info("Merging datasets") 

196 df_list = [df for _, df in queries.items()] 

197 merged_df = df_list[0] 

198 

199 for df in df_list[1:]: 

200 merged_df = pd.merge_asof(merged_df, df, **merge_cfg) 

201 

202 return merged_df 

203 

204 def query_efd_data( 

205 self, 

206 topic: str, 

207 columns: list[str], 

208 err_msg: str | None = None, 

209 reset_index: bool = False, 

210 rename_columns: dict | None = None, 

211 resample: float | None = None, 

212 ) -> DataFrame: 

213 """ 

214 Query the EFD data for a given topic and return a dataframe. 

215 

216 Parameters 

217 ---------- 

218 topic : `str` 

219 The topic to query. 

220 columns : `List[str]` 

221 The columns to query. 

222 err_msg : `str`, optional 

223 The error message to raise if no data is found. If None, it creates 

224 a dataframe padded with zeros. 

225 reset_index : `bool`, optional 

226 Whether to reset the index of the dataframe. 

227 rename_columns : `dict`, optional 

228 A dictionary of column names to rename. 

229 resample : `float`, optional 

230 The resampling frequency in seconds. 

231 

232 Returns 

233 ------- 

234 df : `pd.DataFrame` 

235 A dataframe containing the queried data. If no data is found and 

236 `err_msg` is None, returns a dataframe padded with zeros. 

237 """ 

238 self.log.info(f"Querying dataset: {topic}") 

239 df = getEfdData( 

240 self.client, 

241 topic, 

242 columns=columns, 

243 event=self.event, 

244 prePadding=self.outer_pad, 

245 postPadding=self.outer_pad, 

246 warn=False, 

247 raiseIfTopicNotInSchema=False, 

248 ) 

249 

250 self.log.debug(f"Queried {df.index.size} rows from {topic}") 

251 if df.index.size == 0: 

252 if err_msg is not None: 

253 self.log.error(err_msg) 

254 raise ValueError(err_msg) 

255 else: 

256 self.log.warning(f"Empty dataset for {topic}. Returning a zero-padded dataframe.") 

257 begin_timestamp = pd.Timestamp(self.event.begin.unix, unit="s") 

258 end_timestamp = pd.Timestamp(self.event.end.unix, unit="s") 

259 index = pd.DatetimeIndex(pd.date_range(begin_timestamp, end_timestamp, freq="1S")) 

260 df = pd.DataFrame( 

261 columns=columns, 

262 index=index, 

263 data=np.zeros((index.size, len(columns))), 

264 ) 

265 

266 if rename_columns is not None: 

267 df = df.rename(columns=rename_columns) 

268 

269 if reset_index: 

270 df["timestamp"] = Time(df["timestamp"], format="unix_tai", scale="utc").datetime 

271 df.set_index("timestamp", inplace=True) 

272 df.index = df.index.tz_localize("UTC") 

273 

274 return df 

275 

276 def get_midppoint(self) -> Time: 

277 """Return the halfway point between begin and end.""" 

278 return self.df.index[len(self.df.index) // 2] 

279 

280 def get_stats(self) -> DataFrame: 

281 """ 

282 Calculate the statistics for each column in the retrieved dataset. 

283 

284 Returns 

285 ------- 

286 data : `pd.DataFrame` 

287 A DataFrame containing calculated statistics for each column in the 

288 dataset. For each column, the statistics include minimum, maximum, 

289 mean, and peak-to-peak values. 

290 

291 Notes 

292 ----- 

293 This function computes statistics for each column in the provided 

294 dataset. It utilizes the `get_minmax` function to calculate minimum, 

295 maximum, mean, and peak-to-peak values for each column's data. 

296 """ 

297 cols = self.measured_forces_topics 

298 stats = DataFrame(data=[self.get_slew_minmax(self.df[col]) for col in cols], index=cols) 

299 

300 return stats 

301 

302 @staticmethod 

303 def get_slew_minmax(s: Series) -> Series: 

304 """ 

305 Calculates the min, max, mean, and peak-to-peak values for a data 

306 series. 

307 

308 Parameters 

309 ---------- 

310 s : `pd.Series` 

311 The input pandas Series containing data. 

312 

313 Returns 

314 ------- 

315 stats : `pd.Series` 

316 A Series containing the following calculated values for the two 

317 halves of the input Series: 

318 - min: Minimum value of the Series. 

319 - max: Maximum value of the Series. 

320 - mean: Mean value of the Series. 

321 - ptp: Peak-to-peak (ptp) value of the Series (abs(max - min)). 

322 """ 

323 result = Series( 

324 data=[s.min(), s.max(), s.mean(), np.ptp(s)], 

325 index=["min", "max", "mean", "ptp"], 

326 name=s.name, 

327 ) 

328 return result 

329 

330 def pack_stats_series(self) -> Series: 

331 """ 

332 Packs the stats DataFrame into a Series with custom index labels. 

333 

334 This method takes the DataFrame of statistics stored in the 'stats' 

335 attribute of the current object and reshapes it into a Series where the 

336 indexes are generated using custom labels based on the column names and 

337 index positions. The resulting Series combines values from all columns 

338 of the DataFrame. 

339 

340 Returns 

341 ------- 

342 stats : `pd.Series` 

343 A Series with custom index labels based on the column names and 

344 index positions. The Series contains values from all columns of the 

345 DataFrame. 

346 """ 

347 if isinstance(self.stats, Series): 

348 self.log.info("Stats are already packed into a Series.") 

349 return self.stats 

350 

351 self.log.info("Packing stats into a Series.") 

352 df = self.stats.transpose() 

353 

354 # Define the prefix patterns 

355 column_prefixes = df.columns 

356 index_positions = df.index 

357 

358 # Generate all combinations of prefixes and positions 

359 index_prefixes = [ 

360 f"measuredForce{stat.capitalize()}{position}" 

361 for stat in index_positions 

362 for position, _ in enumerate(column_prefixes) 

363 ] 

364 

365 # Flatten the DataFrame and set the new index 

366 result_series = df.stack().reset_index(drop=True) 

367 result_series.index = index_prefixes 

368 

369 # Append the event information to the Series 

370 event_keys = [ 

371 "dayObs", 

372 "seqNum", 

373 "version", 

374 "begin", 

375 "end", 

376 "duration", 

377 "type", 

378 "endReason", 

379 ] 

380 event_dict = vars(self.event) 

381 event_dict = {key: val for key, val in event_dict.items() if key in event_keys} 

382 

383 # Create a pandas Series from the dictionary 

384 event_series = pd.Series(event_dict) 

385 

386 # Create a new Pandas Series correlating event and system information 

387 system_series = pd.Series( 

388 { 

389 "az_start": self.get_nearest_value("az_actual_torque", self.event.begin), 

390 "az_end": self.get_nearest_value("az_actual_torque", self.event.end), 

391 "az_extreme_vel": self.get_extreme_value("az_actual_velocity"), 

392 "az_extreme_torque": self.get_extreme_value("az_actual_torque"), 

393 "el_start": self.get_nearest_value("el_actual_torque", self.event.begin), 

394 "el_end": self.get_nearest_value("el_actual_torque", self.event.end), 

395 "el_extreme_vel": self.get_extreme_value("el_actual_velocity"), 

396 "el_extreme_torque": self.get_extreme_value("el_actual_torque"), 

397 "ics_enabled": self.get_ics_status(), 

398 } 

399 ) 

400 

401 system_series["az_diff"] = system_series["az_end"] - system_series["az_start"] 

402 system_series["el_diff"] = system_series["el_end"] - system_series["el_start"] 

403 

404 # Concatenate the two Series 

405 result_series = pd.concat([event_series, system_series, result_series]) 

406 

407 # Rename the series columns 

408 result_series = result_series.rename( 

409 { 

410 "dayObs": "day_obs", 

411 "seqNum": "seq_num", 

412 "version": "version", 

413 "begin": "time_begin", 

414 "end": "time_end", 

415 "duration": "time_duration", 

416 "type": "slew_type", 

417 "endReason": "end_reason", 

418 } 

419 ) 

420 

421 # Display the resulting Series 

422 return result_series 

423 

424 def get_extreme_value(self, column: str) -> float: 

425 """ 

426 Returns the most extreme (either max or min) value from a given column. 

427 

428 Parameters 

429 ---------- 

430 column : `str` 

431 The column to query. 

432 

433 Returns 

434 ------- 

435 extreme_val : `float` 

436 The most extreme value from the given column. 

437 """ 

438 index_of_extreme = self.df[column].abs().idxmax() 

439 extreme_value = self.df.loc[index_of_extreme, column] 

440 return extreme_value 

441 

442 def get_nearest_value(self, column: str, timestamp: Time) -> float: 

443 """ 

444 Returns the nearest value to a given timestamp from a given column. 

445 

446 Parameters 

447 ---------- 

448 column : `str` 

449 The column to query. 

450 timestamp : `astropy.time.Time` 

451 The timestamp to query. 

452 

453 Returns 

454 ------- 

455 nearest_val : float 

456 The nearest value to the given timestamp from the given column. 

457 """ 

458 timestamp = pd.Timestamp(timestamp.iso, tz="UTC") 

459 time_diff = abs(self.df.index - timestamp) 

460 idx = time_diff.argmin() 

461 return self.df[column].iloc[idx] 

462 

463 def get_ics_status(self, threshold: float = 1e-6) -> bool: 

464 """Get the status of the ICS for the given event. 

465 

466 Evaluates the values of the applied velocity and acceleration forces 

467 inside the padded stable time window. If the values are all zero, then 

468 this function will return False as the ICS was not enabled. Otherwise, 

469 it will return True. 

470 

471 Parameters 

472 ---------- 

473 threshold : `float`, optional 

474 Threshold value used to determine if the ICS is enabled or not. If 

475 all the values of the applied velocity and acceleration forces are 

476 below this threshold, then the ICS is considered to be disabled. 

477 

478 Returns 

479 ------- 

480 status : `bool` 

481 True if the ICS is enabled, False otherwise. 

482 """ 

483 avf0 = (self.df[[c for c in self.df.columns if "avf" in c]].abs() < threshold).all().eq(True).all() 

484 aaf0 = (self.df[[c for c in self.df.columns if "aaf" in c]].abs() < threshold).all().eq(True).all() 

485 return not (avf0 and aaf0) 

486 

487 

488def find_adjacent_true_regions( 

489 series: Series, min_adjacent: None | int = None 

490) -> list[tuple[DatetimeIndex, DatetimeIndex]]: 

491 """Find regions in a boolean Series containing adjacent True values. 

492 

493 Parameters 

494 ---------- 

495 series : `pd.Series` 

496 The boolean Series to search for regions. 

497 min_adjacent : `int`, optional 

498 Minimum number of adjacent True values in a region. Defaults to half 

499 size of the series. 

500 

501 Returns 

502 ------- 

503 true_regions : list[tuple[pd.DatetimeIndex, pd.DatetimeIndex]] 

504 A list of tuples representing the start and end indices of regions 

505 containing more than or equal to min_adjacent adjacent True values. 

506 """ 

507 min_adjacent = min_adjacent if min_adjacent else 0.5 * series.size 

508 regions = [] 

509 for key, group in series.groupby((series != series.shift()).cumsum()): 

510 if key and len(group) >= min_adjacent: 

511 region_indices = group.index 

512 regions.append((region_indices.min(), region_indices.max())) 

513 return regions 

514 

515 

516def evaluate_m1m3_ics_single_slew( 

517 event: TMAEvent, 

518 efd_client: EfdClient, 

519 inner_pad: float = 1.0, 

520 outer_pad: float = 1.0, 

521 n_sigma: float = 1.0, 

522 log: logging.Logger | None = None, 

523) -> M1M3ICSAnalysis: 

524 """ 

525 Evaluate the M1M3 Inertia Compensation System for a single TMAEvent. 

526 

527 Parameters 

528 ---------- 

529 event : `TMAEvent` 

530 The TMA event to analyze. 

531 efd_client : `EfdClient` 

532 The EFD client to use to retrieve data. 

533 inner_pad : `float`, optional 

534 Time padding inside the stable time window of the slew. 

535 outer_pad : `float`, optional 

536 Time padding outside the slew time window. 

537 n_sigma : `float`, optional 

538 Number of standard deviations to use for the stable region. 

539 log : `logging.Logger`, optional 

540 Logger object to use for logging messages. 

541 

542 Returns 

543 ------- 

544 result : `M1M3ICSAnalysis` 

545 The results of the analysis. 

546 

547 Raises 

548 ------ 

549 ValueError 

550 Raised if there is no hardpoint data for the specified event. 

551 """ 

552 log = log.getChild(__name__) if log is not None else logging.getLogger(__name__) 

553 

554 log.info("Starting inertia compensation system analysis.") 

555 performance_analysis = M1M3ICSAnalysis( 

556 event, 

557 efd_client, 

558 inner_pad=inner_pad, 

559 outer_pad=outer_pad, 

560 n_sigma=n_sigma, 

561 log=log, 

562 ) 

563 

564 return performance_analysis 

565 

566 

567def evaluate_m1m3_ics_day_obs( 

568 day_obs: int, 

569 event_maker: TMAEventMaker, 

570 inner_pad: float = 1.0, 

571 outer_pad: float = 1.0, 

572 n_sigma: float = 1.0, 

573 log: logging.Logger | None = None, 

574) -> DataFrame: 

575 """ 

576 Evaluate the M1M3 Inertia Compensation System in every slew event during a 

577 `dayObs`. 

578 

579 Parameters 

580 ---------- 

581 day_obs : `int` 

582 Observation day in the YYYYMMDD format. 

583 event_maker : `TMAEventMaker` 

584 Object to retrieve TMA events. 

585 inner_pad : `float`, optional 

586 Time padding inside the stable time window of the slew. 

587 outer_pad : `float`, optional 

588 Time padding outside the slew time window. 

589 n_sigma : `float`, optional 

590 Number of standard deviations to use for the stable region. 

591 log : `logging.Logger`, optional 

592 Logger object to use for logging messages. 

593 

594 Returns 

595 ------- 

596 results : `pd.DataFrame` 

597 A data-frame containing the statistical summary of the analysis. 

598 """ 

599 log = log.getChild(__name__) if log is not None else logging.getLogger(__name__) 

600 log.setLevel(logging.DEBUG) 

601 

602 log.info("Retrieving TMA slew events.") 

603 events = event_maker.getEvents(day_obs) 

604 log.info(f"Found {len(events)} events for day {day_obs}") 

605 

606 stats = None 

607 for event in events: 

608 log.info(f"Start inertia compensation system analysis on {event.seqNum}.") 

609 

610 try: 

611 performance_analysis = M1M3ICSAnalysis( 

612 event, 

613 event_maker.client, 

614 inner_pad=inner_pad, 

615 outer_pad=outer_pad, 

616 n_sigma=n_sigma, 

617 log=log, 

618 ) 

619 log.info(f"Complete inertia compensation system analysis on {event.seqNum}.") 

620 except ValueError: 

621 log.warning(f"Missing data for {event.seqNum} on {event.dayObs}") 

622 continue 

623 

624 if stats is None: 

625 stats = performance_analysis.stats 

626 else: 

627 stats = pd.concat((stats.T, performance_analysis.stats), axis=1).T 

628 

629 assert isinstance(stats, DataFrame) 

630 stats = stats.set_index("seq_num", drop=False) 

631 return stats