Coverage for tests / test_efdUtils.py: 30%

144 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-05 19:02 +0000

1# This file is part of summit_utils. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21 

22"""Test cases for utils.""" 

23 

24import asyncio 

25import datetime 

26import unittest 

27 

28import astropy 

29import pandas as pd 

30from astropy.time import Time 

31from utils import getVcr 

32 

33import lsst.utils.tests 

34from lsst.summit.utils.dateTime import ( 

35 astropyToEfdTimestamp, 

36 efdTimestampToAstropy, 

37 getDayObsEndTime, 

38 getDayObsStartTime, 

39) 

40from lsst.summit.utils.efdUtils import ( 

41 clipDataToEvent, 

42 getEfdData, 

43 getMostRecentRowWithDataBefore, 

44 getTopics, 

45 makeEfdClient, 

46) 

47from lsst.summit.utils.tmaUtils import TMAEvent, TMAState 

48 

49HAS_EFD_CLIENT = True 

50try: 

51 import lsst_efd_client 

52except ImportError: 

53 HAS_EFD_CLIENT = False 

54 

55vcr = getVcr() 

56 

57 

58@unittest.skipIf(not HAS_EFD_CLIENT, "No EFD client available") 

59@vcr.use_cassette() 

60class EfdUtilsTestCase(lsst.utils.tests.TestCase): 

61 @classmethod 

62 @vcr.use_cassette() 

63 def setUpClass(cls): 

64 try: 

65 cls.client = makeEfdClient(testing=True) 

66 except RuntimeError: 

67 raise unittest.SkipTest("Could not instantiate an EFD client") 

68 cls.dayObs = 20230531 

69 # get a sample expRecord here to test expRecordToTimespan 

70 cls.axisTopic = "lsst.sal.MTMount.logevent_azimuthMotionState" 

71 cls.timeSeriesTopic = "lsst.sal.MTMount.azimuth" 

72 cls.event = TMAEvent( 

73 dayObs=20230531, 

74 seqNum=27, 

75 type=TMAState.TRACKING, 

76 endReason=TMAState.SLEWING, 

77 duration=0.47125244140625, 

78 begin=Time(1685578353.2265284, scale="utc", format="unix"), 

79 end=Time(1685578353.6977808, scale="utc", format="unix"), 

80 blockInfos=None, 

81 version=0, 

82 _startRow=254, 

83 _endRow=255, 

84 ) 

85 

86 @vcr.use_cassette() 

87 def tearDown(self): 

88 loop = asyncio.get_event_loop() 

89 if self.client.influx_client is not None: 

90 loop.run_until_complete(self.client.influx_client.close()) 

91 

92 @vcr.use_cassette() 

93 def test_makeEfdClient(self): 

94 self.assertIsInstance(self.client, lsst_efd_client.efd_helper.EfdClient) 

95 

96 @vcr.use_cassette() 

97 def test_getTopics(self): 

98 topics = getTopics(self.client, "lsst.sal.MTMount*") 

99 self.assertIsInstance(topics, list) 

100 self.assertGreater(len(topics), 0) 

101 

102 topics = getTopics(self.client, "*fake.topics.does.not.exist*") 

103 self.assertIsInstance(topics, list) 

104 self.assertEqual(len(topics), 0) 

105 

106 # check we can find the mount with a preceding wildcard 

107 topics = getTopics(self.client, "*mTmoUnt*") 

108 self.assertIsInstance(topics, list) 

109 self.assertGreater(len(topics), 0) 

110 

111 # check it fails if we don't allow case insensitivity 

112 topics = getTopics(self.client, "*mTmoUnt*", caseSensitive=True) 

113 self.assertIsInstance(topics, list) 

114 self.assertEqual(len(topics), 0) 

115 

116 @vcr.use_cassette() 

117 def test_getEfdData(self): 

118 dayStart = getDayObsStartTime(self.dayObs) 

119 dayEnd = getDayObsEndTime(self.dayObs) 

120 oneDay = datetime.timedelta(hours=24) 

121 # twelveHours = datetime.timedelta(hours=12) 

