Coverage for python / lsst / dax / apdb / scripts / metrics.py: 13%
121 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-01 08:19 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-01 08:19 +0000
1# This file is part of dax_apdb
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
22from __future__ import annotations
24__all__ = ["metrics_log_to_influx"]
26import io
27import json
28import re
29import sys
30from collections.abc import Iterable
31from datetime import datetime
32from typing import Any, TextIO
34import yaml
36_LOG_LINE_RE_PIPELINE = re.compile(
37 r"""
38 ^
39 INFO .* lsst[.]dax[.]apdb[.]monitor
40 \ \(\w+:(?P<MDC>\{[^}]*\})\)
41 \([^)]*\)[ ]-[ ](?P<metric>.*)
42 $
43 """,
44 re.VERBOSE,
45)
47_LOG_LINE_RE_REPLICATION = re.compile(
48 r"""
49 ^
50 .*[ ]INFO[ ]lsst[.]dax[.]ppdb[.]monitor
51 [ ]-[ ](?P<metric>.*)
52 $
53 """,
54 re.VERBOSE,
55)
57_LOG_LINE_RE_AP_PROTO = re.compile(
58 r"""
59 ^
60 .*[ ]\[INFO\][ ]\[.*?\][ ]apdb_metrics:
61 [ ](?P<metric>.*)
62 $
63 """,
64 re.VERBOSE,
65)
67# Whole line is JSON object.
68_LOG_LINE_RE_JSON_LINE = re.compile("^(?P<metric>.*)$")
70# Error or warning message from cassandra logger.
71_LOG_LINE_CASSANDRA_RE = re.compile(
72 r"""
73 ^
74 (?P<level>ERROR|WARNING)[ ](?P<datetime>\d{4}-\d{2}-\d{2}[ T]\S+)[ ]cassandra[.]cluster
75 \ \((\w+:(?P<MDC>\{[^}]*\}))?\)
76 \([^)]*\)[ ]-[ ](?P<message>.*)
77 $
78 """,
79 re.VERBOSE,
80)
82_AP_PIPE_DIAOBJECTS_RE = re.compile(r"Calculating summary stats for (?P<count>\d+) DiaObjects")
83_AP_PIPE_DIASOURCES_RE = re.compile(
84 r"(?P<count1>\d+) updated and \d+ unassociated diaObjects. Creating (?P<count2>\d+) new diaObjects"
85)
86_AP_PIPE_DIAFORCED_RE = re.compile(r"Updating (?P<count>\d+) diaForcedSources in the APDB")
88_CASSNDRA_MESSAGES_RE = (
89 (re.compile(r"^Error preparing query for host (?P<host>\S+):$"), "error_prepare_query"),
90 (re.compile(r"^Control connection failed to connect"), "error_control_connect"),
91 (
92 re.compile(r"^Unexpected failure handling node (?P<host>\S+) being marked up:$"),
93 "error_failure_marking_up",
94 ),
95 (re.compile(r"^Failed to submit task to executor$"), "error_submit_task"),
96 (re.compile(r"^Failed to create connection pool for new host (?P<host>\S+):$"), "warn_create_pool"),
97 (re.compile(r"^Error attempting to reconnect to (?P<host>\S+),"), "warn_reconnect"),
98 (re.compile(r"^Host (?P<host>\S+) has been marked down"), "warn_host_down"),
99)
102# Some metrics are not usefult for replication
103_SKIP_METRICS_REPLICATION = {
104 "read_metadata_config",
105 "version_check",
106}
108_SKIP_METRICS_AP_PROTO = {
109 "read_metadata_config",
110 "version_check",
111 "insert_build_time",
112}
115def metrics_log_to_influx(
116 file: Iterable[str],
117 context_keys: str,
118 extra_tags: str,
119 fix_row_count: bool,
120 mode: str,
121 prefix: str,
122 no_header: bool,
123 header_database: str,
124) -> None:
125 """Extract metrics from log file and dump as InfluxDB data.
127 Parameters
128 ----------
129 file : `~collections.abc.Iterable` [`str`]
130 Names of the files to parse for metrics.
131 context_keys : `str`
132 Names of keys to extract from message context, comma-separated.
133 extra_tags : `str`
134 Additional tags to add to each record, comma-separated key=value pairs.
135 fix_row_count : `bool`
136 If True then extract records counts from pipeline messages instead of
137 metrics. A workaround for broken metrics.
138 mode : `str`
139 Source of the log, one of "ap_proto", "pipeline", "replication",
140 "json_line".
141 prefix : `str`
142 Prefix to add to each tag name.
143 no_header : `bool`
144 If False then do not print DML header.
145 header_database : `str`
146 Name of the database for DML header.
147 """
148 context_names = [name for name in context_keys.split(",") if name]
149 tags: dict[str, Any] = {}
150 drop_tags: set[str] = set()
151 for tag_val in extra_tags.split(","):
152 if tag_val:
153 tag, _, val = tag_val.partition("=")
154 if tag.startswith("-"):
155 drop_tags.add(tag.strip("-"))
156 else:
157 tags[tag] = val
159 if not no_header:
160 print(
161 f"""\
162# DML
164# CONTEXT-DATABASE: {header_database}
165"""
166 )
168 if not file:
169 file = ["-"]
170 for file_name in file:
171 if file_name == "-":
172 _metrics_log_to_influx(sys.stdin, context_names, tags, drop_tags, fix_row_count, mode, prefix)
173 else:
174 with open(file_name) as file_obj:
175 _metrics_log_to_influx(file_obj, context_names, tags, drop_tags, fix_row_count, mode, prefix)
178def _metrics_log_to_influx(
179 file: TextIO,
180 context_keys: Iterable[str],
181 extra_tags: dict[str, Any],
182 drop_tags: set[str],
183 fix_row_count: bool,
184 mode: str,
185 prefix: str,
186) -> None:
187 """Parse metrics from a single file."""
188 objects_count = -1
189 sources_count = -1
190 forced_sources_count = -1
192 match mode:
193 case "pipeline":
194 line_re = _LOG_LINE_RE_PIPELINE
195 case "replication":
196 line_re = _LOG_LINE_RE_REPLICATION
197 case "ap_proto":
198 line_re = _LOG_LINE_RE_AP_PROTO
199 case "json_line":
200 line_re = _LOG_LINE_RE_JSON_LINE
201 case _:
202 raise ValueError(f"Unexpected mode: {mode}")
204 for line in file:
205 line = line.strip()
206 if fix_row_count and mode == "pipeline":
207 # Counts come from separate AP messages.
208 if match := _AP_PIPE_DIAOBJECTS_RE.search(line):
209 objects_count = int(match.group("count"))
210 elif match := _AP_PIPE_DIASOURCES_RE.search(line):
211 sources_count = int(match.group("count1")) + int(match.group("count2"))
212 elif match := _AP_PIPE_DIAFORCED_RE.search(line):
213 forced_sources_count = int(match.group("count"))
215 if match := line_re.match(line):
216 metric_str = match.group("metric")
217 try:
218 metric: dict[str, Any] = json.loads(metric_str)
219 except json.JSONDecodeError:
220 # Ignore parsing erors, sometimes it happens that lines are
221 # scrambled.
222 continue
223 tags = dict(extra_tags)
225 name: str = metric["name"]
226 if mode == "replication":
227 if name in _SKIP_METRICS_REPLICATION:
228 continue
229 elif mode == "ap_proto":
230 if name in _SKIP_METRICS_AP_PROTO:
231 continue
233 timestamp: float = metric["timestamp"]
234 for tag, tag_val in metric["tags"].items():
235 tags[tag] = tag_val
236 values: dict[str, Any] = metric["values"]
238 if fix_row_count and name == "insert_time":
239 if tags["table"].startswith("DiaObject"):
240 values["row_count"] = objects_count
241 elif tags["table"].startswith("DiaSource"):
242 values["row_count"] = sources_count
243 elif tags["table"].startswith("DiaForcedSource"):
244 values["row_count"] = forced_sources_count
246 if mode == "pipeline" and context_keys:
247 tags.update(_extract_mdc(match, context_keys))
249 for tag in drop_tags:
250 tags.pop(tag, None)
252 _print_metrics(prefix + name, tags, values, timestamp)
254 elif match := _LOG_LINE_CASSANDRA_RE.match(line):
255 tags = dict(extra_tags)
256 tags["level"] = match.group("level").lower()
257 dt = datetime.fromisoformat(match.group("datetime"))
258 timestamp = dt.timestamp()
259 tags.update(_extract_mdc(match, context_keys))
260 values = {"count": 1}
262 message = match.group("message")
263 for message_re, name in _CASSNDRA_MESSAGES_RE:
264 if (message_match := message_re.search(message)) is not None:
265 tags.update(message_match.groupdict())
266 _print_metrics(prefix + name, tags, values, timestamp)
267 break
270def _print_metrics(name: str, tags: dict[str, Any], values: dict[str, Any], timestamp: float) -> None:
271 tags_str = ",".join([name] + [f"{key}={val}" for key, val in tags.items()])
272 values_str = ",".join(f"{key}={val}" for key, val in values.items())
273 print(f"{tags_str} {values_str} {int(timestamp * 1e9)}")
276def _extract_mdc(match: re.Match, context_keys: Iterable[str]) -> dict[str, Any]:
277 tags: dict[str, Any] = {}
278 mdc_str = match.group("MDC")
279 if mdc_str:
280 mdc_str = mdc_str.replace("'", '"')
281 mdc: dict[str, Any] = yaml.safe_load(io.StringIO(mdc_str))
282 for tag in context_keys:
283 if (tag_val := mdc.get(tag)) is not None:
284 tags[tag] = tag_val
285 return tags