Coverage for tests / test_utils.py: 28%

75 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-06 08:33 +0000

1# This file is part of obs_base. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21 

22import unittest 

23from datetime import datetime 

24 

25import astropy.table 

26import numpy as np 

27 

28import lsst.geom as geom 

29import lsst.obs.base as obsBase 

30import lsst.utils.tests 

31from lsst.daf.base import PropertyList 

32from lsst.obs.base.utils import TableVStack, _store_str_header 

33from lsst.pipe.base._dataset_handle import InMemoryDatasetHandle 

34 

35 

36class BboxFromIrafTestCase(lsst.utils.tests.TestCase): 

37 """Demonstrate that we can correctly parse IRAF-style BBOXes.""" 

38 

39 def testValid(self): 

40 test_data = { 

41 "[1:1084,1:1024]": geom.Box2I(geom.PointI(0, 0), geom.PointI(1083, 1023)), 

42 "[0:0,0:0]": geom.Box2I(geom.PointI(-1, -1), geom.PointI(-1, -1)), 

43 } 

44 for val, result in test_data.items(): 

45 self.assertEqual(obsBase.bboxFromIraf(val), result) 

46 

47 def testInvalid(self): 

48 test_data = { 

49 "1:1084,1:1024": RuntimeError, 

50 "(1:1084,1:1024)": RuntimeError, 

51 ("1:1084", "1:1024"): TypeError, 

52 } 

53 for val, err in test_data.items(): 

54 self.assertRaises(err, obsBase.bboxFromIraf, val) 

55 

56 

57class TestProvenanceAdd(unittest.TestCase): 

58 """Tests relating to provenance infrastructure.""" 

59 

60 def test_truncation(self): 

61 """Test that long headers can be truncated.""" 

62 pl = PropertyList() 

63 

64 _store_str_header(pl, "LSST BUTLER RUN", "short") 

65 self.assertEqual(pl["LSST BUTLER RUN"], "short") 

66 

67 _store_str_header(pl, "LSST BUTLER RUN", "short", allow_long_headers=False) 

68 self.assertEqual(pl["LSST BUTLER RUN"], "short") 

69 

70 long = "a123456789b123456789c123456789d123456789e123456789f123456789" 

71 _store_str_header(pl, "LSST BUTLER INPUT 0 RUN", long) 

72 self.assertEqual(pl["LSST BUTLER INPUT 0 RUN"], long) 

73 

74 _store_str_header(pl, "LSST BUTLER INPUT 1 RUN", long, allow_long_headers=False) 

75 self.assertEqual(pl["LSST BUTLER INPUT 1 RUN"], "a123456789b123456789...e123456789f123456789") 

76 

77 key = "LSSTX BUTLER VERY LONG KEYWORD THAT IS AT THE LIMIT OF ALL" 

78 _store_str_header(pl, key, "abc", allow_long_headers=False) 

79 self.assertEqual(pl[key], "abc") 

80 _store_str_header(pl, key, "abcdefghi", allow_long_headers=False) 

81 self.assertEqual(pl[key], "ab...ghi") 

82 

83 with self.assertRaises(ValueError): 

84 _store_str_header(pl, key + "0", "abcdef", allow_long_headers=False) 

85 

86 

87class TestCalibDates(unittest.TestCase): 

88 """Tests relating to curated calibration dates.""" 

89 

90 def test_calib_dates(self): 

91 """Test file root creation and parsing.""" 

92 for valid_start, file_root, standard_iso in ( 

93 ("2025-04-30T12:25", "20250430T122500", "2025-04-30T12:25:00"), 

94 ("2025-04-30 12:25:50", "20250430T122550", "2025-04-30T12:25:50"), 

95 ("2025-04-30 12:25:50.123", "20250430T122550", "2025-04-30T12:25:50"), 

96 ("1970-01-01", "19700101T000000", "1970-01-01T00:00:00"), 

97 ("20100501T1223", "20100501T122300", "2010-05-01T12:23:00"), 

98 ): 

99 derived = obsBase.iso_date_to_curated_calib_file_root(valid_start) 

100 self.assertEqual(derived, file_root) 

101 self.assertEqual(datetime.fromisoformat(derived).isoformat(), standard_iso) 

102 # We know that fromisoformat is used internally by read_one_calib 

103 # so have explicit test here to make sure the date formats we 

104 # expect will work. 

105 self.assertEqual(datetime.fromisoformat(valid_start).isoformat(timespec="seconds"), standard_iso) 

106 

107 

108class TableVStackTestCase(unittest.TestCase): 

109 """Unit tests for lsst.pipe.base.utils.TableVStack.""" 

110 

111 def setUp(self): 

112 self.len_table1 = 5 

113 self.len_table2 = 6 

114 self.table1 = astropy.table.Table( 

115 { 

116 "x": np.arange(self.len_table1), 

117 "y": np.ma.masked_array( 

118 np.arange(1.0, 1.0 + self.len_table1), mask=np.arange(self.len_table1) < 3 

119 ), 

120 } 

121 ) 

122 self.table2 = astropy.table.Table( 

123 { 

124 "x": np.arange(self.len_table2), 

125 "y": np.ma.masked_array( 

126 np.arange(-2.0, -2.0 + self.len_table2), 

127 mask=(np.arange(self.len_table2) % 2) == 0, 

128 ), 

129 } 

130 ) 

131 self.extra_values1 = {"z": -self.table1["x"]} 

132 self.extra_values2 = {"z": -self.table2["x"]} 

133 

134 def test_vstack(self): 

135 handles = ( 

136 InMemoryDatasetHandle(self.table1, storageClass="ArrowAstropy"), 

137 InMemoryDatasetHandle(self.table2, storageClass="ArrowAstropy"), 

138 ) 

139 u_stack = TableVStack.vstack_handles( 

140 handles=handles, 

141 extra_values={ 

142 idx: extra_values for idx, extra_values in enumerate((self.extra_values1, self.extra_values2)) 

143 }, 

144 ) 

145 ap_stack = astropy.table.vstack((self.table1, self.table2)) 

146 self.assertEqual(len(u_stack), self.len_table1 + self.len_table2) 

147 self.assertEqual(len(u_stack), len(ap_stack)) 

148 

149 ap_stack["z"] = np.concatenate((self.extra_values1["z"], self.extra_values2["z"])) 

150 self.assertEqual(u_stack.colnames, ap_stack.colnames) 

151 for colname in u_stack.colnames: 

152 col_u = u_stack[colname] 

153 col_ap = ap_stack[colname] 

154 if (mask := getattr(col_u, "mask", None)) is not None: 

155 np.testing.assert_array_equal(mask, col_ap.mask) 

156 col_u = col_u.data 

157 col_ap = col_ap.data 

158 np.testing.assert_array_equal(col_u, col_ap) 

159 

160 

161class MemoryTester(lsst.utils.tests.MemoryTestCase): 

162 """Test for file leaks.""" 

163 

164 

165def setup_module(module): 

166 """Initialize pytest.""" 

167 lsst.utils.tests.init() 

168 

169 

170if __name__ == "__main__": 170 ↛ 171line 170 didn't jump to line 171 because the condition on line 170 was never true

171 lsst.utils.tests.init() 

172 unittest.main()