-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathnifi_json_handler.py
65 lines (59 loc) · 3.44 KB
/
nifi_json_handler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
import json
import sys
import datetime
jdata = sys.stdin.read()
if jdata is None or not jdata:
print ''
sys.exit(0)
data = json.loads(jdata)
#key = 'processGroups'
key = sys.argv[1]
# if get process group itself, then get it directly and print a json result
if key == 'rootGroup':
groupDetails = {}
groupDetails['timestamp'] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S:%f")
groupDetails['id'] = data['component']['id']
groupDetails['name'] = data['component']['name']
groupDetails['runningCount'] = data['component']['runningCount']
groupDetails['stoppedCount'] = data['component']['stoppedCount']
groupDetails['invalidCount'] = data['component']['invalidCount']
groupDetails['activeRemotePortCount'] = data['component']['activeRemotePortCount']
groupDetails['inactiveRemotePortCount'] = data['component']['inactiveRemotePortCount']
groupDetails['inputPortCount'] = data['component']['inputPortCount']
groupDetails['outputPortCount'] = data['component']['outputPortCount']
groupDetails['queuedCount'] = data['status']['aggregateSnapshot']['queuedCount']
groupDetails['flowFilesReceived'] = data['status']['aggregateSnapshot']['flowFilesReceived']
json_groupDetails = json.dumps(groupDetails)
print json_groupDetails
sys.exit(0)
# if it's remote process group ports, then get it directly
if key == 'rpgPorts':
version = str(data['revision']['version'])
revision = '{"version":' + version
if "clientId" in data['revision']:
clientId = str(data['revision']['clientId'])
revision += ',"clientId":"' + clientId + '"'
revision += '}'
ports = data['component']['contents']['inputPorts']
for item in ports:
if item['connected'] == True:
print 'id:' + item['id'] + ';name:' + item['name'] + ';transmitting:' + str(item['transmitting']).lower() + ';revision:' + revision
sys.exit(0)
if key not in data or len(data[key]) == 0:
print ''
sys.exit(0)
for item in data[key]:
if key == 'processors':
version = str(item['revision']['version'])
revision = '{"version":' + version
if "clientId" in item['revision']:
clientId = str(item['revision']['clientId'])
revision += ',"clientId":"' + clientId + '"'
revision += '}'
print 'id:' + item['component']['id'] + ';name:' + item['component']['name'] + ';revision:' + revision + ';state:' + item['component']['state']
elif key == 'processGroups':
print 'id:' + item['component']['id'] + ';name:' + item['component']['name'] + ';stoppedCount:' + str(item['component']['stoppedCount']) + ';invalidCount:' + str(item['component']['invalidCount']) + ';disabledCount:' + str(item['component']['disabledCount']) + ';inactiveRemotePortCount:' + str(item['component']['inactiveRemotePortCount']) + ';queuedCount:' + item['status']['aggregateSnapshot']['queuedCount']
elif key == 'remoteProcessGroups':
print 'id:' + item['component']['id'] + ';targetUris:' + item['component']['targetUris'] + ';transmitting:' + str(item['component']['transmitting']).lower() + ';activeRemoteInputPortCount:' + str(item['component']['activeRemoteInputPortCount'])
elif key == 'connections':
print 'sourceId:' + item['status']['sourceId'] + ';sourceName:' + item['status']['sourceName'] + ';destinationId:' + item['status']['destinationId'] + ';destinationName:' + item['status']['destinationName'] + ';queuedCount:' + item['status']['aggregateSnapshot']['queuedCount'] + ';destinationGroupId:' + item['destinationGroupId'] + ';destinationType:' + item['destinationType']