Coverage for python / lsst / summit / utils / starTracker / starTracker.py: 0%

85 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-26 09:44 +0000

1# This file is part of summit_utils. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21 

22import datetime 

23import os 

24from dataclasses import dataclass 

25 

26import numpy as np 

27from astropy.io import fits 

28from PIL import Image 

29 

30import lsst.afw.image as afwImage 

31import lsst.daf.base as dafBase 

32from lsst.afw.image import ExposureInfo, VisitInfo 

33from lsst.summit.utils.dateTime import dayObsIntToString 

34 

35__all__ = ( 

36 "KNOWN_CAMERAS", 

37 "narrowCam", 

38 "wideCam", 

39 "fastCam", 

40 "StarTrackerCamera", 

41 "tifToExp", 

42 "fitsToExp", 

43 "openFile", 

44 "dayObsToDateTime", 

45 "isStreamingModeFile", 

46 "dayObsSeqNumFromFilename", 

47 "dayObsSeqNumFrameNumFromFilename", 

48 "getRawDataDirForDayObs", 

49) 

50 

51KNOWN_CAMERAS = ("narrow", "wide", "fast") 

52 

53 

54@dataclass(frozen=True, kw_only=True, slots=True) 

55class StarTrackerCamera: 

56 """A frozen dataclass for StarTracker camera configs""" 

57 

58 cameraType: str 

59 suffix: str 

60 suffixWithSpace: str 

61 doAstrometry: bool 

62 cameraNumber: int 

63 snr: float 

64 minPix: int 

65 brightSourceFraction: float 

66 scaleError: float 

67 doSmoothPlot: bool 

68 

69 

70narrowCam = StarTrackerCamera( 

71 cameraType="narrow", 

72 suffix="_narrow", 

73 suffixWithSpace=" narrow", 

74 doAstrometry=True, 

75 cameraNumber=102, 

76 snr=5, 

77 minPix=25, 

78 brightSourceFraction=0.95, 

79 scaleError=5, 

80 doSmoothPlot=True, 

81) 

82wideCam = StarTrackerCamera( 

83 cameraType="wide", 

84 suffix="_wide", 

85 suffixWithSpace=" wide", 

86 doAstrometry=True, 

87 cameraNumber=101, 

88 snr=5, 

89 minPix=25, 

90 brightSourceFraction=0.8, 

91 scaleError=5, 

92 doSmoothPlot=True, 

93) 

94fastCam = StarTrackerCamera( 

95 cameraType="fast", 

96 suffix="_fast", 

97 suffixWithSpace=" fast", 

98 doAstrometry=True, 

99 cameraNumber=103, 

100 snr=2.5, 

101 minPix=10, 

102 brightSourceFraction=0.95, 

103 scaleError=60, 

104 doSmoothPlot=False, 

105) 

106 

107 

108def tifToExp(filename: str) -> afwImage.Exposure: 

109 """Open a tif image as an exposure. 

110 

111 Opens the file, sets a blank mask plane, and converts the data to 

112 `np.float32` and returns an exposure, currently with no visitInfo. 

113 

114 TODO: DM-38422 Once we have a way of getting the expTime, set that, 

115 and the frequency at which the images were taken. 

116 

117 Parameters 

118 ---------- 

119 filename : `str` 

120 The full path to the file to load. 

121 

122 Returns 

123 ------- 

124 exp : `lsst.afw.image.Exposure` 

125 The exposure. 

126 """ 

127 im = Image.open(filename) 

128 imageData = im.getdata() 

129 data = np.asarray(imageData, dtype=np.float32) 

130 data = data.reshape(im.height, im.width) 

131 img = afwImage.ImageF(data) 

132 maskedIm = afwImage.MaskedImageF(img) 

133 exp = afwImage.ExposureF(maskedIm) 

134 return exp 

135 

136 

137def fitsToExp(filename: str) -> afwImage.Exposure: 

138 """Open a fits file as an exposure. 

139 

140 Parameters 

141 ---------- 

142 filename : `str` 

143 The full path to the file to load. 

144 

145 Returns 

146 ------- 

147 exp : `lsst.afw.image.Exposure` 

148 The exposure. 

149 """ 

150 with fits.open(filename) as f: 

151 header = f[0].header 

152 data = f[1].data 

153 

154 data = np.asarray(data, dtype=np.float32) 

155 img = afwImage.ImageF(data) 

156 maskedIm = afwImage.MaskedImageF(img) 

157 

158 viDict = {} 

159 viDict["exposureTime"] = header.get("EXPTIME") 

160 

161 # set the midpoint of BEG and END as the DATE 

162 begin = datetime.datetime.fromisoformat(header.get("DATE-BEG")) 

163 end = datetime.datetime.fromisoformat(header.get("DATE-END")) 

164 mid = begin + (end - begin) / 2 

165 newTime = dafBase.DateTime(mid.isoformat(), dafBase.DateTime.Timescale.TAI) 

166 viDict["date"] = newTime 

167 

168 vi = VisitInfo(**viDict) 

169 expInfo = ExposureInfo(visitInfo=vi) 

170 exp = afwImage.ExposureF(maskedIm, exposureInfo=expInfo) 

171 return exp 

172 

173 

