Coverage for tests / test_progress.py: 22%

146 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-26 08:49 +0000

1# This file is part of daf_butler. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28 

29import logging 

30import unittest 

31from contextlib import contextmanager 

32 

33import click 

34 

35from lsst.daf.butler.cli.progress import ClickProgressHandler 

36from lsst.daf.butler.cli.utils import clickResultMsg 

37from lsst.daf.butler.progress import Progress, ProgressHandler 

38 

39 

40class MockProgressBar: 

41 """Mock implementation of `ProgressBar` that remembers the status it 

42 would report in a list. 

43 

44 Both the initial 0 and the end-of-iterable size are reported. 

45 

46 Parameters 

47 ---------- 

48 iterable : `Iterable` 

49 Iterable to wrap, or `None`. 

50 total : `int` or `None` 

51 Total value passed at progress-bar construction. 

52 """ 

53 

54 def __init__(self, iterable, total): 

55 self._iterable = iterable 

56 self._current = 0 

57 self.reported = [self._current] 

58 self.total = total 

59 MockProgressBar.last = self 

60 

61 last = None 

62 """Last instance of this class that was constructed, for test code that 

63 cannot access it directly via other means. 

64 """ 

65 

66 def __iter__(self): 

67 for element in self._iterable: 

68 yield element 

69 self._current += 1 

70 self.reported.append(self._current) 

71 

72 def update(self, n: int = 1) -> None: 

73 self._current += n 

74 self.reported.append(self._current) 

75 

76 

77class MockProgressHandler(ProgressHandler): 

78 """A `ProgressHandler` implementation that returns `MockProgressBar` 

79 instances. 

80 """ 

81 

82 @contextmanager 

83 def get_progress_bar(self, iterable, desc, total, level): 

84 yield MockProgressBar(iterable, total=total) 

85 

86 

87class ClickProgressHandlerTestCase(unittest.TestCase): 

88 """Test enabling and disabling progress in click commands. 

89 

90 It looks like click's testing harness doesn't ever actually let its 

91 progress bar generate output, so the best we can do is check that using it 

92 doesn't raise exceptions, and see if it looks like we're doing something 

93 based on what our own progress-object state is. 

94 """ 

95 

96 def setUp(self): 

97 # Set up a mock handler by default. Tests of click behavior will 

98 # rely on this when they check that inside a click command we never 

99 # end up with that mock. 

100 self.logger = logging.getLogger("test_progress") 

101 self.logger.setLevel(logging.INFO) 

102 Progress.set_handler(MockProgressHandler()) 

103 self.runner = click.testing.CliRunner() 

104 

105 def tearDown(self): 

106 MockProgressHandler.last = None 

107 Progress.set_handler(None) 

108 self.logger.setLevel(logging.NOTSET) 

109 

110 def get_cmd(self, level, enabled): 

111 """Return a click command that uses a progress bar and tests that it 

112 is or not enabled, as given. 

113 """ 

114 

115 @click.command() 

116 @ClickProgressHandler.option 

117 def cmd(progress): 

118 p = Progress("test_progress", level=level) 

119 with p.bar(range(5), desc="testing!") as bar: 

120 self.assertFalse(isinstance(bar, MockProgressBar)) 

121 r = list(bar) 

122 self.assertEqual(r, list(range(5))) 

123 self.assertEqual(enabled, p.is_enabled()) 

124 

125 return cmd 

126 

127 def test_click_disabled_by_default(self): 

128 """Test that progress is disabled by default in click commands.""" 

129 result = self.runner.invoke( 

130 self.get_cmd(logging.INFO, enabled=False), 

131 [], 

132 ) 

133 self.assertEqual(result.exit_code, 0, clickResultMsg(result)) 

134 

135 def test_click_enabled(self): 

136 """Test turning on progress in click commands.""" 

137 result = self.runner.invoke( 

138 self.get_cmd(logging.INFO, enabled=True), 

139 ["--progress"], 

140 ) 

141 self.assertEqual(result.exit_code, 0, clickResultMsg(result)) 

142 

143 def test_click_disabled_globally(self): 

144 """Test turning on progress in click commands.""" 

145 result = self.runner.invoke( 

146 self.get_cmd(logging.INFO, enabled=False), 

147 ["--no-progress"], 

148 ) 

149 self.assertEqual(result.exit_code, 0, clickResultMsg(result)) 

150 

151 def test_click_disabled_by_log_level(self): 

152 """Test that progress reports below the current log level are disabled, 

153 even if progress is globally enabled. 

154 """ 

155 result = self.runner.invoke( 

156 self.get_cmd(logging.DEBUG, enabled=False), 

157 ["--progress"], 

158 ) 

159 self.assertEqual(result.exit_code, 0, clickResultMsg(result)) 

160 

161 

162class MockedProgressHandlerTestCase(unittest.TestCase): 

163 """Test that the interface layer for progress reporting works by using 

164 mock handler and progress bar objects. 

165 """ 

166 

167 def setUp(self): 

168 self.logger = logging.getLogger("test_progress") 

169 self.logger.setLevel(logging.INFO) 

170 Progress.set_handler(MockProgressHandler()) 

171 self.progress = Progress("test_progress") 

172 

173 def tearDown(self): 

174 MockProgressHandler.last = None 

175 Progress.set_handler(None) 

176 self.logger.setLevel(logging.NOTSET) 

177 

178 def test_bar_iterable(self): 

179 """Test using `Progress.bar` to wrap an iterable.""" 

