Coverage for python / lsst / dax / apdb / monitor.py: 26%

139 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-21 10:35 +0000

1# This file is part of dax_apdb. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <http://www.gnu.org/licenses/>. 

21 

22from __future__ import annotations 

23 

24__all__ = ["LoggingMonHandler", "MonAgent", "MonService"] 

25 

26import contextlib 

27import json 

28import logging 

29import os 

30import time 

31import warnings 

32from abc import ABC, abstractmethod 

33from collections.abc import Iterable, Iterator, Mapping 

34from typing import TYPE_CHECKING, Any 

35 

36from lsst.utils.classes import Singleton 

37 

38if TYPE_CHECKING: 

39 from contextlib import AbstractContextManager 

40 

41_TagsType = Mapping[str, str | int] 

42 

43 

44_CONFIG_ENV = "DAX_APDB_MONITOR_CONFIG" 

45"""Name of the envvar specifying service configuration.""" 

46 

47 

48class MonHandler(ABC): 

49 """Interface for handlers of the monitoring records. 

50 

51 Handlers are responsible for delivering monitoring records to their final 

52 destination, for example log file or time-series database. 

53 """ 

54 

55 @abstractmethod 

56 def handle( 

57 self, name: str, timestamp: float, tags: _TagsType, values: Mapping[str, Any], agent_name: str 

58 ) -> None: 

59 """Handle one monitoring record. 

60 

61 Parameters 

62 ---------- 

63 name : `str` 

64 Record name, arbitrary string. 

65 timestamp : `str` 

66 Time in seconds since UNIX epoch when record originated. 

67 tags : `~collections.abc.Mapping` [`str`, `str` or `int`] 

68 Tags associated with the record, may be empty. 

69 values : `~collections.abc.Mapping` [`str`, `Any`] 

70 Values associated with the record, usually never empty. 

71 agent_name `str` 

72 Name of a client agent that produced this record. 

73 """ 

74 raise NotImplementedError() 

75 

76 

77class MonAgent: 

78 """Client-side interface for adding monitoring records to the monitoring 

79 service. 

80 

81 Parameters 

82 ---------- 

83 name : `str` 

84 Client agent name, this is used for filtering of the records by the 

85 service and is also passed to monitoring handler as ``agent_name``. 

86 """ 

87 

88 def __init__(self, name: str = ""): 

89 self._name = name 

90 self._service = MonService() 

91 

92 def add_record( 

93 self, 

94 name: str, 

95 *, 

96 values: Mapping[str, Any], 

97 tags: Mapping[str, str | int] | None = None, 

98 timestamp: float | None = None, 

99 ) -> None: 

100 """Send one record to monitoring service. 

101 

102 Parameters 

103 ---------- 

104 name : `str` 

105 Record name, arbitrary string. 

106 values : `~collections.abc.Mapping` [`str`, `Any`] 

107 Values associated with the record, usually never empty. 

108 tags : `~collections.abc.Mapping` [`str`, `str` or `int`] 

109 Tags associated with the record, may be empty. 

110 timestamp : `str` 

111 Time in seconds since UNIX epoch when record originated. 

112 """ 

113 self._service._add_record( 

114 agent_name=self._name, 

115 record_name=name, 

116 tags=tags, 

117 values=values, 

118 timestamp=timestamp, 

119 ) 

120 

121 def context_tags(self, tags: _TagsType) -> AbstractContextManager[None]: 

122 """Context manager that adds a set of tags to all records created 

123 inside the context. 

124 

125 Parameters 

126 ---------- 

127 tags : `~collections.abc.Mapping` [`str`, `str` or `int`] 

128 Tags associated with the records. 

129 

130 Notes 

131 ----- 

132 All calls to `add_record` that happen inside the corresponding context 

133 will add tags specified in this call. Tags specified in `add_record` 

134 will override matching tag names that are passed to this method. On 

135 exit from context a previous tag context is restored (which may be 

136 empty). 

137 """ 

138 return self._service.context_tags(tags) 

139 

140 

141class MonFilter: 

142 """Filter for the names associated with client agents. 

143 

144 Parameters 

145 ---------- 

146 rule : `str` 

147 String specifying filtering rule for a single name, or catch-all rule. 

148 The rule consist of the agent name prefixed by minus or optional plus 

149 sign. Catch-all rule uses name "any". If the rule starts with minus 

150 sign then matching agent will be rejected. Otherwise matching agent 

151 is accepted. 

152 """ 

153 

154 def __init__(self, rule: str): 

155 self._accept = True 

156 if rule.startswith("-"): 

157 self._accept = False 

158 rule = rule[1:] 

159 elif rule.startswith("+"): 

160 rule = rule[1:] 

161 self.agent_name = "" if rule == "any" else rule 

