Skip to content

Commit

Permalink
Zabbix MySQL replication bridge local DB cache and refresh. (#175)
Browse files Browse the repository at this point in the history
* Initial pass at Zabbix MySQL replication bridge for item metrics.

[#MON-3138]

* Zabbix MySQL replication: additional notes and begin item key lookup.

[#MON-3138]

* Zabbix replication: lookup itemid:key hostname map.

[#MON-3138]

* Cleanup using priv drop and get more tunables into config.

    [#MON-3138]

* Support only resume (live) replication events (for now).

[#MON-3144]

* [#MON-3137] TODO list for zabbix_bridge.

* Explicitly load collectors list, rather than yielding.

[#MON-3137]

* Use a local SQLite DB cache (instead of in-memory); reload periodically (900s).

[#MON-3140]

* Fixup Zabbix bridge SQLite cache.

[#MON-3140]

* Zabbix bridge cache executable.

* Add internal metrics for log position, drift, and key miss.

[#MON-3138]

* Support in memory sqlite db refreshed periodically from filesystem DB.

[#MON-3140]

* Remove TODO list.

* Index on id column for perf.

[#MON-3140]

* Rm sqlite3 error handling as it's part of the standard py libs.
  • Loading branch information
christianchristensen authored and johann8384 committed Sep 14, 2016
1 parent 70fa5e2 commit ffb338f
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 35 deletions.
63 changes: 32 additions & 31 deletions collectors/0/zabbix_bridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@
# Dump all replication item/metric insert events from a zabbix mysql server
#

import re
import sqlite3
import sys
import time

try:
from pymysqlreplication import BinLogStreamReader
Expand All @@ -26,11 +27,6 @@
except ImportError:
BinLogStreamReader = None # This is handled gracefully in main()

try:
import pymysql
except ImportError:
pymysql = None # This is handled gracefully in main()

from collectors.etc import zabbix_bridge_conf
from collectors.lib import utils

Expand All @@ -40,9 +36,6 @@ def main():
if BinLogStreamReader is None:
utils.err("error: Python module `pymysqlreplication' is missing")
return 1
if pymysql is None:
utils.err("error: Python module `pymysql' is missing")
return 1
settings = zabbix_bridge_conf.get_settings()

# Set blocking to True if you want to block and wait for the next event at
Expand All @@ -53,7 +46,19 @@ def main():
resume_stream=True,
blocking=True)

hostmap = gethostmap(settings) # Prime initial hostmap
db_filename = settings['sqlitedb']
dbcache = sqlite3.connect(':memory:')
cachecur = dbcache.cursor()
cachecur.execute("ATTACH DATABASE '%s' as 'dbfile'" % (db_filename,))
cachecur.execute('CREATE TABLE zabbix_cache AS SELECT * FROM dbfile.zabbix_cache')
cachecur.execute('CREATE UNIQUE INDEX uniq_zid on zabbix_cache (id)')

# tcollector.zabbix_bridge namespace for internal Zabbix bridge metrics.
log_pos = 0
key_lookup_miss = 0
sample_last_ts = int(time.time())
last_key_lookup_miss = 0

for binlogevent in stream:
if binlogevent.schema == settings['mysql']['db']:
table = binlogevent.table
Expand All @@ -62,34 +67,30 @@ def main():
for row in binlogevent.rows:
r = row['values']
itemid = r['itemid']
try:
hm = hostmap[itemid]
print "zbx.%s %d %s host=%s proxy=%s" % (hm['key'], r['clock'], r['value'], hm['host'], hm['proxy'])
except KeyError:
cachecur.execute('SELECT id, key, host, proxy FROM zabbix_cache WHERE id=?', (itemid,))
row = cachecur.fetchone()
if (row is not None):
print "zbx.%s %d %s host=%s proxy=%s" % (row[1], r['clock'], r['value'], row[2], row[3])
if ((int(time.time()) - sample_last_ts) > settings['internal_metric_interval']): # Sample internal metrics @ 10s intervals
sample_last_ts = int(time.time())
print "tcollector.zabbix_bridge.log_pos %d %s" % (sample_last_ts, log_pos)
print "tcollector.zabbix_bridge.key_lookup_miss %d %s" % (sample_last_ts, key_lookup_miss)
print "tcollector.zabbix_bridge.timestamp_drift %d %s" % (sample_last_ts, (sample_last_ts - r['clock']))
if ((key_lookup_miss - last_key_lookup_miss) > settings['dbrefresh']):
print "tcollector.zabbix_bridge.key_lookup_miss_reload %d %s" % (sample_last_ts, (key_lookup_miss - last_key_lookup_miss))
cachecur.execute('DROP TABLE zabbix_cache')
cachecur.execute('CREATE TABLE zabbix_cache AS SELECT * FROM dbfile.zabbix_cache')
last_key_lookup_miss = key_lookup_miss
else:
# TODO: Consider https://wiki.python.org/moin/PythonDecoratorLibrary#Retry
hostmap = gethostmap(settings)
utils.err("error: Key lookup miss for %s" % (itemid))
key_lookup_miss += 1
sys.stdout.flush()
# if n seconds old, reload
# settings['gethostmap_interval']

dbcache.close()
stream.close()


def gethostmap(settings):
conn = pymysql.connect(**settings['mysql'])
cur = conn.cursor()
cur.execute("SELECT i.itemid, i.key_, h.host, h2.host AS proxy FROM items i JOIN hosts h ON i.hostid=h.hostid LEFT JOIN hosts h2 ON h2.hostid=h.proxy_hostid")
# Translation of item key_
# Note: http://opentsdb.net/docs/build/html/user_guide/writing.html#metrics-and-tags
disallow = re.compile(settings['disallow'])
hostmap = {}
for row in cur:
hostmap[row[0]] = { 'key': re.sub(disallow, '_', row[1]), 'host': re.sub(disallow, '_', row[2]), 'proxy': row[3] }
cur.close()
conn.close()
return hostmap

if __name__ == "__main__":
sys.stdin.close()
sys.exit(main())
Expand Down
79 changes: 79 additions & 0 deletions collectors/900/zabbix_bridge_cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
#!/usr/bin/env python
#
# Copyright (C) 2014 The tcollector Authors.
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or (at your
# option) any later version. This program is distributed in the hope that it
# will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty
# of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
# General Public License for more details. You should have received a copy
# of the GNU Lesser General Public License along with this program. If not,
# see <http://www.gnu.org/licenses/>.
#
# Dump all replication item/metric insert events from a zabbix mysql server
# to a local sqlite cache (that can also be shared).
#

import os
import re
import sqlite3
import sys
import time
try:
import pymysql
except ImportError:
pymysql = None # This is handled gracefully in main()

from collectors.etc import zabbix_bridge_conf
from collectors.lib import utils


def main():
utils.drop_privileges()
if pymysql is None:
utils.err("error: Python module `pymysql' is missing")
return 1
settings = zabbix_bridge_conf.get_settings()

db_filename = settings['sqlitedb']
db_is_new = not os.path.exists(db_filename)
dbcache = sqlite3.connect(db_filename)

if db_is_new:
utils.err("Zabbix bridge SQLite DB file does not exist; creating: %s" % (db_filename))
cachecur = dbcache.cursor()
cachecur.execute('''CREATE TABLE zabbix_cache
(id integer, key text, host text, proxy text)''')
dbcache.commit()
else:
utils.err("Zabbix bridge SQLite DB exists @ %s" % (db_filename))


dbzbx = pymysql.connect(**settings['mysql'])
zbxcur = dbzbx.cursor()
zbxcur.execute("SELECT i.itemid, i.key_, h.host, h2.host AS proxy FROM items i JOIN hosts h ON i.hostid=h.hostid LEFT JOIN hosts h2 ON h2.hostid=h.proxy_hostid")
# Translation of item key_
# Note: http://opentsdb.net/docs/build/html/user_guide/writing.html#metrics-and-tags
disallow = re.compile(settings['disallow'])
cachecur = dbcache.cursor()
print('tcollector.zabbix_bridge.deleterows %d %s' %
(int(time.time()), cachecur.execute('DELETE FROM zabbix_cache').rowcount))
rowcount = 0
for row in zbxcur:
cachecur.execute('''INSERT INTO zabbix_cache(id, key, host, proxy) VALUES (?,?,?,?)''',
(row[0], re.sub(disallow, '_', row[1]), re.sub(disallow, '_', row[2]), row[3]))
rowcount += 1

print('tcollector.zabbix_bridge.rows %d %s' % (int(time.time()), rowcount))
zbxcur.close()
dbcache.commit()

dbzbx.close()
dbcache.close()


if __name__ == "__main__":
sys.stdin.close()
sys.exit(main())
8 changes: 5 additions & 3 deletions collectors/etc/zabbix_bridge_conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ def get_settings():
'passwd': '',
'db': 'zabbix'
},
'slaveid': 3, # Slave identifier, it should be unique.
'disallow': '[^a-zA-Z0-9\-_\.]', # Regex of characters to replace with _.
'gethostmap_interval': 300 # How often to reload itemid, hostmap from DB.
'slaveid': 3, # Slave identifier, it should be unique.
'disallow': '[^a-zA-Z0-9\-_\.]', # Regex of characters to replace with _.
'internal_metric_interval': 30, # Internal metric interval drift and error counts.
'dbrefresh': 10, # Number of key misses before DB reload from file occurs
'sqlitedb': '/tmp/zabbix_bridge.db' # SQLite DB to cache items from Zabbix DB.
}
3 changes: 2 additions & 1 deletion tcollector.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,8 @@ def run(self):
# while breaking out every once in a while to setup selects
# on new children.
while ALIVE:
for col in all_living_collectors():
alc = all_living_collectors()
for col in alc:
for line in col.collect():
self.process_line(col, line)

Expand Down

0 comments on commit ffb338f

Please sign in to comment.