Skip to content

Commit

Permalink
reworked for full IPv4. Fixed: ssh, add_nodes, load_data
Browse files Browse the repository at this point in the history
  • Loading branch information
Christos Mantas committed Apr 24, 2014
1 parent eb8119d commit 856b6f8
Show file tree
Hide file tree
Showing 58 changed files with 51,851 additions and 3,954 deletions.
58 changes: 41 additions & 17 deletions CassandraCluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from os.path import isfile
from lib.persistance_module import get_script_text, env_vars
from lib.tiramola_logging import get_logger
from threading import Thread

orchestrator = None # the VM to which the others report to

Expand All @@ -26,17 +27,24 @@
log = get_logger('CLUSTER', 'DEBUG', logfile='files/logs/Coordinator.log')


def create_cluster(worker_count=0):
def create_cluster(worker_count=0, used=None):
"""
Creates a Cassandra Cluster with a single Seed Node and 'worker_count' other nodes
:param worker_count: the number of the nodes to create-apart from the seednode
"""
global nodes, stash, seeds
#create the seed node
seeds.append(Node(cluster_name, node_type="seed", number=0, create=True, IPv4=True))
#create the rest of the nodes
for i in range(worker_count):
nodes.append(Node(cluster_name, node_type="node", number="%02d"%(len(nodes)+1), create=True))
nodes.append(Node(cluster_name, node_type="node", number="%02d"%(len(nodes)+1), create=True, IPv4=True))
#wait until everybody is ready

#if 'used' is specified, only use this number of nodes
if not used is None:
stash = nodes[used:]
nodes = nodes[:used-1]
#save the cluster to file
save_cluster()
wait_everybody()
find_orchestrator()
Expand Down Expand Up @@ -158,30 +166,46 @@ def inject_hosts_files():
orchestrator.run_command("service ganglia-monitor restart; service gmetad restart", silent=True)


def add_one_node():
"""
Helper function for add_nodes
"""
if not len(stash) == 0:
new_guy = stash.pop(0)
log.info("Using %s from my stash" % new_guy.name)
else:
new_guy = Node(cluster_name, 'node', str(len(nodes)+1), create=True)
nodes.append(new_guy)
new_guy.wait_ready()
new_guy.inject_hostnames(get_hosts(private=True), delete=cluster_name)
new_guy.bootstrap()
log.info("Node %s is live " % new_guy.name)


def add_nodes(count=1):
"""
Adds a node to the cassandra cluster. Refreshes the hosts in all nodes
:return:
"""
log.info('Adding %d nodes' % count )
new_nodes = []
threads = []
#start the threads that add the nodes
for i in range(count):
if not len(stash) == 0:
new_guy = stash.pop(0)
log.info("Using %s from my stash" % new_guy.name)
else:
new_guy = Node(cluster_name, 'node', str(len(nodes)+1), create=True)
nodes.append(new_guy)
new_nodes.append(new_guy)
save_cluster()
for n in new_nodes:
n.wait_ready()
#inject host files to everybody
n.inject_hostnames(get_hosts(private=True), delete=cluster_name)
n.bootstrap()
log.info("Node %s is live " % n.name)
t = Thread(target=add_one_node, args=())
threads.append(t)
t.start()
#TODO join with timeout if serial
#t.join()
#FIXME parallel adding of nodes (ssh problem)
#wait for all the threads to finish
log.debug("Waiting for all the threads to finish adding")
for t in threads:
t.join()
#save the current cluster state
save_cluster()
#inform all
inject_hosts_files()
log.info("Finished adding %d nodes" % count)


def remove_nodes(count=1):
Expand Down
9 changes: 8 additions & 1 deletion ClientsCluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from os.path import isfile
from lib.persistance_module import get_script_text, env_vars
from lib.tiramola_logging import get_logger
from threading import Thread

orchestrator = None # the VM to which the others report to

Expand Down Expand Up @@ -176,12 +177,18 @@ def run(params):
record_count = int(params['records'])
start = 0
step = record_count/len(all_nodes)
threads = []
for c in all_nodes:
load_command = "echo '%s' > /opt/hosts;" % host_text
load_command += get_script_text(cluster_name, "", "load") % (str(record_count), str(step), str(start))
log.info("running load phase on %s" % c.name)
c.run_command(load_command, silent=True)
t = Thread(target=c.run_command, args=(load_command,) )
threads.append(t)
t.start()
start += step
log.info("waiting for load phase to finish in clients")
for t in threads:
t.join()


def destroy_all():
Expand Down
2 changes: 0 additions & 2 deletions Coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,11 @@ def implement_decision():
my_logger.info("Will remove %d nodes" % count)
Servers.remove_nodes(count)
elif action == "PASS":
my_logger.debug("doing nothing")
return
decision_module.pending_action = None
decision_module.currentState = Servers.node_count()



#check if cluster exists
if Servers.exists():
my_logger.info( "Cluster exists using it as is")
Expand Down
9 changes: 6 additions & 3 deletions VM.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ def run_command(self, command, user='root', indent=0, prefix="\t$: ", silent=Fa
' so you cannot run commands on it')
return "ERROR"
self.log.debug("running SSH command:\n\n%s\n\n" % reindent(command, 5))
rv= run_ssh_command(self.get_public_addr(), user, command, indent, prefix)
rv = run_ssh_command(self.get_public_addr(), user, command, indent, prefix, logger=self.log)
if rv is not None:
self.log.debug("command returned:\n\n %s\n\n" % rv)
return rv
Expand Down Expand Up @@ -219,16 +219,18 @@ def wait_ready(self):
if not self.created:
self.log.debug("Not active yet. Sleeping")
while not self.created: sleep(3)
self.log.info( "Waiting for SSH daemon (%s)" % self.get_public_addr())
self.log.info("Waiting for SSH daemon (%s)" % self.get_public_addr())
#time to stop trying
end_time = datetime.now()+timedelta(seconds=ssh_giveup_timeout)
self.log.debug("end time:"+str(end_time))
timer = Timer()
timer.start()
#self.log.info(("VM: Trying ssh, attempt "),
while not success:
#if(attempts%5 == 0): self.log.info( ("%d" % attempts),
attempts += 1
if test_ssh(self.get_public_addr(), 'root'):
self.log.debug("ssh attempt:"+str(attempts))
if test_ssh(self.get_public_addr(), 'root', logger=self.log):
success = True
else:
if datetime.now() > end_time:
Expand Down Expand Up @@ -303,6 +305,7 @@ def stop(self):
assert self.started is True, " Timer had not been started"
start_time = self.start_time
self.start_time = 0
self.started = False
return float(end_time - start_time)/1000

@staticmethod
Expand Down
8 changes: 6 additions & 2 deletions cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,13 @@ def run_stress():
def create_cluster():
try:
nodes = int(args["nodes"])
log.info("creating cluster with %d nodes" % nodes)
if "used" in args.keys():
used = int(args['used'])
else:
used = None
log.info("creating cluster with %d nodes (%d used)" % (nodes, used))
import CassandraCluster
CassandraCluster.create_cluster(nodes-1)
CassandraCluster.create_cluster(nodes-1, used-1)
except KeyError as e:
log.info("create_cluster requires argument %s" % e.args[0])

Expand Down
Loading

0 comments on commit 856b6f8

Please sign in to comment.