174def openFile(filename: str) -> afwImage.Exposure: 

175 """Open a file as an exposure, based on the file type. 

176 

177 Parameters 

178 ---------- 

179 filename : `str` 

180 The full path to the file to load. 

181 

182 Returns 

183 ------- 

184 exp : `lsst.afw.image.Exposure` 

185 The exposure. 

186 """ 

187 if filename.endswith(".tif"): 

188 return tifToExp(filename) 

189 elif filename.endswith(".fits"): 

190 return fitsToExp(filename) 

191 else: 

192 raise ValueError("File type not recognized") 

193 

194 

195def dayObsToDateTime(dayObs: int) -> datetime.datetime: 

196 """Convert a dayObs to a datetime. 

197 

198 Parameters 

199 ---------- 

200 dayObs : `int` 

201 The dayObs. 

202 

203 Returns 

204 ------- 

205 datetime : `datetime` 

206 The datetime. 

207 """ 

208 return datetime.datetime.strptime(dayObsIntToString(dayObs), "%Y-%m-%d") 

209 

210 

211def isStreamingModeFile(filename: str) -> bool: 

212 """Check if a filename is a streaming mode file. 

213 

214 Parameters 

215 ---------- 

216 filename : `str` 

217 The filename. 

218 

219 Returns 

220 ------- 

221 isStreaming : `bool` 

222 Whether the file is a streaming mode file. 

223 """ 

224 # non-streaming filenames are like GC103_O_20240304_000009.fits 

225 # streaming filenames are like GC103_O_20240304_000007_0001316.fits 

226 # which is <camNum>_O_<dayObs>_<seqNum>_<streamSeqNum>.fits 

227 # so 5 sections means streaming, 4 means normal 

228 return os.path.basename(filename).count("_") == 4 

229 

230 

231def dayObsSeqNumFromFilename(filename: str) -> tuple[int, int] | tuple[None, None]: 

232 """Get the dayObs and seqNum from a filename. 

233 

234 If the file is a streaming mode file (`None`, `None`) is returned. 

235 

236 Parameters 

237 ---------- 

238 filename : `str` 

239 The filename. 

240 

241 Returns 

242 ------- 

243 dayObs : `int` or `None` 

244 The dayObs. 

245 seqNum : `int` or `None` 

246 The seqNum. 

247 """ 

248 # filenames are like GC101_O_20221114_000005.fits 

249 filename = os.path.basename(filename) # in case we're passed a full path 

250 

251 # these must not be processed like normal files as they're a part of a long 

252 # series, so return None, None even if that potentially causes problems 

253 # elsewhere, that code needs to deal with that. 

254 if isStreamingModeFile(filename): 

255 return None, None 

256 

257 # this is a regular file 

258 parts = filename.split("_") 

259 _, _, dayObs, seqNumAndSuffix = parts 

260 seqNum = seqNumAndSuffix.removesuffix(".fits") 

261 

262 return int(dayObs), int(seqNum) 

263 

264 

265def dayObsSeqNumFrameNumFromFilename(filename: str) -> tuple[int, int, int]: 

266 """Get the dayObs, seqNum and frameNum from a filename. 

267 

268 If the file is not a streaming mode file then a `ValueError` is raised. 

269 

270 Parameters 

271 ---------- 

272 filename : `str` 

273 The filename. 

274 

275 Returns 

276 ------- 

277 dayObs : `int` 

278 The dayObs. 

279 seqNum : `int` 

280 The seqNum. 

281 frameNum : `int` 

282 The frameNum. 

283 

284 Raises 

285 ------ 

286 ValueError 

287 Raised if the file is not a streaming mode file. 

288 """ 

289 # filenames are like GC103_O_20240308_000169_0000321.fits 

290 # which follows the pattern <camNum>_O_<dayObs>_<seqNum>_<frameNum>.fits 

291 filename = os.path.basename(filename) # in case we're passed a full path 

292 

293 if not isStreamingModeFile(filename): 

294 raise ValueError(f"{filename} is not a streaming mode file") 

295 

296 # this is a regular file 

297 parts = filename.split("_") 

298 _, _, dayObs, seqNum, frameNumAndSuffix = parts 

299 frameNum = frameNumAndSuffix.removesuffix(".fits") 

300 

301 return int(dayObs), int(seqNum), int(frameNum) 

302 

303 

304def getRawDataDirForDayObs(rootDataPath: str, camera: StarTrackerCamera, dayObs: int) -> str: 

305 """Get the raw data dir for a given dayObs. 

306 

307 Parameters 

308 ---------- 

309 rootDataPath : `str` 

310 The root data path. 

311 camera : `lsst.summit.utils.starTracker.StarTrackerCamera` 

312 The camera to get the raw data for. 

313 dayObs : `int` 

314 The dayObs. 

315 """ 

316 camNum = camera.cameraNumber 

317 dayObsDateTime = datetime.datetime.strptime(str(dayObs), "%Y%m%d") 

318 dirSuffix = ( 

319 f"GenericCamera/{camNum}/{dayObsDateTime.year}/" f"{dayObsDateTime.month:02}/{dayObsDateTime.day:02}/" 

320 ) 

321 return os.path.join(rootDataPath, dirSuffix)