Coverage for tests / test_utils.py: 20%

235 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-21 11:03 +0000

1# This file is part of summit_utils. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21 

22"""Test cases for utils.""" 

23 

24import copy 

25import datetime 

26import itertools 

27import unittest 

28from typing import Iterable 

29 

30import astropy.time 

31import astropy.units as u 

32import numpy as np 

33from astro_metadata_translator import makeObservationInfo 

34from astropy.coordinates import HeliocentricEclipticIAU76, SkyCoord 

35from astropy.time import Time 

36 

37import lsst.afw.detection as afwDetect 

38import lsst.afw.geom as afwGeom 

39import lsst.afw.image as afwImage 

40import lsst.geom as geom 

41import lsst.utils.tests 

42from lsst.obs.base import createInitialSkyWcsFromBoresight 

43from lsst.obs.base.makeRawVisitInfoViaObsInfo import MakeRawVisitInfoViaObsInfo 

44from lsst.obs.lsst import Latiss 

45from lsst.obs.lsst.translators.latiss import AUXTEL_LOCATION 

46from lsst.summit.utils.dateTime import getCurrentDayObsDatetime, getCurrentDayObsHumanStr, getCurrentDayObsInt 

47from lsst.summit.utils.utils import getFilterSeeingCorrection # deprecated 

48from lsst.summit.utils.utils import ( 

49 calcEclipticCoords, 

50 computeCcdExposureId, 

51 computeExposureId, 

52 fluxesFromFootprints, 

53 getAirmassSeeingCorrection, 

54 getBandpassSeeingCorrection, 

55 getExpPositionOffset, 

56 getFieldNameAndTileNumber, 

57 getQuantiles, 

58 getSunAngle, 

59 quickSmooth, 

60) 

61 

62 

63class ExpSkyPositionOffsetTestCase(lsst.utils.tests.TestCase): 

64 """A test case for testing sky position offsets for exposures.""" 

65 

66 def setUp(self): 

67 camera = Latiss.getCamera() 

68 self.assertTrue(len(camera) == 1) 

69 self.detector = camera[0] 

70 

71 self.viMaker = MakeRawVisitInfoViaObsInfo() 

72 self.mi = afwImage.MaskedImageF(0, 0) 

73 self.baseHeader = dict( 

74 boresight_airmass=1.5, 

75 temperature=15 * u.deg_C, 

76 observation_type="science", 

77 exposure_time=5 * u.ks, 

78 detector_num=32, 

79 location=AUXTEL_LOCATION, 

80 ) 

81 

82 def test_getExpPositionOffset(self): 

83 epsilon = 0.0001 

84 ra1s = [0, 45, 90] 

85 ra2s = copy.copy(ra1s) 

86 ra2s.extend([r + epsilon for r in ra1s]) 

87 ra1s = np.deg2rad(ra1s) 

88 ra2s = np.deg2rad(ra2s) 

89 

90 epsilon = 0.0001 

91 dec1s = [0, 45, 90] 

92 dec2s = copy.copy(dec1s) 

93 dec2s.extend([d + epsilon for d in dec1s[:-1]]) # skip last point as >90 not allowed for dec 

94 

95 rotAngle1 = geom.Angle(43.2, geom.degrees) # arbitrary non-zero 

96 rotAngle2 = geom.Angle(56.7, geom.degrees) 

97 

98 t1 = astropy.time.Time("2021-09-15T12:00:00", format="isot", scale="utc") 

99 t2 = astropy.time.Time("2021-09-15T12:01:00", format="isot", scale="utc") 

100 expTime = astropy.time.TimeDelta(20, format="sec") 

101 

102 header1 = copy.copy(self.baseHeader) 

103 header2 = copy.copy(self.baseHeader) 

104 header1["datetime_begin"] = astropy.time.Time(t1, format="isot", scale="utc") 

105 header2["datetime_begin"] = astropy.time.Time(t2, format="isot", scale="utc") 

106 

107 header1["datetime_end"] = astropy.time.Time(t1 + expTime, format="isot", scale="utc") 

108 header2["datetime_end"] = astropy.time.Time(t2 + expTime, format="isot", scale="utc") 

109 

110 obsInfo1 = makeObservationInfo(**header1) 

111 obsInfo2 = makeObservationInfo(**header2) 

112 

113 vi1 = self.viMaker.observationInfo2visitInfo(obsInfo1) 

114 vi2 = self.viMaker.observationInfo2visitInfo(obsInfo2) 

