Coverage for tests / test_iteration.py: 28%

81 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-30 08:37 +0000

1# This file is part of utils. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21 

22import itertools 

23import unittest 

24 

25from lsst.utils.iteration import chunk_iterable, ensure_iterable, isplit, sequence_to_string 

26 

27 

28class IterationTestCase(unittest.TestCase): 

29 """Tests for `iterable` helper.""" 

30 

31 def testIterable(self): 

32 test_data = ( 

33 # Boolean indicates to test that we know it's 

34 # meant to be returned unchanged. 

35 (0, False), 

36 ("hello", False), 

37 ({1: 2, 3: 4}, False), 

38 ([0, 1, 2], True), 

39 (["hello", "world"], True), 

40 ({"a", "b", "c"}, True), 

41 ) 

42 

43 for data, is_iterable in test_data: 

44 iterator = ensure_iterable(data) 

45 if not is_iterable: 

46 # Turn into a list so we can check against the 

47 # expected result. 

48 data = [data] 

49 for original, from_iterable in itertools.zip_longest(data, iterator): 

50 self.assertEqual(original, from_iterable) 

51 

52 def testChunking(self): 

53 """Chunk iterables.""" 

54 simple = list(range(101)) 

55 out = [] 

56 n_chunks = 0 

57 for chunk in chunk_iterable(simple, chunk_size=10): 

58 out.extend(chunk) 

59 n_chunks += 1 

60 self.assertEqual(out, simple) 

61 self.assertEqual(n_chunks, 11) 

62 

63 test_dict = dict.fromkeys(range(101), 1) 

64 n_chunks = 0 

65 for chunk in chunk_iterable(test_dict, chunk_size=45): 

66 # Subtract 1 for each key in chunk 

67 for k in chunk: 

68 test_dict[k] -= 1 

69 n_chunks += 1 

70 # Should have matched every key 

71 self.assertEqual(sum(test_dict.values()), 0) 

72 self.assertEqual(n_chunks, 3) 

73 

74 def testIsplit(self): 

75 # Test compatibility with str.split 

76 seps = ("\n", " ", "d") 

77 input_str = "ab\ncd ef\n" 

78 

79 for sep in seps: 

80 for input in (input_str, input_str.encode()): 

81 test_sep = sep.encode() if isinstance(input, bytes) else sep 

82 isp = list(isplit(input, sep=test_sep)) 

83 ssp = input.split(test_sep) 

84 self.assertEqual(isp, ssp) 

85 

86 def test_empty_list(self): 

87 """Test that an empty list returns an empty string.""" 

88 self.assertEqual(sequence_to_string([]), "") 

89 

90 def test_single_element(self): 

91 """Test a list with a single element returns it as a string.""" 

92 self.assertEqual(sequence_to_string([5]), "5") 

93 

94 def test_consecutive_numbers(self): 

95 """Test a list of consecutive numbers with stride 1.""" 

96 self.assertEqual(sequence_to_string([1, 2, 3, 4, 5]), "1..5") 

97 

98 def test_non_consecutive_numbers(self): 

99 """Test a list of non-consecutive numbers.""" 

100 self.assertEqual(sequence_to_string([1, 3, 5, 7]), "1..7:2") 

101 

102 def test_mixed_sequences(self): 

103 """Test a list with both consecutive and non-consecutive numbers.""" 

104 self.assertEqual(sequence_to_string([1, 2, 3, 5, 7, 8, 9]), "1..3^5^7..9") 

105 

106 def test_stride_greater_than_one(self): 

107 """Test sequences where the stride is greater than one.""" 

108 self.assertEqual(sequence_to_string([2, 4, 6, 8]), "2..8:2") 

109 

110 def test_negative_numbers(self): 

111 """Test a list that includes negative numbers.""" 

112 self.assertEqual(sequence_to_string([-5, -4, -3, -1, 0, 1]), "-5..-3^-1..1") 

113 

114 def test_duplicate_numbers(self): 

115 """Test a list with duplicate numbers.""" 

116 self.assertEqual(sequence_to_string([1, 2, 2, 3, 4]), "1..4") 

117 

118 def test_strings_with_common_prefix(self): 

119 """Test a list of strings with a common prefix.""" 

120 self.assertEqual(sequence_to_string(["node1", "node2", "node3"]), "node1..node3") 

121 

122 def test_strings_without_common_prefix(self): 

123 """Test a list of strings without a common prefix.""" 

124 self.assertEqual(sequence_to_string(["alpha", "beta", "gamma"]), "alpha^beta^gamma") 

125 

126 def test_large_sequence(self): 

127 """Test a large sequence of consecutive numbers.""" 

128 large_list = list(range(1, 101)) 

129 self.assertEqual(sequence_to_string(large_list), "1..100") 

130 

131 def test_mixed_types(self): 

132 """Test a list with mixed data types.""" 

133 with self.assertRaises(TypeError): 

134 sequence_to_string([1, "2", 3]) 

135 

136 def test_wrong_types(self): 

137 """Test that passing floats raises an appropriate exception.""" 

138 with self.assertRaises(TypeError): 

139 sequence_to_string([1, 2, 3, 1.2]) 

140 

141 def test_stride_calculation(self): 

142 """Test that the stride is correctly calculated for single strides.""" 

143 self.assertEqual(sequence_to_string([1, 3, 5, 7, 9]), "1..9:2") 

144 

145 def test_single_value_sequences(self): 

146 """Test sequences that should remain as single values.""" 

147 self.assertEqual(sequence_to_string([1, 3, 6, 10]), "1^3^6^10") 

148 

149 def test_overlapping_ranges(self): 

150 """Test a list with duplicate values""" 

151 self.assertEqual(sequence_to_string([1, 2, 2, 3, 4, 4, 5]), "1..5") 

152 

153 def test_unordered_input(self): 

154 """Test that the function correctly handles unordered input.""" 

155 self.assertEqual(sequence_to_string([5, 3, 1, 4, 2]), "1..5") 

156 

157 def test_large_stride(self): 

158 """Test sequences with a large stride.""" 

159 self.assertEqual(sequence_to_string([10, 20, 30, 40]), "10..40:10") 

160 

161 def test_single_character_strings(self): 

162 """Test a list of single-character strings.""" 

163 self.assertEqual(sequence_to_string(["a", "b", "c"]), "a^b^c") 

164 

165 def test_strings_with_prefix(self): 

166 """Test a list of longer strings.""" 

167 self.assertEqual( 

168 sequence_to_string( 

169 [ 

170 "node1", 

171 "node2", 

172 "node3", 

173 "node5", 

174 ] 

175 ), 

176 "node1..node3^node5", 

177 ) 

178 

179 

180if __name__ == "__main__": 

181 unittest.main()