Coverage for tests / test_timer.py: 14%

282 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:31 +0000

1# This file is part of utils. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21 

22import datetime 

23import logging 

24import os 

25import os.path 

26import pstats 

27import tempfile 

28import time 

29import unittest 

30from dataclasses import dataclass 

31 

32from astropy import units as u 

33 

34from lsst.utils.timer import duration_from_timeMethod, logInfo, logPairs, profile, time_this, timeMethod 

35 

36log = logging.getLogger("test_timer") 

37 

38THIS_FILE = os.path.basename(__file__) 

39 

40# Only use this in a single test but needs to be associated 

41# with the function up front. 

42test_metadata = {} 

43 

44 

45@dataclass 

46class Example1: 

47 """Test class with log and metadata properties, similar to ``Task``.""" 

48 

49 log: logging.Logger 

50 metadata: dict 

51 

52 @timeMethod 

53 def sleeper(self, duration: float) -> None: 

54 """Sleep for some time.""" 

55 time.sleep(duration) 

56 

57 

58@timeMethod 

59def decorated_sleeper_nothing(self, duration: float) -> None: 

60 """Test function that sleeps.""" 

61 time.sleep(duration) 

62 

63 

64@timeMethod(logger=log) 

65def decorated_sleeper_logger(self, duration: float) -> None: 

66 """Test function that sleeps and logs.""" 

67 time.sleep(duration) 

68 

69 

70@timeMethod(logger=log, logLevel=logging.INFO) 

71def decorated_sleeper_logger_level(self, duration: float) -> None: 

72 """Test function that logs at INFO.""" 

73 time.sleep(duration) 

74 

75 

76@timeMethod(metadata=test_metadata) 

77def decorated_sleeper_metadata(self, duration: float) -> None: 

78 """Test function that uses external metadata.""" 

79 time.sleep(duration) 

80 

81 

82class TestTimeMethod(unittest.TestCase): 

83 """Test the time method decorator.""" 

84 

85 def testLogPairs(self): 

86 # Test the non-obj case. 

87 logger = logging.getLogger("test") 

88 pairs = (("name1", 0), ("name2", 1)) 

89 metadata = {} 

90 with self.assertLogs(level=logging.INFO) as cm: 

91 logPairs(None, pairs, logLevel=logging.INFO, logger=logger, metadata=metadata) 

92 self.assertEqual(len(cm.output), 1, cm.output) 

93 self.assertTrue(cm.output[0].endswith("name1=0; name2=1"), cm.output) 

94 self.assertEqual(cm.records[0].filename, THIS_FILE, "log message should originate from here") 

95 self.assertEqual(metadata, {"name1": [0], "name2": [1]}) 

96 

97 # Call it again with an explicit stack level. 

98 # Force it to come from lsst.utils. 

99 with self.assertLogs(level=logging.INFO) as cm: 

100 logPairs(None, pairs, logLevel=logging.INFO, logger=logger, metadata=metadata, stacklevel=0) 

101 self.assertEqual(cm.records[0].filename, "timer.py") 

102 

103 # Check that the log message is filtered by default. 

104 with self.assertLogs(level=logging.INFO) as cm: 

105 logPairs(None, pairs, logger=logger, metadata=metadata) 

106 logger.info("Message") 

107 self.assertEqual(len(cm.records), 1) 

108 

109 def testLogInfo(self): 

110 metadata = {} 

111 logger = logging.getLogger("testLogInfo") 

112 with self.assertLogs(level=logging.INFO) as cm: 

113 logInfo(None, prefix="Prefix", metadata=metadata, logger=logger, logLevel=logging.INFO) 

114 self.assertEqual(cm.records[0].filename, THIS_FILE) 

115 self.assertIn("PrefixUtc", metadata) 

116 self.assertIn("PrefixMaxResidentSetSize", metadata) 

117 self.assertIn("nodeName", metadata) 

118 self.assertEqual(metadata["__version__"], 1) 

119 

120 # Again with no log output. 

121 logInfo(None, prefix="Prefix", metadata=metadata) 

122 self.assertEqual(len(metadata["PrefixUtc"]), 2) 

123 

124 # With an explicit stacklevel. 

125 with self.assertLogs(level=logging.INFO) as cm: 

126 logInfo( 

127 None, prefix="Prefix", metadata=metadata, logger=logger, logLevel=logging.INFO, stacklevel=0 

128 ) 

129 self.assertEqual(cm.records[0].filename, "timer.py") 

130 

131 def assertTimer(self, duration, task): 

132 # Call it twice to test the "add" functionality. 

133 task.sleeper(duration) 

134 task.sleeper(duration) 

135 counter = 2 

136 

137 has_logger = getattr(task, "log", None) is not None and task.log is not None 

