Coverage for python / lsst / summit / extras / monitoring.py: 16%
97 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-05 08:54 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-05 08:54 +0000
1# This file is part of summit_extras.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
22from time import sleep
23from typing import Any
25import numpy as np
27import lsst.afw.cameraGeom.utils as cgUtils
28import lsst.afw.display as afwDisplay
29import lsst.afw.image as afwImage
30import lsst.geom as geom
31from lsst.pex.exceptions import NotFoundError
32from lsst.summit.utils.bestEffort import BestEffortIsr
33from lsst.summit.utils.butlerUtils import (
34 getExpIdFromDayObsSeqNum,
35 getExpRecordFromDataId,
36 getMostRecentDataId,
37 makeDefaultLatissButler,
38)
40# TODO: maybe add option to create display and return URL?
43class Monitor:
44 """Real-time AuxTel display driver.
46 Scans the butler repo for new images and sends each one, after
47 running bestEffortIsr, to a Firefly display. Largely superseded by
48 RubinTV and retained mainly for engineering workflows.
50 Parameters
51 ----------
52 fireflyDisplay : `lsst.afw.display.Display`
53 A Firefly display instance to mtv images to.
54 **kwargs
55 Additional keyword arguments forwarded to
56 `lsst.summit.utils.bestEffort.BestEffortIsr`.
57 """
59 cadence = 1 # in seconds
60 runIsr = True
62 def __init__(self, fireflyDisplay: afwDisplay, **kwargs: Any):
63 self.butler = makeDefaultLatissButler()
64 self.display = fireflyDisplay
65 self.bestEffort = BestEffortIsr(**kwargs)
66 self.writeQuickLookImages = None
67 self.overlayAmps = False # do the overlay?
68 self.measureFromChipCenter = False
70 def _getLatestImageDataIdAndExpId(self) -> tuple[dict, int]:
71 """Return the dataId and expId of the most recent raw image.
73 Returns
74 -------
75 dataId : `dict`
76 The most recent raw dataId in the butler repo.
77 expId : `int`
78 The corresponding exposure id.
79 """
80 dataId = getMostRecentDataId(self.butler)
81 expId = getExpIdFromDayObsSeqNum(self.butler, dataId)["exposure"]
82 return dataId, expId
84 def _calcImageStats(self, exp: afwImage.Exposure) -> list[str]:
85 """Compute a short set of image statistic strings.
87 Parameters
88 ----------
89 exp : `lsst.afw.image.Exposure`
90 The exposure to summarise.
92 Returns
93 -------
94 elements : `list` [`str`]
95 Human-readable statistic lines (median, mean) for overlay
96 on the display.
97 """
98 elements = []
99 median = np.median(exp.image.array)
100 elements.append(f"Median={median:.2f}")
101 mean = np.mean(exp.image.array)
102 # elements.append(f"Median={median:.2f}")
103 elements.append(f"Mean={mean:.2f}")
105 return elements
107 def _makeImageInfoText(self, dataId: dict, exp: afwImage.Exposure, asList: bool = False) -> list | str:
108 """Build the overlay text describing a displayed exposure.
110 Parameters
111 ----------
112 dataId : `dict`
113 The dataId for the exposure.
114 exp : `lsst.afw.image.Exposure`
115 The exposure being displayed.
116 asList : `bool`, optional
117 If `True` return a list of lines suitable for individual
118 placement on the display. Otherwise return a single
119 space-joined string suitable for a window title.
121 Returns
122 -------
123 info : `list` [`str`] or `str`
124 The assembled info text.
125 """
126 # TODO: add the following to the display:
127 # az, el, zenith angle
128 # main source centroid
129 # PSF
130 # num saturated pixels (or maybe just an isSaturated bool)
131 # main star max ADU (post isr)
133 elements = []
135 expRecord = getExpRecordFromDataId(self.butler, dataId)
136 imageType = expRecord.observation_type
137 obj = None
138 if imageType.upper() not in ["BIAS", "DARK", "FLAT"]:
139 try:
140 obj = expRecord.target_name
141 obj = obj.replace(" ", "")
142 except Exception:
143 pass
145 for k, v in dataId.items(): # dataId done per line for vertical display
146 elements.append(f"{k}:{v}")
148 if obj:
149 elements.append(f"{obj}")
150 else:
151 elements.append(f"{imageType}")
153 expTime = exp.getInfo().getVisitInfo().getExposureTime()
154 filt = exp.filter.physicalLabel
156 elements.append(f"{expTime}s exp")
157 elements.append(f"{filt}")
159 elements.extend(self._calcImageStats(exp))
161 if asList:
162 return elements
163 return " ".join([e for e in elements])
165 def _printImageInfo(self, elements: list[str]) -> None:
166 """Stamp info text lines onto the current display.
168 Parameters
169 ----------
170 elements : `list` [`str`]
171 Lines of overlay text to place on the display, stacked
172 vertically just below the title bar.
173 """
174 size = 3
175 top = 3850 # just under title for size=3
176 xnom = -600 # 0 is the left edge of the image
177 vSpacing = 100 # about right for size=3, can make f(size) if needed
179 # TODO: add a with buffering and a .flush()
180 # Also maybe a sleep as it seems buggy
181 for i, item in enumerate(elements):
182 y = top - (i * vSpacing)
183 x = xnom + (size * 18.5 * len(item) // 2)
184 self.display.dot(str(item), x, y, size, ctype="red", fontFamily="courier")
186 def run(self, durationInSeconds: int = -1) -> None:
187 """Run the monitor, displaying new images as they are taken.
189 Polls the butler repo at ``self.cadence`` seconds and pushes
190 each newly seen exposure to the Firefly display, optionally
191 running bestEffortIsr first.
193 Parameters
194 ----------
195 durationInSeconds : `int`, optional
196 How long to run for. Use ``-1`` to run effectively forever.
197 """
199 if durationInSeconds == -1:
200 nLoops = int(1e9)
201 else:
202 nLoops = int(durationInSeconds // self.cadence)
204 lastDisplayed = -1
205 for i in range(nLoops):
206 try:
207 dataId, expId = self._getLatestImageDataIdAndExpId()
209 if lastDisplayed == expId:
210 sleep(self.cadence)
211 continue
213 if self.runIsr:
214 exp = self.bestEffort.getExposure(dataId)
215 else:
216 exp = self.butler.get("raw", dataId=dataId)
218 # TODO: add logic to deal with amp overlay and chip center
219 # being mutually exclusive
220 if self.measureFromChipCenter: # after writing only!
221 exp.setXY0(geom.PointI(-2036, -2000))
223 print(f"Displaying {dataId}...")
224 imageInfoText = self._makeImageInfoText(dataId, exp, asList=True)
225 # too long of a title breaks Java FITS i/o
226 fireflyTitle = " ".join([s for s in imageInfoText])[:67]
227 try:
228 self.display.scale("asinh", "zscale")
229 self.display.mtv(exp, title=fireflyTitle)
230 except Exception as e: # includes JSONDecodeError, HTTPError, anything else
231 print(f"Caught error {e}, skipping this image") # TODO: try again maybe?
233 if self.overlayAmps:
234 cgUtils.overlayCcdBoxes(exp.getDetector(), display=self.display, isTrimmed=True)
236 assert isinstance(imageInfoText, list)
237 self._printImageInfo(imageInfoText)
238 lastDisplayed = expId
240 except NotFoundError as e: # NotFoundError when filters aren't defined
241 print(f"Skipped displaying due to {e}")
242 sleep(self.cadence)
243 return