Coverage for python / lsst / summit / utils / nightReport.py: 11%

357 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-21 11:03 +0000

1# This file is part of summit_utils. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21 

22import datetime 

23import logging 

24import pickle 

25from collections.abc import Iterable 

26from dataclasses import dataclass 

27from typing import Any 

28 

29import matplotlib 

30import matplotlib.pyplot as plt 

31import numpy as np 

32from astro_metadata_translator import ObservationInfo 

33from matplotlib.dates import date2num 

34from matplotlib.projections.polar import PolarAxes 

35from matplotlib.pyplot import cm 

36 

37import lsst.daf.butler as dafButler 

38from lsst.utils.iteration import ensure_iterable 

39 

40from .utils import getFieldNameAndTileNumber, obsInfoToDict 

41 

42try: # TODO: Remove post RFC-896: add humanize to rubin-env 

43 from humanize.time import precisedelta 

44 

45 HAVE_HUMANIZE = True 

46except ImportError: 

47 # log a python warning about the lack of humanize 

48 logging.warning("humanize not available, install it to get better time printing") 

49 HAVE_HUMANIZE = False 

50 precisedelta = repr # type: ignore 

51 

52 

53__all__ = ["NightReport"] 

54 

55CALIB_VALUES = [ 

56 "FlatField position", 

57 "Park position", 

58 "azel_target", 

59 "slew_icrs", 

60 "DaytimeCheckout001", 

61 "DaytimeCheckout002", 

62] 

63N_STARS_PER_SYMBOL = 6 

64MARKER_SEQUENCE = [ 

65 "*", 

66 "o", 

67 "D", 

68 "P", 

69 "v", 

70 "^", 

71 "s", 

72 "o", 

73 "v", 

74 "^", 

75 "<", 

76 ">", 

77 "1", 

78 "2", 

79 "3", 

80 "4", 

81 "8", 

82 "s", 

83 "p", 

84 "P", 

85 "*", 

86 "h", 

87 "H", 

88 "+", 

89 "x", 

90 "X", 

91 "D", 

92 "d", 

93 "|", 

94 "_", 

95] 

96SOUTHPOLESTAR = "HD 185975" 

97 

98 

99@dataclass 

100class ColorAndMarker: 

101 """Class for holding colors and marker symbols""" 

102 

103 color: str 

104 marker: str = "*" 

105 

106 

107class NightReport: 

108 _version = 1 

109 

110 def __init__(self, butler: dafButler.Butler, dayObs: int, loadFromFile: str | None = None): 

111 self._supressAstroMetadataTranslatorWarnings() # call early 

112 self.log = logging.getLogger("lsst.summit.utils.NightReport") 

113 self.butler = butler 

114 self.dayObs = dayObs 

115 self.data: dict = dict() 

116 self._expRecordsLoaded: set = set() # set of the expRecords loaded 

117 self._obsInfosLoaded: set = set() # set of the seqNums loaded 

118 self.stars: list[str] = [] 

119 self.cMap: dict[str, ColorAndMarker] = {} 

120 if loadFromFile is not None: 

121 self._load(loadFromFile) 

122 self.rebuild() # sets stars and cMap 

123 

124 def _supressAstroMetadataTranslatorWarnings(self) -> None: 

125 """NB: must be called early""" 

126 logging.basicConfig() 

127 logger = logging.getLogger("lsst.obs.lsst.translators.latiss") 

128 logger.setLevel(logging.ERROR) 

129 logger = logging.getLogger("astro_metadata_translator.observationInfo") 

130 logger.setLevel(logging.ERROR) 

131 

132 def save(self, filename: str) -> None: 

133 """Save the internal data to a file. 

134 

135 Parameters 

136 ---------- 

137 filename : `str` 

138 The full name and path of the file to save to. 

139 """ 

140 toSave = dict( 

141 data=self.data, 

142 _expRecordsLoaded=self._expRecordsLoaded, 

143 _obsInfosLoaded=self._obsInfosLoaded, 

144 dayObs=self.dayObs, 

145 version=self._version, 

146 ) 