115 expInfo1 = afwImage.ExposureInfo() 

116 expInfo1.setVisitInfo(vi1) 

117 expInfo2 = afwImage.ExposureInfo() 

118 expInfo2.setVisitInfo(vi2) 

119 

120 for ra1, dec1, ra2, dec2 in itertools.product(ra1s, dec1s, ra2s, dec2s): 

121 pos1 = geom.SpherePoint(ra1, dec1, geom.degrees) 

122 pos2 = geom.SpherePoint(ra2, dec2, geom.degrees) 

123 

124 wcs1 = createInitialSkyWcsFromBoresight(pos1, rotAngle1, self.detector, flipX=True) 

125 wcs2 = createInitialSkyWcsFromBoresight(pos2, rotAngle2, self.detector, flipX=True) 

126 

127 exp1 = afwImage.ExposureF(self.mi, expInfo1) 

128 exp2 = afwImage.ExposureF(self.mi, expInfo2) 

129 

130 exp1.setWcs(wcs1) 

131 exp2.setWcs(wcs2) 

132 

133 result = getExpPositionOffset(exp1, exp2) 

134 

135 deltaRa = ra1 - ra2 

136 deltaDec = dec1 - dec2 

137 

138 self.assertAlmostEqual(result.deltaRa.asDegrees(), deltaRa, 6) 

139 self.assertAlmostEqual(result.deltaDec.asDegrees(), deltaDec, 6) 

140 

141 

142class MiscUtilsTestCase(lsst.utils.tests.TestCase): 

143 def setUp(self) -> None: 

144 return super().setUp() 

145 

146 def test_getFieldNameAndTileNumber(self): 

147 field, num = getFieldNameAndTileNumber("simple") 

148 self.assertEqual(field, "simple") 

149 self.assertIsNone(num) 

150 

151 field, num = getFieldNameAndTileNumber("_simple") 

152 self.assertEqual(field, "_simple") 

153 self.assertIsNone(num) 

154 

155 field, num = getFieldNameAndTileNumber("simple_321") 

156 self.assertEqual(field, "simple") 

157 self.assertEqual(num, 321) 

158 

159 field, num = getFieldNameAndTileNumber("_simple_321") 

160 self.assertEqual(field, "_simple") 

161 self.assertEqual(num, 321) 

162 

163 field, num = getFieldNameAndTileNumber("test_321a_123") 

164 self.assertEqual(field, "test_321a") 

165 self.assertEqual(num, 123) 

166 

167 field, num = getFieldNameAndTileNumber("test_321a_123_") 

168 self.assertEqual(field, "test_321a_123_") 

169 self.assertIsNone(num) 

170 

171 field, num = getFieldNameAndTileNumber("test_321a_123a") 

172 self.assertEqual(field, "test_321a_123a") 

173 self.assertIsNone(num) 

174 

175 field, num = getFieldNameAndTileNumber("test_321a:asd_asd-dsa_321") 

176 self.assertEqual(field, "test_321a:asd_asd-dsa") 

177 self.assertEqual(num, 321) 

178 

179 def test_getAirmassSeeingCorrection(self): 

180 for airmass in (1.1, 2.0, 20.0): 

181 correction = getAirmassSeeingCorrection(airmass) 

182 self.assertGreater(correction, 0.01) 

183 self.assertLess(correction, 1.0) 

184 

185 correction = getAirmassSeeingCorrection(1) 

186 self.assertEqual(correction, 1.0) 

187 

188 with self.assertRaises(ValueError): 

189 getAirmassSeeingCorrection(0.5) 

190 

191 def test_getFilterSeeingCorrection(self): 

192 for filterName in ("SDSSg_65mm", "SDSSr_65mm", "SDSSi_65mm"): 

193 with self.assertWarns(FutureWarning): 

194 correction = getFilterSeeingCorrection(filterName) 

195 self.assertGreater(correction, 0.5) 

196 self.assertLess(correction, 1.5) 

197 

198 def test_getBandpassSeeingCorrection(self): 

199 for filterName in ( 

200 "SDSSg_65mm", 

201 "SDSSr_65mm", 

202 "SDSSi_65mm", 

203 "u_02", 

204 "g_01", 

205 "r_03", 

206 "i_06", 

207 "z_03", 

208 "y_04", 

209 ): 

210 correction = getBandpassSeeingCorrection(filterName) 

211 self.assertGreater(correction, 0.5) 

212 self.assertLess(correction, 1.5) 

213 

214 def test_quickSmooth(self): 

