Coverage for python / lsst / atmospec / nightAnalysis.py: 0%
117 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-21 10:46 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-21 10:46 +0000
1# This file is part of atmospec.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
22from astropy.coordinates import Angle
23import numpy as np
25import lsst.daf.butler as dafButler
26from lsst.obs.lsst.translators.lsst import FILTER_DELIMITER
27from .utils import isDispersedDataId
29__all__ = ["NightStellarSpectra", "getLineValue", "LINE_NAMES"]
31LINE_NAME_COLUMN = 0
33# mapping from the table's internal naming to something you can type
34LINE_NAMES = {'H_alpha': '$H\\alpha$',
35 'H_beta': '$H\\beta$',
36 'H_gamma': '$H\\gamma$',
37 'H_delta': '$H\\delta$',
38 'H_epsilon': '$H\\epsilon',
39 'O2_b': '$O_2(B)$',
40 'O2_z': '$O_2(Z)$',
41 'O2_y': '$O_2(Y)$',
42 'O2': '$O_2$',
43 'water': '$H_2 O$', # this actually maps to more than one line! Needs fixing upstream
44 }
46LINE_NAMES_REVERSED = {v: k for k, v in LINE_NAMES.items()}
49def _getRowNum(table, lineName):
50 """Surely there's a better/quicker way to do this"""
51 try:
52 actualName = LINE_NAMES[lineName]
53 except KeyError:
54 raise RuntimeError(f"Unknown line name {lineName}") from None # error chaining unhelpful here
56 for rowNum, row in enumerate(table.iterrows()):
57 if row[LINE_NAME_COLUMN] == actualName:
58 return rowNum
59 # NB: this function is not combined with getLineValue() because
60 # the None needs to checked for, as dereferencing the table with
61 # t[colName][None] returns column itself
62 return None
65def getLineValue(table, lineName, columnName, nanForMissingValues=True):
66 """Surely there's a better/quicker way to do this"""
68 rowNum = _getRowNum(table, lineName)
69 if not rowNum:
70 if nanForMissingValues:
71 return np.nan
72 raise ValueError(f"Line {lineName} not found in table")
74 return table[columnName][rowNum]
77class NightStellarSpectra:
78 """Class for holding the results for a night's observations of a given star
79 """
81 def __init__(self, butler, dayObs, targetName, *, instrument="LATISS", ignoreSeqNums=[],
82 collections=None):
83 self.dayObs = int(dayObs)
84 self.targetName = targetName
86 if isinstance(butler, dafButler.Butler):
87 self.butler = dafButler.Butler(butler=butler, instrument=instrument, collections=collections)
88 else:
89 self.butler = dafButler.Butler(butler, instrument=instrument, collections=collections)
90 self._loadExtractions(ignoreSeqNums)
91 # xxx maybe just load everything and then call removeSeqNums()?
93 def isDispersed(self, seqNum):
94 """Check if this observation is dispersed."""
95 dataId = {"day_obs": self.dayObs, "seq_num": seqNum}
96 return isDispersedDataId(dataId, self.butler)
98 def _readOneExtractionFile(self, seqNum):
99 datasetType = "extraction"
100 try:
101 return self.butler.get(datasetType,
102 seq_num=seqNum,
103 day_obs=self.dayObs)
104 except LookupError:
105 return None
107 def _loadExtractions(self, ignoreSeqNums):
108 # Get all the observations for the night.
109 where = "exposure.day_obs = dayObs and exposure.target_name = targetName"
110 records = self.butler.registry.queryDimensionRecords("exposure",
111 where=where,
112 bind={"dayObs": self.dayObs,
113 "targetName": self.targetName},
114 )
115 allSeqNums = [r.seq_num for r in records]
116 self.seqNums = []
117 nIgnored = 0
119 for seqNum in allSeqNums:
120 if self.isDispersed(seqNum):
121 if seqNum not in ignoreSeqNums:
122 self.seqNums.append(seqNum)
123 else:
124 nIgnored += 1
126 msg = (f"Found {len(self.seqNums)+nIgnored} dispersed observations of "
127 + f"{self.targetName} on {self.dayObs} in registry")
128 if nIgnored:
129 msg += f" of which {nIgnored} were skipped."
130 print(msg)
132 # Force sorted order and remove any potential duplicates.
133 self.seqNums = sorted(set(self.seqNums))
135 self.data = {}
136 successes = []
137 for seqNum in self.seqNums:
138 linesTable = self._readOneExtractionFile(seqNum)
139 if linesTable:
140 self.data[seqNum] = linesTable
141 successes.append(seqNum)
142 self.seqNums = successes
143 print(f"Loaded extractions for {len(self.data.keys())} of the above")
144 return
146 def removeSeqNums(self, seqNums):
147 for seqNum in seqNums:
148 if seqNum in self.seqNums:
149 assert seqNum in self.data.keys()
150 self.seqNums.remove(seqNum)
151 self.data.pop(seqNum)
153 def _getExposureRecords(self):
154 """Retrieve the exposure records for the relevant exposures.
156 Returns
157 -------
158 records : `dict` [`int`, `DimensionRecord`]
159 The matching records, indexed by sequence number.
160 """
161 where = "exposure.day_obs = dayObs"
162 records = self.butler.registry.queryDimensionRecords("exposure",
163 where=where,
164 bind={"dayObs": self.dayObs},
165 ).order_by("seq_num")
166 seqNums = set(self.seqNums) # Use set for faster lookup.
168 # Store in dict indexed by sequence number. This allows the caller
169 # to choose their own ordering and removes any potential duplicates.
170 return {r.seq_num: r for r in records if r.seq_num in seqNums}
172 def getFilterDisperserSet(self):
173 # Doing a query per seqNum is going to be slow, so query for the
174 # whole night and filter.
175 records = self._getExposureRecords()
176 fullFilters = {records[seq_num].physical_filter for seq_num in self.seqNums}
177 return fullFilters
179 def getFilterSet(self):
180 fullFilters = self.getFilterDisperserSet()
181 return {filt.split(FILTER_DELIMITER)[0] for filt in fullFilters}
183 def getDisperserSet(self):
184 fullFilters = self.getFilterDisperserSet()
185 return {filt.split(FILTER_DELIMITER)[1] for filt in fullFilters}
187 def getLineValue(self, seqNum, lineName, columnName):
188 # just convenient to be able to call this on a class instance as well
189 # as the free floating function - nothing deep happening here
190 return getLineValue(self.data[seqNum], lineName, columnName)
192 def getLineValues(self, lineName, columnName):
193 return [getLineValue(self.data[seqNum], lineName, columnName) if seqNum in self.data else np.nan
194 for seqNum in self.seqNums]
196 def getAllTableLines(self, includeIntermittentLines=True):
197 lineSets = []
198 for seqNum, table in self.data.items():
199 lineSets.append(set(table.columns[LINE_NAME_COLUMN].data))
201 if includeIntermittentLines:
202 ret = set.union(*lineSets)
203 else:
204 ret = set.intersection(*lineSets)
206 # we get byte strings back, so translate, and also lookup in
207 # line name dictionary for convenience
208 lines = [text.decode('utf-8') for text in ret]
209 lineNames = [LINE_NAMES_REVERSED[line] for line in lines]
210 return lineNames
212 def getObsTimes(self):
213 # TODO: Add option to subtract int part and multiply up
214 # to make it a human-readable small number
215 records = self._getExposureRecords()
216 dates = [records[seq_num].timespan.begin for seq_num in self.seqNums]
217 return [d.mjd for d in dates]
219 def getAirmasses(self):
220 records = self._getExposureRecords()
221 zenith_angles = [records[seq_num].zenith_angle for seq_num in self.seqNums]
222 angles = [Angle(za, unit="deg") for za in zenith_angles]
223 return [1/np.cos(za.radian) for za in angles]
225 def printObservationTable(self):
226 records = self._getExposureRecords()
227 for seq_num in self.seqNums:
228 r = records[seq_num]
229 print(r.seq_num, r.physical_filter)