147 with open(filename, "wb") as f: 

148 pickle.dump(toSave, f, pickle.HIGHEST_PROTOCOL) 

149 

150 def _load(self, filename: str) -> None: 

151 """Load the report data from a file. 

152 

153 Called on init if loadFromFile is not None. Should not be used directly 

154 as other things are populated on load in the __init__. 

155 

156 Parameters 

157 ---------- 

158 filename : `str` 

159 The full name and path of the file to load from. 

160 """ 

161 with open(filename, "rb") as f: 

162 loaded = pickle.load(f) 

163 self.data = loaded["data"] 

164 self._expRecordsLoaded = loaded["_expRecordsLoaded"] 

165 self._obsInfosLoaded = loaded["_obsInfosLoaded"] 

166 dayObs = loaded["dayObs"] 

167 loadedVersion = loaded.get("version", 0) 

168 

169 if dayObs != self.dayObs: 

170 raise RuntimeError(f"Loaded data is for {dayObs} but current dayObs is {self.dayObs}") 

171 if loadedVersion < self._version: 

172 self.log.critical( 

173 f"Loaded version is {loadedVersion} but current version is {self._version}." 

174 " Check carefully for compatibility issues/regenerate your saved report!" 

175 ) 

176 # update to the version on the instance in case the report is 

177 # re-saved. 

178 self._version = loadedVersion 

179 assert len(self.data) == len(self._expRecordsLoaded) 

180 assert len(self.data) == len(self._obsInfosLoaded) 

181 self.log.info(f"Loaded {len(self.data)} records from {filename}") 

182 

183 @staticmethod 

184 def _getSortedData(data: dict) -> dict: 

185 """Get a sorted copy of the internal data.""" 

186 if list(data.keys()) == sorted(data.keys()): 

187 return data 

188 else: 

189 return {k: data[k] for k in sorted(data.keys())} 

190 

191 def getExpRecordDictForDayObs(self, dayObs: int) -> dict: 

192 """Get all the exposureRecords as dicts for the current dayObs. 

193 

194 Notes 

195 ----- 

196 Runs in ~0.05s for 1000 records. 

197 """ 

198 expRecords = self.butler.registry.queryDimensionRecords( 

199 "exposure", where="exposure.day_obs=dayObs", bind={"dayObs": dayObs}, datasets="raw" 

200 ) 

201 listExpRecords = list(expRecords) 

202 records = {e.seq_num: e.toDict() for e in listExpRecords} # not guaranteed to be in order 

203 for record in records.values(): 

204 target = record["target_name"] if record["target_name"] is not None else "" 

205 if target: 

206 shortTarget, _ = getFieldNameAndTileNumber(target, warn=False) 

207 else: 

208 shortTarget = "" 

209 record["target_name_short"] = shortTarget 

210 return self._getSortedData(records) 

211 

212 def getObsInfoAndMetadataForSeqNum(self, seqNum: int) -> tuple[ObservationInfo, dict]: 

213 """Get the obsInfo and metadata for a given seqNum. 

214 

215 TODO: Once we have a summit repo containing all this info, remove this 

216 method and all scraping of headers! Probably also remove the save/load 

217 functionalty there too, as the whole init will go from many minutes to 

218 under a second. 

219 

220 Parameters 

221 ---------- 

222 seqNum : `int` 

223 The seqNum. 

224 

225 Returns 

226 ------- 

227 obsInfo : `astro_metadata_translator.ObservationInfo` 

228 The obsInfo. 

229 md : `dict` 

230 The raw metadata. 

231 

232 Notes 

233 ----- 

234 Very slow, as it has to load the whole file on object store repos 

235 and access the file on regular filesystem repos. 

236 """ 

237 dataId = {"day_obs": self.dayObs, "seq_num": seqNum, "detector": 0} 

