-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmaster.py
150 lines (132 loc) · 5.33 KB
/
master.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
from SimpleXMLRPCServer import SimpleXMLRPCServer
import xmlrpclib
import argparse
import config
import sys
import os
import time
import errno
import socket
import pickle
joinSeq = [False] * config.SERVER_COUNT
kv_store = {}
def joinServer(dogs, clients, servers, arg):
server_id = int(arg[1])
dogs[server_id].joinServer(server_id, joinSeq)
i = 0
while True:
time.sleep(config.CHECK_INTERVAL)
i += config.CHECK_INTERVAL
try:
if servers[server_id].hello() == 0:
break
except socket.error as err:
if err.errno != errno.ECONNREFUSED:
raise err
if config.DEBUG:
print "Server[{}] joined in {} seconds.".format(server_id, i)
joinSeq[server_id] = True
def killServer(dogs, clients, servers, arg):
dogs[int(arg[1])].killServer()
joinSeq[int(arg[1])] = False
def joinClient(dogs, clients, servers, arg):
clients[int(arg[1]) % config.CLIENT_COUNT].joinClient(int(arg[2]))
def breakConnection(dogs, clients, servers, arg):
# if break between client and server, send to client,
# elseif between 2 servers, send to first server to execute
id1, id2 = int(arg[1]), int(arg[2])
if id1 < config.SERVER_COUNT and id2 < config.SERVER_COUNT:
servers[id1].breakConnection(id2)
i = 0
while servers[id2].isConnectedTo(id1):
time.sleep(config.CHECK_INTERVAL)
i += config.CHECK_INTERVAL
if config.DEBUG:
print "Connection betwen Server[{}] Server[{}] break in {} seconds.".format(id1, id2, i)
else:
clients[max(id1, id2) % config.CLIENT_COUNT].breakConnection(min(id1, id2))
def createConnection(dogs, clients, servers, arg):
id1, id2 = int(arg[1]), int(arg[2])
if id1 < config.SERVER_COUNT and id2 < config.SERVER_COUNT:
servers[id1].createConnection(id2)
else:
# clients[max(id1, id2) % config.CLIENT_COUNT].createConnection(min(id1, id2))
clients[max(id1, id2) % config.CLIENT_COUNT].joinClient(min(id1, id2))
def stabilize(dogs, clients, servers, arg):
i = 0
while True:
finished = True
for j, server in enumerate(servers):
if joinSeq[j] and not server.status(joinSeq):
finished = False
break
if finished:
break
time.sleep(config.STABILIZE_INTERVAL)
i += config.STABILIZE_INTERVAL
if config.DEBUG:
print("stabilized after {} second.".format(i))
def printStore(dogs, clients, servers, arg):
global kv_store
disKvStore = servers[int(arg[1])].printStore()
if config.DEBUG:
if int(arg[1]) == 0:
kv_store = disKvStore
else:
for key in kv_store:
if key not in disKvStore or disKvStore[key] != kv_store[key]:
print('On the server %s the key %s has a wrong value' % (arg[1], key))
print("Remote Value: {}".format(disKvStore[key][0:20] if key in disKvStore else None) )
print("Ground Truth: {}".format(kv_store[key][0:20]))
for k, v in disKvStore.items():
print "{}:{}".format(k, v)
if config.DEBUG:
print servers[int(arg[1])].printStore()
def put(dogs, clients, servers, arg):
# what happens if there are ' ' in key and value
clients[int(arg[1]) % config.CLIENT_COUNT].put(arg[2], arg[3])
def get(dogs, clients, servers, arg):
res = clients[int(arg[1]) % config.CLIENT_COUNT].get(arg[2])
print res
if __name__ == "__main__":
# 1. connect with five server watchdogs and five clients and five servers
# (NOTE: connect with watchdogs before servers)
# 2. Wait for the Samantha's command
os.system("rm -rf dict log")
dogs, clients, servers = [], [], []
for i in range(config.SERVER_COUNT):
dogs.append( xmlrpclib.ServerProxy('http://' + str(config.WATCHDOG_IP_LIST[i]) + ':' + str(config.WATCHDOG_PORT[i])))
clients.append(xmlrpclib.ServerProxy('http://' + str(config.CLIENT_IP_LIST[i])+ ':' + str(config.CLIENT_PORT[i])))
servers.append(xmlrpclib.ServerProxy('http://' + str(config.WATCHDOG_IP_LIST[i]) + ':' +str(config.SERVER_PORT[i])))
command2func = {
'joinServer' : joinServer,
'killServer' : killServer,
'joinClient' : joinClient,
'breakConnection' : breakConnection,
'createConnection' : createConnection,
'stabilize' : stabilize,
'printStore' : printStore,
'put' : put,
'get' : get
}
start = time.time()
commandCount = 0
while True:
input = sys.stdin.readline().strip('\n')
commandCount += 1
if len(input) == 0 or input.startswith("#"):
break
if config.DEBUG:
if config.DISPLAY_COMMAND:
print "excecuting command [{}]: {}".format(commandCount, input[:30])
else:
print "excecuting command [{}]".format(commandCount)
arg = input.split(' ')
func = command2func.get(arg[0], None)
if func:
func(dogs, clients, servers, arg)
else:
continue
allTime = time.time() - start
print ("Time to execute all commands: {}".format(allTime))
print('throughput is %f requests per second' % (commandCount / allTime))