Coverage for python / lsst / pipe / base / quantum_graph / aggregator / _progress.py: 21%
83 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-18 08:44 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-18 08:44 +0000
1# This file is part of pipe_base.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28from __future__ import annotations
30__all__ = ("ProgressCounter", "ProgressManager", "make_worker_log")
32import logging
33import os
34import time
35from types import TracebackType
36from typing import Any, Self
38from lsst.utils.logging import TRACE, VERBOSE, LsstLogAdapter, PeriodicLogger, getLogger
40from ._config import AggregatorConfig
43class ProgressCounter:
44 """A progress tracker for an individual aspect of the aggregation process.
46 Parameters
47 ----------
48 parent : `ProgressManager`
49 The parent progress manager object.
50 description : `str`
51 Human-readable description of this aspect.
52 unit : `str`
53 Unit (in plural form) for the items being counted.
54 total : `int`, optional
55 Expected total number of items. May be set later.
56 """
58 def __init__(self, parent: ProgressManager, description: str, unit: str, total: int | None = None):
59 self._parent = parent
60 self.total = total
61 self._description = description
62 self._current = 0
63 self._unit = unit
64 self._bar: Any = None
66 def update(self, n: int) -> None:
67 """Report that ``n`` new items have been processed.
69 Parameters
70 ----------
71 n : `int`
72 Number of new items processed.
73 """
74 self._current += n
75 if self._parent.interactive:
76 if self._bar is None:
77 if n == self.total:
78 return
79 from tqdm import tqdm
81 self._bar = tqdm(desc=self._description, total=self.total, leave=False, unit=f" {self._unit}")
82 else:
83 self._bar.update(n)
84 if self._current == self.total:
85 self._bar.close()
86 self._parent._log_status()
88 def close(self) -> None:
89 """Close the counter, guaranteeing that `update` will not be called
90 again.
91 """
92 if self._bar is not None:
93 self._bar.close()
94 self._bar = None
96 def append_log_terms(self, msg: list[str]) -> None:
97 """Append a log message for this counter to a list if it is active.
99 Parameters
100 ----------
101 msg : `list` [ `str` ]
102 List of messages to concatenate into a single line and log
103 together, to be modified in-place.
104 """
105 if self.total is not None and self._current > 0 and self._current < self.total:
106 msg.append(f"{self._description} ({self._current} of {self.total} {self._unit})")
109class ProgressManager:
110 """A helper class for the provenance aggregator that handles reporting
111 progress to the user.
113 This includes both logging (including periodic logging) and optional
114 progress bars.
116 Parameters
117 ----------
118 log : `lsst.utils.logging.LsstLogAdapter`
119 LSST-customized logger.
120 config : `AggregatorConfig`
121 Configuration for the aggregator.
123 Notes
124 -----
125 This class is a context manager in order to manage the redirection of
126 logging when progress bars for interactive display are in use. The context
127 manager does nothing otherwise.
128 """
130 def __init__(self, log: LsstLogAdapter, config: AggregatorConfig):
131 self.start = time.time()
132 self.log = log
133 self.config = config
134 self._periodic_log = PeriodicLogger(self.log, config.log_status_interval)
135 self.scans = ProgressCounter(self, "scanning", "quanta")
136 self.writes = ProgressCounter(self, "writing", "quanta")
137 self.quantum_ingests = ProgressCounter(self, "ingesting outputs", "quanta")
138 self.interactive = config.interactive_status
140 def __enter__(self) -> Self:
141 if self.interactive:
142 from tqdm.contrib.logging import logging_redirect_tqdm
144 self._logging_redirect = logging_redirect_tqdm()
145 self._logging_redirect.__enter__()
146 return self
148 def __exit__(
149 self,
150 exc_type: type[BaseException] | None,
151 exc_value: BaseException | None,
152 traceback: TracebackType | None,
153 ) -> bool | None:
154 if self.interactive:
155 self._logging_redirect.__exit__(exc_type, exc_value, traceback)
156 return None
158 @property
159 def elapsed_time(self) -> float:
160 """The time in seconds since the start of the aggregator."""
161 return time.time() - self.start
163 def _log_status(self) -> None:
164 """Invoke the periodic logger with the current status."""
165 log_terms: list[str] = []
166 self.scans.append_log_terms(log_terms)
167 self.writes.append_log_terms(log_terms)
168 self.quantum_ingests.append_log_terms(log_terms)
169 self._periodic_log.log("Status after %0.1fs: %s.", self.elapsed_time, "; ".join(log_terms))
172def make_worker_log(name: str, config: AggregatorConfig) -> LsstLogAdapter:
173 """Make a logger for a worker.
175 Parameters
176 ----------
177 name : `str`
178 Name of the worker, to be used as part of the name for the logger.
179 config : `AggregatorConfig`
180 Configuration for the aggregator.
181 """
182 base_log = logging.getLogger(f"lsst.pipe.base.quantum_graph.aggregator.{name}")
183 base_log.propagate = False
184 log = getLogger(logger=base_log)
185 if config.worker_log_dir is not None:
186 os.makedirs(config.worker_log_dir, exist_ok=True)
187 match config.worker_log_level.upper():
188 case "VERBOSE":
189 log.setLevel(VERBOSE)
190 case "TRACE":
191 log.setLevel(TRACE)
192 case std:
193 log.setLevel(getattr(logging, std))
194 handler = logging.FileHandler(os.path.join(config.worker_log_dir, f"{name}.log"))
195 handler.setFormatter(
196 logging.Formatter("%(levelname)s %(asctime)s.%(msecs)03d %(message)s", "%Y-%m-%dT%H:%M:%S")
197 )
198 log.addHandler(handler)
199 else:
200 log.addHandler(logging.NullHandler())
201 return log