Coverage for python / lsst / ip / isr / shutterMotion.py: 9%

269 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-01 08:30 +0000

1# This file is part of ip_isr. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21""" 

22Shutter motion profile storage class 

23""" 

24 

25__all__ = ["ShutterMotionProfile", "ShutterMotionProfileFull"] 

26 

27from astropy.table import Table 

28from scipy.optimize import newton 

29import numpy as np 

30 

31from lsst.ip.isr import IsrCalib 

32 

33 

34class ShutterMotionProfile(IsrCalib): 

35 """Shutter motion profile measurements. 

36 

37 Parameters 

38 ---------- 

39 log : `logging.Logger`, optional 

40 Log to write messages to. If `None` a default logger will be used. 

41 **kwargs : 

42 Additional parameters. 

43 """ 

44 

45 _OBSTYPE = "shutterMotionProfile" 

46 _SCHEMA = "ShutterMotionProfile" 

47 _VERSION = 1.0 

48 

49 def __init__(self, **kwargs): 

50 super().__init__(**kwargs) 

51 

52 # Quantities that come from `encodeSamples` 

53 self.time_tai = [] 

54 self.time_mjd = [] 

55 self.position = [] 

56 self.hall_time_tai = [] 

57 self.hall_time_mjd = [] 

58 self.hall_position = [] 

59 self.hall_sensorId = [] 

60 self.hall_isOn = [] 

61 self.fit_name = [] 

62 self.fit_start_time = [] 

63 self.fit_pivot1 = [] 

64 self.fit_pivot2 = [] 

65 self.fit_jerk0 = [] 

66 self.fit_jerk1 = [] 

67 self.fit_jerk2 = [] 

68 

69 self.requiredAttributes.update(["time_tai", "time_mjd", "position", 

70 "hall_time_tai", "hall_time_mjd", "hall_position", 

71 "hall_sensorId", "hall_isOn", 

72 "fit_name", "fit_start_time", "fit_pivot1", 

73 "fit_pivot2", "fit_jerk0", "fit_jerk1", "fit_jerk2", 

74 ]) 

75 

76 def calculateMidpoint(self, modelName="hallSensorFit", skipPosition=False): 

77 """Calculate time of midpoint of travel for this profile. 

78 

79 Derived from Shuang Liang's CTN-002 (https://ctn-002.lsst.io). 

80 Equation numbers listed are from this document. As the fits 

81 have already been done, we can ignore the raw position/Hall 

82 sensor data. 

83 

84 Parameters 

85 ---------- 

86 modelName : `str` 

87 Fit model to use to calculate the midpoint. 

88 skipPosition : `bool` 

89 If true, only the acceleration based calculation will be 

90 done. If false, both the acceleration and position based 

91 calculations are done. 

92 

93 Returns 

94 ------- 

95 tm_accel : `float` 

96 The time of the midpoint from the start of motion in 

97 seconds, as derived from the point where the acceleration 

98 on the shutter is zero. 

99 tm_position : `float` 

100 The time of the midpoint from the start of motion in 

101 seconds, as derived from the point where the shutter 

102 position is midway between its starting and ending 

103 locations. This will be NaN if ``skipPosition`` is true. 

104 

105 Raises 

106 ------ 

107 RuntimeError 

108 Raised if the requested ``modelName`` is not found in the 

109 calibration. 

