Coverage for python / lsst / summit / utils / starTracker / starTracker.py: 0%
85 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-07 09:02 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-07 09:02 +0000
1# This file is part of summit_utils.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
22import datetime
23import os
24from dataclasses import dataclass
26import numpy as np
27from astropy.io import fits
28from PIL import Image
30import lsst.afw.image as afwImage
31import lsst.daf.base as dafBase
32from lsst.afw.image import ExposureInfo, VisitInfo
33from lsst.summit.utils.dateTime import dayObsIntToString
35__all__ = (
36 "KNOWN_CAMERAS",
37 "narrowCam",
38 "wideCam",
39 "fastCam",
40 "StarTrackerCamera",
41 "tifToExp",
42 "fitsToExp",
43 "openFile",
44 "dayObsToDateTime",
45 "isStreamingModeFile",
46 "dayObsSeqNumFromFilename",
47 "dayObsSeqNumFrameNumFromFilename",
48 "getRawDataDirForDayObs",
49)
51KNOWN_CAMERAS = ("narrow", "wide", "fast")
54@dataclass(frozen=True, kw_only=True, slots=True)
55class StarTrackerCamera:
56 """A frozen dataclass for StarTracker camera configs"""
58 cameraType: str
59 suffix: str
60 suffixWithSpace: str
61 doAstrometry: bool
62 cameraNumber: int
63 snr: float
64 minPix: int
65 brightSourceFraction: float
66 scaleError: float
67 doSmoothPlot: bool
70narrowCam = StarTrackerCamera(
71 cameraType="narrow",
72 suffix="_narrow",
73 suffixWithSpace=" narrow",
74 doAstrometry=True,
75 cameraNumber=102,
76 snr=5,
77 minPix=25,
78 brightSourceFraction=0.95,
79 scaleError=5,
80 doSmoothPlot=True,
81)
82wideCam = StarTrackerCamera(
83 cameraType="wide",
84 suffix="_wide",
85 suffixWithSpace=" wide",
86 doAstrometry=True,
87 cameraNumber=101,
88 snr=5,
89 minPix=25,
90 brightSourceFraction=0.8,
91 scaleError=5,
92 doSmoothPlot=True,
93)
94fastCam = StarTrackerCamera(
95 cameraType="fast",
96 suffix="_fast",
97 suffixWithSpace=" fast",
98 doAstrometry=True,
99 cameraNumber=103,
100 snr=2.5,
101 minPix=10,
102 brightSourceFraction=0.95,
103 scaleError=60,
104 doSmoothPlot=False,
105)
108def tifToExp(filename: str) -> afwImage.Exposure:
109 """Open a tif image as an exposure.
111 Opens the file, sets a blank mask plane, and converts the data to
112 `np.float32` and returns an exposure, currently with no visitInfo.
114 TODO: DM-38422 Once we have a way of getting the expTime, set that,
115 and the frequency at which the images were taken.
117 Parameters
118 ----------
119 filename : `str`
120 The full path to the file to load.
122 Returns
123 -------
124 exp : `lsst.afw.image.Exposure`
125 The exposure.
126 """
127 im = Image.open(filename)
128 imageData = im.getdata()
129 data = np.asarray(imageData, dtype=np.float32)
130 data = data.reshape(im.height, im.width)
131 img = afwImage.ImageF(data)
132 maskedIm = afwImage.MaskedImageF(img)
133 exp = afwImage.ExposureF(maskedIm)
134 return exp
137def fitsToExp(filename: str) -> afwImage.Exposure:
138 """Open a fits file as an exposure.
140 Parameters
141 ----------
142 filename : `str`
143 The full path to the file to load.
145 Returns
146 -------
147 exp : `lsst.afw.image.Exposure`
148 The exposure.
149 """
150 with fits.open(filename) as f:
151 header = f[0].header
152 data = f[1].data
154 data = np.asarray(data, dtype=np.float32)
155 img = afwImage.ImageF(data)
156 maskedIm = afwImage.MaskedImageF(img)
158 viDict = {}
159 viDict["exposureTime"] = header.get("EXPTIME")
161 # set the midpoint of BEG and END as the DATE
162 begin = datetime.datetime.fromisoformat(header.get("DATE-BEG"))
163 end = datetime.datetime.fromisoformat(header.get("DATE-END"))
164 mid = begin + (end - begin) / 2
165 newTime = dafBase.DateTime(mid.isoformat(), dafBase.DateTime.Timescale.TAI)
166 viDict["date"] = newTime
168 vi = VisitInfo(**viDict)
169 expInfo = ExposureInfo(visitInfo=vi)
170 exp = afwImage.ExposureF(maskedIm, exposureInfo=expInfo)
171 return exp
174def openFile(filename: str) -> afwImage.Exposure:
175 """Open a file as an exposure, based on the file type.
177 Parameters
178 ----------
179 filename : `str`
180 The full path to the file to load.
182 Returns
183 -------
184 exp : `lsst.afw.image.Exposure`
185 The exposure.
186 """
187 if filename.endswith(".tif"):
188 return tifToExp(filename)
189 elif filename.endswith(".fits"):
190 return fitsToExp(filename)
191 else:
192 raise ValueError("File type not recognized")
195def dayObsToDateTime(dayObs: int) -> datetime.datetime:
196 """Convert a dayObs to a datetime.
198 Parameters
199 ----------
200 dayObs : `int`
201 The dayObs.
203 Returns
204 -------
205 datetime : `datetime`
206 The datetime.
207 """
208 return datetime.datetime.strptime(dayObsIntToString(dayObs), "%Y-%m-%d")
211def isStreamingModeFile(filename: str) -> bool:
212 """Check if a filename is a streaming mode file.
214 Parameters
215 ----------
216 filename : `str`
217 The filename.
219 Returns
220 -------
221 isStreaming : `bool`
222 Whether the file is a streaming mode file.
223 """
224 # non-streaming filenames are like GC103_O_20240304_000009.fits
225 # streaming filenames are like GC103_O_20240304_000007_0001316.fits
226 # which is <camNum>_O_<dayObs>_<seqNum>_<streamSeqNum>.fits
227 # so 5 sections means streaming, 4 means normal
228 return os.path.basename(filename).count("_") == 4
231def dayObsSeqNumFromFilename(filename: str) -> tuple[int, int] | tuple[None, None]:
232 """Get the dayObs and seqNum from a filename.
234 If the file is a streaming mode file (`None`, `None`) is returned.
236 Parameters
237 ----------
238 filename : `str`
239 The filename.
241 Returns
242 -------
243 dayObs : `int` or `None`
244 The dayObs.
245 seqNum : `int` or `None`
246 The seqNum.
247 """
248 # filenames are like GC101_O_20221114_000005.fits
249 filename = os.path.basename(filename) # in case we're passed a full path
251 # these must not be processed like normal files as they're a part of a long
252 # series, so return None, None even if that potentially causes problems
253 # elsewhere, that code needs to deal with that.
254 if isStreamingModeFile(filename):
255 return None, None
257 # this is a regular file
258 parts = filename.split("_")
259 _, _, dayObs, seqNumAndSuffix = parts
260 seqNum = seqNumAndSuffix.removesuffix(".fits")
262 return int(dayObs), int(seqNum)
265def dayObsSeqNumFrameNumFromFilename(filename: str) -> tuple[int, int, int]:
266 """Get the dayObs, seqNum and frameNum from a filename.
268 If the file is not a streaming mode file then a `ValueError` is raised.
270 Parameters
271 ----------
272 filename : `str`
273 The filename.
275 Returns
276 -------
277 dayObs : `int`
278 The dayObs.
279 seqNum : `int`
280 The seqNum.
281 frameNum : `int`
282 The frameNum.
284 Raises
285 ------
286 ValueError
287 Raised if the file is not a streaming mode file.
288 """
289 # filenames are like GC103_O_20240308_000169_0000321.fits
290 # which follows the pattern <camNum>_O_<dayObs>_<seqNum>_<frameNum>.fits
291 filename = os.path.basename(filename) # in case we're passed a full path
293 if not isStreamingModeFile(filename):
294 raise ValueError(f"{filename} is not a streaming mode file")
296 # this is a regular file
297 parts = filename.split("_")
298 _, _, dayObs, seqNum, frameNumAndSuffix = parts
299 frameNum = frameNumAndSuffix.removesuffix(".fits")
301 return int(dayObs), int(seqNum), int(frameNum)
304def getRawDataDirForDayObs(rootDataPath: str, camera: StarTrackerCamera, dayObs: int) -> str:
305 """Get the raw data dir for a given dayObs.
307 Parameters
308 ----------
309 rootDataPath : `str`
310 The root data path.
311 camera : `lsst.summit.utils.starTracker.StarTrackerCamera`
312 The camera to get the raw data for.
313 dayObs : `int`
314 The dayObs.
315 """
316 camNum = camera.cameraNumber
317 dayObsDateTime = datetime.datetime.strptime(str(dayObs), "%Y%m%d")
318 dirSuffix = (
319 f"GenericCamera/{camNum}/{dayObsDateTime.year}/" f"{dayObsDateTime.month:02}/{dayObsDateTime.day:02}/"
320 )
321 return os.path.join(rootDataPath, dirSuffix)