Coverage for tests / test_timer.py: 14%
282 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-26 08:43 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-26 08:43 +0000
1# This file is part of utils.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
22import datetime
23import logging
24import os
25import os.path
26import pstats
27import tempfile
28import time
29import unittest
30from dataclasses import dataclass
32from astropy import units as u
34from lsst.utils.timer import duration_from_timeMethod, logInfo, logPairs, profile, time_this, timeMethod
36log = logging.getLogger("test_timer")
38THIS_FILE = os.path.basename(__file__)
40# Only use this in a single test but needs to be associated
41# with the function up front.
42test_metadata = {}
45@dataclass
46class Example1:
47 """Test class with log and metadata properties, similar to ``Task``."""
49 log: logging.Logger
50 metadata: dict
52 @timeMethod
53 def sleeper(self, duration: float) -> None:
54 """Sleep for some time."""
55 time.sleep(duration)
58@timeMethod
59def decorated_sleeper_nothing(self, duration: float) -> None:
60 """Test function that sleeps."""
61 time.sleep(duration)
64@timeMethod(logger=log)
65def decorated_sleeper_logger(self, duration: float) -> None:
66 """Test function that sleeps and logs."""
67 time.sleep(duration)
70@timeMethod(logger=log, logLevel=logging.INFO)
71def decorated_sleeper_logger_level(self, duration: float) -> None:
72 """Test function that logs at INFO."""
73 time.sleep(duration)
76@timeMethod(metadata=test_metadata)
77def decorated_sleeper_metadata(self, duration: float) -> None:
78 """Test function that uses external metadata."""
79 time.sleep(duration)
82class TestTimeMethod(unittest.TestCase):
83 """Test the time method decorator."""
85 def testLogPairs(self):
86 # Test the non-obj case.
87 logger = logging.getLogger("test")
88 pairs = (("name1", 0), ("name2", 1))
89 metadata = {}
90 with self.assertLogs(level=logging.INFO) as cm:
91 logPairs(None, pairs, logLevel=logging.INFO, logger=logger, metadata=metadata)
92 self.assertEqual(len(cm.output), 1, cm.output)
93 self.assertTrue(cm.output[0].endswith("name1=0; name2=1"), cm.output)
94 self.assertEqual(cm.records[0].filename, THIS_FILE, "log message should originate from here")
95 self.assertEqual(metadata, {"name1": [0], "name2": [1]})
97 # Call it again with an explicit stack level.
98 # Force it to come from lsst.utils.
99 with self.assertLogs(level=logging.INFO) as cm:
100 logPairs(None, pairs, logLevel=logging.INFO, logger=logger, metadata=metadata, stacklevel=0)
101 self.assertEqual(cm.records[0].filename, "timer.py")
103 # Check that the log message is filtered by default.
104 with self.assertLogs(level=logging.INFO) as cm:
105 logPairs(None, pairs, logger=logger, metadata=metadata)
106 logger.info("Message")
107 self.assertEqual(len(cm.records), 1)
109 def testLogInfo(self):
110 metadata = {}
111 logger = logging.getLogger("testLogInfo")
112 with self.assertLogs(level=logging.INFO) as cm:
113 logInfo(None, prefix="Prefix", metadata=metadata, logger=logger, logLevel=logging.INFO)
114 self.assertEqual(cm.records[0].filename, THIS_FILE)
115 self.assertIn("PrefixUtc", metadata)
116 self.assertIn("PrefixMaxResidentSetSize", metadata)
117 self.assertIn("nodeName", metadata)
118 self.assertEqual(metadata["__version__"], 1)
120 # Again with no log output.
121 logInfo(None, prefix="Prefix", metadata=metadata)
122 self.assertEqual(len(metadata["PrefixUtc"]), 2)
124 # With an explicit stacklevel.
125 with self.assertLogs(level=logging.INFO) as cm:
126 logInfo(
127 None, prefix="Prefix", metadata=metadata, logger=logger, logLevel=logging.INFO, stacklevel=0
128 )
129 self.assertEqual(cm.records[0].filename, "timer.py")
131 def assertTimer(self, duration, task):
132 # Call it twice to test the "add" functionality.
133 task.sleeper(duration)
134 task.sleeper(duration)
135 counter = 2
137 has_logger = getattr(task, "log", None) is not None and task.log is not None
138 has_metadata = getattr(task, "metadata", None) is not None and task.metadata is not None
140 if has_logger:
141 counter += 1
142 with self.assertLogs("timer.task", level=logging.DEBUG) as cm:
143 task.sleeper(duration)
144 self.assertEqual(cm.records[0].filename, THIS_FILE, "log message should originate from here")
146 if has_metadata:
147 self.assertEqual(len(task.metadata["sleeperStartUserTime"]), counter)
149 start = datetime.datetime.fromisoformat(task.metadata["sleeperStartUtc"][1])
150 end = datetime.datetime.fromisoformat(task.metadata["sleeperEndUtc"][1])
151 delta = end - start
152 delta_sec = delta.seconds + (delta.microseconds / 1e6)
153 self.assertGreaterEqual(delta_sec, duration)
155 def testTaskLike(self):
156 """Test timer on something that looks like a Task."""
157 # Call with different parameters.
158 parameters = (
159 (logging.getLogger("task"), {}),
160 (logging.getLogger("task"), None),
161 (None, {}),
162 (None, None),
163 )
165 duration = 0.1
166 for log, metadata in parameters:
167 with self.subTest(log=repr(log), metadata=repr(metadata)):
168 task = Example1(log=log, metadata=metadata)
169 self.assertTimer(duration, task)
170 exampleDuration = duration_from_timeMethod(task.metadata, "sleeper")
171 if metadata is not None:
172 self.assertGreater(exampleDuration, 0)
174 def testDecorated(self):
175 """Test timeMethod on non-Task like instances."""
176 duration = 0.1
178 # The "self" object shouldn't be usable but this should do nothing
179 # and not crash.
180 decorated_sleeper_nothing(self, duration)
182 # Use a function decorated for logging.
183 with self.assertLogs("timer.test_timer", level=logging.DEBUG) as cm:
184 decorated_sleeper_logger(self, duration)
185 self.assertEqual(cm.records[0].filename, THIS_FILE, "log message should originate from here")
187 # And adjust the log level
188 with self.assertLogs("timer.test_timer", level=logging.INFO):
189 decorated_sleeper_logger_level(self, duration)
191 # Use a function decorated for metadata.
192 self.assertEqual(len(test_metadata), 0)
193 with self.assertLogs("timer.test_timer", level=logging.DEBUG) as cm:
194 # Check that we only get a single log message and nothing from
195 # timeMethod itself.
196 decorated_sleeper_metadata(self, duration)
197 logging.getLogger("timer.test_timer").debug("sentinel")
198 self.assertEqual(len(cm.output), 1)
199 self.assertIn("decorated_sleeper_metadataStartUserTime", test_metadata)
201 def testDisabled(self):
202 """Test that setting the appropriate envvar disables the decorator."""
203 duration = 0.1
205 with unittest.mock.patch.dict(os.environ, {"LSST_UTILS_DISABLE_TIMER": "1"}, clear=True):
206 import decorator_test.disable_timer
208 # For an empty decorator, we have to check for the attribute that
209 # `functools.wraps` attaches to the function.
210 self.assertFalse(hasattr(decorator_test.disable_timer.sleep_and_nothing, "__wrapped__"))
212 # For a decorator with kwargs, no logs should be emitted.
213 with self.assertNoLogs("timer.disable_timer", level=logging.INFO):
214 decorator_test.disable_timer.sleep_and_log(self, duration)
217class TimerTestCase(unittest.TestCase):
218 """Test the timer functionality."""
220 def testTimer(self):
221 with self.assertLogs(level="DEBUG") as cm:
222 with time_this():
223 pass
224 self.assertEqual(cm.records[0].name, "timer")
225 self.assertEqual(cm.records[0].levelname, "DEBUG")
226 self.assertEqual(cm.records[0].filename, THIS_FILE)
228 with self.assertLogs(level="DEBUG") as cm:
229 with time_this(prefix=None):
230 pass
231 self.assertEqual(cm.records[0].name, "root")
232 self.assertEqual(cm.records[0].levelname, "DEBUG")
233 self.assertIn("Took", cm.output[0])
234 self.assertNotIn(": Took", cm.output[0])
235 self.assertNotIn("; ", cm.output[0])
236 self.assertEqual(cm.records[0].filename, THIS_FILE)
238 # Report memory usage.
239 with self.assertLogs(level="DEBUG") as cm:
240 with time_this(level=logging.DEBUG, prefix=None, mem_usage=True):
241 pass
242 self.assertEqual(cm.records[0].name, "root")
243 self.assertEqual(cm.records[0].levelname, "DEBUG")
244 self.assertIn("Took", cm.output[0])
245 self.assertIn("memory", cm.output[0])
246 self.assertIn("delta", cm.output[0])
247 self.assertIn("peak delta", cm.output[0])
248 self.assertIn("byte", cm.output[0])
250 # Request memory usage but with log level that will not issue it.
251 with self.assertLogs(level="INFO") as cm:
252 with time_this(level=logging.DEBUG, prefix=None, mem_usage=True):
253 pass
254 # Ensure that a log message is issued.
255 _log = logging.getLogger()
256 _log.info("info")
257 self.assertEqual(cm.records[0].name, "root")
258 self.assertEqual(cm.records[0].levelname, "INFO")
259 all = "\n".join(cm.output)
260 self.assertNotIn("Took", all)
261 self.assertNotIn("memory", all)
263 # Report memory usage including child processes.
264 with self.assertLogs(level="DEBUG") as cm:
265 with time_this(level=logging.DEBUG, prefix=None, mem_usage=True, mem_child=True):
266 pass
267 self.assertEqual(cm.records[0].name, "root")
268 self.assertEqual(cm.records[0].levelname, "DEBUG")
269 self.assertIn("Took", cm.output[0])
270 self.assertIn("memory", cm.output[0])
271 self.assertIn("delta", cm.output[0])
272 self.assertIn("peak delta", cm.output[0])
273 self.assertIn("byte", cm.output[0])
275 # Report memory usage, use non-default, but a valid memory unit.
276 with self.assertLogs(level="DEBUG") as cm:
277 with time_this(level=logging.DEBUG, prefix=None, mem_usage=True, mem_unit=u.kilobyte):
278 pass
279 self.assertEqual(cm.records[0].name, "root")
280 self.assertEqual(cm.records[0].levelname, "DEBUG")
281 self.assertIn("Took", cm.output[0])
282 self.assertIn("memory", cm.output[0])
283 self.assertIn("delta", cm.output[0])
284 self.assertIn("peak delta", cm.output[0])
285 self.assertIn("kbyte", cm.output[0])
287 # Report memory usage, use an invalid memory unit.
288 with self.assertLogs(level="DEBUG") as cm:
289 with time_this(level=logging.DEBUG, prefix=None, mem_usage=True, mem_unit=u.gram):
290 pass
291 self.assertEqual(cm.records[0].name, "lsst.utils.timer")
292 self.assertEqual(cm.records[0].levelname, "WARNING")
293 self.assertIn("Invalid", cm.output[0])
294 self.assertIn("byte", cm.output[0])
295 self.assertEqual(cm.records[1].name, "root")
296 self.assertEqual(cm.records[1].levelname, "DEBUG")
297 self.assertIn("Took", cm.output[1])
298 self.assertIn("memory", cm.output[1])
299 self.assertIn("delta", cm.output[1])
300 self.assertIn("peak delta", cm.output[1])
301 self.assertIn("byte", cm.output[1])
303 # Change logging level
304 with self.assertLogs(level="INFO") as cm:
305 with time_this(level=logging.INFO, prefix=None):
306 pass
307 self.assertEqual(cm.records[0].name, "root")
308 self.assertIn("Took", cm.output[0])
309 self.assertIn("seconds", cm.output[0])
311 # Use a new logger with a message.
312 msg = "Test message %d"
313 test_num = 42
314 logname = "test"
315 with self.assertLogs(level="DEBUG") as cm:
316 with time_this(log=logging.getLogger(logname), msg=msg, args=(42,), prefix=None):
317 pass
318 self.assertEqual(cm.records[0].name, logname)
319 self.assertIn("Took", cm.output[0])
320 self.assertIn(msg % test_num, cm.output[0])
322 # Now with kwargs.
323 msg = "Test message %d"
324 test_num = 42
325 logname = "test"
326 kwargs = {"extra_info": "extra", "value": 3}
327 with self.assertLogs(level="DEBUG") as cm:
328 with time_this(
329 log=logging.getLogger(logname), msg=msg, args=(test_num,), kwargs=kwargs, prefix=None
330 ):
331 pass
332 self.assertEqual(cm.records[0].name, logname)
333 self.assertIn("Took", cm.output[0])
334 self.assertIn(msg % test_num, cm.output[0])
335 self.assertIn("extra_info='extra'", cm.output[0])
336 self.assertIn("value=3", cm.output[0])
338 # Prefix the logger.
339 prefix = "prefix"
340 with self.assertLogs(level="DEBUG") as cm:
341 with time_this(prefix=prefix):
342 pass
343 self.assertEqual(cm.records[0].name, prefix)
344 self.assertIn("Took", cm.output[0])
346 # Prefix explicit logger.
347 with self.assertLogs(level="DEBUG") as cm:
348 with time_this(log=logging.getLogger(logname), prefix=prefix):
349 pass
350 self.assertEqual(cm.records[0].name, f"{prefix}.{logname}")
352 # Trigger a problem.
353 with self.assertLogs(level="DEBUG") as cm:
354 with self.assertRaises(RuntimeError):
355 with time_this(log=logging.getLogger(logname), prefix=prefix):
356 raise RuntimeError("A problem %s")
357 self.assertEqual(cm.records[0].name, f"{prefix}.{logname}")
358 self.assertIn("A problem %s", cm.records[0].message)
359 self.assertEqual(cm.records[0].levelname, "DEBUG")
361 def test_time_this_return(self):
362 """Test that the context manager returns the duration."""
363 # Return duration but not memory usage.
364 with self.assertNoLogs(level="INFO"):
365 with time_this(level=logging.DEBUG, prefix=None, mem_usage=False) as timer:
366 time.sleep(0.01)
367 self.assertGreater(timer.duration, 0.0)
368 self.assertIsNone(timer.mem_current_usage)
370 # Ask for memory usage that will be calculated.
371 with self.assertLogs(level="DEBUG"):
372 # mem usage will be requested but not calculated.
373 with time_this(level=logging.DEBUG, prefix=None, mem_usage=True) as timer:
374 time.sleep(0.01)
375 self.assertGreater(timer.duration, 0.0)
376 self.assertIsInstance(timer.mem_current_delta, float)
378 # Ask for memory usage but will not be calculated.
379 with self.assertNoLogs(level="WARNING"):
380 # mem usage will be requested but not calculated.
381 with time_this(level=logging.DEBUG, prefix=None, mem_usage=True) as timer:
382 time.sleep(0.01)
383 self.assertGreater(timer.duration, 0.0)
384 self.assertIsNone(timer.mem_current_usage)
386 # Require memory usage is returned in context manager.
387 with self.assertNoLogs(level="WARNING"):
388 with time_this(level=logging.DEBUG, prefix=None, force_mem_usage=True) as timer:
389 time.sleep(0.01)
390 self.assertGreater(timer.duration, 0.0)
391 self.assertIsInstance(timer.mem_current_usage, float)
393 def test_structlog(self):
394 """Test that the timer works with structlog loggers."""
395 try:
396 import structlog
397 from structlog.testing import capture_logs
398 except ImportError:
399 self.skipTest("structlog is not installed")
401 msg = "Test message %d"
402 test_num = 42
403 kwargs = {"extra_info": "extra", "value": 3}
405 with capture_logs() as cap:
406 slog = structlog.get_logger("structlog_timer")
407 with time_this(log=slog, msg=msg, args=(test_num,), kwargs=kwargs):
408 pass
409 self.assertEqual(cap[0]["value"], 3)
410 self.assertEqual(cap[0]["extra_info"], "extra")
411 self.assertGreaterEqual(cap[0]["duration"], 0.0)
412 self.assertIn("Test message 42", cap[0]["event"])
415class ProfileTestCase(unittest.TestCase):
416 """Test profiling decorator."""
418 def test_profile(self):
419 logger = logging.getLogger("profile")
421 with profile(None) as prof:
422 pass
423 self.assertIsNone(prof)
425 with tempfile.NamedTemporaryFile() as tmp:
426 with self.assertLogs("profile", level=logging.INFO) as cm:
427 with profile(tmp.name, logger) as prof:
428 pass
429 self.assertEqual(len(cm.output), 2)
430 self.assertIsNotNone(prof)
431 self.assertTrue(os.path.exists(tmp.name))
432 self.assertIsInstance(pstats.Stats(tmp.name), pstats.Stats)
435if __name__ == "__main__":
436 unittest.main()