Coverage for python / lsst / atmospec / nightAnalysis.py: 0%

117 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-22 09:00 +0000

1# This file is part of atmospec. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21 

22from astropy.coordinates import Angle 

23import numpy as np 

24 

25import lsst.daf.butler as dafButler 

26from lsst.obs.lsst.translators.lsst import FILTER_DELIMITER 

27from .utils import isDispersedDataId 

28 

29__all__ = ["NightStellarSpectra", "getLineValue", "LINE_NAMES"] 

30 

31LINE_NAME_COLUMN = 0 

32 

33# mapping from the table's internal naming to something you can type 

34LINE_NAMES = {'H_alpha': '$H\\alpha$', 

35 'H_beta': '$H\\beta$', 

36 'H_gamma': '$H\\gamma$', 

37 'H_delta': '$H\\delta$', 

38 'H_epsilon': '$H\\epsilon', 

39 'O2_b': '$O_2(B)$', 

40 'O2_z': '$O_2(Z)$', 

41 'O2_y': '$O_2(Y)$', 

42 'O2': '$O_2$', 

43 'water': '$H_2 O$', # this actually maps to more than one line! Needs fixing upstream 

44 } 

45 

46LINE_NAMES_REVERSED = {v: k for k, v in LINE_NAMES.items()} 

47 

48 

49def _getRowNum(table, lineName): 

50 """Surely there's a better/quicker way to do this""" 

51 try: 

52 actualName = LINE_NAMES[lineName] 

53 except KeyError: 

54 raise RuntimeError(f"Unknown line name {lineName}") from None # error chaining unhelpful here 

55 

56 for rowNum, row in enumerate(table.iterrows()): 

57 if row[LINE_NAME_COLUMN] == actualName: 

58 return rowNum 

59 # NB: this function is not combined with getLineValue() because 

60 # the None needs to checked for, as dereferencing the table with 

61 # t[colName][None] returns column itself 

62 return None 

63 

64 

65def getLineValue(table, lineName, columnName, nanForMissingValues=True): 

66 """Surely there's a better/quicker way to do this""" 

67 

68 rowNum = _getRowNum(table, lineName) 

69 if not rowNum: 

70 if nanForMissingValues: 

71 return np.nan 

72 raise ValueError(f"Line {lineName} not found in table") 

73 

74 return table[columnName][rowNum] 

75 

76 

77class NightStellarSpectra: 

78 """Class for holding the results for a night's observations of a given star 

79 """ 

80 

81 def __init__(self, butler, dayObs, targetName, *, instrument="LATISS", ignoreSeqNums=[], 

82 collections=None): 

83 self.dayObs = int(dayObs) 

84 self.targetName = targetName 

85 

86 if isinstance(butler, dafButler.Butler): 

87 self.butler = dafButler.Butler(butler=butler, instrument=instrument, collections=collections) 

88 else: 

89 self.butler = dafButler.Butler(butler, instrument=instrument, collections=collections) 

90 self._loadExtractions(ignoreSeqNums) 

91 # xxx maybe just load everything and then call removeSeqNums()? 

92 

93 def isDispersed(self, seqNum): 

94 """Check if this observation is dispersed.""" 

95 dataId = {"day_obs": self.dayObs, "seq_num": seqNum} 

96 return isDispersedDataId(dataId, self.butler) 

97 

98 def _readOneExtractionFile(self, seqNum): 

99 datasetType = "extraction" 

100 try: 

101 return self.butler.get(datasetType, 

102 seq_num=seqNum, 

103 day_obs=self.dayObs) 

104 except LookupError: 

105 return None 

106 

107 def _loadExtractions(self, ignoreSeqNums): 

108 # Get all the observations for the night. 

109 where = "exposure.day_obs = dayObs and exposure.target_name = targetName" 

110 records = self.butler.registry.queryDimensionRecords("exposure", 

111 where=where, 

112 bind={"dayObs": self.dayObs, 

113 "targetName": self.targetName}, 

114 ) 

115 allSeqNums = [r.seq_num for r in records] 

116 self.seqNums = [] 

117 nIgnored = 0 

118 

119 for seqNum in allSeqNums: 

120 if self.isDispersed(seqNum): 

121 if seqNum not in ignoreSeqNums: 

122 self.seqNums.append(seqNum) 

123 else: 

124 nIgnored += 1 

125 