238 md = self.butler.get("raw.metadata", dataId) 

239 return ObservationInfo(md), md.toDict() 

240 

241 def rebuild(self, full: bool = False) -> None: 

242 """Scrape new data if there is any, otherwise is a no-op. 

243 

244 If full is True, then all data is reloaded. 

245 

246 Parameters 

247 ---------- 

248 full : `bool`, optional 

249 Do a full reload of all the data, removing any which is pre-loaded? 

250 """ 

251 if full: 

252 self.data = dict() 

253 self._expRecordsLoaded = set() 

254 self._obsInfosLoaded = set() 

255 

256 records = self.getExpRecordDictForDayObs(self.dayObs) 

257 if len(records) == len(self.data): # nothing to do 

258 self.log.info("No new records found") 

259 # NB don't return here, because we need to rebuild the 

260 # star maps etc if we came from a file. 

261 else: 

262 # still need to merge the new expRecordDicts into self.data 

263 # but only these, as the other items have obsInfos merged into them 

264 for seqNum in list(records.keys() - self._expRecordsLoaded): 

265 self.data[seqNum] = records[seqNum] 

266 self._expRecordsLoaded.add(seqNum) 

267 

268 # now load all the obsInfos 

269 seqNums = list(records.keys()) 

270 obsInfosToLoad = set(seqNums) - self._obsInfosLoaded 

271 if obsInfosToLoad: 

272 self.log.info(f"Loading {len(obsInfosToLoad)} obsInfo(s)") 

273 for i, seqNum in enumerate(obsInfosToLoad): 

274 if (i + 1) % 200 == 0: 

275 self.log.info(f"Loaded {i + 1} obsInfos") 

276 obsInfo, metadata = self.getObsInfoAndMetadataForSeqNum(seqNum) 

277 obsInfoDict = obsInfoToDict(obsInfo) 

278 records[seqNum].update(obsInfoDict) 

279 # _raw_metadata item will hopefully not be needed in the future 

280 # but add it while we have it for free, as it has DIMM seeing 

281 records[seqNum]["_raw_metadata"] = metadata 

282 self._obsInfosLoaded.add(seqNum) 

283 

284 self.data = self._getSortedData(self.data) # make sure we stay sorted 

285 self.stars = self.getObservedObjects() 

286 self.cMap = self.makeStarColorAndMarkerMap(self.stars) 

287 

288 def getDatesForSeqNums(self) -> dict[int, datetime.datetime]: 

289 """Get a dict of {seqNum: date} for the report. 

290 

291 Returns 

292 ------- 

293 dates : `dict[int, datetime.datetime]` 

294 Dict of {seqNum: date} for the current report. 

295 """ 

296 return { 

297 seqNum: self.data[seqNum]["timespan"].begin.to_datetime() for seqNum in sorted(self.data.keys()) 

298 } 

299 

300 def getObservedObjects(self, ignoreTileNum: bool = True) -> list[str]: 

301 """Get a list of the observed objects for the night. 

302 

303 Repeated observations of individual imaging fields have _NNN appended 

304 to the field name. Use ``ignoreTileNum`` to remove these, collapsing 

305 the observations of the field to a single target name. 

306 

307 Parameters 

308 ---------- 

309 ignoreTileNum : `bool`, optional 

310 Remove the trailing _NNN tile number for imaging fields? 

311 """ 

312 key = "target_name_short" if ignoreTileNum else "target_name" 

313 allTargets = sorted({record[key] if record[key] is not None else "" for record in self.data.values()}) 

314 return allTargets 

315 

316 def getSeqNumsMatching( 

317 self, invert: bool = False, subset: list[int] | None = None, **kwargs: str 

318 ) -> list[int]: 

319 """Get seqNums which match/don't match all kwargs provided, e.g. 

320 

321 report.getSeqNumsMatching(exposure_time=30, 

322 target_name='ETA1 DOR') 

323 

324 Set invert=True to get all seqNums which don't match the provided 

325 args, e.g. to find all seqNums which are not calibs 

326 

327 Subset allows for repeated filtering by passing in a set of seqNums 

328 """ 