180 iterable = list(range(5)) 

181 with self.progress.bar(iterable) as bar: 

182 r = list(bar) 

183 self.assertEqual(r, iterable) 

184 self.assertEqual(iterable + [len(iterable)], bar.reported) 

185 

186 def test_bar_update(self): 

187 """Test using `Progress.bar` with manual updates.""" 

188 with self.progress.bar(total=10) as bar: 

189 for _ in range(5): 

190 bar.update(2) 

191 self.assertEqual(list(range(0, 12, 2)), bar.reported) 

192 

193 def test_iter_chunks_fully_sized(self): 

194 """Test using `Progress.iter_chunks` with a sized iterable of sized 

195 chunks. 

196 """ 

197 iterable = [list(range(2)), list(range(3))] 

198 seen = [] 

199 for chunk in self.progress.iter_chunks(iterable): 

200 seen.extend(chunk) 

201 self.assertEqual(seen, iterable[0] + iterable[1]) 

202 self.assertEqual(MockProgressBar.last.reported, [0, 2, 5]) 

203 self.assertEqual(MockProgressBar.last.total, 5) 

204 

205 def test_iter_chunks_with_total(self): 

206 """Test using `Progress.iter_chunks` with total provided and 

207 sized chunks. 

208 """ 

209 iterable = [list(range(2)), list(range(3))] 

210 seen = [] 

211 for chunk in self.progress.iter_chunks(iter(iterable), total=5): 

212 seen.extend(chunk) 

213 self.assertEqual(seen, iterable[0] + iterable[1]) 

214 self.assertEqual(MockProgressBar.last.reported, [0, 2, 5]) 

215 self.assertEqual(MockProgressBar.last.total, 5) 

216 

217 def test_iter_chunks_total_false(self): 

218 """Test using `Progress.iter_chunks` with total=False and non-sized 

219 chunks. This should display progress with the number of 

220 chunks. 

221 """ 

222 iterable = [iter(range(2)), iter(range(3))] 

223 seen = [] 

224 for chunk in self.progress.iter_chunks(iterable, total=False): 

225 seen.extend(chunk) 

226 self.assertEqual(seen, list(range(2)) + list(range(3))) 

227 self.assertEqual(MockProgressBar.last.reported, [0, 1, 2]) 

228 self.assertEqual(MockProgressBar.last.total, 2) 

229 

230 def test_iter_chunks_not_sized(self): 

231 """Test using `Progress.iter_chunks` with an unsized iterable.""" 

232 iterable = [iter(range(2)), iter(range(3))] 

233 seen = [] 

234 for chunk in self.progress.iter_chunks(iter(iterable)): 

235 seen.extend(chunk) 

236 self.assertEqual(seen, list(range(2)) + list(range(3))) 

237 self.assertEqual(MockProgressBar.last.reported, [0, 1, 2]) 

238 self.assertEqual(MockProgressBar.last.total, None) 

239 

240 def test_iter_item_chunks_fully_sized(self): 

241 """Test using `Progress.iter_item_chunks` with a sized iterable of 

242 sized chunks. 

243 """ 

244 mapping = {"x": list(range(2)), "y": list(range(3))} 

245 seen = {} 

246 for key, chunk in self.progress.iter_item_chunks(mapping.items()): 

247 seen[key] = chunk 

248 self.assertEqual(seen, mapping) 

249 self.assertEqual(MockProgressBar.last.reported, [0, 2, 5]) 

250 self.assertEqual(MockProgressBar.last.total, 5) 

251 

252 def test_iter_item_chunks_with_total(self): 

253 """Test using `Progress.iter_item_chunks` with total provided and 

254 sized chunks. 

255 """ 

256 mapping = {"x": list(range(2)), "y": list(range(3))} 

257 seen = {} 

258 for key, chunk in self.progress.iter_item_chunks(iter(mapping.items()), total=5): 

259 seen[key] = chunk 

260 self.assertEqual(seen, mapping) 

261 self.assertEqual(MockProgressBar.last.reported, [0, 2, 5]) 

262 self.assertEqual(MockProgressBar.last.total, 5) 

263 

264 def test_iter_item_chunks_total_false(self): 

265 """Test using `Progress.iter_item_chunks` with total=False and 

266 non-sized chunks. This should display progress with the number of 

267 chunks. 

268 """ 

269 mapping = {"x": iter(range(2)), "y": iter(range(3))} 

270 seen = {} 

271 for key, chunk in self.progress.iter_item_chunks(mapping.items(), total=False): 

272 seen[key] = list(chunk) 

273 self.assertEqual(seen, {"x": list(range(2)), "y": list(range(3))}) 

274 self.assertEqual(MockProgressBar.last.reported, [0, 1, 2]) 

275 self.assertEqual(MockProgressBar.last.total, 2) 

276 

277 def test_iter_item_chunks_not_sized(self): 

278 """Test using `Progress.iter_item_chunks` with an unsized iterable of 

279 non-sized chunks. 

280 """ 

281 mapping = {"x": iter(range(2)), "y": iter(range(3))} 

282 seen = {} 

283 for key, chunk in self.progress.iter_item_chunks(iter(mapping.items())): 

284 seen[key] = list(chunk) 

285 self.assertEqual(seen, {"x": list(range(2)), "y": list(range(3))}) 

286 self.assertEqual(MockProgressBar.last.reported, [0, 1, 2]) 

287 self.assertEqual(MockProgressBar.last.total, None) 

288 

289 

290if __name__ == "__main__": 

291 unittest.main()