Coverage for tests / test_groups.py: 23%

101 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-17 08:50 +0000

1# This file is part of astro_metadata_translator. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the LICENSE file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# Use of this source code is governed by a 3-clause BSD-style 

10# license that can be found in the LICENSE file. 

11 

12import math 

13import os.path 

14import unittest 

15from collections.abc import MutableMapping, Sequence 

16from typing import Any 

17 

18from pydantic import BaseModel, ConfigDict 

19 

20from astro_metadata_translator import ObservationGroup, ObservationInfo 

21from astro_metadata_translator.serialize import group_to_fits, info_to_fits 

22from astro_metadata_translator.tests import read_test_file 

23 

24TESTDIR = os.path.abspath(os.path.dirname(__file__)) 

25 

26 

27class ModelWithObsGroup(BaseModel): 

28 """Pydantic model that contains an ObservationGroup.""" 

29 

30 model_config = ConfigDict( 

31 ser_json_inf_nan="constants", # Pydantic does not preserve NaN support from child models. 

32 ) 

33 

34 obs_group: ObservationGroup 

35 number: int 

36 

37 

38class ObservationGroupTestCase(unittest.TestCase): 

39 """Test grouping of observations.""" 

40 

41 datadir = os.path.join(TESTDIR, "data") 

42 

43 def setUp(self) -> None: 

44 self.decam_files = ( 

45 "fitsheader-decam.yaml", 

46 "fitsheader-decam-0160496.yaml", 

47 "fitsheader-decam-calexp-0412037_10.yaml", 

48 ) 

49 self.hsc_files = ("fitsheader-hsc-HSCA04090107.yaml", "fitsheader-hsc.yaml") 

50 

51 def _files_to_headers(self, files: Sequence[str]) -> Sequence[MutableMapping[str, Any]]: 

52 return [read_test_file(os.path.join(self.datadir, f)) for f in files] 

53 

54 def test_groups(self) -> None: 

55 headers = self._files_to_headers(self.decam_files) 

56 

57 obs_group = ObservationGroup(headers) 

58 self.assertEqual(len(obs_group), 3) 

59 self.assertEqual( 

60 str(obs_group), 

61 "[(DECam, 2013-09-01T06:02:55.754), (DECam, 2012-12-11T22:06:32.859)," 

62 " (DECam, 2015-02-20T00:47:21.127)]", 

63 ) 

64 

65 # Ensure that pedantic conversion is allowed. 

66 obs_group = ObservationGroup(headers, pedantic=True) 

67 

68 sorted_group = ObservationGroup(sorted(obs_group)) 

69 self.assertIsInstance(sorted_group, ObservationGroup) 

70 self.assertEqual(len(sorted_group), 3) 

71 self.assertEqual(sorted_group[0], obs_group[1]) 

72 

73 self.assertNotEqual(obs_group, sorted_group) 

74 obs_group.sort() 

75 self.assertEqual(obs_group, sorted_group) 

76 obs_group.reverse() 

77 self.assertEqual(obs_group[0], sorted_group[-1]) 

78 

79 newest = obs_group.newest() 

80 oldest = obs_group.oldest() 

81 self.assertEqual(newest, sorted_group[-1]) 

82 self.assertEqual(oldest, sorted_group[0]) 

83 

84 self.assertLess(oldest, newest) 

85 self.assertGreater(newest, oldest) 

86 

87 self.assertNotEqual(oldest, obs_group) 

88 self.assertNotEqual(obs_group, oldest) 

89 

90 # Add some headers and check that sorting still works 

91 obs_group.extend(self._files_to_headers(self.hsc_files)) 

92 self.assertEqual(len(obs_group), 5) 

93 self.assertEqual(obs_group.newest(), obs_group[3]) 

94 

95 instruments = obs_group.property_values("instrument") 

96 self.assertEqual(instruments, {"HSC", "DECam"}) 

97 

98 # Check that simplified form round trips 

99 self.assertEqual(ObservationGroup.from_simple(obs_group.to_simple()), obs_group) 

100 

101 # Delete some entries. 

102 first = obs_group.pop() 

103 self.assertIsInstance(first, ObservationInfo) 

104 self.assertEqual(len(obs_group), 4) 

105 

106 # Override an entry with type coercion. 