329 # copy data so we can pop, and restrict to subset if provided 

330 local = {seqNum: rec for seqNum, rec in self.data.items() if (subset is None or seqNum in subset)} 

331 

332 # for each kwarg, filter out items which match/don't 

333 for filtAttr, filtVal in kwargs.items(): 

334 toPop = [] # can't pop inside inner loop so collect 

335 for seqNum, record in local.items(): 

336 v = record.get(filtAttr) 

337 if invert: 

338 if v == filtVal: 

339 toPop.append(seqNum) 

340 else: 

341 if v != filtVal: 

342 toPop.append(seqNum) 

343 [local.pop(seqNum) for seqNum in toPop] 

344 

345 return sorted(local.keys()) 

346 

347 def printAvailableKeys(self, sample: bool = False, includeRaw: bool = False) -> None: 

348 """Print all the keys available to query on, optionally including the 

349 full set of header keys. 

350 

351 Note that there is a big mix of quantities, some are int/float/string 

352 but some are astropy quantities. 

353 

354 If sample is True, then a sample value for each key is printed too, 

355 which is useful for dealing with types and seeing what each item 

356 actually means. 

357 """ 

358 for seqNum, recordDict in self.data.items(): # loop + break because we don't know the first seqNum 

359 for k, v in recordDict.items(): 

360 if sample: 

361 print(f"{k}: {v}") 

362 else: 

363 print(k) 

364 if includeRaw: 

365 print("\nRaw header keys in _raw_metadata:") 

366 for k in recordDict["_raw_metadata"]: 

367 print(k) 

368 break 

369 

370 @staticmethod 

371 def makeStarColorAndMarkerMap(stars: list[str]) -> dict[str, ColorAndMarker]: 

372 """Create a color/marker map for a list of observed objects.""" 

373 markerMap = {} 

374 # mypy doesn't recognize dynamically created colormap attributes 

375 colors = cm.rainbow(np.linspace(0, 1, N_STARS_PER_SYMBOL)) # type: ignore 

376 for i, star in enumerate(stars): 

377 markerIndex = i // (N_STARS_PER_SYMBOL) 

378 colorIndex = i % (N_STARS_PER_SYMBOL) 

379 markerMap[star] = ColorAndMarker(colors[colorIndex], MARKER_SEQUENCE[markerIndex]) 

380 return markerMap 

381 

382 def calcShutterTimes(self) -> dict | None: 

383 """Calculate the total time spent on science, engineering and readout. 

384 

385 Science and engineering time both include the time spent on readout, 

386 such that if images were taken all night with no downtime and no slews 

387 the efficiency would be 100%. 

388 

389 Returns 

390 ------- 

391 timings : `dict` 

392 Dictionary of the various calculated times, in seconds, and the 

393 seqNums of the first and last observations used in the calculation. 

394 """ 

395 firstObs = self.getObservingStartSeqNum(method="heuristic") 

396 if not firstObs: 

397 self.log.warning("No on-sky observations found.") 

398 return None 

399 lastObs = max(self.data.keys()) 

400 

401 begin = self.data[firstObs]["datetime_begin"] 

402 end = self.data[lastObs]["datetime_end"] 

403 

404 READOUT_TIME = 2.0 

405 shutterOpenTime = sum([self.data[s]["exposure_time"] for s in range(firstObs, lastObs + 1)]) 

406 readoutTime = sum([READOUT_TIME for _ in range(firstObs, lastObs + 1)]) 

407 

408 sciSeqNums = self.getSeqNumsMatching(observation_type="science") 

409 scienceIntegration = sum([self.data[s]["exposure_time"] for s in sciSeqNums]) 

410 scienceTimeTotal = scienceIntegration.value + (len(sciSeqNums) * READOUT_TIME) 

411 

412 result: dict[str, float | int] = {} 

