-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmavlink_influxdb.py
executable file
·113 lines (96 loc) · 3.88 KB
/
mavlink_influxdb.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
#!/usr/bin/env python3
import argparse
import datetime
import logging
import math
import os.path
from typing import Any, Dict, List, TextIO
import influxdb # type: ignore
from pymavlink.DFReader import DFMessage, DFReader_binary # type: ignore
_logger = logging.getLogger('mavlink_influxdb')
def main() -> None:
parser = argparse.ArgumentParser(
description="Upload dataflash logs to InfluxDB.")
parser.add_argument('filename', help="Log filename")
parser.add_argument('--hostname', required=True,
help="InfluxDB server hostname")
parser.add_argument('--port', type=int, default=8086,
help="InfluxDB server port")
parser.add_argument('--certificate', help="InfluxDB client certificate")
parser.add_argument('--username',
help="InfluxDB username", default='mavlink')
parser.add_argument('--password-file', type=argparse.FileType('r'),
help="File containing InfluxDB password")
parser.add_argument('--database', default='mavlink',
help="InfluxDB database name")
parser.add_argument('--vehicle',
help="Vehicle name (stored in 'vehicle' tag)")
args = parser.parse_args()
log = DFReader_binary(args.filename, False)
password: str
if args.password_file:
password_file: TextIO
with args.password_file as password_file:
password = password_file.read().rstrip('\r\n')
else:
password = 'mavlink'
client = influxdb.InfluxDBClient(
host=args.hostname,
port=args.port,
database=args.database,
username=args.username,
password=password,
ssl=bool(args.certificate),
verify_ssl=bool(args.certificate),
cert=args.certificate)
common_tags: Dict[str, str] = {
'filename': os.path.basename(args.filename)
}
if args.vehicle:
common_tags['vehicle'] = args.vehicle
json_points: List[Dict[str, Any]] = []
# Iterate through logfile, process data and import into InfluxDB
while True:
entry: DFMessage = log.recv_msg()
if entry is None:
_logger.debug("No more log entries, break from processing loop")
break
msg_type = entry.fmt.name
timestamp_ns = int(float(entry._timestamp) * 1000000000)
fields = {}
for field_name in entry.get_fieldnames():
field = getattr(entry, field_name)
# Skip NaNs
if isinstance(field, float) and math.isnan(field):
continue
# Skip fields that can't be decoded as UTF-8, as the Python client
# and perhaps InfluxDB itself can't handle it.
if isinstance(field, bytes):
try:
field.decode('utf-8')
except UnicodeDecodeError:
_logger.debug("skipping non UTF-8 field: %s.%s=%s",
msg_type, field_name, field)
continue
fields[field_name] = field
tags = {}
if entry.fmt.instance_field is not None:
tags['instance'] = fields[entry.fmt.instance_field]
json_body: Dict[str, Any] = {
'measurement': msg_type,
'time': timestamp_ns,
'tags': tags,
'fields': fields
}
json_points.append(json_body)
# Batch writes to influxdb, much faster
if len(json_points) > 20000:
client.write_points(json_points, time_precision='n',
database=args.database, tags=common_tags)
json_points = [] # Clear out json_points after bulk write
# Flush remaining points
if len(json_points) > 0:
client.write_points(json_points, time_precision='n',
database=args.database, tags=common_tags)
if __name__ == "__main__":
main()