126 msg = (f"Found {len(self.seqNums)+nIgnored} dispersed observations of " 

127 + f"{self.targetName} on {self.dayObs} in registry") 

128 if nIgnored: 

129 msg += f" of which {nIgnored} were skipped." 

130 print(msg) 

131 

132 # Force sorted order and remove any potential duplicates. 

133 self.seqNums = sorted(set(self.seqNums)) 

134 

135 self.data = {} 

136 successes = [] 

137 for seqNum in self.seqNums: 

138 linesTable = self._readOneExtractionFile(seqNum) 

139 if linesTable: 

140 self.data[seqNum] = linesTable 

141 successes.append(seqNum) 

142 self.seqNums = successes 

143 print(f"Loaded extractions for {len(self.data.keys())} of the above") 

144 return 

145 

146 def removeSeqNums(self, seqNums): 

147 for seqNum in seqNums: 

148 if seqNum in self.seqNums: 

149 assert seqNum in self.data.keys() 

150 self.seqNums.remove(seqNum) 

151 self.data.pop(seqNum) 

152 

153 def _getExposureRecords(self): 

154 """Retrieve the exposure records for the relevant exposures. 

155 

156 Returns 

157 ------- 

158 records : `dict` [`int`, `DimensionRecord`] 

159 The matching records, indexed by sequence number. 

160 """ 

161 where = "exposure.day_obs = dayObs" 

162 records = self.butler.registry.queryDimensionRecords("exposure", 

163 where=where, 

164 bind={"dayObs": self.dayObs}, 

165 ).order_by("seq_num") 

166 seqNums = set(self.seqNums) # Use set for faster lookup. 

167 

168 # Store in dict indexed by sequence number. This allows the caller 

169 # to choose their own ordering and removes any potential duplicates. 

170 return {r.seq_num: r for r in records if r.seq_num in seqNums} 

171 

172 def getFilterDisperserSet(self): 

173 # Doing a query per seqNum is going to be slow, so query for the 

174 # whole night and filter. 

175 records = self._getExposureRecords() 

176 fullFilters = {records[seq_num].physical_filter for seq_num in self.seqNums} 

177 return fullFilters 

178 

179 def getFilterSet(self): 

180 fullFilters = self.getFilterDisperserSet() 

181 return {filt.split(FILTER_DELIMITER)[0] for filt in fullFilters} 

182 

183 def getDisperserSet(self): 

184 fullFilters = self.getFilterDisperserSet() 

185 return {filt.split(FILTER_DELIMITER)[1] for filt in fullFilters} 

186 

187 def getLineValue(self, seqNum, lineName, columnName): 

188 # just convenient to be able to call this on a class instance as well 

189 # as the free floating function - nothing deep happening here 

190 return getLineValue(self.data[seqNum], lineName, columnName) 

191 

192 def getLineValues(self, lineName, columnName): 

193 return [getLineValue(self.data[seqNum], lineName, columnName) if seqNum in self.data else np.nan 

194 for seqNum in self.seqNums] 

195 

196 def getAllTableLines(self, includeIntermittentLines=True): 

197 lineSets = [] 

198 for seqNum, table in self.data.items(): 

199 lineSets.append(set(table.columns[LINE_NAME_COLUMN].data)) 

200 

201 if includeIntermittentLines: 

202 ret = set.union(*lineSets) 

203 else: 

204 ret = set.intersection(*lineSets) 

205 

206 # we get byte strings back, so translate, and also lookup in 

207 # line name dictionary for convenience 

208 lines = [text.decode('utf-8') for text in ret] 

209 lineNames = [LINE_NAMES_REVERSED[line] for line in lines] 

210 return lineNames 

211 

212 def getObsTimes(self): 

213 # TODO: Add option to subtract int part and multiply up 

214 # to make it a human-readable small number 

215 records = self._getExposureRecords() 

216 dates = [records[seq_num].timespan.begin for seq_num in self.seqNums] 

217 return [d.mjd for d in dates] 

218 

219 def getAirmasses(self): 

220 records = self._getExposureRecords() 

221 zenith_angles = [records[seq_num].zenith_angle for seq_num in self.seqNums] 

222 angles = [Angle(za, unit="deg") for za in zenith_angles] 

223 return [1/np.cos(za.radian) for za in angles] 

224 

225 def printObservationTable(self): 

226 records = self._getExposureRecords() 

227 for seq_num in self.seqNums: 

228 r = records[seq_num] 

229 print(r.seq_num, r.physical_filter)