138 has_metadata = getattr(task, "metadata", None) is not None and task.metadata is not None 

139 

140 if has_logger: 

141 counter += 1 

142 with self.assertLogs("timer.task", level=logging.DEBUG) as cm: 

143 task.sleeper(duration) 

144 self.assertEqual(cm.records[0].filename, THIS_FILE, "log message should originate from here") 

145 

146 if has_metadata: 

147 self.assertEqual(len(task.metadata["sleeperStartUserTime"]), counter) 

148 

149 start = datetime.datetime.fromisoformat(task.metadata["sleeperStartUtc"][1]) 

150 end = datetime.datetime.fromisoformat(task.metadata["sleeperEndUtc"][1]) 

151 delta = end - start 

152 delta_sec = delta.seconds + (delta.microseconds / 1e6) 

153 self.assertGreaterEqual(delta_sec, duration) 

154 

155 def testTaskLike(self): 

156 """Test timer on something that looks like a Task.""" 

157 # Call with different parameters. 

158 parameters = ( 

159 (logging.getLogger("task"), {}), 

160 (logging.getLogger("task"), None), 

161 (None, {}), 

162 (None, None), 

163 ) 

164 

165 duration = 0.1 

166 for log, metadata in parameters: 

167 with self.subTest(log=repr(log), metadata=repr(metadata)): 

168 task = Example1(log=log, metadata=metadata) 

169 self.assertTimer(duration, task) 

170 exampleDuration = duration_from_timeMethod(task.metadata, "sleeper") 

171 if metadata is not None: 

172 self.assertGreater(exampleDuration, 0) 

173 

174 def testDecorated(self): 

175 """Test timeMethod on non-Task like instances.""" 

176 duration = 0.1 

177 

178 # The "self" object shouldn't be usable but this should do nothing 

179 # and not crash. 

180 decorated_sleeper_nothing(self, duration) 

181 

182 # Use a function decorated for logging. 

183 with self.assertLogs("timer.test_timer", level=logging.DEBUG) as cm: 

184 decorated_sleeper_logger(self, duration) 

185 self.assertEqual(cm.records[0].filename, THIS_FILE, "log message should originate from here") 

186 

187 # And adjust the log level 

188 with self.assertLogs("timer.test_timer", level=logging.INFO): 

189 decorated_sleeper_logger_level(self, duration) 

190 

191 # Use a function decorated for metadata. 

192 self.assertEqual(len(test_metadata), 0) 

193 with self.assertLogs("timer.test_timer", level=logging.DEBUG) as cm: 

194 # Check that we only get a single log message and nothing from 

195 # timeMethod itself. 

196 decorated_sleeper_metadata(self, duration) 

197 logging.getLogger("timer.test_timer").debug("sentinel") 

198 self.assertEqual(len(cm.output), 1) 

199 self.assertIn("decorated_sleeper_metadataStartUserTime", test_metadata) 

200 

201 def testDisabled(self): 

202 """Test that setting the appropriate envvar disables the decorator.""" 

203 duration = 0.1 

204 

205 with unittest.mock.patch.dict(os.environ, {"LSST_UTILS_DISABLE_TIMER": "1"}, clear=True): 

206 import decorator_test.disable_timer 

207 

208 # For an empty decorator, we have to check for the attribute that 

209 # `functools.wraps` attaches to the function. 

210 self.assertFalse(hasattr(decorator_test.disable_timer.sleep_and_nothing, "__wrapped__")) 

211 

212 # For a decorator with kwargs, no logs should be emitted. 

213 with self.assertNoLogs("timer.disable_timer", level=logging.INFO): 

214 decorator_test.disable_timer.sleep_and_log(self, duration) 

215 

216 

217class TimerTestCase(unittest.TestCase): 

218 """Test the timer functionality.""" 

219 

220 def testTimer(self): 

221 with self.assertLogs(level="DEBUG") as cm: 

222 with time_this(): 

223 pass 

224 self.assertEqual(cm.records[0].name, "timer") 

225 self.assertEqual(cm.records[0].levelname, "DEBUG") 

226 self.assertEqual(cm.records[0].filename, THIS_FILE) 

227 

228 with self.assertLogs(level="DEBUG") as cm: 

229 with time_this(prefix=None): 

230 pass 

231 self.assertEqual(cm.records[0].name, "root") 

232 self.assertEqual(cm.records[0].levelname, "DEBUG") 

233 self.assertIn("Took", cm.output[0]) 

234 self.assertNotIn(": Took", cm.output[0]) 

235 self.assertNotIn("; ", cm.output[0]) 

236 self.assertEqual(cm.records[0].filename, THIS_FILE) 

237 

238 # Report memory usage. 