413 result["firstObs"] = firstObs 

414 result["lastObs"] = lastObs 

415 result["startTime"] = begin 

416 result["endTime"] = end 

417 result["nightLength"] = (end - begin).sec # was a datetime.timedelta 

418 result["shutterOpenTime"] = shutterOpenTime.value # was an Quantity 

419 result["readoutTime"] = readoutTime 

420 result["scienceIntegration"] = scienceIntegration.value # was an Quantity 

421 result["scienceTimeTotal"] = scienceTimeTotal 

422 

423 return result 

424 

425 def printShutterTimes(self) -> None: 

426 """Print out the shutter efficiency stats in a human-readable 

427 format. 

428 """ 

429 if not HAVE_HUMANIZE: 

430 self.log.warning("Please install humanize to make this print as intended.") 

431 timings = self.calcShutterTimes() 

432 if not timings: 

433 print("No on-sky observations found, so no shutter efficiency stats are available yet.") 

434 return 

435 

436 print( 

437 f"Observations started at: seqNum {timings['firstObs']:>3} at" 

438 f" {timings['startTime'].to_datetime().strftime('%H:%M:%S')} TAI" 

439 ) 

440 print( 

441 f"Observations ended at: seqNum {timings['lastObs']:>3} at" 

442 f" {timings['endTime'].to_datetime().strftime('%H:%M:%S')} TAI" 

443 ) 

444 print(f"Total time on sky: {precisedelta(timings['nightLength'])}") 

445 print() 

446 print(f"Shutter open time: {precisedelta(timings['shutterOpenTime'])}") 

447 print(f"Readout time: {precisedelta(timings['readoutTime'])}") 

448 engEff = 100 * (timings["shutterOpenTime"] + timings["readoutTime"]) / timings["nightLength"] 

449 print(f"Engineering shutter efficiency = {engEff:.1f}%") 

450 print() 

451 print(f"Science integration: {precisedelta(timings['scienceIntegration'])}") 

452 sciEff = 100 * (timings["scienceTimeTotal"] / timings["nightLength"]) 

453 print(f"Science shutter efficiency = {sciEff:.1f}%") 

454 

455 def getTimeDeltas(self) -> dict[int, int]: 

456 """Returns a dict, keyed by seqNum, of the time since the end of the 

457 last integration. The time since does include the readout, so is always 

458 greater than or equal to the readout time. 

459 

460 Returns 

461 ------- 

462 timeGaps : `dict` 

463 Dictionary of the time gaps, in seconds, keyed by seqNum. 

464 """ 

465 seqNums = list(self.data.keys()) # need a list not a generator, and NB it might not be contiguous! 

466 dts = [0] # first item is zero by definition 

467 for i, seqNum in enumerate(seqNums[1:]): 

468 dt = self.data[seqNum]["datetime_begin"] - self.data[(seqNums[i])]["datetime_end"] 

469 dts.append(dt.sec) 

470 

471 return {s: dt for s, dt in zip(seqNums, dts)} 

472 

473 def printObsGaps(self, threshold: float | int = 100, includeCalibs: bool = False) -> None: 

474 """Print out the gaps between observations in a human-readable format. 

475 

476 Prints the most recent gaps first. 

477 

478 Parameters 

479 ---------- 

480 threshold : `float`, optional 

481 The minimum time gap to print out, in seconds. 

482 includeCalibs : `bool`, optional 

483 If True, start at the lowest seqNum, otherwise start when the 

484 night's observing started. 

485 """ 

486 if not HAVE_HUMANIZE: 

487 self.log.warning("Please install humanize to make this print as intended.") 

488 dts = self.getTimeDeltas() 

489 

490 allSeqNums = list(self.data.keys()) 

491 if includeCalibs: 

492 seqNums = allSeqNums 

493 else: 

494 firstObs = self.getObservingStartSeqNum(method="heuristic") 

495 if not firstObs: 