107 obs_group[0] = headers[-1] 

108 self.assertIsInstance(obs_group[0], ObservationInfo) 

109 

110 with self.assertRaises(ValueError): 

111 # None is not allowed here. 

112 ObservationGroup([None, *headers]) 

113 

114 with self.assertRaises(ValueError): 

115 # Passing in a bad translator class. 

116 ObservationGroup(headers, translator_class=[]) 

117 

118 def test_sort_key(self) -> None: 

119 """Sorting with keys.""" 

120 headers = self._files_to_headers(self.decam_files) 

121 

122 obs_group = ObservationGroup(headers) 

123 

124 # Force sorting. 

125 obs_group.sort() 

126 self.assertIsNotNone(obs_group._sorted) 

127 

128 def by_airmass(value: ObservationInfo) -> float: 

129 am = value.boresight_airmass 

130 if am is None: 

131 return 0.0 

132 return am 

133 

134 obs_group.sort(key=by_airmass, reverse=True) 

135 self.assertEqual(obs_group[0].boresight_airmass, 1.67) 

136 

137 def test_fits_group(self) -> None: 

138 headers = self._files_to_headers(self.decam_files) 

139 

140 obs_group = ObservationGroup(headers) 

141 cards, comments = group_to_fits(obs_group) 

142 

143 expected = { 

144 "INSTRUME": "DECam", 

145 "TIMESYS": "TAI", 

146 "DATE-OBS": "2012-12-11T22:07:07.859", 

147 "MJD-OBS": 56272.92161874134, 

148 "DATE-BEG": "2012-12-11T22:07:07.859", 

149 "MJD-BEG": 56272.92161874134, 

150 "DATE-END": "2015-02-20T00:50:11.000", 

151 "MJD-END": 57073.034849537034, 

152 "DATE-AVG": "2014-01-15T23:28:39.430", 

153 "MJD-AVG": 56672.97823413919, 

154 } 

155 self.assertEqual(cards, expected) 

156 

157 def test_fits_info(self) -> None: 

158 header = self._files_to_headers(self.decam_files)[0] 

159 obs_group = ObservationGroup([header]) 

160 cards, comments = info_to_fits(obs_group[0]) 

161 

162 expected = { 

163 "INSTRUME": "DECam", 

164 "TIMESYS": "TAI", 

165 "MJD-AVG": 56536.25417681625, 

166 "MJD-END": 56536.25591435185, 

167 "MJD-OBS": 56536.25243928065, 

168 "MJD-BEG": 56536.25243928065, 

169 "DATE-OBS": "2013-09-01T06:03:30.754", 

170 "DATE-BEG": "2013-09-01T06:03:30.754", 

171 "DATE-AVG": "2013-09-01T06:06:00.877", 

172 "DATE-END": "2013-09-01T06:08:31.000", 

173 } 

174 self.assertEqual(cards, expected) 

175 

176 def test_pydantic_roundtrip(self) -> None: 

177 """Test that ObservationGroup can round trip using Pydantic APIs.""" 

178 headers = self._files_to_headers(self.decam_files) 

179 

180 obsinfo = ObservationInfo(detector_name="detA", boresight_airmass=math.nan) 

181 headers.append(obsinfo) 

182 

183 obs_group = ObservationGroup(headers) 

184 j_str = obs_group.model_dump_json() 

185 new_group = ObservationGroup.model_validate_json(j_str) 

186 self.assertEqual(new_group, obs_group) 

187 self.assertTrue(math.isnan(new_group[-1].boresight_airmass)) 

188 

189 # Now try a model containing a group. 

190 test_model = ModelWithObsGroup(obs_group=new_group, number=42) 

191 j_str = test_model.model_dump_json() 

192 new_model = ModelWithObsGroup.model_validate_json(j_str) 

193 self.assertEqual(new_model, test_model) 

194 self.assertTrue(math.isnan(new_model.obs_group[-1].boresight_airmass)) 

195 

196 def test_iteration_semantics(self) -> None: 

197 """Test that iterating the group yields ObservationInfo members.""" 

198 headers = self._files_to_headers(self.decam_files) 

199 obs_group = ObservationGroup(headers) 

200 self.assertEqual(list(obs_group), obs_group._members) 

201 

202 

203if __name__ == "__main__": 

204 unittest.main()