239 with self.assertLogs(level="DEBUG") as cm: 

240 with time_this(level=logging.DEBUG, prefix=None, mem_usage=True): 

241 pass 

242 self.assertEqual(cm.records[0].name, "root") 

243 self.assertEqual(cm.records[0].levelname, "DEBUG") 

244 self.assertIn("Took", cm.output[0]) 

245 self.assertIn("memory", cm.output[0]) 

246 self.assertIn("delta", cm.output[0]) 

247 self.assertIn("peak delta", cm.output[0]) 

248 self.assertIn("byte", cm.output[0]) 

249 

250 # Request memory usage but with log level that will not issue it. 

251 with self.assertLogs(level="INFO") as cm: 

252 with time_this(level=logging.DEBUG, prefix=None, mem_usage=True): 

253 pass 

254 # Ensure that a log message is issued. 

255 _log = logging.getLogger() 

256 _log.info("info") 

257 self.assertEqual(cm.records[0].name, "root") 

258 self.assertEqual(cm.records[0].levelname, "INFO") 

259 all = "\n".join(cm.output) 

260 self.assertNotIn("Took", all) 

261 self.assertNotIn("memory", all) 

262 

263 # Report memory usage including child processes. 

264 with self.assertLogs(level="DEBUG") as cm: 

265 with time_this(level=logging.DEBUG, prefix=None, mem_usage=True, mem_child=True): 

266 pass 

267 self.assertEqual(cm.records[0].name, "root") 

268 self.assertEqual(cm.records[0].levelname, "DEBUG") 

269 self.assertIn("Took", cm.output[0]) 

270 self.assertIn("memory", cm.output[0]) 

271 self.assertIn("delta", cm.output[0]) 

272 self.assertIn("peak delta", cm.output[0]) 

273 self.assertIn("byte", cm.output[0]) 

274 

275 # Report memory usage, use non-default, but a valid memory unit. 

276 with self.assertLogs(level="DEBUG") as cm: 

277 with time_this(level=logging.DEBUG, prefix=None, mem_usage=True, mem_unit=u.kilobyte): 

278 pass 

279 self.assertEqual(cm.records[0].name, "root") 

280 self.assertEqual(cm.records[0].levelname, "DEBUG") 

281 self.assertIn("Took", cm.output[0]) 

282 self.assertIn("memory", cm.output[0]) 

283 self.assertIn("delta", cm.output[0]) 

284 self.assertIn("peak delta", cm.output[0]) 

285 self.assertIn("kbyte", cm.output[0]) 

286 

287 # Report memory usage, use an invalid memory unit. 

288 with self.assertLogs(level="DEBUG") as cm: 

289 with time_this(level=logging.DEBUG, prefix=None, mem_usage=True, mem_unit=u.gram): 

290 pass 

291 self.assertEqual(cm.records[0].name, "lsst.utils.timer") 

292 self.assertEqual(cm.records[0].levelname, "WARNING") 

293 self.assertIn("Invalid", cm.output[0]) 

294 self.assertIn("byte", cm.output[0]) 

295 self.assertEqual(cm.records[1].name, "root") 

296 self.assertEqual(cm.records[1].levelname, "DEBUG") 

297 self.assertIn("Took", cm.output[1]) 

298 self.assertIn("memory", cm.output[1]) 

299 self.assertIn("delta", cm.output[1]) 

300 self.assertIn("peak delta", cm.output[1]) 

301 self.assertIn("byte", cm.output[1]) 

302 

303 # Change logging level 

304 with self.assertLogs(level="INFO") as cm: 

305 with time_this(level=logging.INFO, prefix=None): 

306 pass 

307 self.assertEqual(cm.records[0].name, "root") 

308 self.assertIn("Took", cm.output[0]) 

309 self.assertIn("seconds", cm.output[0]) 

310 

311 # Use a new logger with a message. 

312 msg = "Test message %d" 

313 test_num = 42 

314 logname = "test" 

315 with self.assertLogs(level="DEBUG") as cm: 

316 with time_this(log=logging.getLogger(logname), msg=msg, args=(42,), prefix=None): 

317 pass 

318 self.assertEqual(cm.records[0].name, logname) 

319 self.assertIn("Took", cm.output[0]) 

320 self.assertIn(msg % test_num, cm.output[0]) 

321 

322 # Now with kwargs. 

323 msg = "Test message %d" 

324 test_num = 42 

325 logname = "test" 

326 kwargs = {"extra_info": "extra", "value": 3} 

327 with self.assertLogs(level="DEBUG") as cm: 

328 with time_this( 

329 log=logging.getLogger(logname), msg=msg, args=(test_num,), kwargs=kwargs, prefix=None 

330 ): 

331 pass 

332 self.assertEqual(cm.records[0].name, logname) 

