Skip to content

Commit

Permalink
influx: move to InfluxDB2
Browse files Browse the repository at this point in the history
Signed-off-by: Frank Villaro-Dixon <[email protected]>
  • Loading branch information
Frankkkkk committed Jun 1, 2022
1 parent 6e3f6b8 commit 5271386
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 24 deletions.
2 changes: 2 additions & 0 deletions mqtt2influxdb/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import argparse
import logging
from time import sleep
import traceback
from urllib3 import disable_warnings
from .config import load_config, ConfigError
from .mqtt2influxdb import Mqtt2InfluxDB
Expand Down Expand Up @@ -58,6 +59,7 @@ def main():
if isinstance(e, ConfigError):
print('Config error:')
print(e)
traceback.print_exc()
sys.exit(1)


Expand Down
10 changes: 4 additions & 6 deletions mqtt2influxdb/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,10 @@ def port_range(port):
},

'influxdb': {
'host': And(str, len),
'port': And(int, port_range),
Optional('username'): And(str, len),
Optional('password'): And(str, len),
'database': And(str, len),
Optional('ssl'): bool
'url': And(str, len),
'bucket': And(str, len),
Optional('org'): str,
Optional('token'): str,
},
Optional("base64decode"): {
'source': And(str, len, Use(str_or_jsonPath)),
Expand Down
31 changes: 14 additions & 17 deletions mqtt2influxdb/mqtt2influxdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,11 @@
from datetime import datetime
import paho.mqtt.client
from paho.mqtt.client import topic_matches_sub
import influxdb
import influxdb_client
import jsonpath_ng
import requests
import base64
from requests.auth import HTTPBasicAuth
import http.client as http_client
import builtins
import py_expression_eval
import pycron
Expand All @@ -27,11 +26,12 @@ def __init__(self, config):
self._points = config['points']
self._config = config

self._influxdb = influxdb.InfluxDBClient(config['influxdb']['host'],
config['influxdb']['port'],
config['influxdb'].get('username', 'root'),
config['influxdb'].get('password', 'root'),
ssl=config['influxdb'].get('ssl', False))
self._influxdb = influxdb_client.InfluxDBClient(url=config['influxdb']['url'],
token=config['influxdb'].get('token', None),
org=config['influxdb'].get('org', 'influxdata'),
)
self._influxdb_write_api = self._influxdb.write_api()
self._influxdb_bucket = config['influxdb']['bucket']

self._mqtt = paho.mqtt.client.Client()

Expand All @@ -49,15 +49,6 @@ def __init__(self, config):
self._mqtt.on_message = self._on_mqtt_message

def run(self):
logging.debug('InfluxDB create database %s', self._config['influxdb']['database'])
self._influxdb.create_database(self._config['influxdb']['database'])
self._influxdb.switch_database(self._config['influxdb']['database'])

for point in self._points:
if 'database' in point:
logging.debug('InfluxDB create database %s', point['database'])
self._influxdb.create_database(point['database'])

logging.info('MQTT broker host: %s, port: %d, use tls: %s',
self._config['mqtt']['host'],
self._config['mqtt']['port'],
Expand Down Expand Up @@ -185,7 +176,13 @@ def _on_mqtt_message(self, client, userdata, message):

logging.debug('influxdb write %s', record)

self._influxdb.write_points([record], database=point.get('database', None))
point = influxdb_client.Point(record['measurement'])
for tag_name, tag_value in record['tags'].items():
point.tag(tag_name, tag_value)
for field_name, field_value in record['fields'].items():
point.field(field_name, field_value)

self._influxdb_write_api.write(bucket=self._influxdb_bucket, record=point)

if 'http' in self._config:
http_record = {}
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
PyYAML>=5.1.1
paho-mqtt>=1.0
influxdb
influxdb-client[ciso]
schema>=0.6.7
jsonpath-ng>=1.4.3
pycron>=3.0.0
Expand Down

0 comments on commit 5271386

Please sign in to comment.