forked from cpfair/tapiriik
-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathstats_cron.py
159 lines (140 loc) · 6.91 KB
/
stats_cron.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
from tapiriik.database import db, close_connections
from tapiriik.settings import RABBITMQ_USER_QUEUE_STATS_URL
from datetime import datetime, timedelta
import requests
# total distance synced
distanceSyncedAggr = list(db.sync_stats.aggregate([{"$group": {"_id": None, "total": {"$sum": "$Distance"}}}]))
if distanceSyncedAggr:
distanceSynced = distanceSyncedAggr[0]["total"]
else:
distanceSynced = 0
# last 24hr, for rate calculation
lastDayDistanceSyncedAggr = list(db.sync_stats.aggregate([{"$match": {"Timestamp": {"$gt": datetime.utcnow() - timedelta(hours=24)}}}, {"$group": {"_id": None, "total": {"$sum": "$Distance"}}}]))
if lastDayDistanceSyncedAggr:
lastDayDistanceSynced = lastDayDistanceSyncedAggr[0]["total"]
else:
lastDayDistanceSynced = 0
# similarly, last 1hr
lastHourDistanceSyncedAggr = list(db.sync_stats.aggregate([{"$match": {"Timestamp": {"$gt": datetime.utcnow() - timedelta(hours=1)}}}, {"$group": {"_id": None, "total": {"$sum": "$Distance"}}}]))
if lastHourDistanceSyncedAggr:
lastHourDistanceSynced = lastHourDistanceSyncedAggr[0]["total"]
else:
lastHourDistanceSynced = 0
# How long users are taking to get pushed into rabbitMQ
# Once called "queueHead" as, a very long time ago, this _was_ user queuing
enqueueHead = list(db.users.find({"QueuedAt": {"$lte": datetime.utcnow()}, "SynchronizationWorker": None, "SynchronizationHostRestriction": {"$exists": False}}, {"QueuedAt": 1}).sort("QueuedAt").limit(10))
enqueueTime = timedelta(0)
if len(enqueueHead):
for pendingEnqueueUser in enqueueHead:
enqueueTime += datetime.utcnow() - pendingEnqueueUser["QueuedAt"]
enqueueTime /= len(enqueueHead)
# Query rabbitMQ to get main queue throughput and length
rmq_user_queue_stats = requests.get(RABBITMQ_USER_QUEUE_STATS_URL).json()
rmq_user_queue_length = rmq_user_queue_stats["messages_ready_details"]["avg"]
rmq_user_queue_rate = rmq_user_queue_stats["message_stats"]["ack_details"]["avg_rate"]
rmq_user_queue_wait_time = rmq_user_queue_length / rmq_user_queue_rate
# sync time utilization
db.sync_worker_stats.remove({"Timestamp": {"$lt": datetime.utcnow() - timedelta(hours=1)}}) # clean up old records
timeUsedAgg = list(db.sync_worker_stats.aggregate([{"$group": {"_id": None, "total": {"$sum": "$TimeTaken"}}}]))
totalSyncOps = db.sync_worker_stats.count()
if timeUsedAgg:
timeUsed = timeUsedAgg[0]["total"]
avgSyncTime = timeUsed / totalSyncOps
else:
timeUsed = 0
avgSyncTime = 0
# error/pending/locked stats
lockedSyncRecords = list(db.users.aggregate([
{"$match": {"SynchronizationWorker": {"$ne": None}}},
{"$group": {"_id": None, "count": {"$sum": 1}}}
]))
if len(lockedSyncRecords) > 0:
lockedSyncRecords = lockedSyncRecords[0]["count"]
else:
lockedSyncRecords = 0
pendingSynchronizations = list(db.users.aggregate([
{"$match": {"NextSynchronization": {"$lt": datetime.utcnow()}}},
{"$group": {"_id": None, "count": {"$sum": 1}}}
]))
if len(pendingSynchronizations) > 0:
pendingSynchronizations = pendingSynchronizations[0]["count"]
else:
pendingSynchronizations = 0
usersWithErrors = list(db.users.aggregate([
{"$match": {"NonblockingSyncErrorCount": {"$gt": 0}}},
{"$group": {"_id": None, "count": {"$sum": 1}}}
]))
if len(usersWithErrors) > 0:
usersWithErrors = usersWithErrors[0]["count"]
else:
usersWithErrors = 0
totalErrors = list(db.users.aggregate([
{"$group": {"_id": None,
"total": {"$sum": "$NonblockingSyncErrorCount"}}}
]))
if len(totalErrors) > 0:
totalErrors = totalErrors[0]["total"]
else:
totalErrors = 0
db.sync_status_stats.insert({
"Timestamp": datetime.utcnow(),
"Locked": lockedSyncRecords,
"Pending": pendingSynchronizations,
"ErrorUsers": usersWithErrors,
"TotalErrors": totalErrors,
"SyncTimeUsed": timeUsed,
"SyncEnqueueTime": enqueueTime.total_seconds(),
"SyncQueueHeadTime": rmq_user_queue_wait_time
})
db.stats.update({}, {"$set": {
"TotalDistanceSynced": distanceSynced,
"LastDayDistanceSynced": lastDayDistanceSynced,
"LastHourDistanceSynced": lastHourDistanceSynced,
"TotalSyncTimeUsed": timeUsed,
"AverageSyncDuration": avgSyncTime,
"LastHourSynchronizationCount": totalSyncOps,
"EnqueueTime": enqueueTime.total_seconds(),
"QueueHeadTime": rmq_user_queue_wait_time,
"Updated": datetime.utcnow() }}, upsert=True)
def aggregateCommonErrors():
from bson.code import Code
# The exception message always appears right before "LOCALS:"
map_operation = Code(
"function(){"
"if (!this.SyncErrors) return;"
"var errorMatch = new RegExp(/\\n([^\\n]+)\\n\\nLOCALS:/);"
"var id = this._id;"
"var svc = this.Service;"
"var now = new Date();"
"this.SyncErrors.forEach(function(error){"
"var message = error.Message.match(errorMatch)[1];"
"var key = {service: svc, stem: message.substring(0, 60)};"
"var recency_score = error.Timestamp ? (now - error.Timestamp)/1000 : 0;"
"emit(key, {count:1, ts_count: error.Timestamp ? 1 : 0, recency: recency_score, connections:[id], exemplar:message});"
"});"
"}"
)
reduce_operation = Code(
"function(key, item){"
"var reduced = {count:0, ts_count:0, connections:[], recency: 0};"
"var connection_collections = [];"
"item.forEach(function(error){"
"reduced.count+=error.count;"
"reduced.ts_count+=error.ts_count;"
"reduced.recency+=error.recency;"
"reduced.exemplar = error.exemplar;"
"connection_collections.push(error.connections);"
"});"
"reduced.connections = reduced.connections.concat.apply(reduced.connections, connection_collections);"
"return reduced;"
"}")
finalize_operation = Code(
"function(key, res){"
"res.recency_avg = res.recency / res.ts_count;"
"return res;"
"}"
)
db.connections.map_reduce(map_operation, reduce_operation, "common_sync_errors", finalize=finalize_operation)
# We don't need to do anything with the result right now, just leave it there to appear in the dashboard
aggregateCommonErrors()
close_connections()