-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathparse-logs
executable file
·82 lines (70 loc) · 2.62 KB
/
parse-logs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
#!/usr/bin/env python3
"""
Parse and display json logs as output by c2cwsgiutils applications. Just pipe the logs to this script. For example:
oc logs -f prod-2-5-c2cgeoportal-geoportal-5cf5859df8-gxfb2 | ./parse_logs
"""
import datetime
import json
import re
import sys
COLORS = {"DEBUG": 30, "INFO": 32, "WARNING": 31, "WARN": 31, "ERROR": "41;37", "CRITICAL": "41;37"}
TIMESTAMP_RE = re.compile(r"\d+/\d+/\d+ \d+:\d+:\d+ [AP]M *(\{.*)")
def colorize(color: int, txt: str) -> str:
return f"\033[{color}m{txt}\033[0m"
def color_level(level: str) -> str:
return colorize(COLORS[level], level[0])
def print_line(line: str) -> None:
line = line.split("\r")[-1]
if line.startswith("{"):
try:
match = TIMESTAMP_RE.match(line)
if match:
line = match.group(1)
parsed = json.loads(line)
if (
"timestamp" in parsed
and "level_name" in parsed
and "source_facility" in parsed
and "msg" in parsed
):
cur_time = datetime.datetime.fromtimestamp(parsed["timestamp"]).strftime("%H:%M:%S.%f")
file = f"{parsed['source_facility']}:{parsed.get('line', -1)}"
print(
f"{cur_time} {color_level(parsed['level_name'])} {colorize(35, file)} {parsed['msg']}"
)
if "full_message" in parsed:
print(parsed["full_message"])
elif (
"timegenerated" in parsed
and "level_name" in parsed
and "logger_name" in parsed
and "msg" in parsed
):
try:
cur_time = datetime.datetime.strptime(
parsed["timegenerated"].split(".")[0], "%Y-%m-%dT%H:%M:%S"
).strftime("%H:%M:%S.%f")
except ValueError:
cur_time = parsed["timegenerated"]
print(
f"{cur_time} {color_level(parsed['level_name'])} {colorize(35, parsed['logger_name'])} {parsed['msg']}"
)
if "full_message" in parsed:
print(parsed["full_message"])
else:
print(line)
print(parsed.keys())
except json.decoder.JSONDecodeError:
print(line.rstrip("\n"))
else:
print(line.rstrip("\n"))
def main() -> None:
try:
for line in sys.stdin:
print_line(line)
except KeyboardInterrupt:
pass
except BrokenPipeError:
sys.exit(1)
if __name__ == "__main__":
main()