333 self.assertIn("Took", cm.output[0]) 

334 self.assertIn(msg % test_num, cm.output[0]) 

335 self.assertIn("extra_info='extra'", cm.output[0]) 

336 self.assertIn("value=3", cm.output[0]) 

337 

338 # Prefix the logger. 

339 prefix = "prefix" 

340 with self.assertLogs(level="DEBUG") as cm: 

341 with time_this(prefix=prefix): 

342 pass 

343 self.assertEqual(cm.records[0].name, prefix) 

344 self.assertIn("Took", cm.output[0]) 

345 

346 # Prefix explicit logger. 

347 with self.assertLogs(level="DEBUG") as cm: 

348 with time_this(log=logging.getLogger(logname), prefix=prefix): 

349 pass 

350 self.assertEqual(cm.records[0].name, f"{prefix}.{logname}") 

351 

352 # Trigger a problem. 

353 with self.assertLogs(level="DEBUG") as cm: 

354 with self.assertRaises(RuntimeError): 

355 with time_this(log=logging.getLogger(logname), prefix=prefix): 

356 raise RuntimeError("A problem %s") 

357 self.assertEqual(cm.records[0].name, f"{prefix}.{logname}") 

358 self.assertIn("A problem %s", cm.records[0].message) 

359 self.assertEqual(cm.records[0].levelname, "DEBUG") 

360 

361 def test_time_this_return(self): 

362 """Test that the context manager returns the duration.""" 

363 # Return duration but not memory usage. 

364 with self.assertNoLogs(level="INFO"): 

365 with time_this(level=logging.DEBUG, prefix=None, mem_usage=False) as timer: 

366 time.sleep(0.01) 

367 self.assertGreater(timer.duration, 0.0) 

368 self.assertIsNone(timer.mem_current_usage) 

369 

370 # Ask for memory usage that will be calculated. 

371 with self.assertLogs(level="DEBUG"): 

372 # mem usage will be requested but not calculated. 

373 with time_this(level=logging.DEBUG, prefix=None, mem_usage=True) as timer: 

374 time.sleep(0.01) 

375 self.assertGreater(timer.duration, 0.0) 

376 self.assertIsInstance(timer.mem_current_delta, float) 

377 

378 # Ask for memory usage but will not be calculated. 

379 with self.assertNoLogs(level="WARNING"): 

380 # mem usage will be requested but not calculated. 

381 with time_this(level=logging.DEBUG, prefix=None, mem_usage=True) as timer: 

382 time.sleep(0.01) 

383 self.assertGreater(timer.duration, 0.0) 

384 self.assertIsNone(timer.mem_current_usage) 

385 

386 # Require memory usage is returned in context manager. 

387 with self.assertNoLogs(level="WARNING"): 

388 with time_this(level=logging.DEBUG, prefix=None, force_mem_usage=True) as timer: 

389 time.sleep(0.01) 

390 self.assertGreater(timer.duration, 0.0) 

391 self.assertIsInstance(timer.mem_current_usage, float) 

392 

393 def test_structlog(self): 

394 """Test that the timer works with structlog loggers.""" 

395 try: 

396 import structlog 

397 from structlog.testing import capture_logs 

398 except ImportError: 

399 self.skipTest("structlog is not installed") 

400 

401 msg = "Test message %d" 

402 test_num = 42 

403 kwargs = {"extra_info": "extra", "value": 3} 

404 

405 with capture_logs() as cap: 

406 slog = structlog.get_logger("structlog_timer") 

407 with time_this(log=slog, msg=msg, args=(test_num,), kwargs=kwargs): 

408 pass 

409 self.assertEqual(cap[0]["value"], 3) 

410 self.assertEqual(cap[0]["extra_info"], "extra") 

411 self.assertGreaterEqual(cap[0]["duration"], 0.0) 

412 self.assertIn("Test message 42", cap[0]["event"]) 

413 

414 

415class ProfileTestCase(unittest.TestCase): 

416 """Test profiling decorator.""" 

417 

418 def test_profile(self): 

419 logger = logging.getLogger("profile") 

420 

421 with profile(None) as prof: 

422 pass 

423 self.assertIsNone(prof) 

424 

425 with tempfile.NamedTemporaryFile() as tmp: 

426 with self.assertLogs("profile", level=logging.INFO) as cm: 

427 with profile(tmp.name, logger) as prof: 

428 pass 

429 self.assertEqual(len(cm.output), 2) 

430 self.assertIsNotNone(prof) 

431 self.assertTrue(os.path.exists(tmp.name)) 

432 self.assertIsInstance(pstats.Stats(tmp.name), pstats.Stats) 

433 

434 

435if __name__ == "__main__": 

436 unittest.main()