110 """ 

111 modelIndex = -1 

112 for idx, name in enumerate(self.fit_name): 

113 if name == modelName: 

114 modelIndex = idx 

115 if modelIndex == -1: 

116 raise RuntimeError(f"Unknown model {modelName} requested.") 

117 

118 # Alias to follow technote 

119 t0 = self.fit_start_time[modelIndex] 

120 t1 = self.fit_pivot1[modelIndex] 

121 t2 = self.fit_pivot2[modelIndex] 

122 

123 # Equation (3.1) 

124 j0 = self.fit_jerk0[modelIndex] 

125 j1 = self.fit_jerk1[modelIndex] 

126 

127 # Equation (3.2) 

128 a1 = j0*t1 

129 

130 # Equation (3.4) 

131 A1 = a1 - j1*t1 

132 

133 # First estimate of midpoint, where acceleration is zero. 

134 # a = 0 = A1 + j1*t (Equation 5.1) 

135 def acc(t): 

136 return A1 + j1 * t 

137 

138 try: 

139 tm_accel = newton(acc, 0.5*(t2 + t1)) 

140 except Exception as e: 

141 self.log.warn(f"Midpoint calculation (from acceleration) failed to converge: {e}") 

142 tm_accel = np.nan 

143 

144 if skipPosition: 

145 tm_position = np.nan 

146 else: 

147 # Second estimate of midpoint, when s is halfway betweeen 

148 # start and final position. Equation (5.2). 

149 V1 = t1**2 * (j0 - j1)/2. - t1*A1 

150 S1 = t1**3 * (j0 - j1)/6. - t1**2 * A1/2. - t1*V1 

151 Smid = 0.5*(self.metadata["startPosition"] + self.metadata["endPosition"]) 

152 

153 def pos(t): 

154 return j1*(t**3)/6. + A1*(t**2)/2. + V1*t + S1 - Smid 

155 

156 try: 

157 tm_position = newton(pos, tm_accel) 

158 except Exception as e: 

159 self.log.warn(f"Midpoint calculation (from position) failed to converge: {e}") 

160 tm_position = np.nan 

161 

162 # Restore t0 so these can be compared to raw timestamps. 

163 return tm_accel + t0, tm_position + t0 

164 

165 @classmethod 

166 def fromExposure(cls, exposure, direction="open"): 

167 """Construct a ShutterMotionProfile from an exposure. 

168 

169 Parameters 

170 ---------- 

171 exposure : `lsst.afw.image.Exposuref` 

172 Exposure to read header information from. 

173 direction : `str`, optional 

174 Direction of shutter to construcxt. Should be one of 

175 "open" or "close". 

176 

177 Returns 

178 ------- 

179 calib : `lsst.ip.isr.ShutterMotionProfile` 

180 Constructed profile. 

181 

182 """ 

183 if direction not in ('open', 'close'): 

184 raise ValueError(f"Unknown shutter direction {direction} suppled.") 

185 

186 keywords = [f'SHUTTER {direction.upper()} STARTTIME TAI ISOT', 

187 f'SHUTTER {direction.upper()} STARTTIME TAI MJD', 

188 f'SHUTTER {direction.upper()} SIDE', 

189 f'SHUTTER {direction.upper()} MODEL', 

190 f'SHUTTER {direction.upper()} HALLSENSORFIT MODELSTARTTIME', 

191 f'SHUTTER {direction.upper()} HALLSENSORFIT PIVOTPOINT1', 

192 f'SHUTTER {direction.upper()} HALLSENSORFIT PIVOTPOINT2', 

193 f'SHUTTER {direction.upper()} HALLSENSORFIT JERK0', 

194 f'SHUTTER {direction.upper()} HALLSENSORFIT JERK1', 

195 f'SHUTTER {direction.upper()} HALLSENSORFIT JERK2'] 

196 

197 for kw in keywords: 

198 if kw not in exposure.metadata: 

199 raise RuntimeError(f"Expected header keyword not found: {kw}.") 

200 

201 calib = cls() 

202 calib.time_tai.append(exposure.metadata[f'SHUTTER {direction.upper()} STARTTIME TAI ISOT']) 

203 calib.time_mjd.append(exposure.metadata[f'SHUTTER {direction.upper()} STARTTIME TAI MJD']) 

204 calib.metadata['SIDE'] = exposure.metadata[f'SHUTTER {direction.upper()} SIDE'] 

205 

206 calib.fit_name.append("hallSensorFit") 

207 

208 calib.fit_start_time.append( 

209 exposure.metadata[f'SHUTTER {direction.upper()} HALLSENSORFIT MODELSTARTTIME'] 

210 ) 

211 calib.fit_pivot1.append(exposure.metadata[f'SHUTTER {direction.upper()} HALLSENSORFIT PIVOTPOINT1']) 

212 calib.fit_pivot2.append(exposure.metadata[f'SHUTTER {direction.upper()} HALLSENSORFIT PIVOTPOINT2']) 

213 calib.fit_jerk0.append(exposure.metadata[f'SHUTTER {direction.upper()} HALLSENSORFIT JERK0']) 

214 calib.fit_jerk1.append(exposure.metadata[f'SHUTTER {direction.upper()} HALLSENSORFIT JERK1']) 

215 calib.fit_jerk2.append(exposure.metadata[f'SHUTTER {direction.upper()} HALLSENSORFIT JERK2']) 

216 

217 return calib 

218 

219 @classmethod 

220 def fromDict(cls, dictionary): 

221 """Construct a ShutterMotionProfile from a dictionary of properties. 