215 # just test that it runs and returns the right shape. It's a wrapper on 

216 # scipy.ndimage.gaussian_filter we can trust that it does what it 

217 # should, and we just test the interface hasn't bitrotted on either end 

218 data = np.zeros((100, 100), dtype=np.float32) 

219 data = quickSmooth(data, 5.0) 

220 self.assertEqual(data.shape, (100, 100)) 

221 

222 def test_getCurrentDayObsDatetime(self): 

223 """Just a type check and a basic sanity check on the range. 

224 

225 Setting days=3 as the tolerance just because of timezones and who knows 

226 what really. 

227 """ 

228 dt = getCurrentDayObsDatetime() 

229 self.assertIsInstance(dt, datetime.date) 

230 self.assertLess(dt, datetime.date.today() + datetime.timedelta(days=3)) 

231 self.assertGreater(dt, datetime.date.today() - datetime.timedelta(days=3)) 

232 

233 def test_getCurrentDayObsInt(self): 

234 """Just a type check and a basic sanity check on the range.""" 

235 dayObs = getCurrentDayObsInt() 

236 self.assertIsInstance(dayObs, int) 

237 self.assertLess(dayObs, 21000101) 

238 self.assertGreater(dayObs, 19700101) 

239 

240 def test_getCurrentDayObsHumanStr(self): 

241 """Just a basic formatting check.""" 

242 dateStr = getCurrentDayObsHumanStr() 

243 self.assertIsInstance(dateStr, str) 

244 self.assertEqual(len(dateStr), 10) 

245 self.assertRegex(dateStr, r"\d{4}-\d{2}-\d{2}") 

246 

247 def test_getSunAngle(self): 

248 """Just a basic sanity check on the range.""" 

249 testTime = Time("2021-09-15T16:00:00", format="isot", scale="utc") 

250 sunAngle = getSunAngle(testTime) 

251 self.assertIsInstance(sunAngle, float) 

252 self.assertLess(sunAngle, 90.01) 

253 self.assertGreater(sunAngle, -90.01) 

254 

255 sunAngle = getSunAngle() 

256 self.assertIsInstance(sunAngle, float) 

257 self.assertLess(sunAngle, 90.01) 

258 self.assertGreater(sunAngle, -90.01) 

259 

260 

261class QuantileTestCase(lsst.utils.tests.TestCase): 

262 def setUp(self) -> None: 

263 return super().setUp() 

264 

265 def test_quantiles(self): 

266 # We understand that our algorithm gives very large rounding error 

267 # compared to the generic numpy method. But still test it. 

268 np.random.seed(1234) 

269 dataRanges = [(50, 1, -1), (100_000, 5_000, -2), (5_000_000, 10_000, -2), (50_000, 100_000, -3)] 

270 colorRanges = [2, 256, 999] # [very few, nominal, lots and an odd number] 

271 for nColors, (mean, width, decimal) in itertools.product(colorRanges, dataRanges): 

272 data = np.random.normal(mean, width, (100, 100)) 

273 data[10, 10] = np.nan # check we're still nan-safe 

274 if np.nanmax(data) - np.nanmin(data) > 300_000: 

275 with self.assertLogs(level="WARNING") as cm: 

276 edges1 = getQuantiles(data, nColors) 

277 self.assertIn("Data range is very large", cm.output[0]) 

278 else: 

279 with self.assertNoLogs(level="WARNING") as cm: 

280 edges1 = getQuantiles(data, nColors) 

281 edges2 = np.nanquantile(data, np.linspace(0, 1, nColors + 1)) # must check with nanquantile 

282 np.testing.assert_almost_equal(edges1, edges2, decimal=decimal) 

283 

284 

285class ImageBasedTestCase(lsst.utils.tests.TestCase): 

286 def test_fluxFromFootprint(self): 

287 image = afwImage.Image( 

288 np.arange(8100, dtype=np.int32).reshape(90, 90), xy0=lsst.geom.Point2I(10, 12), dtype="I" 

289 ) 

290 

291 radius = 3 

292 spans = afwGeom.SpanSet.fromShape(radius, afwGeom.Stencil.CIRCLE, offset=(27, 30)) 

293 footprint1 = afwDetect.Footprint(spans) 

294 

295 # The extracted footprint should be the same as the product of the 

296 # spans and the overlapped bow with the image 

297 truth1 = spans.asArray() * image.array[15:22, 14:21] 

298 

299 radius = 3 