122 

123 # test the dayObs interface 

124 dayObsData = getEfdData(self.client, self.axisTopic, dayObs=self.dayObs) 

125 self.assertIsInstance(dayObsData, pd.DataFrame) 

126 

127 # test the starttime interface 

128 dayStartData = getEfdData(self.client, self.axisTopic, begin=dayStart, timespan=oneDay) 

129 self.assertIsInstance(dayStartData, pd.DataFrame) 

130 

131 # check they're equal 

132 self.assertTrue(dayObsData.equals(dayStartData)) 

133 

134 # test the starttime interface with an endtime 

135 dayEnd = getDayObsEndTime(self.dayObs) 

136 dayStartEndData = getEfdData(self.client, self.axisTopic, begin=dayStart, end=dayEnd) 

137 self.assertTrue(dayObsData.equals(dayStartEndData)) 

138 

139 # test event 

140 # note that here we're going to clip to an event and pad things, so 

141 # we want to use the timeSeriesTopic not the states, so that there's 

142 # plenty of rows to test the padding is actually working 

143 eventData = getEfdData(self.client, self.timeSeriesTopic, event=self.event) 

144 self.assertIsInstance(dayObsData, pd.DataFrame) 

145 

146 # test padding options 

147 padded = getEfdData(self.client, self.timeSeriesTopic, event=self.event, prePadding=1, postPadding=2) 

148 self.assertGreater(len(padded), len(eventData)) 

149 startTimeDiff = efdTimestampToAstropy(eventData.iloc[0]["private_efdStamp"]) - efdTimestampToAstropy( 

150 padded.iloc[0]["private_efdStamp"] 

151 ) 

152 endTimeDiff = efdTimestampToAstropy(padded.iloc[-1]["private_efdStamp"]) - efdTimestampToAstropy( 

153 eventData.iloc[-1]["private_efdStamp"] 

154 ) 

155 

156 self.assertGreater(startTimeDiff.sec, 0) 

157 self.assertLess(startTimeDiff.sec, 1.1) # padding isn't super exact, so give a little wiggle room 

158 self.assertGreater(endTimeDiff.sec, 0) 

159 self.assertLess(endTimeDiff.sec, 2.1) # padding isn't super exact, so give a little wiggle room 

160 

161 with self.assertRaises(ValueError): 

162 # not enough info to constrain 

163 _ = getEfdData(self.client, self.axisTopic) 

164 # dayObs supplied and a start time is not allowed 

165 _ = getEfdData(self.client, self.axisTopic, dayObs=self.dayObs, begin=dayStart) 

166 # dayObs supplied and a stop time is not allowed 

167 _ = getEfdData(self.client, self.axisTopic, dayObs=self.dayObs, end=dayEnd) 

168 # dayObs supplied and timespan is not allowed 

169 _ = getEfdData(self.client, self.axisTopic, dayObs=self.dayObs, timespan=oneDay) 

170 # being alone is not allowed 

171 _ = getEfdData(self.client, self.axisTopic, begin=self.dayObs) 

172 # good query, except the topic doesn't exist 

173 _ = getEfdData(self.client, "badTopic", begin=dayStart, end=dayEnd) 

174 

175 @vcr.use_cassette() 

176 def test_raiseIfTopicNotInSchema(self): 

177 dayStart = getDayObsStartTime(self.dayObs) 

178 dayEnd = getDayObsEndTime(self.dayObs) 

179 

180 badTopic = "lsst.sal.nonExistentTopic" 

181 # test this does not raise 

182 _ = getEfdData(self.client, badTopic, begin=dayStart, end=dayEnd, raiseIfTopicNotInSchema=False) 

183 

184 with self.assertRaises(ValueError): 

185 # test this does raise, as raiseIfTopicNotInSchema defaults to True 

186 _ = getEfdData(self.client, badTopic, begin=dayStart, end=dayEnd) 

187 

188 @vcr.use_cassette() 

189 def test_getMostRecentRowWithDataBefore(self): 

190 time = Time(1687845854.736784, scale="utc", format="unix") 

