Skip to content

Commit

Permalink
thresholds and sync
Browse files Browse the repository at this point in the history
  • Loading branch information
Christos Mantas committed Sep 18, 2014
1 parent 63c50be commit b6b9402
Show file tree
Hide file tree
Showing 19 changed files with 3,991 additions and 9,934 deletions.
3 changes: 2 additions & 1 deletion ClientsCluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,8 @@ def update_hostfiles(servers):
log.info("updating hostfiles")
# generate ycsb-specific hosts file text
host_text = ""
del servers["cassandra_seednode"]

if "cassandra_seednode" in servers.keys(): del servers["cassandra_seednode"]

#generate the "hosts" text for YCSB
for key, value in servers.iteritems(): host_text += value+"\n"
Expand Down
46 changes: 27 additions & 19 deletions Coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,18 @@ def implement_decision():
try:
if action == "ADD":
decision_module.pending_action = action
my_logger.info("Will add %d nodes" % count)
my_logger.debug("Will add %d nodes" % count)
Servers.add_nodes(count)
# artificially delay the decision in order to discard transient measurements
my_logger.info("Sleeping! (artificial delay)")
sleep(env_vars['extra_decision_delay_per_node']*count)
elif action == "REMOVE":
decision_module.pending_action = action
my_logger.info("Will remove %d nodes" % count)
my_logger.debug("Will remove %d nodes" % count)
Servers.remove_nodes(count)
#not supposed to be here for pass decsion
else:
return

#update the hosts files in clients
Clients.update_hostfiles(Servers.get_hosts())
Expand Down Expand Up @@ -120,7 +122,7 @@ def run(timeout=None):

# DONE
#join the running_process
running_process.join()
if not running_process is None: running_process.join()
my_logger.info(" run is finished")


Expand All @@ -129,7 +131,7 @@ def train():
Runs a training phase in order to collect a training set of metrics for the given cluster
"""
#change the gain function for training purposes
env_vars['gain'] = 'num_nodes'
env_vars['gain'] = '0'

# load the training vars into the regular enviroment vars
t_vars = env_vars["training_vars"]
Expand All @@ -145,13 +147,18 @@ def train():
try:remove(env_vars["measurements_file"])
except: pass

# Sanity-Check the nodecount
if Servers.node_count() != t_vars['min_cluster_size']:
my_logger.error("TRAINING: Start training with the Minimum cluster size, %d (now:%d)" %(t_vars['min_cluster_size'], Servers.node_count()))
exit()
# # Sanity-Check the nodecount
# if Servers.node_count() != t_vars['min_cluster_size']:
# my_logger.error("TRAINING: Start training with the Minimum cluster size, %d (now:%d)" %(t_vars['min_cluster_size'], Servers.node_count()))
# exit()

Clients.kill_nodes()
Servers.kill_nodes()
Servers.bootstrap_cluster(t_vars['min_cluster_size'])
svr_hosts = Servers.get_hosts(private=env_vars["private_network"])

Clients.run({'type': 'load', 'servers': Servers.get_hosts(), 'records': t_vars['records']})

# get the workload parameters
svr_hosts = Servers.get_hosts(private=True)
#create the parameters dictionary for the training phase
params = {'type': 'sinusoid', 'servers': svr_hosts, 'target': t_vars['target_load'],
'offset': t_vars['offset_load'], 'period': t_vars['period']}
Expand All @@ -164,30 +171,31 @@ def train():

# run 1 period of workload for each of the the states between min and max cluster size
for i in range(env_vars['max_cluster_size'] - t_vars['min_cluster_size'] + 1):
print "iteration "+str(i)
my_logger.info("iteration "+str(i))

#run the workload with the specified params to the clients
Clients.run(params)

#refresh once
all_metrics = monitor_clients.refreshMetrics()
#This should only decide to add a node after a period is passed
global decision
decision = decision_module.take_decision(all_metrics)

#run for 1 period
timeout = time() + env_vars['period']
while time() <= timeout:
#fetch metrics and takes decisions
sleep(metrics_interval)
# refresh the metrics
all_metrics = monitor_clients.refreshMetrics()
client_metrics = monitor_clients.refreshMetrics()
server_metrics = monitor_servers.refreshMetrics()

#only refresh metrics
decision_module.take_decision(client_metrics, server_metrics)

#This should only decide to add a node after a period is passed
decision = decision_module.take_decision(all_metrics)
#manually add a node
decision = {"action": "ADD", 'count':1}
# synchronously implement that decision
implement_decision()

# synchronously implement that decision
implement_decision()


#stop the clients after one period has passed
Expand Down
6 changes: 3 additions & 3 deletions Experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ def simulate():
draw_exp("files/measurements/simulation/measurements.txt")


def clean():
def clean_start():
success = False
while not success:
try:
Expand All @@ -201,7 +201,7 @@ def clean():
ClientsCluster.run(args)
success = True
except:
log.error("Failed to cllean, restarting")
log.error("Failed to clean, restarting")
sleep(120)


Expand Down Expand Up @@ -254,7 +254,7 @@ def run_batch_experiments(exp_list):
success = False
while not success and tries > 0:
if (not ('clean' in exp)) or bool(exp['clean']):
clean()
clean_start()
else:
#make sure the cluster is at its min size
CassandraCluster.set_cluster_size(env_vars["min_cluster_size"])
Expand Down
29 changes: 17 additions & 12 deletions Monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,18 +83,23 @@ def refreshMetrics(self):
#self.my_logger.debug("Refreshing metrics from %s:%s" % (self.ganglia_host, self.ganglia_port))
for res in socket.getaddrinfo(self.ganglia_host, self.ganglia_port, socket.AF_UNSPEC, socket.SOCK_STREAM):
af, socktype, proto, canonname, sa = res
try:
self.soc = socket.socket(af, socktype, proto)
except socket.error as msg:
s = None
continue
try:
self.soc.connect(sa)
except socket.error as msg:
self.soc.close()
self.soc = None
continue
break
self.soc = None
attempts = 0
while self.soc is None and attempts < 3:
try:
self.soc = socket.socket(af, socktype, proto)
except socket.error as msg:
s = None
sleep(1)
continue
try:
self.soc.connect(sa)
except socket.error as msg:
self.soc.close()
self.soc = None
sleep(1)
continue
break
if self.soc is None:
print 'could not open socket %s:%s' % (str(self.ganglia_host), str(self.ganglia_port))
sys.exit(1)
Expand Down
Loading

0 comments on commit b6b9402

Please sign in to comment.