Coverage for python / lsst / utils / iteration.py: 8%

105 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:31 +0000

1# This file is part of utils. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# Use of this source code is governed by a 3-clause BSD-style 

10# license that can be found in the LICENSE file. 

11# 

12 

13"""Utilities relating to iterators.""" 

14 

15from __future__ import annotations 

16 

17__all__ = ["chunk_iterable", "ensure_iterable", "isplit", "sequence_to_string"] 

18 

19import itertools 

20from collections.abc import Iterable, Iterator, Mapping, Sequence 

21from typing import Any, TypeGuard, TypeVar 

22 

23 

24def chunk_iterable(data: Iterable[Any], chunk_size: int = 1_000) -> Iterator[tuple[Any, ...]]: 

25 """Return smaller chunks of an iterable. 

26 

27 Parameters 

28 ---------- 

29 data : `~collections.abc.Iterable` [ `typing.Any` ] of anything 

30 The iterable to be chunked. Can be a mapping, in which case 

31 the keys are returned in chunks. 

32 chunk_size : `int`, optional 

33 The largest chunk to return. Can be smaller and depends on the 

34 number of elements in the iterator. Defaults to 1_000. 

35 

36 Yields 

37 ------ 

38 chunk : `tuple` 

39 The contents of a chunk of the iterator as a `tuple`. A tuple is 

40 preferred over an iterator since it is more convenient to tell it is 

41 empty and the caller knows it can be sized and indexed. 

42 """ 

43 it = iter(data) 

44 while chunk := tuple(itertools.islice(it, chunk_size)): 

45 yield chunk 

46 

47 

48def ensure_iterable(a: Any) -> Iterable[Any]: 

49 """Ensure that the input is iterable. 

50 

51 There are multiple cases, when the input is: 

52 

53 - iterable, but not a `str` or Mapping -> iterate over elements 

54 (e.g. ``[i for i in a]``) 

55 - a `str` -> return single element iterable (e.g. ``[a]``) 

56 - a Mapping -> return single element iterable 

57 - not iterable -> return single element iterable (e.g. ``[a]``). 

58 

59 Parameters 

60 ---------- 

61 a : `~collections.abc.Iterable` or `str` or not iterable 

62 Argument to be converted to an iterable. 

63 

64 Returns 

65 ------- 

66 i : `~collections.abc.Iterable` 

67 Iterable version of the input value. 

68 """ 

69 if isinstance(a, str): 

70 yield a 

71 return 

72 if isinstance(a, Mapping): 

73 yield a 

74 return 

75 try: 

76 yield from a 

77 except Exception: 

78 yield a 

79 

80 

81T = TypeVar("T", str, bytes) 

82 

83 

84def isplit(string: T, sep: T) -> Iterator[T]: 

85 """Split a string or bytes by separator returning a generator. 

86 

87 Parameters 

88 ---------- 

89 string : `str` or `bytes` 

90 The string to split into substrings. 

91 sep : `str` or `bytes` 

92 The separator to use to split the string. Must be the same 

93 type as ``string``. Must always be given. 

94 

95 Yields 

96 ------ 

97 subset : `str` or `bytes` 

98 The next subset extracted from the input until the next separator. 

99 """ 

100 if type(string) is not type(sep): 

101 raise TypeError(f"String and separator types must match ({type(string)} != {type(sep)})") 

102 begin = 0 

103 while True: 

104 end = string.find(sep, begin) 

105 if end == -1: 

106 yield string[begin:] 

107 return 

108 yield string[begin:end] 

109 begin = end + 1 

110 

111 

112def _extract_numeric_suffix(s: str) -> tuple[str, int | None]: 

113 """Extract the numeric suffix from a string. 

114 

115 Returns the prefix and the numeric suffix as an integer, if present. 

116 

117 For example: 

118 

119 'node1' -> ('node', 1) 

120 'node' -> ('node', None) 

121 'node123abc' -> ('node123abc', None) 

122 

123 Parameters 

124 ---------- 

125 s : `str` 

126 The string to extract the numeric suffix from. 

127 

128 Returns 

129 ------- 

130 suffix : `str` 

131 The numeric suffix of the string, if any. 

132 """ 

133 index = len(s) 

134 while index > 0 and s[index - 1].isdigit(): 

135 index -= 1 

136 prefix = s[:index] 

137 suffix = s[index:] 

138 if suffix: 

139 return prefix, int(suffix) 

140 else: 

141 return s, None 

142 

143 

144def _add_pair_to_name(val_name: list[str], val0: int | str, val1: int | str, stride: int = 1) -> None: 

145 """Format a pair of values (val0 and val1) and appends the result to 

146 val_name. 

147 

148 This helper function takes the starting and ending values of a sequence 

149 and formats them into a compact string representation, considering the 

150 stride and whether the values are integers or strings with common 

151 prefixes. 

152 

153 Parameters 

154 ---------- 

155 val_name : `list` [ `str` ] 

156 The list to append the formatted string to. 

157 val0 : `int` or `str` 

158 The starting value of the sequence. 

159 val1 : `int` or `str` 

160 The ending value of the sequence. 

161 stride : `int`, optional 

162 The stride or difference between consecutive numbers in the 

163 sequence. Defaults to 1. 

164 """ 