162 

163 def is_match_all(self) -> bool: 

164 """Return `True` if this rule is a catch-all rule. 

165 

166 Returns 

167 ------- 

168 is_match_all : `bool` 

169 `True` if rule name is `-any`, `+any`, or `any`. 

170 """ 

171 return not self.agent_name 

172 

173 def accept(self, agent_name: str) -> bool | None: 

174 """Return filtering decision for specified agent name. 

175 

176 Parameters 

177 ---------- 

178 agent_name : `str` 

179 Name of the client agent that produces monitoring record. 

180 

181 Returns 

182 ------- 

183 decision : `bool` or `None` 

184 `True` if the agent is accepted, `False` if agent is rejected. 

185 `None` is returned if this rule does not match agent name and 

186 decision should be made by the next rule. 

187 """ 

188 if not self.agent_name or agent_name == self.agent_name: 

189 return self._accept 

190 return None 

191 

192 

193class MonService(metaclass=Singleton): 

194 """Class implementing monitoring service functionality. 

195 

196 Notes 

197 ----- 

198 This is a singleton class which serves all client agents in an application. 

199 It accepts records from agents, filters it based on a set of configured 

200 rules and forwards them to one or more configured handlers. By default 

201 there are no handlers defined which means that all records are discarded. 

202 Default set of filtering rules is empty which accepts all agent names. 

203 

204 To produce a useful output from this service one has to add at least one 

205 handler using `add_handler` method (e.g. `LoggingMonHandler` instance). 

206 The `set_filters` methods can be used to specify the set of filtering 

207 rules. 

208 """ 

209 

210 _handlers: list[MonHandler] = [] 

211 """List of active handlers.""" 

212 

213 _context_tags: _TagsType | None = None 

214 """Current tag context, these tags are added to each new record.""" 

215 

216 _filters: list[MonFilter] = [] 

217 """Sequence of filters for agent names.""" 

218 

219 _initialized: bool = False 

220 """False before initialization.""" 

221 

222 def set_filters(self, rules: Iterable[str]) -> None: 

223 """Define a sequence of rules for filtering of the agent names. 

224 

225 Parameters 

226 ---------- 

227 rules : `~collections.abc.Iterable` [`str`] 

228 Ordered collection of rules. Each string specifies filtering rule 

229 for a single name, or catch-all rule. The rule consist of the 

230 agent name prefixed by minus or optional plus sign. Catch-all rule 

231 uses name "any". If the rule starts with minus sign then matching 

232 agent will be rejected. Otherwise matching agent is accepted. 

233 

234 Notes 

235 ----- 

236 The catch-all rule (`-any`, `+any`, or `any`) can be specified in any 

237 location in the sequence but it is always applied last. E.g. 

238 `["-any", "+agent1"]` behaves the same as `["+agent1", "-any"]`. 

239 If the set of rues does not include catch-all rule, filtering behaves 

240 as if it is added implicitly as `+any`. 

241 

242 Filtering code evaluates each rule in order. First rule that matches 

243 the agent name wins. Agent names are matched literally, wildcards are 

244 not supported and there are no parent/child relations between agent 

245 names (e.g `lsst.dax.apdb` and `lsst.dax.apdb.sql` are treated as 

246 independent names). 

247 """ 

248 match_all: MonFilter | None = None 

249 self._filters = [] 

250 for rule in rules: 

251 mon_filter = MonFilter(rule) 

252 if mon_filter.is_match_all(): 

253 match_all = mon_filter 

254 else: 

255 self._filters.append(mon_filter) 

256 if match_all: 

257 self._filters.append(match_all) 

258 

259 def _add_record( 

260 self, 

261 *, 

262 agent_name: str, 

263 record_name: str, 

264 values: Mapping[str, Any], 

265 tags: Mapping[str, str | int] | None = None, 

266 timestamp: float | None = None, 

267 ) -> None: 

268 """Add one monitoring record, this method is for use by agents only.""" 

269 if not self._initialized: 

270 try: 

271 self._default_init() 

272 self._initialized = True 

273 except Exception as exc: 

274 # Complain but continue. 

275 message = f"Error in configuration of monitoring service: {exc}" 

276 # Stack level does not really matter. 

277 warnings.warn(message, stacklevel=3) 

278 if self._handlers: 

279 accept: bool | None = None 

280 # Check every filter, accept if none makes any decision. 

281 for filter in self._filters: 

282 accept = filter.accept(agent_name) 

283 if accept is False: 

284 return 

285 if accept is True: 

286 break 

287 if timestamp is None: 

288 timestamp = time.time() 

289 if tags is None: 

290 tags = self._context_tags or {} 

291 else: 

292 if self._context_tags: 