300 spans = afwGeom.SpanSet.fromShape(radius, afwGeom.Stencil.CIRCLE, offset=(44, 49)) 

301 footprint2 = afwDetect.Footprint(spans) 

302 truth2 = spans.asArray() * image.array[34:41, 31:38] 

303 

304 allFootprints = [footprint1, footprint2] 

305 footprintSet = afwDetect.FootprintSet(image.getBBox()) 

306 footprintSet.setFootprints(allFootprints) 

307 

308 # check it can accept a footprintSet, and single and iterables of 

309 # footprints 

310 with self.assertRaises(TypeError): 

311 fluxesFromFootprints(10, image) 

312 

313 with self.assertRaises(TypeError): 

314 fluxesFromFootprints([8, 6, 7, 5, 3, 0, 9], image) 

315 

316 # check the footPrintSet 

317 fluxes = fluxesFromFootprints(footprintSet, image) 

318 expectedLength = len(footprintSet.getFootprints()) 

319 self.assertEqual(len(fluxes), expectedLength) # always one flux per footprint 

320 self.assertIsInstance(fluxes, Iterable) 

321 self.assertAlmostEqual(fluxes[0], np.sum(truth1)) 

322 self.assertAlmostEqual(fluxes[1], np.sum(truth2)) 

323 

324 # check the list of footprints 

325 fluxes = fluxesFromFootprints(allFootprints, image) 

326 expectedLength = 2 

327 self.assertEqual(len(fluxes), expectedLength) # always one flux per footprint 

328 self.assertIsInstance(fluxes, Iterable) 

329 self.assertAlmostEqual(fluxes[0], np.sum(truth1)) 

330 self.assertAlmostEqual(fluxes[1], np.sum(truth2)) 

331 

332 # ensure that subtracting the image median from fluxes leave image 

333 # pixels untouched 

334 oldImageArray = copy.deepcopy(image.array) 

335 fluxes = fluxesFromFootprints(footprintSet, image, subtractImageMedian=True) 

336 np.testing.assert_array_equal(image.array, oldImageArray) 

337 

338 

339class IdTestCase(lsst.utils.tests.TestCase): 

340 def test_exposure_id(self): 

341 self.assertEqual(computeExposureId("latiss", "O", 20240402, 35), 2024040200035) 

342 self.assertEqual(computeExposureId("LATISS", "C", 20240402, 35), 2024040200035) 

343 self.assertEqual(computeExposureId("LSSTComCamSim", "S", 20240402, 35), 7024040200035) 

344 with self.assertRaises(ValueError): 

345 computeExposureId("bad_instrument", "O", 20240402, 35) 

346 

347 def test_ccdexposure_id(self): 

348 self.assertEqual(computeCcdExposureId("latiss", 2024040200035, 0), 5205 * (2**23) + 35 * 256 + 0) 

349 with self.assertRaises(ValueError): 

350 computeCcdExposureId("latiss", 20240402000035, 1) 

351 with self.assertRaises(ValueError): 

352 computeCcdExposureId("LsstComCam", 20240402000035, 9) 

353 with self.assertRaises(ValueError): 

354 computeCcdExposureId("LsstCam", 20240402000035, 205) 

355 self.assertEqual( 

356 computeCcdExposureId("LSSTComCamSim", 7024040200035, 2), 

357 5 * (2**37) + 5205 * (2**23) + 35 * 256 + 2, 

358 ) 

359 

360 def test_calcEclipticCoords(self): 

361 ras = [0, 1, 45, 90, 180, 270, 359.9] 

362 decs = [-90, -80, -45, -1, 0, 1, 45, 90] 

363 

364 for ra, dec in itertools.product(ras, decs): 

365 p = SkyCoord(ra=ra * u.deg, dec=dec * u.deg, distance=1 * u.au, frame="hcrs") 

366 eclCoords = p.transform_to(HeliocentricEclipticIAU76) 

367 _lambda, beta = calcEclipticCoords(ra, dec) 

368 self.assertAlmostEqual(eclCoords.lat.deg, beta, 6) 

369 self.assertAlmostEqual(eclCoords.lon.deg, _lambda, 6) 

370 

371 

372class TestMemory(lsst.utils.tests.MemoryTestCase): 

373 pass 

374 

375 

376def setup_module(module): 

377 lsst.utils.tests.init() 

378 

379 

380if __name__ == "__main__": 380 ↛ 381line 380 didn't jump to line 381 because the condition on line 380 was never true

381 lsst.utils.tests.init() 

382 unittest.main()