191 rowData = getMostRecentRowWithDataBefore( 

192 self.client, "lsst.sal.MTM1M3.logevent_forceActuatorState", time 

193 ) 

194 self.assertIsInstance(rowData, pd.Series) 

195 

196 stateTime = efdTimestampToAstropy(rowData["private_efdStamp"]) 

197 self.assertLess(stateTime, time) 

198 

199 def test_efdTimestampToAstropy(self): 

200 time = efdTimestampToAstropy(1687845854.736784) 

201 self.assertIsInstance(time, astropy.time.Time) 

202 return 

203 

204 def test_astropyToEfdTimestamp(self): 

205 time = Time(1687845854.736784, scale="utc", format="unix") 

206 efdTimestamp = astropyToEfdTimestamp(time) 

207 self.assertIsInstance(efdTimestamp, float) 

208 return 

209 

210 @vcr.use_cassette() 

211 def test_clipDataToEvent(self): 

212 # get 10 mins of data either side of the event we'll clip to 

213 duration = datetime.timedelta(seconds=10 * 60) 

214 queryBegin = self.event.begin - duration 

215 queryEnd = self.event.end + duration 

216 dayObsData = getEfdData(self.client, "lsst.sal.MTMount.azimuth", begin=queryBegin, end=queryEnd) 

217 

218 # clip the data, and check it's shorter, non-zero, and falls in the 

219 # right time range 

220 clippedData = clipDataToEvent(dayObsData, self.event) 

221 

222 self.assertIsInstance(clippedData, pd.DataFrame) 

223 self.assertGreater(len(clippedData), 0) 

224 self.assertLess(len(clippedData), len(dayObsData)) 

225 

226 dataStart = efdTimestampToAstropy(clippedData.iloc[0]["private_efdStamp"]) 

227 dataEnd = efdTimestampToAstropy(clippedData.iloc[-1]["private_efdStamp"]) 

228 

229 self.assertGreaterEqual(dataStart, self.event.begin) 

230 self.assertLessEqual(dataEnd, self.event.end) 

231 

232 # test the pre/post padding options 

233 clippedPaddedData = clipDataToEvent(dayObsData, self.event, prePadding=1, postPadding=2) 

234 self.assertIsInstance(clippedPaddedData, pd.DataFrame) 

235 self.assertGreater(len(clippedPaddedData), 0) 

236 self.assertLess(len(clippedPaddedData), len(dayObsData)) 

237 self.assertGreater(len(clippedPaddedData), len(clippedData)) 

238 

239 paddedDataStart = efdTimestampToAstropy(clippedPaddedData.iloc[0]["private_efdStamp"]) 

240 paddedDataEnd = efdTimestampToAstropy(clippedPaddedData.iloc[-1]["private_efdStamp"]) 

241 self.assertLessEqual(paddedDataStart, dataStart) 

242 self.assertGreaterEqual(paddedDataEnd, dataEnd) 

243 

244 # Get the minimum and maximum timestamps before padding 

245 startTimeUnpadded = clippedData["private_efdStamp"].min() 

246 endTimeUnpadded = clippedData["private_efdStamp"].max() 

247 

248 # Get the minimum and maximum timestamps after padding 

249 startTimePadded = clippedPaddedData["private_efdStamp"].min() 

250 endTimePadded = clippedPaddedData["private_efdStamp"].max() 

251 

252 # Check that the difference between the min times and max times is 

253 # approximately equal to the padding. Not exact as data sampling is 

254 # not infinite. 

255 self.assertAlmostEqual(startTimeUnpadded - startTimePadded, 1, delta=0.1) 

256 self.assertAlmostEqual(endTimePadded - endTimeUnpadded, 2, delta=0.1) 

257 return 

258 

259 

260class TestMemory(lsst.utils.tests.MemoryTestCase): 

261 pass 

262 

263 

264def setup_module(module): 

265 lsst.utils.tests.init() 

266 

267 

268if __name__ == "__main__": 268 ↛ 269line 268 didn't jump to line 269 because the condition on line 268 was never true

269 lsst.utils.tests.init() 

270 unittest.main()