Coverage for python / lsst / dax / apdb / scripts / metrics.py: 13%

121 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-26 08:58 +0000

1# This file is part of dax_apdb 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This program is free software: you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation, either version 3 of the License, or 

12# (at your option) any later version. 

13# 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18# 

19# You should have received a copy of the GNU General Public License 

20# along with this program. If not, see <https://www.gnu.org/licenses/>. 

21 

22from __future__ import annotations 

23 

24__all__ = ["metrics_log_to_influx"] 

25 

26import io 

27import json 

28import re 

29import sys 

30from collections.abc import Iterable 

31from datetime import datetime 

32from typing import Any, TextIO 

33 

34import yaml 

35 

36_LOG_LINE_RE_PIPELINE = re.compile( 

37 r""" 

38 ^ 

39 INFO .* lsst[.]dax[.]apdb[.]monitor 

40 \ \(\w+:(?P<MDC>\{[^}]*\})\) 

41 \([^)]*\)[ ]-[ ](?P<metric>.*) 

42 $ 

43 """, 

44 re.VERBOSE, 

45) 

46 

47_LOG_LINE_RE_REPLICATION = re.compile( 

48 r""" 

49 ^ 

50 .*[ ]INFO[ ]lsst[.]dax[.]ppdb[.]monitor 

51 [ ]-[ ](?P<metric>.*) 

52 $ 

53 """, 

54 re.VERBOSE, 

55) 

56 

57_LOG_LINE_RE_AP_PROTO = re.compile( 

58 r""" 

59 ^ 

60 .*[ ]\[INFO\][ ]\[.*?\][ ]apdb_metrics: 

61 [ ](?P<metric>.*) 

62 $ 

63 """, 

64 re.VERBOSE, 

65) 

66 

67# Whole line is JSON object. 

68_LOG_LINE_RE_JSON_LINE = re.compile("^(?P<metric>.*)$") 

69 

70# Error or warning message from cassandra logger. 

71_LOG_LINE_CASSANDRA_RE = re.compile( 

72 r""" 

73 ^ 

74 (?P<level>ERROR|WARNING)[ ](?P<datetime>\d{4}-\d{2}-\d{2}[ T]\S+)[ ]cassandra[.]cluster 

75 \ \((\w+:(?P<MDC>\{[^}]*\}))?\) 

76 \([^)]*\)[ ]-[ ](?P<message>.*) 

77 $ 

78 """, 

79 re.VERBOSE, 

80) 

81 

82_AP_PIPE_DIAOBJECTS_RE = re.compile(r"Calculating summary stats for (?P<count>\d+) DiaObjects") 

83_AP_PIPE_DIASOURCES_RE = re.compile( 

84 r"(?P<count1>\d+) updated and \d+ unassociated diaObjects. Creating (?P<count2>\d+) new diaObjects" 

85) 

86_AP_PIPE_DIAFORCED_RE = re.compile(r"Updating (?P<count>\d+) diaForcedSources in the APDB") 

87 

88_CASSNDRA_MESSAGES_RE = ( 

89 (re.compile(r"^Error preparing query for host (?P<host>\S+):$"), "error_prepare_query"), 

90 (re.compile(r"^Control connection failed to connect"), "error_control_connect"), 

91 ( 

92 re.compile(r"^Unexpected failure handling node (?P<host>\S+) being marked up:$"), 

93 "error_failure_marking_up", 

94 ), 

95 (re.compile(r"^Failed to submit task to executor$"), "error_submit_task"), 

96 (re.compile(r"^Failed to create connection pool for new host (?P<host>\S+):$"), "warn_create_pool"), 

97 (re.compile(r"^Error attempting to reconnect to (?P<host>\S+),"), "warn_reconnect"), 

98 (re.compile(r"^Host (?P<host>\S+) has been marked down"), "warn_host_down"), 

99) 

100 

101 

102# Some metrics are not usefult for replication 

103_SKIP_METRICS_REPLICATION = { 

104 "read_metadata_config", 

105 "version_check", 

106} 

107 

108_SKIP_METRICS_AP_PROTO = { 

109 "read_metadata_config", 

110 "version_check", 

111 "insert_build_time", 

112} 

113 

114 

115def metrics_log_to_influx( 

116 file: Iterable[str], 

117 context_keys: str, 

118 extra_tags: str, 

119 fix_row_count: bool, 

120 mode: str, 

121 prefix: str, 

122 no_header: bool, 

123 header_database: str, 

124) -> None: 

125 """Extract metrics from log file and dump as InfluxDB data. 

126 

127 Parameters 

128 ---------- 

129 file : `~collections.abc.Iterable` [`str`] 

130 Names of the files to parse for metrics. 

131 context_keys : `str` 

132 Names of keys to extract from message context, comma-separated. 

133 extra_tags : `str` 

134 Additional tags to add to each record, comma-separated key=value pairs. 

135 fix_row_count : `bool` 

136 If True then extract records counts from pipeline messages instead of 

137 metrics. A workaround for broken metrics. 

138 mode : `str` 

139 Source of the log, one of "ap_proto", "pipeline", "replication", 

140 "json_line". 

141 prefix : `str` 

142 Prefix to add to each tag name. 

143 no_header : `bool` 

144 If False then do not print DML header. 

145 header_database : `str` 

146 Name of the database for DML header. 

147 """ 

148 context_names = [name for name in context_keys.split(",") if name] 

149 tags: dict[str, Any] = {} 

