forked from weka/export
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlokilogs.py
251 lines (207 loc) · 9.09 KB
/
lokilogs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
#! /usr/bin/env python3
#
# lokilogs - send weka cluster events to a Loki server
#
# Author: Vince Fleming, [email protected]
#
# example of usage grafana/loki api when you need push any log/message from your python scipt
import argparse
import datetime
import json
# import syslog
import time
import socket
import sys
from logging import getLogger, INFO
import dateutil
import dateutil.parser
import requests
import urllib3
from wekalib.wekatime import lokitime_to_wekatime, wekatime_to_datetime, lokitime_to_datetime, datetime_to_lokitime, datetime_to_wekatime
log = getLogger(__name__)
class LokiServer(object):
def __init__(self, lokihost, lokiport, map_registry):
self.host = lokihost
self.port = lokiport
self.registry = map_registry
# save some trouble, and make sure names are resolvable
try:
socket.gethostbyname(lokihost)
except socket.gaierror as exc:
log.critical(f"Loki Server name '{lokihost}' is not resolvable - is it in /etc/hosts or DNS?")
raise
except Exception as exc:
log.critical(exc)
raise
# push msg log into grafana-loki
def loki_logevent(self, timestamp, event, **labels):
url = 'http://' + self.host + ':' + str(self.port) + '/loki/api/v1/push' # set the URL
# set the headers
headers = {
'Content-type': 'application/json'
}
log.debug(f"{labels}")
# set the payload
payload = {
'streams': [
{
'stream': labels["labels"],
'values': [
[timestamp, event]
]
}
]
}
# encode payload to a string
payload_str = json.dumps(payload)
# log.debug( json.dumps(payload, indent=4, sort_keys=True) )
# this is where we actually send it
try:
answer = requests.post(url, data=payload_str, headers=headers)
except requests.exceptions.ConnectionError as exc:
log.critical(f"Unable to send Events to Loki: unable to establish connection: FATAL")
raise
except Exception as exc:
log.critical(f"Unable to send Events to Loki")
raise
log.debug(f"status code: {answer.status_code}")
# check the return code
if answer.status_code == 400:
# I've only seen code 400 for duplicate entries; but I could be wrong. ;)
log.error(f"Error posting event; possible duplicate entry: {answer.text}")
return False
elif answer.status_code != 204: # 204 is ok
log.error("loki_logevent(): bad http status code: " + str(answer.status_code) + " " + answer.text)
return False
return True
# end loki_logevent
# format the events and send them up to Loki
def send_events(self, event_dict, cluster):
num_successful = 0
node_host_map = self.registry.lookup('node-host')
if len(event_dict) == 0:
log.debug("No events to send")
return
# must be sorted by timestamp or Loki will reject them
#last_eventtime = "0"
for timestamp, event in sorted(event_dict.items()): # oldest first
labels = {
"source": "weka",
"cluster": cluster.name,
"category": event["category"],
"event_type": event["type"],
"severity": event["severity"]
}
# "node_id": event["nid"],
if 'params' in event:
params = event['params']
log.debug(f'{event["description"]}:::::{params}')
if 'hostname' in params:
labels['hostname'] = params['hostname']
if 'nodeId' in params:
labels['nodeid'] = str(params['nodeId'])
if 'hostname' not in labels:
if type(params['nodeId']) is not str:
formatted_nodeid = 'NodeId<' + f'{params["nodeId"]}>'
else:
formatted_nodeid = params['nodeId']
try:
hostname = node_host_map[formatted_nodeid]
except Exception as exc:
log.error(f"NodeId {formatted_nodeid} not in node-host map!")
hostname = 'error'
log.debug("setting hostname*************************************************************")
labels['hostname'] = hostname
# map weka event severities to Loki event severities
orig_sev = event['severity']
if event['severity'] == 'MAJOR' or event['severity'] == 'MINOR':
event['severity'] = 'ERROR'
elif event['severity'] == 'CRITICAL':
event['severity'] = 'FATAL'
description = f"cluster:{cluster.name} :{orig_sev}: {event['type']}: {event['description']}"
log.debug(f"sending event: timestamp={timestamp}, labels={labels}, desc={description}")
log.log(INFO, f"WekaEvent: {description}") # send to syslog
try:
if self.loki_logevent(timestamp, description, labels=labels):
# only update time if upload successful, so we don't drop events (they should retry upload next time)
cluster.last_event_timestamp = event['timestamp']
num_successful += 1
except:
continue # if it has an exception, abort
log.info(f"Total events={len(event_dict)}; successfully sent {num_successful} events")
if num_successful != 0:
cluster.last_event_timestamp = cluster.last_get_events_time
# end send_events
# Not used anymore... but might come in handy
# get the time of the last event that Loki has for this cluster so we know where we left off
def last_lokievent_time(lokihost, port, cluster):
http_pool = urllib3.PoolManager()
log.debug("getting last event from Loki")
# url = 'http://' + lokihost + ':' + str(port) + '/loki/api/v1/query' # set the URL
url = 'http://' + lokihost + ':' + str(port) + '/loki/api/v1/query_range' # set the URL
# set the headers
headers = {
'Content-type': 'application/json'
}
clusternamequery = "{cluster=\"" + f"{cluster.name}" + "\"}"
fields = {
# 'direction': "BACKWARDS",
'query': clusternamequery
}
try:
latest = http_pool.request('GET', url, fields=fields)
except Exception as exc:
log.debug(f"{exc} caught")
return "0"
if latest.status != 200:
return "0"
log.debug(f"{latest.status} {latest.data}")
latest_data = json.loads(latest.data)
newest = 0
# log.debug(f"latest_data={json.dumps(latest_data, indent=4)}")
results = latest_data["data"]["result"]
for result in results:
values = result["values"]
for value in values:
if int(value[0]) > newest:
newest = int(value[0])
log.debug(f"timeval={lokitime_to_wekatime(value[0])}")
first_result = str(newest)
log.debug(f"first_result={first_result}, {lokitime_to_wekatime(first_result)}")
return first_result
# end last_lokievent_time
if __name__ == '__main__':
# Globals
target_host = ""
target_port = 0
loki_host = ""
verbose = 0
parser = argparse.ArgumentParser(description="Loki Log Exporter for Weka clusters")
parser.add_argument("-c", "--configfile", dest='configfile', default="./weka-metrics-exporter.yml",
help="override ./weka-metrics-exporter.yml as config file")
parser.add_argument("-p", "--port", dest='port', default="3100", help="TCP port number to listen on")
parser.add_argument("-H", "--HOST", dest='wekahost', default="localhost",
help="Specify the Weka host(s) (hostname/ip) to collect stats from. May be a comma-separated list")
parser.add_argument("-L", "--LOKIHOST", dest='lokihost', default="localhost",
help="Specify the hostname of the Loki server")
parser.add_argument("-v", "--verbose", dest='verbose', action="count", help="Enable verbose output")
args = parser.parse_args()
target_host = args.wekahost # make sure we can give a list in case one or more are not reachable
target_port = args.port
loki_host = args.lokihost
verbose = args.verbose
# initially, make sure we seed the list with all past events
all_events = gather_weka_events(target_host)
sortable_events = reformat_events(all_events)
send_events(loki_host, sortable_events)
while True:
if verbose > 0:
print("sleeping")
time.sleep(60) # check for events once per minute
if verbose > 0:
print("gathering events")
all_events = gather_weka_events(target_host, "10m") # maybe make this less? 1m?
sortable_events = reformat_events(all_events)
if verbose > 0:
print("sending events")
send_events(loki_host, sortable_events)