This repository has been archived by the owner on Apr 3, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathoccam.py
executable file
·379 lines (330 loc) · 12.9 KB
/
occam.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
#!/usr/bin/env python3
# http://knowyourmeme.com/memes/deal-with-it.
import configparser
import hashlib
import json
import multiprocessing
import random
import re
import requests
import signal
import sys
import time
import traceback
from datetime import datetime
from http.server import BaseHTTPRequestHandler, HTTPServer
from threading import Thread
import redis
import checks
from log import log
import outputs
###########
# CONFIGS #
###########
config = configparser.ConfigParser()
config.read('config')
redis_retry = int(config['redis']['retry'])
redis_host = config['redis']['host']
redis_port = int(config['redis']['port'])
redis_conn = redis.StrictRedis(host=redis_host, port=redis_port, db=0)
start_time = datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')
msgQueue = multiprocessing.Queue(multiprocessing.cpu_count() * 6)
statsQueue = multiprocessing.Queue()
#######################
# WORKERS / PROCESSES #
#######################
class Matcher(multiprocessing.Process):
"""Worker process that pops batches from msgQueue queue and iterates through checks.py"""
def __init__(self, worker_id, queue):
multiprocessing.Process.__init__(self)
self.daemon = True
self.worker_id = worker_id
self.queue = queue
self.running = True
def run(self):
log.info("Matcher Worker-%d Started" % self.worker_id)
signal.signal(signal.SIGINT, signal.SIG_IGN)
bl_rules = {}
while self.running:
# Look for blacklist rules update.
if not self.queue.empty():
bl_rules = self.queue.get(False)
log.info("Worker-%s - Blacklist Rules Updated: %s" %
(self.worker_id, json.dumps(bl_rules)))
# Handle message batches.
# Abundance of try/except (for now) since arbitrary code is injected as config.
try:
batch = msgQueue.get(True, 3)
for m in batch:
try:
msg = json.loads(m.decode('utf-8'))
for k in bl_rules:
if k in msg:
if msg[k] in bl_rules[k]: break
else:
try:
checks.run(msg)
except:
log.error("Exception occurred processing message:\n%s" %
(traceback.format_exc()))
except:
continue
except:
continue
def stop(self):
self.running = False
log.info("Matcher Worker-%s Stopping" % self.worker_id)
class Tasks(multiprocessing.Process):
"""Dedicated process that hosts general task threads"""
def __init__(self):
multiprocessing.Process.__init__(self)
self.daemon = True
self.running = True
self.tasks = []
def run(self):
# Start 'Blacklister()' sync thread.
blacklister = Blacklister(blacklist_queues)
blacklister.start()
# Start REST 'Api()' and 'Statser()' threads.
statser = Statser()
statser.start()
api = Api()
api.start()
# Start 3 alerting threads.
for i in range(3):
alerts = Alerter()
alerts.start()
while self.running:
try:
time.sleep(0.25)
except KeyboardInterrupt:
continue
def stop(self):
self.running = False
###################
# TASKS / THREADS #
###################
class RedisReader(Thread):
"""Polls redis queue for messages, writes batches to msgQueue for worker consumption"""
def __init__(self):
Thread.__init__(self)
self.daemon = True
self.running = True
def run(self):
log.info("Redis Reader Task Started")
while self.running:
# Pipeline batches to reduce net latency.
pipe = redis_conn.pipeline()
pipe.lrange('messages', 0, 99)
pipe.ltrim('messages', 100, -1)
try:
batch = pipe.execute()[0]
if batch:
msgQueue.put(batch)
statsQueue.put(len(batch))
else:
# Sleep if Redis list is empty to avoid burning cycles.
time.sleep(3)
except Exception:
log.warn("Failed to poll Redis")
self.try_redis_connection(redis_conn)
def stop(self):
self.running = False
log.info("Redis Reader Task Stopping")
def try_redis_connection(self):
"""Retry Redis PING command if Redis is unreachable"""
while True:
try:
redis_conn.ping()
log.info("Connected to Redis at %s:%d" % (redis_host, redis_port))
break
except Exception:
log.warn("Redis unreachable, retrying in %ds" % redis_retry)
time.sleep(redis_retry)
class Blacklister(Thread):
"""Syncs blacklist rules with Redis and propagates to workers"""
def __init__(self, queues):
Thread.__init__(self)
self.daemon = True
self.queues = queues
def run(self):
blacklist = {}
while True:
blacklist_update = self.fetch_blacklist()
if blacklist != blacklist_update:
blacklist = blacklist_update
for i in self.queues: i.put(blacklist)
time.sleep(5)
@classmethod
def fetch_blacklist(self):
blacklist_update = {}
# What rule keys exist in Redis?
try:
blacklist_keys = redis_conn.smembers('blacklist')
except Exception:
log.warn("Redis unreachable, retrying in %ds" % redis_retry)
time.sleep(redis_retry)
# Get each rule KV and build map.
for i in blacklist_keys:
k = i.decode('utf-8')
get = redis_conn.get(k)
if get == None:
# If Rule key value is None, it was likely expired - remove it from blacklist set.
redis_conn.srem('blacklist', k)
else:
kv = get.decode('utf-8').split(':')
# Create a key in the blacklist map for the rule KV pair found in Redis,
# or, append to existing.
if not kv[0] in blacklist_update:
blacklist_update[kv[0]] = []
blacklist_update[kv[0]].append(kv[1])
else:
blacklist_update[kv[0]].append(kv[1])
return blacklist_update
class Alerter(Thread):
"""Receives event triggers from outputs.alertQueue, handles writing alerts"""
def __init__(self):
Thread.__init__(self)
self.daemon = True
def run(self):
while True:
if not outputs.alertsQueue.empty():
alertMeta = outputs.alertsQueue.get()
if alertMeta[0] == "outConsole":
outputs.outConsoleHandler(alertMeta)
elif alertMeta[0] == "outHc":
outputs.outHcHandler(alertMeta, config)
elif alertMeta[0] == "outPd":
outputs.outPdHandler(alertMeta, config)
else:
time.sleep(1)
class Statser(Thread):
"""Outputs periodic Occam stats info"""
def __init__(self):
Thread.__init__(self)
self.daemon = True
def run(self):
count_current = count_previous = 0
while True:
# Handle rate stats.
stop = time.time()+5
while time.time() < stop:
if not statsQueue.empty():
count_current += statsQueue.get()
else:
time.sleep(0.25)
if count_current > count_previous:
# We divide by the actual duration because thread scheduling /
# wall time vs. execution time can't be trusted.
duration = time.time() - stop + 5
rate = count_current / duration
log.info("Last %.1fs: polled %.2f messages/sec." % (duration, rate))
count_previous = count_current = 0
# Handle alerts stats.
alerts_queue_length = outputs.alertsQueue.qsize()
if alerts_queue_length > 0:
log.info("Outbound alerts queue length: %d" % alerts_queue_length)
# REST API: this for real needs to be significantly better.
class Api(Thread):
"""Dumps Occam service and blacklist info, accepts blacklist requests"""
def __init__(self):
Thread.__init__(self)
self.daemon = True
def run(self):
server = HTTPServer((config['api']['listen'], int(config['api']['port'])), ApiCalls)
log.info("API - Listening at %s:%s" % (config['api']['listen'], config['api']['port']))
server.serve_forever()
class ApiCalls(BaseHTTPRequestHandler):
def do_GET(self):
if self.path == '/':
# Response message.
status = {
"Occam Start Time": start_time,
"Outbound Alerts Queue Length": outputs.alertsQueue.qsize()
}
# Build outage meta.
blacklist = Blacklister.fetch_blacklist()
if not bool(blacklist): blacklist = "None"
status['Current Outages Scheduled'] = blacklist
self.wfile.write(bytes("\n" + json.dumps(status, indent=2, sort_keys=True) + "\n", "utf-8"))
else:
self.wfile.write(bytes("Request Invalid\n", "utf-8"))
def do_POST(self):
if self.path == '/':
content_length = int(self.headers['Content-Length'])
post_data = self.rfile.read(content_length).decode('utf-8')
# Handle request.
try:
outage_meta = json.loads(post_data)['outage'].split(':')
log.info("API - Outage Request: where '%s' == '%s' for %s hour(s)" %
(outage_meta[0], outage_meta[1], outage_meta[2]))
# Generate outage key data.
outage_id = hashlib.sha1(str(outage_meta[:2]).encode()).hexdigest()
outage_expires = int(outage_meta[2]) * 3600
outage_kv = str(outage_meta[0] + ':' + outage_meta[1])
# Update outage in Redis.
redis_conn.setex(outage_id, outage_expires, outage_kv)
redis_conn.sadd('blacklist', outage_id)
# Send response.
self.wfile.write(bytes("Request Received - POST: " + post_data + "\n", "utf-8"))
except:
self.wfile.write(bytes("Request Error: " + post_data + "\n", "utf-8"))
def do_DELETE(self):
if self.path == '/':
content_length = int(self.headers['Content-Length'])
post_data = self.rfile.read(content_length).decode('utf-8')
# Handle request.
try:
outage_meta = json.loads(post_data)['outage'].split(':')
log.info("API - Delete Outage Request: where '%s' == '%s'" %
(outage_meta[0], outage_meta[1]))
# Generate outage key data.
outage_id = hashlib.sha1(str(outage_meta[:2]).encode()).hexdigest()
# Update outage in Redis.
redis_conn.delete(outage_id)
# Send response.
self.wfile.write(bytes("Request Received - DELETE: " + post_data + "\n", "utf-8"))
except:
self.wfile.write(bytes("Request Error: " + post_data + "\n", "utf-8"))
###########
# SERVICE #
###########
if __name__ == "__main__":
# Start 1 Matcher worker if single hw thread, else greater of '2' and (hw threads - 2).
n = 1 if multiprocessing.cpu_count() == 1 else max(multiprocessing.cpu_count()-1, 2)
# Initialize Matcher workers and queues.
# Append queues to list that's fed to the blacklister thread for rule propagation.
blacklist_queues = []
workers = []
for i in range(n):
queue_i = multiprocessing.Queue()
blacklist_queues.append(queue_i)
worker = Matcher(i, queue_i)
workers.append(worker)
worker.start()
time.sleep(0.5)
# Start general task threads handler process.
tasks = Tasks()
tasks.start()
# Init Redis thread.
redis_reader = RedisReader()
# Sit-n-spin.
try:
# Avoiding adding communication to worker processes
# to ensure initial blacklist sync occurred, instead, we're being lazy.
log.info("Waiting for Blacklist Rules sync")
time.sleep(1)
# Then start main Redis reader task.
redis_reader.start()
redis_reader.join()
except KeyboardInterrupt:
redis_reader.stop()
tasks.stop()
time.sleep(1)
while True:
if not msgQueue.empty():
log.info("Waiting for In-Flight Messages")
time.sleep(3)
else:
for i in workers: i.stop()
sys.exit(0)