496 print("No on-sky observations found, so there can be no gaps in observing yet.") 

497 return 

498 # there is always a big gap before firstObs by definition so add 1 

499 startPoint = allSeqNums.index(firstObs) + 1 

500 seqNums = allSeqNums[startPoint:] 

501 

502 messages = [] 

503 for seqNum in reversed(seqNums): 

504 dt = dts[seqNum] 

505 if dt > threshold: 

506 messages.append(f"seqNum {seqNum:3}: {precisedelta(dt)} gap") 

507 

508 if messages: 

509 print(f"Gaps between observations greater than {threshold}s:") 

510 for line in messages: 

511 print(line) 

512 

513 def getObservingStartSeqNum(self, method: str = "safe") -> int | None: 

514 """Get the seqNum at which on-sky observations started. 

515 

516 If no on-sky observations were taken ``None`` is returned. 

517 

518 Parameters 

519 ---------- 

520 method : `str` 

521 The calculation method to use. Options are: 

522 - 'safe': Use the first seqNum with an observation_type that is 

523 explicitly not a calibration or test. This is a safe way of 

524 excluding the calibs, but will include observations where we 

525 take some closed dome test images, or start observing too early, 

526 and go back to taking calibs for a while before the night starts. 

527 - 'heuristic': Use a heuristic to find the first seqNum. The 

528 current heuristic is to find the first seqNum with an observation 

529 type of CWFS, as we always do a CWFS focus before going on sky. 

530 This does not work well for old days, because this wasn't always 

531 the way data was taken. Note: may be updated in the future, at 

532 which point this will be renamed ``cwfs``. 

533 

534 Returns 

535 ------- 

536 startSeqNum : `int` 

537 The seqNum of the start of the night's observing. 

538 """ 

539 allowedMethods = ["heuristic", "safe"] 

540 if method not in allowedMethods: 

541 raise ValueError(f"Method must be one of {allowedMethods}") 

542 

543 if method == "safe": 

544 # as of 20221211, the full set of observation_types ever seen is: 

545 # acq, bias, cwfs, dark, engtest, flat, focus, science, stuttered, 

546 # test, unknown 

547 offSkyObsTypes = ["bias", "dark", "flat", "test", "unknown"] 

548 for seqNum in sorted(self.data.keys()): 

549 if self.data[seqNum]["observation_type"] not in offSkyObsTypes: 

550 return seqNum 

551 return None 

552 

553 if method == "heuristic": 

554 # take the first cwfs image and return that 

555 seqNums = self.getSeqNumsMatching(observation_type="cwfs") 

556 if not seqNums: 

557 self.log.warning("No cwfs images found, observing is assumed not to have started.") 

558 return None 

559 return min(seqNums) 

560 return None 

561 

562 def printObsTable(self, **kwargs: Any) -> None: 

563 """Print a table of the days observations. 

564 

565 Parameters 

566 ---------- 

567 **kwargs : `dict` 

568 Filter the observation table according to seqNums which match these 

569 {k: v} pairs. For example, to only print out science observations 

570 pass ``observation_type='science'``. 

571 """ 

572 seqNums = self.data.keys() if not kwargs else self.getSeqNumsMatching(**kwargs) 

573 seqNums = sorted(seqNums) # should always be sorted, but is a total disaster here if not 

574 

575 dts = self.getTimeDeltas() 

576 lines = [] 

577 for seqNum in seqNums: 

578 try: 

579 expTime = self.data[seqNum]["exposure_time"].value 

580 imageType = self.data[seqNum]["observation_type"] 

581 target = self.data[seqNum]["target_name"] 

582 deadtime = dts[seqNum] 

583 filt = self.data[seqNum]["physical_filter"] 

584 

585 msg = f"{seqNum} {target} {expTime:.1f} {deadtime:.02f} {imageType} {filt}" 

586 except Exception: 

587 msg = f"Error parsing {seqNum}!" 

588 lines.append(msg) 

589 

