Coverage for python / lsst / dax / apdb / monitor.py: 26%
139 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-21 10:35 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-21 10:35 +0000
1# This file is part of dax_apdb.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <http://www.gnu.org/licenses/>.
22from __future__ import annotations
24__all__ = ["LoggingMonHandler", "MonAgent", "MonService"]
26import contextlib
27import json
28import logging
29import os
30import time
31import warnings
32from abc import ABC, abstractmethod
33from collections.abc import Iterable, Iterator, Mapping
34from typing import TYPE_CHECKING, Any
36from lsst.utils.classes import Singleton
38if TYPE_CHECKING:
39 from contextlib import AbstractContextManager
41_TagsType = Mapping[str, str | int]
44_CONFIG_ENV = "DAX_APDB_MONITOR_CONFIG"
45"""Name of the envvar specifying service configuration."""
48class MonHandler(ABC):
49 """Interface for handlers of the monitoring records.
51 Handlers are responsible for delivering monitoring records to their final
52 destination, for example log file or time-series database.
53 """
55 @abstractmethod
56 def handle(
57 self, name: str, timestamp: float, tags: _TagsType, values: Mapping[str, Any], agent_name: str
58 ) -> None:
59 """Handle one monitoring record.
61 Parameters
62 ----------
63 name : `str`
64 Record name, arbitrary string.
65 timestamp : `str`
66 Time in seconds since UNIX epoch when record originated.
67 tags : `~collections.abc.Mapping` [`str`, `str` or `int`]
68 Tags associated with the record, may be empty.
69 values : `~collections.abc.Mapping` [`str`, `Any`]
70 Values associated with the record, usually never empty.
71 agent_name `str`
72 Name of a client agent that produced this record.
73 """
74 raise NotImplementedError()
77class MonAgent:
78 """Client-side interface for adding monitoring records to the monitoring
79 service.
81 Parameters
82 ----------
83 name : `str`
84 Client agent name, this is used for filtering of the records by the
85 service and is also passed to monitoring handler as ``agent_name``.
86 """
88 def __init__(self, name: str = ""):
89 self._name = name
90 self._service = MonService()
92 def add_record(
93 self,
94 name: str,
95 *,
96 values: Mapping[str, Any],
97 tags: Mapping[str, str | int] | None = None,
98 timestamp: float | None = None,
99 ) -> None:
100 """Send one record to monitoring service.
102 Parameters
103 ----------
104 name : `str`
105 Record name, arbitrary string.
106 values : `~collections.abc.Mapping` [`str`, `Any`]
107 Values associated with the record, usually never empty.
108 tags : `~collections.abc.Mapping` [`str`, `str` or `int`]
109 Tags associated with the record, may be empty.
110 timestamp : `str`
111 Time in seconds since UNIX epoch when record originated.
112 """
113 self._service._add_record(
114 agent_name=self._name,
115 record_name=name,
116 tags=tags,
117 values=values,
118 timestamp=timestamp,
119 )
121 def context_tags(self, tags: _TagsType) -> AbstractContextManager[None]:
122 """Context manager that adds a set of tags to all records created
123 inside the context.
125 Parameters
126 ----------
127 tags : `~collections.abc.Mapping` [`str`, `str` or `int`]
128 Tags associated with the records.
130 Notes
131 -----
132 All calls to `add_record` that happen inside the corresponding context
133 will add tags specified in this call. Tags specified in `add_record`
134 will override matching tag names that are passed to this method. On
135 exit from context a previous tag context is restored (which may be
136 empty).
137 """
138 return self._service.context_tags(tags)
141class MonFilter:
142 """Filter for the names associated with client agents.
144 Parameters
145 ----------
146 rule : `str`
147 String specifying filtering rule for a single name, or catch-all rule.
148 The rule consist of the agent name prefixed by minus or optional plus
149 sign. Catch-all rule uses name "any". If the rule starts with minus
150 sign then matching agent will be rejected. Otherwise matching agent
151 is accepted.
152 """
154 def __init__(self, rule: str):
155 self._accept = True
156 if rule.startswith("-"):
157 self._accept = False
158 rule = rule[1:]
159 elif rule.startswith("+"):
160 rule = rule[1:]
161 self.agent_name = "" if rule == "any" else rule
163 def is_match_all(self) -> bool:
164 """Return `True` if this rule is a catch-all rule.
166 Returns
167 -------
168 is_match_all : `bool`
169 `True` if rule name is `-any`, `+any`, or `any`.
170 """
171 return not self.agent_name
173 def accept(self, agent_name: str) -> bool | None:
174 """Return filtering decision for specified agent name.
176 Parameters
177 ----------
178 agent_name : `str`
179 Name of the client agent that produces monitoring record.
181 Returns
182 -------
183 decision : `bool` or `None`
184 `True` if the agent is accepted, `False` if agent is rejected.
185 `None` is returned if this rule does not match agent name and
186 decision should be made by the next rule.
187 """
188 if not self.agent_name or agent_name == self.agent_name:
189 return self._accept
190 return None
193class MonService(metaclass=Singleton):
194 """Class implementing monitoring service functionality.
196 Notes
197 -----
198 This is a singleton class which serves all client agents in an application.
199 It accepts records from agents, filters it based on a set of configured
200 rules and forwards them to one or more configured handlers. By default
201 there are no handlers defined which means that all records are discarded.
202 Default set of filtering rules is empty which accepts all agent names.
204 To produce a useful output from this service one has to add at least one
205 handler using `add_handler` method (e.g. `LoggingMonHandler` instance).
206 The `set_filters` methods can be used to specify the set of filtering
207 rules.
208 """
210 _handlers: list[MonHandler] = []
211 """List of active handlers."""
213 _context_tags: _TagsType | None = None
214 """Current tag context, these tags are added to each new record."""
216 _filters: list[MonFilter] = []
217 """Sequence of filters for agent names."""
219 _initialized: bool = False
220 """False before initialization."""
222 def set_filters(self, rules: Iterable[str]) -> None:
223 """Define a sequence of rules for filtering of the agent names.
225 Parameters
226 ----------
227 rules : `~collections.abc.Iterable` [`str`]
228 Ordered collection of rules. Each string specifies filtering rule
229 for a single name, or catch-all rule. The rule consist of the
230 agent name prefixed by minus or optional plus sign. Catch-all rule
231 uses name "any". If the rule starts with minus sign then matching
232 agent will be rejected. Otherwise matching agent is accepted.
234 Notes
235 -----
236 The catch-all rule (`-any`, `+any`, or `any`) can be specified in any
237 location in the sequence but it is always applied last. E.g.
238 `["-any", "+agent1"]` behaves the same as `["+agent1", "-any"]`.
239 If the set of rues does not include catch-all rule, filtering behaves
240 as if it is added implicitly as `+any`.
242 Filtering code evaluates each rule in order. First rule that matches
243 the agent name wins. Agent names are matched literally, wildcards are
244 not supported and there are no parent/child relations between agent
245 names (e.g `lsst.dax.apdb` and `lsst.dax.apdb.sql` are treated as
246 independent names).
247 """
248 match_all: MonFilter | None = None
249 self._filters = []
250 for rule in rules:
251 mon_filter = MonFilter(rule)
252 if mon_filter.is_match_all():
253 match_all = mon_filter
254 else:
255 self._filters.append(mon_filter)
256 if match_all:
257 self._filters.append(match_all)
259 def _add_record(
260 self,
261 *,
262 agent_name: str,
263 record_name: str,
264 values: Mapping[str, Any],
265 tags: Mapping[str, str | int] | None = None,
266 timestamp: float | None = None,
267 ) -> None:
268 """Add one monitoring record, this method is for use by agents only."""
269 if not self._initialized:
270 try:
271 self._default_init()
272 self._initialized = True
273 except Exception as exc:
274 # Complain but continue.
275 message = f"Error in configuration of monitoring service: {exc}"
276 # Stack level does not really matter.
277 warnings.warn(message, stacklevel=3)
278 if self._handlers:
279 accept: bool | None = None
280 # Check every filter, accept if none makes any decision.
281 for filter in self._filters:
282 accept = filter.accept(agent_name)
283 if accept is False:
284 return
285 if accept is True:
286 break
287 if timestamp is None:
288 timestamp = time.time()
289 if tags is None:
290 tags = self._context_tags or {}
291 else:
292 if self._context_tags:
293 all_tags = dict(self._context_tags)
294 all_tags.update(tags)
295 tags = all_tags
296 for handler in self._handlers:
297 handler.handle(record_name, timestamp, tags, values, agent_name)
299 def _default_init(self) -> None:
300 """Perform default initialization of the service."""
301 if env := os.environ.get(_CONFIG_ENV):
302 # Configuration is specified as colon-separated list of key:value
303 # pairs or simple values. Simple values are treated as filters
304 # (see set_filters for syntax). key-values pairs pairs specify
305 # handlers, for now the only supported handler is logging, it
306 # is specified as "logging:<logger-name>[:<level>]".
307 filters = []
308 handlers: list[MonHandler] = []
309 for item in env.split(","):
310 pieces = item.split(":")
311 if len(pieces) in (2, 3) and pieces[0] == "logging":
312 logger_name = pieces[1]
313 if len(pieces) == 3:
314 level_name = pieces[2]
315 level = logging.getLevelNamesMapping().get(level_name.upper())
316 if level is None:
317 raise ValueError(
318 f"Unknown logging level name {level_name!r} in {_CONFIG_ENV}={env!r}"
319 )
320 else:
321 level = logging.INFO
322 handlers.append(LoggingMonHandler(logger_name, level))
323 elif len(pieces) == 1:
324 filters.extend(pieces)
325 else:
326 raise ValueError(f"Unexpected format of item {item!r} in {_CONFIG_ENV}={env!r}")
327 for handler in handlers:
328 self.add_handler(handler)
329 self.set_filters(filters)
331 @property
332 def handlers(self) -> Iterable[MonHandler]:
333 """Set of handlers defined currently."""
334 return self._handlers
336 def add_handler(self, handler: MonHandler) -> None:
337 """Add one monitoring handler.
339 Parameters
340 ----------
341 handler : `MonHandler`
342 Handler instance.
343 """
344 # Manually adding handler means default initialization should be
345 # skipped.
346 self._initialized = True
347 if handler not in self._handlers:
348 self._handlers.append(handler)
350 def remove_handler(self, handler: MonHandler) -> None:
351 """Remove a monitoring handler.
353 Parameters
354 ----------
355 handler : `MonHandler`
356 Handler instance.
357 """
358 if handler in self._handlers:
359 self._handlers.remove(handler)
361 def _add_context_tags(self, tags: _TagsType) -> _TagsType | None:
362 """Extend the tag context with new tags, overriding any tags that may
363 already exist in a current context.
364 """
365 old_tags = self._context_tags
366 if not self._context_tags:
367 self._context_tags = tags
368 else:
369 all_tags = dict(self._context_tags)
370 all_tags.update(tags)
371 self._context_tags = all_tags
372 return old_tags
374 @contextlib.contextmanager
375 def context_tags(self, tags: _TagsType) -> Iterator[None]:
376 """Context manager that adds a set of tags to all records created
377 inside the context.
379 Typically clients will be using `MonAgent.context_tags`, which forwards
380 to this method.
381 """
382 old_context = self._add_context_tags(tags)
383 try:
384 yield
385 finally:
386 # Restore old context.
387 self._context_tags = old_context
390class LoggingMonHandler(MonHandler):
391 """Implementation of the monitoring handler which dumps records formatted
392 as JSON objects to `logging`.
394 Parameters
395 ----------
396 logger_name : `str`
397 Name of the `logging` logger to use for output.
398 log_level : `int`, optional
399 Logging level to use for output, default is `INFO`
401 Notes
402 -----
403 The attributes of the formatted JSON object correspond to the parameters
404 of `handle` method, except for `agent_name` which is mapped to `source`.
405 The `tags` and `values` become JSON sub-objects with corresponding keys.
406 """
408 def __init__(self, logger_name: str, log_level: int = logging.INFO):
409 self._logger = logging.getLogger(logger_name)
410 self._level = log_level
412 def handle(
413 self, name: str, timestamp: float, tags: _TagsType, values: Mapping[str, Any], agent_name: str
414 ) -> None:
415 # Docstring is inherited from base class.
416 record = {
417 "name": name,
418 "timestamp": timestamp,
419 "tags": tags,
420 "values": values,
421 "source": agent_name,
422 }
423 msg = json.dumps(record)
424 self._logger.log(self._level, msg)