Coverage for python / lsst / summit / utils / nightReport.py: 11%
357 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 09:44 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 09:44 +0000
1# This file is part of summit_utils.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
22import datetime
23import logging
24import pickle
25from collections.abc import Iterable
26from dataclasses import dataclass
27from typing import Any
29import matplotlib
30import matplotlib.pyplot as plt
31import numpy as np
32from astro_metadata_translator import ObservationInfo
33from matplotlib.dates import date2num
34from matplotlib.projections.polar import PolarAxes
35from matplotlib.pyplot import cm
37import lsst.daf.butler as dafButler
38from lsst.utils.iteration import ensure_iterable
40from .utils import getFieldNameAndTileNumber, obsInfoToDict
42try: # TODO: Remove post RFC-896: add humanize to rubin-env
43 from humanize.time import precisedelta
45 HAVE_HUMANIZE = True
46except ImportError:
47 # log a python warning about the lack of humanize
48 logging.warning("humanize not available, install it to get better time printing")
49 HAVE_HUMANIZE = False
50 precisedelta = repr # type: ignore
53__all__ = ["NightReport"]
55CALIB_VALUES = [
56 "FlatField position",
57 "Park position",
58 "azel_target",
59 "slew_icrs",
60 "DaytimeCheckout001",
61 "DaytimeCheckout002",
62]
63N_STARS_PER_SYMBOL = 6
64MARKER_SEQUENCE = [
65 "*",
66 "o",
67 "D",
68 "P",
69 "v",
70 "^",
71 "s",
72 "o",
73 "v",
74 "^",
75 "<",
76 ">",
77 "1",
78 "2",
79 "3",
80 "4",
81 "8",
82 "s",
83 "p",
84 "P",
85 "*",
86 "h",
87 "H",
88 "+",
89 "x",
90 "X",
91 "D",
92 "d",
93 "|",
94 "_",
95]
96SOUTHPOLESTAR = "HD 185975"
99@dataclass
100class ColorAndMarker:
101 """Class for holding colors and marker symbols"""
103 color: str
104 marker: str = "*"
107class NightReport:
108 _version = 1
110 def __init__(self, butler: dafButler.Butler, dayObs: int, loadFromFile: str | None = None):
111 self._supressAstroMetadataTranslatorWarnings() # call early
112 self.log = logging.getLogger("lsst.summit.utils.NightReport")
113 self.butler = butler
114 self.dayObs = dayObs
115 self.data: dict = dict()
116 self._expRecordsLoaded: set = set() # set of the expRecords loaded
117 self._obsInfosLoaded: set = set() # set of the seqNums loaded
118 self.stars: list[str] = []
119 self.cMap: dict[str, ColorAndMarker] = {}
120 if loadFromFile is not None:
121 self._load(loadFromFile)
122 self.rebuild() # sets stars and cMap
124 def _supressAstroMetadataTranslatorWarnings(self) -> None:
125 """NB: must be called early"""
126 logging.basicConfig()
127 logger = logging.getLogger("lsst.obs.lsst.translators.latiss")
128 logger.setLevel(logging.ERROR)
129 logger = logging.getLogger("astro_metadata_translator.observationInfo")
130 logger.setLevel(logging.ERROR)
132 def save(self, filename: str) -> None:
133 """Save the internal data to a file.
135 Parameters
136 ----------
137 filename : `str`
138 The full name and path of the file to save to.
139 """
140 toSave = dict(
141 data=self.data,
142 _expRecordsLoaded=self._expRecordsLoaded,
143 _obsInfosLoaded=self._obsInfosLoaded,
144 dayObs=self.dayObs,
145 version=self._version,
146 )
147 with open(filename, "wb") as f:
148 pickle.dump(toSave, f, pickle.HIGHEST_PROTOCOL)
150 def _load(self, filename: str) -> None:
151 """Load the report data from a file.
153 Called on init if loadFromFile is not None. Should not be used directly
154 as other things are populated on load in the __init__.
156 Parameters
157 ----------
158 filename : `str`
159 The full name and path of the file to load from.
160 """
161 with open(filename, "rb") as f:
162 loaded = pickle.load(f)
163 self.data = loaded["data"]
164 self._expRecordsLoaded = loaded["_expRecordsLoaded"]
165 self._obsInfosLoaded = loaded["_obsInfosLoaded"]
166 dayObs = loaded["dayObs"]
167 loadedVersion = loaded.get("version", 0)
169 if dayObs != self.dayObs:
170 raise RuntimeError(f"Loaded data is for {dayObs} but current dayObs is {self.dayObs}")
171 if loadedVersion < self._version:
172 self.log.critical(
173 f"Loaded version is {loadedVersion} but current version is {self._version}."
174 " Check carefully for compatibility issues/regenerate your saved report!"
175 )
176 # update to the version on the instance in case the report is
177 # re-saved.
178 self._version = loadedVersion
179 assert len(self.data) == len(self._expRecordsLoaded)
180 assert len(self.data) == len(self._obsInfosLoaded)
181 self.log.info(f"Loaded {len(self.data)} records from {filename}")
183 @staticmethod
184 def _getSortedData(data: dict) -> dict:
185 """Get a sorted copy of the internal data."""
186 if list(data.keys()) == sorted(data.keys()):
187 return data
188 else:
189 return {k: data[k] for k in sorted(data.keys())}
191 def getExpRecordDictForDayObs(self, dayObs: int) -> dict:
192 """Get all the exposureRecords as dicts for the current dayObs.
194 Notes
195 -----
196 Runs in ~0.05s for 1000 records.
197 """
198 expRecords = self.butler.registry.queryDimensionRecords(
199 "exposure", where="exposure.day_obs=dayObs", bind={"dayObs": dayObs}, datasets="raw"
200 )
201 listExpRecords = list(expRecords)
202 records = {e.seq_num: e.toDict() for e in listExpRecords} # not guaranteed to be in order
203 for record in records.values():
204 target = record["target_name"] if record["target_name"] is not None else ""
205 if target:
206 shortTarget, _ = getFieldNameAndTileNumber(target, warn=False)
207 else:
208 shortTarget = ""
209 record["target_name_short"] = shortTarget
210 return self._getSortedData(records)
212 def getObsInfoAndMetadataForSeqNum(self, seqNum: int) -> tuple[ObservationInfo, dict]:
213 """Get the obsInfo and metadata for a given seqNum.
215 TODO: Once we have a summit repo containing all this info, remove this
216 method and all scraping of headers! Probably also remove the save/load
217 functionalty there too, as the whole init will go from many minutes to
218 under a second.
220 Parameters
221 ----------
222 seqNum : `int`
223 The seqNum.
225 Returns
226 -------
227 obsInfo : `astro_metadata_translator.ObservationInfo`
228 The obsInfo.
229 md : `dict`
230 The raw metadata.
232 Notes
233 -----
234 Very slow, as it has to load the whole file on object store repos
235 and access the file on regular filesystem repos.
236 """
237 dataId = {"day_obs": self.dayObs, "seq_num": seqNum, "detector": 0}
238 md = self.butler.get("raw.metadata", dataId)
239 return ObservationInfo(md), md.toDict()
241 def rebuild(self, full: bool = False) -> None:
242 """Scrape new data if there is any, otherwise is a no-op.
244 If full is True, then all data is reloaded.
246 Parameters
247 ----------
248 full : `bool`, optional
249 Do a full reload of all the data, removing any which is pre-loaded?
250 """
251 if full:
252 self.data = dict()
253 self._expRecordsLoaded = set()
254 self._obsInfosLoaded = set()
256 records = self.getExpRecordDictForDayObs(self.dayObs)
257 if len(records) == len(self.data): # nothing to do
258 self.log.info("No new records found")
259 # NB don't return here, because we need to rebuild the
260 # star maps etc if we came from a file.
261 else:
262 # still need to merge the new expRecordDicts into self.data
263 # but only these, as the other items have obsInfos merged into them
264 for seqNum in list(records.keys() - self._expRecordsLoaded):
265 self.data[seqNum] = records[seqNum]
266 self._expRecordsLoaded.add(seqNum)
268 # now load all the obsInfos
269 seqNums = list(records.keys())
270 obsInfosToLoad = set(seqNums) - self._obsInfosLoaded
271 if obsInfosToLoad:
272 self.log.info(f"Loading {len(obsInfosToLoad)} obsInfo(s)")
273 for i, seqNum in enumerate(obsInfosToLoad):
274 if (i + 1) % 200 == 0:
275 self.log.info(f"Loaded {i + 1} obsInfos")
276 obsInfo, metadata = self.getObsInfoAndMetadataForSeqNum(seqNum)
277 obsInfoDict = obsInfoToDict(obsInfo)
278 records[seqNum].update(obsInfoDict)
279 # _raw_metadata item will hopefully not be needed in the future
280 # but add it while we have it for free, as it has DIMM seeing
281 records[seqNum]["_raw_metadata"] = metadata
282 self._obsInfosLoaded.add(seqNum)
284 self.data = self._getSortedData(self.data) # make sure we stay sorted
285 self.stars = self.getObservedObjects()
286 self.cMap = self.makeStarColorAndMarkerMap(self.stars)
288 def getDatesForSeqNums(self) -> dict[int, datetime.datetime]:
289 """Get a dict of {seqNum: date} for the report.
291 Returns
292 -------
293 dates : `dict[int, datetime.datetime]`
294 Dict of {seqNum: date} for the current report.
295 """
296 return {
297 seqNum: self.data[seqNum]["timespan"].begin.to_datetime() for seqNum in sorted(self.data.keys())
298 }
300 def getObservedObjects(self, ignoreTileNum: bool = True) -> list[str]:
301 """Get a list of the observed objects for the night.
303 Repeated observations of individual imaging fields have _NNN appended
304 to the field name. Use ``ignoreTileNum`` to remove these, collapsing
305 the observations of the field to a single target name.
307 Parameters
308 ----------
309 ignoreTileNum : `bool`, optional
310 Remove the trailing _NNN tile number for imaging fields?
311 """
312 key = "target_name_short" if ignoreTileNum else "target_name"
313 allTargets = sorted({record[key] if record[key] is not None else "" for record in self.data.values()})
314 return allTargets
316 def getSeqNumsMatching(
317 self, invert: bool = False, subset: list[int] | None = None, **kwargs: str
318 ) -> list[int]:
319 """Get seqNums which match/don't match all kwargs provided, e.g.
321 report.getSeqNumsMatching(exposure_time=30,
322 target_name='ETA1 DOR')
324 Set invert=True to get all seqNums which don't match the provided
325 args, e.g. to find all seqNums which are not calibs
327 Subset allows for repeated filtering by passing in a set of seqNums
328 """
329 # copy data so we can pop, and restrict to subset if provided
330 local = {seqNum: rec for seqNum, rec in self.data.items() if (subset is None or seqNum in subset)}
332 # for each kwarg, filter out items which match/don't
333 for filtAttr, filtVal in kwargs.items():
334 toPop = [] # can't pop inside inner loop so collect
335 for seqNum, record in local.items():
336 v = record.get(filtAttr)
337 if invert:
338 if v == filtVal:
339 toPop.append(seqNum)
340 else:
341 if v != filtVal:
342 toPop.append(seqNum)
343 [local.pop(seqNum) for seqNum in toPop]
345 return sorted(local.keys())
347 def printAvailableKeys(self, sample: bool = False, includeRaw: bool = False) -> None:
348 """Print all the keys available to query on, optionally including the
349 full set of header keys.
351 Note that there is a big mix of quantities, some are int/float/string
352 but some are astropy quantities.
354 If sample is True, then a sample value for each key is printed too,
355 which is useful for dealing with types and seeing what each item
356 actually means.
357 """
358 for seqNum, recordDict in self.data.items(): # loop + break because we don't know the first seqNum
359 for k, v in recordDict.items():
360 if sample:
361 print(f"{k}: {v}")
362 else:
363 print(k)
364 if includeRaw:
365 print("\nRaw header keys in _raw_metadata:")
366 for k in recordDict["_raw_metadata"]:
367 print(k)
368 break
370 @staticmethod
371 def makeStarColorAndMarkerMap(stars: list[str]) -> dict[str, ColorAndMarker]:
372 """Create a color/marker map for a list of observed objects."""
373 markerMap = {}
374 # mypy doesn't recognize dynamically created colormap attributes
375 colors = cm.rainbow(np.linspace(0, 1, N_STARS_PER_SYMBOL)) # type: ignore
376 for i, star in enumerate(stars):
377 markerIndex = i // (N_STARS_PER_SYMBOL)
378 colorIndex = i % (N_STARS_PER_SYMBOL)
379 markerMap[star] = ColorAndMarker(colors[colorIndex], MARKER_SEQUENCE[markerIndex])
380 return markerMap
382 def calcShutterTimes(self) -> dict | None:
383 """Calculate the total time spent on science, engineering and readout.
385 Science and engineering time both include the time spent on readout,
386 such that if images were taken all night with no downtime and no slews
387 the efficiency would be 100%.
389 Returns
390 -------
391 timings : `dict`
392 Dictionary of the various calculated times, in seconds, and the
393 seqNums of the first and last observations used in the calculation.
394 """
395 firstObs = self.getObservingStartSeqNum(method="heuristic")
396 if not firstObs:
397 self.log.warning("No on-sky observations found.")
398 return None
399 lastObs = max(self.data.keys())
401 begin = self.data[firstObs]["datetime_begin"]
402 end = self.data[lastObs]["datetime_end"]
404 READOUT_TIME = 2.0
405 shutterOpenTime = sum([self.data[s]["exposure_time"] for s in range(firstObs, lastObs + 1)])
406 readoutTime = sum([READOUT_TIME for _ in range(firstObs, lastObs + 1)])
408 sciSeqNums = self.getSeqNumsMatching(observation_type="science")
409 scienceIntegration = sum([self.data[s]["exposure_time"] for s in sciSeqNums])
410 scienceTimeTotal = scienceIntegration.value + (len(sciSeqNums) * READOUT_TIME)
412 result: dict[str, float | int] = {}
413 result["firstObs"] = firstObs
414 result["lastObs"] = lastObs
415 result["startTime"] = begin
416 result["endTime"] = end
417 result["nightLength"] = (end - begin).sec # was a datetime.timedelta
418 result["shutterOpenTime"] = shutterOpenTime.value # was an Quantity
419 result["readoutTime"] = readoutTime
420 result["scienceIntegration"] = scienceIntegration.value # was an Quantity
421 result["scienceTimeTotal"] = scienceTimeTotal
423 return result
425 def printShutterTimes(self) -> None:
426 """Print out the shutter efficiency stats in a human-readable
427 format.
428 """
429 if not HAVE_HUMANIZE:
430 self.log.warning("Please install humanize to make this print as intended.")
431 timings = self.calcShutterTimes()
432 if not timings:
433 print("No on-sky observations found, so no shutter efficiency stats are available yet.")
434 return
436 print(
437 f"Observations started at: seqNum {timings['firstObs']:>3} at"
438 f" {timings['startTime'].to_datetime().strftime('%H:%M:%S')} TAI"
439 )
440 print(
441 f"Observations ended at: seqNum {timings['lastObs']:>3} at"
442 f" {timings['endTime'].to_datetime().strftime('%H:%M:%S')} TAI"
443 )
444 print(f"Total time on sky: {precisedelta(timings['nightLength'])}")
445 print()
446 print(f"Shutter open time: {precisedelta(timings['shutterOpenTime'])}")
447 print(f"Readout time: {precisedelta(timings['readoutTime'])}")
448 engEff = 100 * (timings["shutterOpenTime"] + timings["readoutTime"]) / timings["nightLength"]
449 print(f"Engineering shutter efficiency = {engEff:.1f}%")
450 print()
451 print(f"Science integration: {precisedelta(timings['scienceIntegration'])}")
452 sciEff = 100 * (timings["scienceTimeTotal"] / timings["nightLength"])
453 print(f"Science shutter efficiency = {sciEff:.1f}%")
455 def getTimeDeltas(self) -> dict[int, int]:
456 """Returns a dict, keyed by seqNum, of the time since the end of the
457 last integration. The time since does include the readout, so is always
458 greater than or equal to the readout time.
460 Returns
461 -------
462 timeGaps : `dict`
463 Dictionary of the time gaps, in seconds, keyed by seqNum.
464 """
465 seqNums = list(self.data.keys()) # need a list not a generator, and NB it might not be contiguous!
466 dts = [0] # first item is zero by definition
467 for i, seqNum in enumerate(seqNums[1:]):
468 dt = self.data[seqNum]["datetime_begin"] - self.data[(seqNums[i])]["datetime_end"]
469 dts.append(dt.sec)
471 return {s: dt for s, dt in zip(seqNums, dts)}
473 def printObsGaps(self, threshold: float | int = 100, includeCalibs: bool = False) -> None:
474 """Print out the gaps between observations in a human-readable format.
476 Prints the most recent gaps first.
478 Parameters
479 ----------
480 threshold : `float`, optional
481 The minimum time gap to print out, in seconds.
482 includeCalibs : `bool`, optional
483 If True, start at the lowest seqNum, otherwise start when the
484 night's observing started.
485 """
486 if not HAVE_HUMANIZE:
487 self.log.warning("Please install humanize to make this print as intended.")
488 dts = self.getTimeDeltas()
490 allSeqNums = list(self.data.keys())
491 if includeCalibs:
492 seqNums = allSeqNums
493 else:
494 firstObs = self.getObservingStartSeqNum(method="heuristic")
495 if not firstObs:
496 print("No on-sky observations found, so there can be no gaps in observing yet.")
497 return
498 # there is always a big gap before firstObs by definition so add 1
499 startPoint = allSeqNums.index(firstObs) + 1
500 seqNums = allSeqNums[startPoint:]
502 messages = []
503 for seqNum in reversed(seqNums):
504 dt = dts[seqNum]
505 if dt > threshold:
506 messages.append(f"seqNum {seqNum:3}: {precisedelta(dt)} gap")
508 if messages:
509 print(f"Gaps between observations greater than {threshold}s:")
510 for line in messages:
511 print(line)
513 def getObservingStartSeqNum(self, method: str = "safe") -> int | None:
514 """Get the seqNum at which on-sky observations started.
516 If no on-sky observations were taken ``None`` is returned.
518 Parameters
519 ----------
520 method : `str`
521 The calculation method to use. Options are:
522 - 'safe': Use the first seqNum with an observation_type that is
523 explicitly not a calibration or test. This is a safe way of
524 excluding the calibs, but will include observations where we
525 take some closed dome test images, or start observing too early,
526 and go back to taking calibs for a while before the night starts.
527 - 'heuristic': Use a heuristic to find the first seqNum. The
528 current heuristic is to find the first seqNum with an observation
529 type of CWFS, as we always do a CWFS focus before going on sky.
530 This does not work well for old days, because this wasn't always
531 the way data was taken. Note: may be updated in the future, at
532 which point this will be renamed ``cwfs``.
534 Returns
535 -------
536 startSeqNum : `int`
537 The seqNum of the start of the night's observing.
538 """
539 allowedMethods = ["heuristic", "safe"]
540 if method not in allowedMethods:
541 raise ValueError(f"Method must be one of {allowedMethods}")
543 if method == "safe":
544 # as of 20221211, the full set of observation_types ever seen is:
545 # acq, bias, cwfs, dark, engtest, flat, focus, science, stuttered,
546 # test, unknown
547 offSkyObsTypes = ["bias", "dark", "flat", "test", "unknown"]
548 for seqNum in sorted(self.data.keys()):
549 if self.data[seqNum]["observation_type"] not in offSkyObsTypes:
550 return seqNum
551 return None
553 if method == "heuristic":
554 # take the first cwfs image and return that
555 seqNums = self.getSeqNumsMatching(observation_type="cwfs")
556 if not seqNums:
557 self.log.warning("No cwfs images found, observing is assumed not to have started.")
558 return None
559 return min(seqNums)
560 return None
562 def printObsTable(self, **kwargs: Any) -> None:
563 """Print a table of the days observations.
565 Parameters
566 ----------
567 **kwargs : `dict`
568 Filter the observation table according to seqNums which match these
569 {k: v} pairs. For example, to only print out science observations
570 pass ``observation_type='science'``.
571 """
572 seqNums = self.data.keys() if not kwargs else self.getSeqNumsMatching(**kwargs)
573 seqNums = sorted(seqNums) # should always be sorted, but is a total disaster here if not
575 dts = self.getTimeDeltas()
576 lines = []
577 for seqNum in seqNums:
578 try:
579 expTime = self.data[seqNum]["exposure_time"].value
580 imageType = self.data[seqNum]["observation_type"]
581 target = self.data[seqNum]["target_name"]
582 deadtime = dts[seqNum]
583 filt = self.data[seqNum]["physical_filter"]
585 msg = f"{seqNum} {target} {expTime:.1f} {deadtime:.02f} {imageType} {filt}"
586 except Exception:
587 msg = f"Error parsing {seqNum}!"
588 lines.append(msg)
590 print(r"seqNum target expTime deadtime imageType filt")
591 print(r"------ ------ ------- -------- --------- ----")
592 for line in lines:
593 print(line)
595 def getExposureMidpoint(self, seqNum: int) -> datetime.datetime:
596 """Return the midpoint of the exposure as a float in MJD.
598 Parameters
599 ----------
600 seqNum : `int`
601 The seqNum to get the midpoint for.
603 Returns
604 -------
605 midpoint : `datetime.datetime`
606 The midpoint, as a python datetime object.
607 """
608 timespan = self.data[seqNum]["timespan"]
609 expTime = self.data[seqNum]["exposure_time"]
610 return ((timespan.begin) + expTime / 2).to_datetime()
612 def plotPerObjectAirMass(
613 self, objects: Iterable[str] | None = None, airmassOneAtTop: bool = True, saveFig: str = ""
614 ) -> matplotlib.figure.Figure | None:
615 """Plot the airmass for objects observed over the course of the night.
617 Parameters
618 ----------
619 objects : `list` [`str`], optional
620 The objects to plot. If not provided, all objects are plotted.
621 airmassOneAtTop : `bool`, optional
622 Put the airmass of 1 at the top of the plot, like astronomers
623 expect.
624 saveFig : `str`, optional
625 Save the figure to this file path?
627 Return
628 ------
629 fig : `matplotlib.figure.Figure` or `None`
630 The figure object or `None` if nothing was plotted.
631 """
632 if not objects:
633 objects = self.stars
635 if not objects: # there's genuinely nothing now
636 self.log.info("No objects to plot")
637 return None
639 objects = ensure_iterable(objects)
641 fig = plt.figure(figsize=(16, 12))
642 for star in objects:
643 if star in CALIB_VALUES:
644 continue
645 seqNums = self.getSeqNumsMatching(target_name_short=star)
646 airMasses = [self.data[seqNum]["boresight_airmass"] for seqNum in seqNums]
647 obsTimes = [self.getExposureMidpoint(seqNum) for seqNum in seqNums]
648 color = self.cMap[star].color
649 marker = self.cMap[star].marker
650 obsDates = date2num(obsTimes)
651 plt.plot(obsDates, airMasses, color=color, marker=marker, label=star, ms=10, ls="")
653 plt.ylabel("Airmass", fontsize=20)
654 plt.xlabel("Time (UTC)", fontsize=20)
655 plt.xticks(rotation=25, horizontalalignment="right")
657 ax = plt.gca()
658 xfmt = matplotlib.dates.DateFormatter("%m-%d %H:%M:%S")
659 ax.xaxis.set_major_formatter(xfmt)
661 if airmassOneAtTop:
662 ax.set_ylim(ax.get_ylim()[::-1])
664 plt.legend(bbox_to_anchor=(1, 1.025), prop={"size": 15}, loc="upper left")
666 plt.tight_layout()
667 if saveFig:
668 plt.savefig(saveFig)
669 return fig
671 def _makePolarPlot(
672 self,
673 azimuthsInDegrees: list[float],
674 zenithAngles: list[float],
675 marker: str = "*-",
676 title: str | None = None,
677 makeFig: bool = True,
678 color: str | None = None,
679 objName: str | None = None,
680 ) -> matplotlib.axes.Axes:
681 """Private method to actually do the polar plotting.
683 azimuthsInDegrees : `list` [`float`]
684 The azimuth values, in degrees.
685 zenithAngles : `list` [`float`]
686 The zenith angle values, but more generally, the values on the
687 radial axis, so can be in whatever units you want.
688 marker : `str`, optional
689 The marker to use.
690 title : `str`, optional
691 The plot title.
692 makeFig : `bool`, optional
693 Make a new figure?
694 color : `str`, optional
695 The marker color.
696 objName : `str`, optional
697 The object name, for the legend.
699 Returns
700 -------
701 ax : `matplotlib.axes.Axe`
702 The axes on which the plot was made.
703 """
704 if makeFig:
705 _ = plt.figure(figsize=(10, 10))
706 ax = plt.subplot(111, polar=True)
707 assert isinstance(ax, PolarAxes), "Expected a polar plot"
708 ax.plot([a * np.pi / 180 for a in azimuthsInDegrees], zenithAngles, marker, c=color, label=objName)
709 if title:
710 ax.set_title(title, va="bottom")
711 ax.set_theta_zero_location("N")
712 ax.set_theta_direction(-1)
713 ax.set_rlim(0, 90)
714 return ax
716 def makeAltAzCoveragePlot(
717 self, objects: Iterable[str] | None = None, withLines: bool = False, saveFig: str = ""
718 ) -> matplotlib.figure.Figure | None:
719 """Make a polar plot of the azimuth and zenith angle for each object.
721 Plots the azimuth on the theta axis, and zenith angle (not altitude!)
722 on the radius axis, such that 0 is at the centre, like you're looking
723 top-down on the telescope.
725 Parameters
726 ----------
727 objects : `list` [`str`], optional
728 The objects to plot. If not provided, all objects are plotted.
729 withLines : `bool`, optional
730 Connect the points with lines?
731 saveFig : `str`, optional
732 Save the figure to this file path?
734 Return
735 ------
736 fig : `matplotlib.figure.Figure` or `None`
737 The figure object, or `None` if nothing was plotted.
738 """
739 if not objects:
740 objects = self.stars
741 objects = ensure_iterable(objects)
743 if not objects: # there's genuinely nothing now
744 self.log.info("No objects to plot")
745 return None
747 fig = plt.figure(figsize=(16, 12))
749 ax = None
750 for obj in objects:
751 if obj in CALIB_VALUES:
752 continue
753 seqNums = self.getSeqNumsMatching(target_name_short=obj)
754 altAzes = [self.data[seqNum]["altaz_begin"] for seqNum in seqNums]
755 alts = [altAz.alt.deg for altAz in altAzes if altAz is not None]
756 azes = [altAz.az.deg for altAz in altAzes if altAz is not None]
757 assert len(alts) == len(azes)
758 if len(azes) == 0:
759 self.log.warning(f"Found no alt/az data for {obj}")
760 zens = [90 - alt for alt in alts]
761 color = self.cMap[obj].color
762 marker = self.cMap[obj].marker
763 if withLines:
764 marker += "-"
766 ax = self._makePolarPlot(
767 azes, zens, marker=marker, title=None, makeFig=False, color=color, objName=obj
768 )
770 if ax is None:
771 self.log.info("Only calibs taken so far, nothing to plot")
772 return None
774 lgnd = ax.legend(bbox_to_anchor=(1.05, 1), prop={"size": 15}, loc="upper left")
775 ax.set_title("Axial coverage - azimuth (theta, deg) vs zenith angle (r, deg)", size=20)
777 for h in lgnd.legend_handles:
778 if h is None or not isinstance(h, matplotlib.lines.Line2D):
779 continue
780 size = 14
781 marker = str(h.get_marker())
782 if "-" in marker:
783 size += 5
784 h.set_markersize(size)
786 plt.tight_layout()
787 if saveFig:
788 plt.savefig(saveFig)
789 return fig