150 drop_tags: set[str] = set() 

151 for tag_val in extra_tags.split(","): 

152 if tag_val: 

153 tag, _, val = tag_val.partition("=") 

154 if tag.startswith("-"): 

155 drop_tags.add(tag.strip("-")) 

156 else: 

157 tags[tag] = val 

158 

159 if not no_header: 

160 print( 

161 f"""\ 

162# DML 

163 

164# CONTEXT-DATABASE: {header_database} 

165""" 

166 ) 

167 

168 if not file: 

169 file = ["-"] 

170 for file_name in file: 

171 if file_name == "-": 

172 _metrics_log_to_influx(sys.stdin, context_names, tags, drop_tags, fix_row_count, mode, prefix) 

173 else: 

174 with open(file_name) as file_obj: 

175 _metrics_log_to_influx(file_obj, context_names, tags, drop_tags, fix_row_count, mode, prefix) 

176 

177 

178def _metrics_log_to_influx( 

179 file: TextIO, 

180 context_keys: Iterable[str], 

181 extra_tags: dict[str, Any], 

182 drop_tags: set[str], 

183 fix_row_count: bool, 

184 mode: str, 

185 prefix: str, 

186) -> None: 

187 """Parse metrics from a single file.""" 

188 objects_count = -1 

189 sources_count = -1 

190 forced_sources_count = -1 

191 

192 match mode: 

193 case "pipeline": 

194 line_re = _LOG_LINE_RE_PIPELINE 

195 case "replication": 

196 line_re = _LOG_LINE_RE_REPLICATION 

197 case "ap_proto": 

198 line_re = _LOG_LINE_RE_AP_PROTO 

199 case "json_line": 

200 line_re = _LOG_LINE_RE_JSON_LINE 

201 case _: 

202 raise ValueError(f"Unexpected mode: {mode}") 

203 

204 for line in file: 

205 line = line.strip() 

206 if fix_row_count and mode == "pipeline": 

207 # Counts come from separate AP messages. 

208 if match := _AP_PIPE_DIAOBJECTS_RE.search(line): 

209 objects_count = int(match.group("count")) 

210 elif match := _AP_PIPE_DIASOURCES_RE.search(line): 

211 sources_count = int(match.group("count1")) + int(match.group("count2")) 

212 elif match := _AP_PIPE_DIAFORCED_RE.search(line): 

213 forced_sources_count = int(match.group("count")) 

214 

215 if match := line_re.match(line): 

216 metric_str = match.group("metric") 

217 try: 

218 metric: dict[str, Any] = json.loads(metric_str) 

219 except json.JSONDecodeError: 

220 # Ignore parsing erors, sometimes it happens that lines are 

221 # scrambled. 

222 continue 

223 tags = dict(extra_tags) 

224 

225 name: str = metric["name"] 

226 if mode == "replication": 

227 if name in _SKIP_METRICS_REPLICATION: 

228 continue 

229 elif mode == "ap_proto": 

230 if name in _SKIP_METRICS_AP_PROTO: 

231 continue 

232 

233 timestamp: float = metric["timestamp"] 

234 for tag, tag_val in metric["tags"].items(): 

235 tags[tag] = tag_val 

236 values: dict[str, Any] = metric["values"] 

237 

238 if fix_row_count and name == "insert_time": 

239 if tags["table"].startswith("DiaObject"): 

240 values["row_count"] = objects_count 

241 elif tags["table"].startswith("DiaSource"): 

242 values["row_count"] = sources_count 

243 elif tags["table"].startswith("DiaForcedSource"): 

244 values["row_count"] = forced_sources_count 

245 

246 if mode == "pipeline" and context_keys: 

247 tags.update(_extract_mdc(match, context_keys)) 

248 

249 for tag in drop_tags: 

250 tags.pop(tag, None) 

251 

252 _print_metrics(prefix + name, tags, values, timestamp) 

253 

254 elif match := _LOG_LINE_CASSANDRA_RE.match(line): 

255 tags = dict(extra_tags) 

256 tags["level"] = match.group("level").lower() 

257 dt = datetime.fromisoformat(match.group("datetime")) 

258 timestamp = dt.timestamp() 

259 tags.update(_extract_mdc(match, context_keys)) 

260 values = {"count": 1} 

261 

262 message = match.group("message") 

263 for message_re, name in _CASSNDRA_MESSAGES_RE: 

264 if (message_match := message_re.search(message)) is not None: 

265 tags.update(message_match.groupdict()) 

266 _print_metrics(prefix + name, tags, values, timestamp) 

267 break 

268 

269 

270def _print_metrics(name: str, tags: dict[str, Any], values: dict[str, Any], timestamp: float) -> None: 

271 tags_str = ",".join([name] + [f"{key}={val}" for key, val in tags.items()]) 

272 values_str = ",".join(f"{key}={val}" for key, val in values.items()) 

273 print(f"{tags_str} {values_str} {int(timestamp * 1e9)}") 

274 

275 

276def _extract_mdc(match: re.Match, context_keys: Iterable[str]) -> dict[str, Any]: 

277 tags: dict[str, Any] = {} 

278 mdc_str = match.group("MDC") 

279 if mdc_str: 

280 mdc_str = mdc_str.replace("'", '"') 

281 mdc: dict[str, Any] = yaml.safe_load(io.StringIO(mdc_str)) 

282 for tag in context_keys: 

283 if (tag_val := mdc.get(tag)) is not None: 

284 tags[tag] = tag_val 

285 return tags