222 

223 Parameters 

224 ---------- 

225 dictionary : `dict` 

226 Dictionary of properties. 

227 

228 Returns 

229 ------- 

230 calib : `lsst.ip.isr.ShutterMotionProfile 

231 Constructed calibration. 

232 

233 Raises 

234 ------ 

235 RuntimeError 

236 Raised if the supplied dictionary is for a different 

237 calibration type. 

238 """ 

239 calib = cls() 

240 

241 if calib._OBSTYPE != dictionary["fileType"]: 

242 raise RuntimeError(f"Incorrect calibration supplied. Expected {calib._OBSTYPE}, " 

243 f"found {dictionary['OBSTYPE']}") 

244 motionProfile = dictionary.pop("motionProfile") 

245 

246 encodeSamples = motionProfile.pop("encodeSamples") 

247 hallTransitions = motionProfile.pop("hallTransitions") 

248 fitResults = motionProfile.pop("fitResults") 

249 

250 if "metadata" in dictionary: 

251 metadata = dictionary.pop("metadata") 

252 for key, value in metadata.items(): 

253 dictionary[key] = value 

254 calib.setMetadata(dictionary) 

255 

256 formatVersion = calib.metadata["version"] 

257 

258 startTime = motionProfile.pop("startTime") 

259 if formatVersion == 1.0: 

260 # Original format. 

261 motionProfile["startTime_tai"] = startTime["tai"] 

262 motionProfile["startTime_mjd"] = startTime["mjd"] 

263 else: 

264 # Update to clarify all times are in the TAI system. 

265 motionProfile["startTime_tai"] = startTime["tai"]["isot"] 

266 motionProfile["startTime_mjd"] = startTime["tai"]["mjd"] 

267 

268 calib.readEncodeSamples(encodeSamples, formatVersion) 

269 calib.readHallTransitions(hallTransitions, formatVersion) 

270 calib.readFitResults(fitResults) 

271 

272 calib.updateMetadata(**motionProfile) 

273 return calib 

274 

275 def toDict(self): 

276 """Return a dictionary containing the calibration properties. 

277 

278 The dictionary should be able to be round-tripped through 

279 `fromDict`. 

280 

281 Returns 

282 ------- 

283 dictionary : `dict` 

284 Dictionary of properties. 

285 """ 

286 self.updateMetadata() 

287 formatVersion = self.metadata["version"] 

288 

289 if formatVersion == 1.0: 

290 outDict = { 

291 "fileName": self.metadata["fileName"], 

292 "fileType": self.metadata["fileType"], 

293 "metadata": { 

294 "CALIBCLS": "lsst.ip.isr.ShutterMotionProfile", 

295 "OBSTYPE": self._OBSTYPE, 

296 }, 

297 "obsId": self.metadata["obsId"], 

298 "version": self.metadata.get("version", -1), 

299 "motionProfile": { 

300 "startTime": { 

301 "tai": self.metadata["startTime_tai"], 

302 "mjd": self.metadata["startTime_mjd"], 

303 }, 

304 "startPosition": self.metadata["startPosition"], 

305 "targetPosition": self.metadata["targetPosition"], 

306 "endPosition": self.metadata["endPosition"], 

307 "targetDuration": self.metadata["targetDuration"], 

308 "actionDuration": self.metadata["actionDuration"], 

309 "side": self.metadata["side"], 

310 "isOpen": self.metadata["isOpen"], 

311 "encodeSamples": self.writeEncodeSamples(), 

312 "hallTransitions": self.writeHallTransitions(), 

313 "fitResults": self.writeFitResults(), 

314 }, 

315 } 

316 elif formatVersion == 2.0: 

317 outDict = { 

318 "fileName": self.metadata["fileName"], 

319 "fileType": self.metadata["fileType"], 

320 "metadata": { 

321 "CALIBCLS": "lsst.ip.isr.ShutterMotionProfile", 

322 "OBSTYPE": self._OBSTYPE, 

323 }, 

324 "obsId": self.metadata["obsId"], 

325 "version": self.metadata.get("version", -1), 

326 "motionProfile": { 

327 "startTime": { 

328 "tai": { 

329 "isot": self.metadata["startTime_tai"], 

330 "mjd": self.metadata["startTime_mjd"], 

331 }, 

332 }, 

333 "startPosition": self.metadata["startPosition"], 

334 "targetPosition": self.metadata["targetPosition"], 

335 "endPosition": self.metadata["endPosition"], 

336 "targetDuration": self.metadata["targetDuration"], 

337 "actionDuration": self.metadata["actionDuration"], 

338 "side": self.metadata["side"], 

339 "isOpen": self.metadata["isOpen"], 

340 "encodeSamples": self.writeEncodeSamples(), 

341 "hallTransitions": self.writeHallTransitions(), 

342 "fitResults": self.writeFitResults(), 

343 }, 

344 } 

345 else: 

346 raise RuntimeError(f"Unknown file version: {formatVersion}") 

347 return outDict 

348 

349 @classmethod 

350 def fromTable(cls, tableList): 

351 """Construct calibration from a list of tables. 

352 

353 This method uses the `fromDict` method to create the 

354 calibration, after constructing an appropriate dictionary from 

355 the input tables. 

356 

357 Parameters 

358 ---------- 

359 tableList : `list` [`lsst.afw.table.Table`] 

360 List of tables to use to construct the crosstalk 

361 calibration. For shutter motion profiles, the first table 

362 contains the samples, the second the Hall transition data, 

363 and the third the model fits. 

364 

365 Returns 

366 ------- 

367 calib : `lsst.ip.isr.ShutterMotionProfile` 

368 The calibration defined in the tables. 

369 """ 

370 samples = tableList[0] 

371 transitions = tableList[1] 

372 modelFits = tableList[2] 

373 

374 metadata = samples.meta 

375 

376 calib = cls() 

377 calib.time_tai = np.squeeze(samples["TIME_TAI"].data).tolist() 

378 if hasattr(calib.time_tai[0], "decode"): 

379 calib.time_tai = [time.decode("utf-8") for time in calib.time_tai] 

380 calib.time_mjd = np.squeeze(samples["TIME_MJD"].data).tolist() 

381 calib.position = np.squeeze(samples["POSITION"].data).tolist() 

382 

383 calib.hall_time_tai = np.squeeze(transitions["HALL_TIME_TAI"].data).tolist() 

384 if hasattr(calib.hall_time_tai[0], "decode"): 

385 calib.hall_time_tai = [time.decode("utf-8") for time in calib.hall_time_tai] 

386 calib.hall_time_mjd = np.squeeze(transitions["HALL_TIME_MJD"].data).tolist() 

387 calib.hall_position = np.squeeze(transitions["HALL_POSITION"].data).tolist() 

388 calib.hall_sensorId = np.squeeze(transitions["HALL_SENSORID"].data).tolist() 

389 calib.hall_isOn = np.squeeze(transitions["HALL_ISON"].data).tolist() 

390 

391 calib.fit_model = modelFits.meta["FIT_MODEL"] 

392 

393 calib.fit_name = np.squeeze(modelFits["FIT_NAME"].data).tolist() 

394 if hasattr(calib.fit_name[0], "decode"): 

395 calib.fit_name = [fit.decode("utf-8") for fit in calib.fit_name] 

396 calib.fit_start_time = np.squeeze(modelFits["FIT_START_TIME"].data).tolist() 

397 calib.fit_pivot1 = np.squeeze(modelFits["FIT_PIVOT1"].data).tolist() 

398 calib.fit_pivot2 = np.squeeze(modelFits["FIT_PIVOT2"].data).tolist() 

399 calib.fit_jerk0 = np.squeeze(modelFits["FIT_JERK0"].data).tolist() 

400 calib.fit_jerk1 = np.squeeze(modelFits["FIT_JERK1"].data).tolist() 

401 calib.fit_jerk2 = np.squeeze(modelFits["FIT_JERK2"].data).tolist() 

402 

403 if "OBSTYPE" not in metadata: 

404 metadata["OBSTYPE"] = cls._OBSTYPE 

405 

406 # This translation is needed to support correct 

407 # round-tripping. It's not an elegant solution. 

408 for key in ("fileName", "fileType", "obsId", "version", "side", "isOpen"): 

409 if key.upper() in metadata: 

410 value = metadata.pop(key.upper()) 

411 metadata[key] = value 

412 for key in ("CALIB_ID", "DETECTOR", "DET_NAME", "DET_SER", "FILTER", "INSTRUME", 

413 "RAFTNAME", "SEQCKSUM", "SEQFILE", "SEQNAME", "SLOTNAME"): 

414 if key in metadata: 

415 if metadata[key] == "": 

416 metadata[key] = None 

417 

418 calib.updateMetadata(**metadata) 

419 return calib 

420 

421 def toTable(self): 

422 """Construct a list of tables containing the information in this 

423 calibration. 

424 

425 The list of tables should create an identical calibration 

426 after being passed to this class's fromTable method. 

427 

428 Returns 

429 ------- 

430 tableList : `list` [`lsst.afw.table.Table`] 

431 List of tables containing the shutter motion profile 

432 information. 

433 """ 

434 self.updateMetadata() 

435 

436 samples = Table( 

437 {"TIME_TAI": np.array(self.time_tai).tolist(), 

438 "TIME_MJD": np.array(self.time_mjd).tolist(), 

439 "POSITION": np.array(self.position).tolist()}, 

440 names=("TIME_TAI", "TIME_MJD", "POSITION"), 

441 dtype=("U32", "f8", "f8") 

442 ) 

443 transitions = Table( 

444 {"HALL_TIME_TAI": np.array(self.hall_time_tai).tolist(), 

445 "HALL_TIME_MJD": np.array(self.hall_time_mjd).tolist(), 

446 "HALL_POSITION": np.array(self.hall_position).tolist(), 

447 "HALL_SENSORID": np.array(self.hall_sensorId).tolist(), 

448 "HALL_ISON": np.array(self.hall_isOn).tolist()}, 

449 names=("HALL_TIME_TAI", "HALL_TIME_MJD", "HALL_POSITION", 

450 "HALL_SENSORID", "HALL_ISON"), 

451 dtype=("U32", "f8", "f8", "i4", "?") 

452 ) 

453 modelFits = Table( 

454 {"FIT_NAME": np.array(self.fit_name).tolist(), 

455 "FIT_START_TIME": np.array(self.fit_start_time).tolist(), 

456 "FIT_PIVOT1": np.array(self.fit_pivot1).tolist(), 

457 "FIT_PIVOT2": np.array(self.fit_pivot2).tolist(), 

458 "FIT_JERK0": np.array(self.fit_jerk0).tolist(), 

459 "FIT_JERK1": np.array(self.fit_jerk1).tolist(), 

460 "FIT_JERK2": np.array(self.fit_jerk2).tolist()}, 

461 names=("FIT_NAME", "FIT_START_TIME", "FIT_PIVOT1", "FIT_PIVOT2", 

462 "FIT_JERK0", "FIT_JERK1", "FIT_JERK2"), 

463 dtype=("U32", "f8", "f8", "f8", "f8", "f8", "f8") 

464 ) 

465 modelFits.meta["FIT_MODEL"] = self.fit_model 

466 

467 inMeta = self.getMetadata().toDict() 

468 outMeta = {k: v for k, v in inMeta.items() if v is not None} 

469 outMeta.update({k: "" for k, v in inMeta.items() if v is None}) 

470 samples.meta = outMeta 

471 

472 return [samples, transitions, modelFits] 

473 

474 def readEncodeSamples(self, inputSamples, formatVersion): 

475 """Read a list of input samples into the calibration. 

476 

477 Parameters 

478 ---------- 

479 inputSamples : `list` [`dict` [`str` `str`]] 

480 List of dictionaries of samples. 

481 formatVersion : `float` 

482 Version of the file format to read. 

483 

484 Raises 

485 ------ 

486 RuntimeError 

487 Raised if the calibration has already read samples, or if 

488 the format is not known. 

489 """ 

490 if len(self.time_tai) != 0: 

491 raise RuntimeError("Cannot re-read already-read calibration.") 

492 

493 if formatVersion == 1.0: 

494 for sample in inputSamples: 

495 self.time_tai.append(sample["time"]["tai"]) 

496 self.time_mjd.append(sample["time"]["mjd"]) 

497 self.position.append(sample["position"]) 

498 elif formatVersion == 2.0: 

499 for sample in inputSamples: 

500 self.time_tai.append(sample["tai"]["isot"]) 

501 self.time_mjd.append(sample["tai"]["mjd"]) 

502 self.position.append(sample["position"]) 

503 else: 

504 raise RuntimeError(f"Unknown file version: {formatVersion}") 

505 

506 def writeEncodeSamples(self): 

507 """Return list of samples as dictionaries. 

508 

509 Returns 

510 ------- 

511 inputSamples : `list` [`dict` [`str` `str`]] 

512 List of dictionaries of samples. 

513 

514 Raises 

515 ------ 

516 RuntimeError 

517 Raised if the calibration has not read samples. 

518 """ 

519 if len(self.time_tai) == 0: 

520 raise RuntimeError("Cannot export empty calibration.") 

521 

522 formatVersion = self.metadata["version"] 

523 

524 samples = [] 

525 if formatVersion == 1.0: 

526 for tai, mjd, position in zip(self.time_tai, self.time_mjd, self.position): 

527 sample = {"time": {"tai": tai, "mjd": mjd}, 

528 "position": position} 

529 samples.append(sample) 

530 elif formatVersion == 2.0: 

531 for tai, mjd, position in zip(self.time_tai, self.time_mjd, self.position): 

532 sample = {"tai": {"isot": tai, "mjd": mjd}, 

533 "position": position} 

534 samples.append(sample) 

535 else: 

536 raise RuntimeError(f"Unknown file version: {formatVersion}") 

537 

538 return samples 

539 

540 def readHallTransitions(self, inputTransitions, formatVersion): 

541 """Read a list of input samples into the calibration. 

542 

543 Parameters 

544 ---------- 

545 inputTransitions : `list` [`dict` [`str` `str`]] 

546 List of dictionaries of transitions. 

547 formatVersion : `float` 

548 Version of the file format to read. 

549 

550 Raises 

551 ------ 

552 RuntimeError 

553 Raised if the calibration has already read samples, or if 

554 the format is not known. 

555 """ 

556 if len(self.hall_time_tai) != 0: 

557 raise RuntimeError("Cannot re-read alreday-read calibration.") 

558 

559 if formatVersion == 1.0: 

560 for transition in inputTransitions: 

561 self.hall_time_tai.append(transition["time"]["tai"]) 

562 self.hall_time_mjd.append(transition["time"]["mjd"]) 

563 self.hall_position.append(transition["position"]) 

564 self.hall_sensorId.append(transition["sensorId"]) 

565 self.hall_isOn.append(bool(transition["isOn"])) 

566 elif formatVersion == 2.0: 

567 for transition in inputTransitions: 

568 self.hall_time_tai.append(transition["tai"]["isot"]) 

569 self.hall_time_mjd.append(transition["tai"]["mjd"]) 

570 self.hall_position.append(transition["position"]) 

571 self.hall_sensorId.append(transition["sensorId"]) 

572 self.hall_isOn.append(bool(transition["isOn"])) 

573 else: 

574 raise RuntimeError(f"Unknown file version: {formatVersion}") 

575 

576 def writeHallTransitions(self): 

577 """Return list of samples as dictionaries. 

578 

579 Returns 

580 ------- 

581 inputTransitions : `list` [`dict` [`str` `str`]] 

582 List of dictionaries of Hall transitions 

583 

584 Raises 

585 ------ 

586 RuntimeError 

587 Raised if the calibration has not read Hall 

588 transitions. 

589 """ 

590 if len(self.hall_time_tai) == 0: 

591 raise RuntimeError("Cannot export empty calibration.") 

592 

593 formatVersion = self.metadata["version"] 

594 if formatVersion not in (1.0, 2.0): 

595 raise RuntimeError(f"Unknown file version: {formatVersion}") 

596 transitions = [] 

597 

598 for tai, mjd, position, sensorId, isOn in zip( 

599 self.hall_time_tai, 

600 self.hall_time_mjd, 

601 self.hall_position, 

602 self.hall_sensorId, 

603 self.hall_isOn): 

604 if formatVersion == 1.0: 

605 transition = {"time": {"tai": tai, "mjd": mjd}, 

606 "position": position, 

607 "sensorId": sensorId, 

608 "isOn": isOn} 

609 elif formatVersion == 2.0: 

610 transition = {"tai": {"isot": tai, "mjd": mjd}, 

611 "position": position, 

612 "sensorId": sensorId, 

613 "isOn": isOn} 

614 transitions.append(transition) 

615 return transitions 

616 

617 def readFitResults(self, fitResults): 

618 """Read a list of fit results into the calibration. 

619 

620 Parameters 

621 ---------- 

622 inputTransitions : `list` [`dict` [`str` `str`]] 

623 List of dictionaries of fit results. 

624 

625 Raises 

626 ------ 

627 RuntimeError 

628 Raised if the calibration has already read fit results. 

629 """ 

630 if len(self.fit_name) != 0: 

631 raise RuntimeError("Cannot re-read already-read fit results.") 

632 self.fit_model = fitResults.pop("Model") 

633 

634 for fitName, fitModel in fitResults.items(): 

635 if hasattr(fitName, "decode"): 

636 fitName = fitName.decode("utf-8") 

637 self.fit_name.append(fitName) 

638 self.fit_start_time.append(fitModel["ModelStartTime"]) 

639 self.fit_pivot1.append(fitModel["PivotPoint1"]) 

640 self.fit_pivot2.append(fitModel["PivotPoint2"]) 

641 self.fit_jerk0.append(fitModel["Jerk0"]) 

642 self.fit_jerk1.append(fitModel["Jerk1"]) 

643 self.fit_jerk2.append(fitModel["Jerk2"]) 

644 

645 def writeFitResults(self): 

646 """Return list of samples as dictionaries. 

647 

648 Returns 

649 ------- 

650 inputTransitions : `list` [`dict` [`str` `str`]] 

651 List of dictionaries of Hall transitions 

652 

653 Raises 

654 ------ 

655 RuntimeError 

656 Raised if the calibration has not read Hall 

657 transitions. 

658 """ 

659 if len(self.fit_name) == 0: 

660 raise RuntimeError("Cannot export empty calibration.") 

661 

662 fitResults = {"Model": self.fit_model} 

663 for fitName, startTime, pivot1, pivot2, jerk0, jerk1, jerk2 in zip( 

664 self.fit_name, self.fit_start_time, 

665 self.fit_pivot1, self.fit_pivot2, 

666 self.fit_jerk0, self.fit_jerk1, self.fit_jerk2): 

667 fitResults[fitName] = {"ModelStartTime": startTime, 

668 "PivotPoint1": pivot1, 

669 "PivotPoint2": pivot2, 

670 "Jerk0": jerk0, 

671 "Jerk1": jerk1, 

672 "Jerk2": jerk2} 

673 return fitResults 

674 

675 

676class ShutterMotionProfileFull(IsrCalib): 

677 """Class to hold both open and close profiles, as stored in the 

678 exposure headers. 

679 

680 Parameters 

681 ---------- 

682 log : `logging.Logger`, optional 

683 Log to write messages to. If `None` a default logger will be used. 

684 **kwargs : 

685 Additional parameters. 

686 """ 

687 _OBSTYPE = "shutterMotionProfileFull" 

688 _SCHEMA = "ShutterMotionProfileFull" 

689 _VERSION = 1.0 

690 

691 def __init__(self, **kwargs): 

692 super().__init__(**kwargs) 

693 

694 self.profile_open = None 

695 self.profile_close = None 

696 

697 self.requiredAttributes.update(["profile_open", "profile_close"]) 

698 

699 @classmethod 

700 def fromExposure(cls, exposure): 

701 """Construct a ShutterMotionProfileFull from an exposure. 

702 

703 Parameters 

704 ---------- 

705 exposure : `lsst.afw.image.Exposuref` 

706 Exposure to read header information from. 

707 direction : `str`, optional 

708 Direction of shutter to construcxt. Should be one of 

709 "open" or "close". 

710 

711 Returns 

712 ------- 

713 calib : `lsst.ip.isr.ShutterMotionProfile` 

714 Constructed profile. 

715 """ 

716 calib = cls() 

717 calib.profile_open = ShutterMotionProfile.fromExposure(exposure, direction="open") 

718 calib.profile_close = ShutterMotionProfile.fromExposure(exposure, direction="close") 

719 

720 return calib 

721 

722 def calculateMidpoints(self): 

723 # This is at least a start for downstream calculations. 

724 midpoint_open, _ = self.profile_open.calculateMidpoint(skipPosition=True) 

725 midpoint_close, _ = self.profile_close.calculateMidpoint(skipPosition=True) 

726 

727 return midpoint_open, midpoint_close