293 all_tags = dict(self._context_tags) 

294 all_tags.update(tags) 

295 tags = all_tags 

296 for handler in self._handlers: 

297 handler.handle(record_name, timestamp, tags, values, agent_name) 

298 

299 def _default_init(self) -> None: 

300 """Perform default initialization of the service.""" 

301 if env := os.environ.get(_CONFIG_ENV): 

302 # Configuration is specified as colon-separated list of key:value 

303 # pairs or simple values. Simple values are treated as filters 

304 # (see set_filters for syntax). key-values pairs pairs specify 

305 # handlers, for now the only supported handler is logging, it 

306 # is specified as "logging:<logger-name>[:<level>]". 

307 filters = [] 

308 handlers: list[MonHandler] = [] 

309 for item in env.split(","): 

310 pieces = item.split(":") 

311 if len(pieces) in (2, 3) and pieces[0] == "logging": 

312 logger_name = pieces[1] 

313 if len(pieces) == 3: 

314 level_name = pieces[2] 

315 level = logging.getLevelNamesMapping().get(level_name.upper()) 

316 if level is None: 

317 raise ValueError( 

318 f"Unknown logging level name {level_name!r} in {_CONFIG_ENV}={env!r}" 

319 ) 

320 else: 

321 level = logging.INFO 

322 handlers.append(LoggingMonHandler(logger_name, level)) 

323 elif len(pieces) == 1: 

324 filters.extend(pieces) 

325 else: 

326 raise ValueError(f"Unexpected format of item {item!r} in {_CONFIG_ENV}={env!r}") 

327 for handler in handlers: 

328 self.add_handler(handler) 

329 self.set_filters(filters) 

330 

331 @property 

332 def handlers(self) -> Iterable[MonHandler]: 

333 """Set of handlers defined currently.""" 

334 return self._handlers 

335 

336 def add_handler(self, handler: MonHandler) -> None: 

337 """Add one monitoring handler. 

338 

339 Parameters 

340 ---------- 

341 handler : `MonHandler` 

342 Handler instance. 

343 """ 

344 # Manually adding handler means default initialization should be 

345 # skipped. 

346 self._initialized = True 

347 if handler not in self._handlers: 

348 self._handlers.append(handler) 

349 

350 def remove_handler(self, handler: MonHandler) -> None: 

351 """Remove a monitoring handler. 

352 

353 Parameters 

354 ---------- 

355 handler : `MonHandler` 

356 Handler instance. 

357 """ 

358 if handler in self._handlers: 

359 self._handlers.remove(handler) 

360 

361 def _add_context_tags(self, tags: _TagsType) -> _TagsType | None: 

362 """Extend the tag context with new tags, overriding any tags that may 

363 already exist in a current context. 

364 """ 

365 old_tags = self._context_tags 

366 if not self._context_tags: 

367 self._context_tags = tags 

368 else: 

369 all_tags = dict(self._context_tags) 

370 all_tags.update(tags) 

371 self._context_tags = all_tags 

372 return old_tags 

373 

374 @contextlib.contextmanager 

375 def context_tags(self, tags: _TagsType) -> Iterator[None]: 

376 """Context manager that adds a set of tags to all records created 

377 inside the context. 

378 

379 Typically clients will be using `MonAgent.context_tags`, which forwards 

380 to this method. 

381 """ 

382 old_context = self._add_context_tags(tags) 

383 try: 

384 yield 

385 finally: 

386 # Restore old context. 

387 self._context_tags = old_context 

388 

389 

390class LoggingMonHandler(MonHandler): 

391 """Implementation of the monitoring handler which dumps records formatted 

392 as JSON objects to `logging`. 

393 

394 Parameters 

395 ---------- 

396 logger_name : `str` 

397 Name of the `logging` logger to use for output. 

398 log_level : `int`, optional 

399 Logging level to use for output, default is `INFO` 

400 

401 Notes 

402 ----- 

403 The attributes of the formatted JSON object correspond to the parameters 

404 of `handle` method, except for `agent_name` which is mapped to `source`. 

405 The `tags` and `values` become JSON sub-objects with corresponding keys. 

406 """ 

407 

408 def __init__(self, logger_name: str, log_level: int = logging.INFO): 

409 self._logger = logging.getLogger(logger_name) 

410 self._level = log_level 

411 

412 def handle( 

413 self, name: str, timestamp: float, tags: _TagsType, values: Mapping[str, Any], agent_name: str 

414 ) -> None: 

415 # Docstring is inherited from base class. 

416 record = { 

417 "name": name, 

418 "timestamp": timestamp, 

419 "tags": tags, 

420 "values": values, 

421 "source": agent_name, 

422 } 

423 msg = json.dumps(record) 

424 self._logger.log(self._level, msg)