590 print(r"seqNum target expTime deadtime imageType filt") 

591 print(r"------ ------ ------- -------- --------- ----") 

592 for line in lines: 

593 print(line) 

594 

595 def getExposureMidpoint(self, seqNum: int) -> datetime.datetime: 

596 """Return the midpoint of the exposure as a float in MJD. 

597 

598 Parameters 

599 ---------- 

600 seqNum : `int` 

601 The seqNum to get the midpoint for. 

602 

603 Returns 

604 ------- 

605 midpoint : `datetime.datetime` 

606 The midpoint, as a python datetime object. 

607 """ 

608 timespan = self.data[seqNum]["timespan"] 

609 expTime = self.data[seqNum]["exposure_time"] 

610 return ((timespan.begin) + expTime / 2).to_datetime() 

611 

612 def plotPerObjectAirMass( 

613 self, objects: Iterable[str] | None = None, airmassOneAtTop: bool = True, saveFig: str = "" 

614 ) -> matplotlib.figure.Figure | None: 

615 """Plot the airmass for objects observed over the course of the night. 

616 

617 Parameters 

618 ---------- 

619 objects : `list` [`str`], optional 

620 The objects to plot. If not provided, all objects are plotted. 

621 airmassOneAtTop : `bool`, optional 

622 Put the airmass of 1 at the top of the plot, like astronomers 

623 expect. 

624 saveFig : `str`, optional 

625 Save the figure to this file path? 

626 

627 Return 

628 ------ 

629 fig : `matplotlib.figure.Figure` or `None` 

630 The figure object or `None` if nothing was plotted. 

631 """ 

632 if not objects: 

633 objects = self.stars 

634 

635 if not objects: # there's genuinely nothing now 

636 self.log.info("No objects to plot") 

637 return None 

638 

639 objects = ensure_iterable(objects) 

640 

641 fig = plt.figure(figsize=(16, 12)) 

642 for star in objects: 

643 if star in CALIB_VALUES: 

644 continue 

645 seqNums = self.getSeqNumsMatching(target_name_short=star) 

646 airMasses = [self.data[seqNum]["boresight_airmass"] for seqNum in seqNums] 

647 obsTimes = [self.getExposureMidpoint(seqNum) for seqNum in seqNums] 

648 color = self.cMap[star].color 

649 marker = self.cMap[star].marker 

650 obsDates = date2num(obsTimes) 

651 plt.plot(obsDates, airMasses, color=color, marker=marker, label=star, ms=10, ls="") 

652 

653 plt.ylabel("Airmass", fontsize=20) 

654 plt.xlabel("Time (UTC)", fontsize=20) 

655 plt.xticks(rotation=25, horizontalalignment="right") 

656 

657 ax = plt.gca() 

658 xfmt = matplotlib.dates.DateFormatter("%m-%d %H:%M:%S") 

659 ax.xaxis.set_major_formatter(xfmt) 

660 

661 if airmassOneAtTop: 

662 ax.set_ylim(ax.get_ylim()[::-1]) 

663 

664 plt.legend(bbox_to_anchor=(1, 1.025), prop={"size": 15}, loc="upper left") 

665 

666 plt.tight_layout() 

667 if saveFig: 

668 plt.savefig(saveFig) 

669 return fig 

670 

671 def _makePolarPlot( 

672 self, 

673 azimuthsInDegrees: list[float], 

674 zenithAngles: list[float], 

675 marker: str = "*-", 

676 title: str | None = None, 

677 makeFig: bool = True, 

678 color: str | None = None, 

679 objName: str | None = None, 

680 ) -> matplotlib.axes.Axes: 