165 if isinstance(val0, str) and isinstance(val1, str): 

166 prefix0, num_suffix0 = _extract_numeric_suffix(val0) 

167 prefix1, num_suffix1 = _extract_numeric_suffix(val1) 

168 if prefix0 == prefix1 and num_suffix0 is not None and num_suffix1 is not None: 

169 if num_suffix0 == num_suffix1: 

170 dvn = val0 

171 else: 

172 dvn = f"{val0}..{val1}" 

173 if stride > 1: 

174 dvn += f":{stride}" 

175 else: 

176 dvn = val0 if val0 == val1 else f"{val0}^{val1}" 

177 else: 

178 sval0 = str(val0) 

179 sval1 = str(val1) 

180 if val0 == val1: 

181 dvn = sval0 

182 elif isinstance(val0, int) and isinstance(val1, int): 

183 if val1 == val0 + stride: 

184 dvn = f"{sval0}^{sval1}" 

185 else: 

186 dvn = f"{sval0}..{sval1}" 

187 if stride > 1: 

188 dvn += f":{stride}" 

189 else: 

190 dvn = f"{sval0}^{sval1}" 

191 val_name.append(dvn) 

192 

193 

194def _is_list_of_ints(values: list[int | str]) -> TypeGuard[list[int]]: 

195 """Check if a list is composed entirely of integers. 

196 

197 Parameters 

198 ---------- 

199 values : `list` [`int` or `str`]: 

200 The list of values to check. 

201 

202 Returns 

203 ------- 

204 is_ints : `bool` 

205 True if all values are integers, False otherwise. 

206 """ 

207 return all(isinstance(v, int) for v in values) 

208 

209 

210def sequence_to_string(values: Sequence[int | str]) -> str: 

211 """Convert a list of integers or strings into a compact string 

212 representation by merging consecutive values or sequences. 

213 

214 This function takes a list of integers or strings, sorts them, identifies 

215 sequences where consecutive numbers differ by a consistent stride, or 

216 strings with common prefixes, and returns a string that compactly 

217 represents these sequences. Consecutive numbers are merged into ranges, and 

218 strings with common prefixes are handled to produce a concise 

219 representation. 

220 

221 Parameters 

222 ---------- 

223 values : `list` [ `int` or `str` ] 

224 A list of items to be compacted. Must all be of the same type. 

225 

226 Returns 

227 ------- 

228 sequence_as_string : `str` 

229 A compact string representation of the input list. 

230 

231 Notes 

232 ----- 

233 - The function handles both integers and strings. 

234 - For strings with common prefixes, only the differing suffixes are 

235 considered. 

236 - The stride is determined as the minimum difference between 

237 consecutive numbers. 

238 - Strings without common prefixes are listed individually. 

239 

240 Examples 

241 -------- 

242 >>> getNameOfSet([1, 2, 3, 5, 7, 8, 9]) 

243 '1..3^5^7..9' 

244 >>> getNameOfSet(["node1", "node2", "node3"]) 

245 'node1..node3' 

246 >>> getNameOfSet([10, 20, 30, 40]) 

247 '10..40:10' 

248 """ 

249 if not values: 

250 return "" 

251 

252 values = sorted(set(values)) 

253 

254 pure_ints_or_pure_strings = all(isinstance(item, int) for item in values) or all( 

255 isinstance(item, str) for item in values 

256 ) 

257 if not pure_ints_or_pure_strings: 

258 types = {type(item) for item in values} 

259 raise TypeError(f"All items in the input list must be either integers or strings, got {types}") 

260 

261 # Determine the stride for integers 

262 stride = 1 

263 if len(values) > 1 and _is_list_of_ints(values): 

264 differences = [values[i + 1] - values[i] for i in range(len(values) - 1)] 

265 stride = min(differences) if differences else 1 

266 stride = max(stride, 1) 

267 

268 val_name: list[str] = [] 

269 val0 = values[0] 

270 val1 = val0 

271 for val in values[1:]: 

272 if isinstance(val, int): 

273 assert isinstance(val1, int) 

274 if val == val1 + stride: 

275 val1 = val 

276 else: 

277 _add_pair_to_name(val_name, val0, val1, stride) 

278 val0 = val 

279 val1 = val0 

280 elif isinstance(val, str): 

281 assert isinstance(val1, str) 

282 prefix1, num_suffix1 = _extract_numeric_suffix(val1) 

283 prefix, num_suffix = _extract_numeric_suffix(val) 

284 if prefix1 == prefix and num_suffix1 is not None and num_suffix is not None: 

285 if num_suffix == num_suffix1 + stride: 

286 val1 = val 

287 else: 

288 _add_pair_to_name(val_name, val0, val1) 

289 val0 = val 

290 val1 = val0 

291 else: 

292 _add_pair_to_name(val_name, val0, val1) 

293 val0 = val 

294 val1 = val0 

295 

296 _add_pair_to_name(val_name, val0, val1, stride) 

297 

298 return "^".join(val_name)