Coverage for python / lsst / pipe / base / quantum_graph / aggregator / _progress.py: 21%

83 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-24 08:19 +0000

1# This file is part of pipe_base. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28from __future__ import annotations 

29 

30__all__ = ("ProgressCounter", "ProgressManager", "make_worker_log") 

31 

32import logging 

33import os 

34import time 

35from types import TracebackType 

36from typing import Any, Self 

37 

38from lsst.utils.logging import TRACE, VERBOSE, LsstLogAdapter, PeriodicLogger, getLogger 

39 

40from ._config import AggregatorConfig 

41 

42 

43class ProgressCounter: 

44 """A progress tracker for an individual aspect of the aggregation process. 

45 

46 Parameters 

47 ---------- 

48 parent : `ProgressManager` 

49 The parent progress manager object. 

50 description : `str` 

51 Human-readable description of this aspect. 

52 unit : `str` 

53 Unit (in plural form) for the items being counted. 

54 total : `int`, optional 

55 Expected total number of items. May be set later. 

56 """ 

57 

58 def __init__(self, parent: ProgressManager, description: str, unit: str, total: int | None = None): 

59 self._parent = parent 

60 self.total = total 

61 self._description = description 

62 self._current = 0 

63 self._unit = unit 

64 self._bar: Any = None 

65 

66 def update(self, n: int) -> None: 

67 """Report that ``n`` new items have been processed. 

68 

69 Parameters 

70 ---------- 

71 n : `int` 

72 Number of new items processed. 

73 """ 

74 self._current += n 

75 if self._parent.interactive: 

76 if self._bar is None: 

77 if n == self.total: 

78 return 

79 from tqdm import tqdm 

80 

81 self._bar = tqdm(desc=self._description, total=self.total, leave=False, unit=f" {self._unit}") 

82 else: 

83 self._bar.update(n) 

84 if self._current == self.total: 

85 self._bar.close() 

86 self._parent._log_status() 

87 

88 def close(self) -> None: 

89 """Close the counter, guaranteeing that `update` will not be called 

90 again. 

91 """ 

92 if self._bar is not None: 

93 self._bar.close() 

94 self._bar = None 

95 

96 def append_log_terms(self, msg: list[str]) -> None: 

97 """Append a log message for this counter to a list if it is active. 

98 

99 Parameters 

100 ---------- 

101 msg : `list` [ `str` ] 

102 List of messages to concatenate into a single line and log 

103 together, to be modified in-place. 

104 """ 

105 if self.total is not None and self._current > 0 and self._current < self.total: 

106 msg.append(f"{self._description} ({self._current} of {self.total} {self._unit})") 

107 

108 

109class ProgressManager: 

110 """A helper class for the provenance aggregator that handles reporting 

111 progress to the user. 

112 

113 This includes both logging (including periodic logging) and optional 

114 progress bars. 

115 

116 Parameters 

117 ---------- 

118 log : `lsst.utils.logging.LsstLogAdapter` 

119 LSST-customized logger. 

120 config : `AggregatorConfig` 

121 Configuration for the aggregator. 

122 

123 Notes 

124 ----- 

125 This class is a context manager in order to manage the redirection of 

126 logging when progress bars for interactive display are in use. The context 

127 manager does nothing otherwise. 

128 """ 

129 

130 def __init__(self, log: LsstLogAdapter, config: AggregatorConfig): 

131 self.start = time.time() 

132 self.log = log 

133 self.config = config 

134 self._periodic_log = PeriodicLogger(self.log, config.log_status_interval) 

135 self.scans = ProgressCounter(self, "scanning", "quanta") 

136 self.writes = ProgressCounter(self, "writing", "quanta") 

137 self.quantum_ingests = ProgressCounter(self, "ingesting outputs", "quanta") 

138 self.interactive = config.interactive_status 

139 

140 def __enter__(self) -> Self: 

141 if self.interactive: 

142 from tqdm.contrib.logging import logging_redirect_tqdm 

143 

144 self._logging_redirect = logging_redirect_tqdm() 

145 self._logging_redirect.__enter__() 

146 return self 

147 

148 def __exit__( 

149 self, 

150 exc_type: type[BaseException] | None, 

151 exc_value: BaseException | None, 

152 traceback: TracebackType | None, 

153 ) -> bool | None: 

154 if self.interactive: 

155 self._logging_redirect.__exit__(exc_type, exc_value, traceback) 

156 return None 

157 

158 @property 

159 def elapsed_time(self) -> float: 

160 """The time in seconds since the start of the aggregator.""" 

161 return time.time() - self.start 

162 

163 def _log_status(self) -> None: 

164 """Invoke the periodic logger with the current status.""" 

165 log_terms: list[str] = [] 

166 self.scans.append_log_terms(log_terms) 

167 self.writes.append_log_terms(log_terms) 

168 self.quantum_ingests.append_log_terms(log_terms) 

169 self._periodic_log.log("Status after %0.1fs: %s.", self.elapsed_time, "; ".join(log_terms)) 

170 

171 

172def make_worker_log(name: str, config: AggregatorConfig) -> LsstLogAdapter: 

173 """Make a logger for a worker. 

174 

175 Parameters 

176 ---------- 

177 name : `str` 

178 Name of the worker, to be used as part of the name for the logger. 

179 config : `AggregatorConfig` 

180 Configuration for the aggregator. 

181 """ 

182 base_log = logging.getLogger(f"lsst.pipe.base.quantum_graph.aggregator.{name}") 

183 base_log.propagate = False 

184 log = getLogger(logger=base_log) 

185 if config.worker_log_dir is not None: 

186 os.makedirs(config.worker_log_dir, exist_ok=True) 

187 match config.worker_log_level.upper(): 

188 case "VERBOSE": 

189 log.setLevel(VERBOSE) 

190 case "TRACE": 

191 log.setLevel(TRACE) 

192 case std: 

193 log.setLevel(getattr(logging, std)) 

194 handler = logging.FileHandler(os.path.join(config.worker_log_dir, f"{name}.log")) 

195 handler.setFormatter( 

196 logging.Formatter("%(levelname)s %(asctime)s.%(msecs)03d %(message)s", "%Y-%m-%dT%H:%M:%S") 

197 ) 

198 log.addHandler(handler) 

199 else: 

200 log.addHandler(logging.NullHandler()) 

201 return log