681 """Private method to actually do the polar plotting. 

682 

683 azimuthsInDegrees : `list` [`float`] 

684 The azimuth values, in degrees. 

685 zenithAngles : `list` [`float`] 

686 The zenith angle values, but more generally, the values on the 

687 radial axis, so can be in whatever units you want. 

688 marker : `str`, optional 

689 The marker to use. 

690 title : `str`, optional 

691 The plot title. 

692 makeFig : `bool`, optional 

693 Make a new figure? 

694 color : `str`, optional 

695 The marker color. 

696 objName : `str`, optional 

697 The object name, for the legend. 

698 

699 Returns 

700 ------- 

701 ax : `matplotlib.axes.Axe` 

702 The axes on which the plot was made. 

703 """ 

704 if makeFig: 

705 _ = plt.figure(figsize=(10, 10)) 

706 ax = plt.subplot(111, polar=True) 

707 assert isinstance(ax, PolarAxes), "Expected a polar plot" 

708 ax.plot([a * np.pi / 180 for a in azimuthsInDegrees], zenithAngles, marker, c=color, label=objName) 

709 if title: 

710 ax.set_title(title, va="bottom") 

711 ax.set_theta_zero_location("N") 

712 ax.set_theta_direction(-1) 

713 ax.set_rlim(0, 90) 

714 return ax 

715 

716 def makeAltAzCoveragePlot( 

717 self, objects: Iterable[str] | None = None, withLines: bool = False, saveFig: str = "" 

718 ) -> matplotlib.figure.Figure | None: 

719 """Make a polar plot of the azimuth and zenith angle for each object. 

720 

721 Plots the azimuth on the theta axis, and zenith angle (not altitude!) 

722 on the radius axis, such that 0 is at the centre, like you're looking 

723 top-down on the telescope. 

724 

725 Parameters 

726 ---------- 

727 objects : `list` [`str`], optional 

728 The objects to plot. If not provided, all objects are plotted. 

729 withLines : `bool`, optional 

730 Connect the points with lines? 

731 saveFig : `str`, optional 

732 Save the figure to this file path? 

733 

734 Return 

735 ------ 

736 fig : `matplotlib.figure.Figure` or `None` 

737 The figure object, or `None` if nothing was plotted. 

738 """ 

739 if not objects: 

740 objects = self.stars 

741 objects = ensure_iterable(objects) 

742 

743 if not objects: # there's genuinely nothing now 

744 self.log.info("No objects to plot") 

745 return None 

746 

747 fig = plt.figure(figsize=(16, 12)) 

748 

749 ax = None 

750 for obj in objects: 

751 if obj in CALIB_VALUES: 

752 continue 

753 seqNums = self.getSeqNumsMatching(target_name_short=obj) 

754 altAzes = [self.data[seqNum]["altaz_begin"] for seqNum in seqNums] 

755 alts = [altAz.alt.deg for altAz in altAzes if altAz is not None] 

756 azes = [altAz.az.deg for altAz in altAzes if altAz is not None] 

757 assert len(alts) == len(azes) 

758 if len(azes) == 0: 

759 self.log.warning(f"Found no alt/az data for {obj}") 

760 zens = [90 - alt for alt in alts] 

761 color = self.cMap[obj].color 

762 marker = self.cMap[obj].marker 

763 if withLines: 

764 marker += "-" 

765 

766 ax = self._makePolarPlot( 

767 azes, zens, marker=marker, title=None, makeFig=False, color=color, objName=obj 

768 ) 

769 

770 if ax is None: 

771 self.log.info("Only calibs taken so far, nothing to plot") 

772 return None 

773 

774 lgnd = ax.legend(bbox_to_anchor=(1.05, 1), prop={"size": 15}, loc="upper left") 

775 ax.set_title("Axial coverage - azimuth (theta, deg) vs zenith angle (r, deg)", size=20) 

776 

777 for h in lgnd.legend_handles: 

778 if h is None or not isinstance(h, matplotlib.lines.Line2D): 

779 continue 

780 size = 14 

781 marker = str(h.get_marker()) 

782 if "-" in marker: 

783 size += 5 

784 h.set_markersize(size) 

785 

786 plt.tight_layout() 

787 if saveFig: 

788 plt.savefig(saveFig) 

789 return fig