Skip to content

Commit

Permalink
Merge pull request #183 from FYP-Auto-Scale-JasmineGraph/train-test
Browse files Browse the repository at this point in the history
Fix Errors in Federation Training Commands and Add Testcases.
  • Loading branch information
miyurud authored Sep 4, 2023
2 parents 2a6a664 + 5d4792e commit a107a5b
Show file tree
Hide file tree
Showing 14 changed files with 8,320 additions and 53 deletions.
8 changes: 8 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,16 @@ ENV JASMINEGRAPH_HOME="/home/ubuntu/software/jasminegraph"

WORKDIR /home/ubuntu/software/jasminegraph

RUN apt-get update
RUN apt-get install -y python3.8
RUN apt-get install -y python3-pip
RUN apt-get install -y python3.8-distutils
RUN python3.8 -m pip install stellargraph
RUN python3.8 -m pip install chardet

COPY ./GraphSAGE ./GraphSAGE
RUN pip install -r ./GraphSAGE/requirements
RUN pip install pandas

COPY ./build.sh ./build.sh
COPY ./run-docker.sh ./run-docker.sh
Expand Down
6 changes: 3 additions & 3 deletions conf/jasminegraph-server.properties
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ org.jasminegraph.server.modelDir=/var/tmp/jasminegraph-localstore/models/
#--------------------------------------------------------------------------------

org.jasminegraph.federated.enabled=true
org.jasminegraph.fl.location=/home/ubuntu/software/jasminegraph/src-python/
org.jasminegraph.fl.dataDir=/home/ubuntu/software/jasminegraph/src-python/data/
org.jasminegraph.fl.weights=/home/ubuntu/software/jasminegraph/src-python/weights/
org.jasminegraph.fl.location=/home/ubuntu/software/jasminegraph/src_python/
org.jasminegraph.fl.dataDir=/home/ubuntu/software/jasminegraph/src_python/data/
org.jasminegraph.fl.weights=/home/ubuntu/software/jasminegraph/src_python/weights/
org.jasminegraph.fl_clients=2
org.jasminegraph.fl.epochs=4
org.jasminegraph.fl.rounds=4
Expand Down
20 changes: 6 additions & 14 deletions src/frontend/JasmineGraphFrontEnd.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1148,6 +1148,8 @@ void *frontendservicesesion(std::string masterIP, int connFd, SQLiteDBInterface
JasmineGraphServer *jasmineServer = new JasmineGraphServer();
jasmineServer->initiateFiles(graphID, trainData);
jasmineServer->initiateMerge(graphID, trainData, sqlite);
write(connFd, DONE.c_str(), FRONTEND_COMMAND_LENGTH);
write(connFd, "\r\n", 2);

} else if (line.compare(TRAIN) == 0) {
string message = "Available main flags:\r\n";
Expand All @@ -1158,20 +1160,6 @@ void *frontendservicesesion(std::string masterIP, int connFd, SQLiteDBInterface
string flags =
Conts::FLAGS::GRAPH_ID + " " + Conts::FLAGS::LEARNING_RATE + " " + Conts::FLAGS::BATCH_SIZE + " " +
Conts::FLAGS::VALIDATE_ITER + " " + Conts::FLAGS::EPOCHS;
result_wr = write(connFd, flags.c_str(), flags.size());
if (result_wr < 0) {
frontend_logger.log("Error writing to socket", "error");
}
result_wr = write(connFd, "\r\n", 2);
if (result_wr < 0) {
frontend_logger.log("Error writing to socket", "error");
}
message = "Send --<flag1> <value1> --<flag2> <value2> ..\r\n";
result_wr = write(connFd, message.c_str(), message.size());
if (result_wr < 0) {
frontend_logger.log("Error writing to socket", "error");
}

write(connFd, flags.c_str(), flags.size());
write(connFd, "\r\n", 2);
message = "Send --<flag1> <value1> --<flag2> <value2> ..\r\n";
Expand Down Expand Up @@ -1223,9 +1211,11 @@ void *frontendservicesesion(std::string masterIP, int connFd, SQLiteDBInterface
if (federatedEnabled == "true") {
JasmineGraphServer *jasmineServer = new JasmineGraphServer();
if (utils.getJasmineGraphProperty("org.jasminegraph.fl.org.training") == "true") {
frontend_logger.log("Initiate org communication", "info");
jasmineServer->initiateOrgCommunication(graphID, trainData, sqlite);

} else {
frontend_logger.log("Initiate communication", "info");
jasmineServer->initiateCommunication(graphID, trainData, sqlite);

}
Expand All @@ -1236,6 +1226,8 @@ void *frontendservicesesion(std::string masterIP, int connFd, SQLiteDBInterface
jasminGraphTrainingInitiator->initiateTrainingLocally(graphID, trainData);

}
write(connFd, DONE.c_str(), FRONTEND_COMMAND_LENGTH);
write(connFd, "\r\n", 2);

} else if (line.compare(IN_DEGREE) == 0) {
frontend_logger.log("Calculating In Degree Distribution", "info");
Expand Down
47 changes: 34 additions & 13 deletions src/server/JasmineGraphInstanceService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,10 @@ void *instanceservicesession(void *dummyPt) {
line = (data);
line = utils.trim_copy(line, " \f\n\r\t\v");
string server_hostname = line;
write(connFd, JasmineGraphInstanceProtocol::HOST_OK.c_str(), JasmineGraphInstanceProtocol::HOST_OK.size());
instance_logger.log("Received hostname : " + line, "info");

instance_logger.log("Sending : " + JasmineGraphInstanceProtocol::HOST_OK, "info");
write(connFd, JasmineGraphInstanceProtocol::HOST_OK.c_str(), JasmineGraphInstanceProtocol::HOST_OK.size());
std::cout << "ServerName : " << server_hostname << std::endl;
} else if (line.compare(JasmineGraphInstanceProtocol::CLOSE) == 0) {
write(connFd, JasmineGraphInstanceProtocol::CLOSE_ACK.c_str(),
Expand Down Expand Up @@ -1688,7 +1690,7 @@ void *instanceservicesession(void *dummyPt) {
string trainData(data);

std::vector<std::string> trainargs = Utils::split(trainData, ' ');

instance_logger.info("Received options : " + trainData);
string graphID;
string partitionID = trainargs[trainargs.size() - 1];

Expand Down Expand Up @@ -1760,7 +1762,7 @@ void *instanceservicesession(void *dummyPt) {
string trainData(data);

std::vector<std::string> trainargs = Utils::split(trainData, ' ');

instance_logger.info("Received options : " + trainData);
string graphID;
string partitionID = trainargs[trainargs.size() - 1];

Expand All @@ -1782,6 +1784,7 @@ void *instanceservicesession(void *dummyPt) {
bzero(data, INSTANCE_DATA_LENGTH);
read(connFd, data, INSTANCE_DATA_LENGTH);
string trainData(data);
instance_logger.log("Train Data : " + trainData, "info");

std::vector<std::string> trainargs = Utils::split(trainData, ' ');

Expand Down Expand Up @@ -3167,13 +3170,14 @@ void JasmineGraphInstanceService::trainPartition(string trainData) {
std::transform(trainargs.begin(), trainargs.end(), std::back_inserter(vc), converter);

std::string path = "cd " + utils.getJasmineGraphProperty("org.jasminegraph.graphsage") + " && ";
std::string command = path + "python3.11 -m unsupervised_train ";
std::string command = path + "python3.11 -m unsupervised_train > /home/ubuntu/software/jasminegraph/logs/unsupervised_train" + partitionID + "-" + Utils::getCurrentTimestamp() + ".txt" ;

int argc = trainargs.size();
for (int i = 0; i < argc - 2; ++i) {
command += trainargs[i + 2];
command += " ";
}
instance_logger.log("Executing : " + command, "info");
system(command.c_str());
}

Expand Down Expand Up @@ -4480,11 +4484,13 @@ void JasmineGraphInstanceService::initServer(string trainData){
std::transform(trainargs.begin(), trainargs.end(), std::back_inserter(vc), converter);

std::string path = "cd " + utils.getJasmineGraphProperty("org.jasminegraph.fl.location") + " && ";
std::string command = path + "python3.11 fl_server.py "+ utils.getJasmineGraphProperty("org.jasminegraph.fl.weights") + " "
std::string command = path + "python3.8 fl_server.py "+ utils.getJasmineGraphProperty("org.jasminegraph.fl.weights") + " "
+ utils.getJasmineGraphProperty("org.jasminegraph.fl.dataDir")
+ " " + utils.getJasmineGraphProperty("org.jasminegraph.fl.dataDir")+ " "+ graphID + " 0 "
+ utils.getJasmineGraphProperty("org.jasminegraph.fl_clients")
+ " " + utils.getJasmineGraphProperty("org.jasminegraph.fl.epochs") +" localhost 5000 > server_logs.txt";
+ " " + utils.getJasmineGraphProperty("org.jasminegraph.fl.epochs") +" localhost 5000 > "
+ "/home/ubuntu/software/jasminegraph/logs/server_logs-" + Utils::getCurrentTimestamp() + ".txt";
instance_logger.log("Executing : " + command, "info");
popen(command.c_str(), "r");
}

Expand All @@ -4507,7 +4513,8 @@ void JasmineGraphInstanceService::initOrgServer(string trainData){
std::string path = "cd " + utils.getJasmineGraphProperty("org.jasminegraph.fl.location") + " && ";
std::string command = path + "python3.11 org_server.py " + graphID+ " " + utils.getJasmineGraphProperty("org.jasminegraph.fl_clients")
+ " " + utils.getJasmineGraphProperty("org.jasminegraph.fl.epochs")
+" localhost 5050 > org_server_logs.txt";
+" localhost 5050 > /home/ubuntu/software/jasminegraph/logs/org_server_logs-" + Utils::getCurrentTimestamp() + ".txt";
instance_logger.log("Executing : " + command, "info");
popen(command.c_str(), "r");
}

Expand All @@ -4532,7 +4539,9 @@ void JasmineGraphInstanceService::initAgg(string trainData){
+ utils.getJasmineGraphProperty("org.jasminegraph.fl.dataDir")
+ " " + utils.getJasmineGraphProperty("org.jasminegraph.fl.dataDir")+ " " + "4" + " 0 "
+ utils.getJasmineGraphProperty("org.jasminegraph.fl.num.orgs")
+ " " + utils.getJasmineGraphProperty("org.jasminegraph.fl.epochs") +" localhost 5000 > agg_logs.txt";
+ " " + utils.getJasmineGraphProperty("org.jasminegraph.fl.epochs") +" localhost 5000 > "
+ "/home/ubuntu/software/jasminegraph/logs/agg_logs-" + Utils::getCurrentTimestamp() + ".txt";
instance_logger.log("Executing : " + command, "info");
popen(command.c_str(), "r");
}

Expand All @@ -4554,13 +4563,14 @@ void JasmineGraphInstanceService::initClient(string trainData){
std::transform(trainargs.begin(), trainargs.end(), std::back_inserter(vc), converter);

std::string path = "cd " + utils.getJasmineGraphProperty("org.jasminegraph.fl.location") + " && ";
std::string command = path + "python3.11 fl_client.py "+ utils.getJasmineGraphProperty("org.jasminegraph.fl.weights") + " "
std::string command = path + "python3.8 fl_client.py "+ utils.getJasmineGraphProperty("org.jasminegraph.fl.weights") + " "
+ utils.getJasmineGraphProperty("org.jasminegraph.fl.dataDir")
+ " " + utils.getJasmineGraphProperty("org.jasminegraph.fl.dataDir")+ " "+ graphID + " " + partitionID + " "
+ utils.getJasmineGraphProperty("org.jasminegraph.fl.epochs")
+ " localhost " + utils.getJasmineGraphProperty("org.jasminegraph.fl.org.port")
+ " > client_logs_" + partitionID +".txt";
+ " > /home/ubuntu/software/jasminegraph/logs/client_logs_" + partitionID + "-" + Utils::getCurrentTimestamp() + ".txt";

instance_logger.log("Executing : " + command, "info");
popen(command.c_str(), "r");
}

Expand All @@ -4570,12 +4580,23 @@ void JasmineGraphInstanceService::mergeFiles(string trainData){
std::vector<std::string> trainargs = Utils::split(trainData, ' ');
string graphID = trainargs[1];
string partitionID = trainargs[2];
FILE* fp;
int exit_status;

std::string path = "cd " + utils.getJasmineGraphProperty("org.jasminegraph.fl.location") + " && ";
std::string command = path + "python3.11 merge.py "+ utils.getJasmineGraphProperty("org.jasminegraph.server.instance.datafolder")+ " "
+ utils.getJasmineGraphProperty("org.jasminegraph.server.instance.trainedmodelfolder") + " "
+ utils.getJasmineGraphProperty("org.jasminegraph.fl.dataDir") + " " + graphID + " " + partitionID + " > merge_logs"
+ partitionID +".txt";
+ utils.getJasmineGraphProperty("org.jasminegraph.fl.dataDir") + " " + graphID + " " + partitionID
+ " > /home/ubuntu/software/jasminegraph/logs/merge_logs" + partitionID + "-" + Utils::getCurrentTimestamp() + ".txt";

popen(command.c_str(), "r");
instance_logger.log("Executing : " + command, "info");
fp = popen(command.c_str(), "r");
if (fp == NULL) {
instance_logger.log("Merge Command Execution Failed for Graph ID - Patition ID: " + graphID + " - " + partitionID, "error");
}

exit_status = pclose(fp);
if (exit_status == -1) {
instance_logger.log("Merge Command Execution Failed for Graph ID - Patition ID: " + graphID + " - " + partitionID + "; Error : " + strerror(errno) , "error");
}
}
20 changes: 18 additions & 2 deletions src/server/JasmineGraphServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3108,12 +3108,12 @@ void JasmineGraphServer::initiateCommunication(std::string graphID, std::string

if (i==0) {

workerThreads[threadID] = std::thread(initiateServer,"localhost", serverPort,
workerThreads[threadID] = std::thread(initiateServer,workerInstance.hostname, serverPort,
serverDataPort,trainingArgs,fl_clients, to_string(i));
threadID++;
}

workerThreads[threadID] = std::thread(initiateClient,"localhost", serverPort, serverDataPort,trainingArgs +
workerThreads[threadID] = std::thread(initiateClient,workerInstance.hostname, serverPort, serverDataPort,trainingArgs +
" " + to_string(i), fl_clients, to_string(i));
threadID++;

Expand Down Expand Up @@ -3402,6 +3402,14 @@ bool JasmineGraphServer::initiateServer(std::string host, int port, int dataPort
string server_host = utils.getJasmineGraphProperty("org.jasminegraph.server.host");
write(sockfd, server_host.c_str(), server_host.size());
server_logger.log("Sent fed : " + server_host, "info");
bzero(data, 301);
read(sockfd, data, 300);
response = (data);
response = utils.trim_copy(response, " \f\n\r\t\v");

if (response.compare(JasmineGraphInstanceProtocol::HOST_OK) == 0) {
server_logger.log("Received : " + JasmineGraphInstanceProtocol::HOST_OK, "info");
}

write(sockfd, JasmineGraphInstanceProtocol::INITIATE_SERVER.c_str(),
JasmineGraphInstanceProtocol::INITIATE_SERVER.size());
Expand Down Expand Up @@ -3477,6 +3485,14 @@ bool JasmineGraphServer::initiateClient(std::string host, int port, int dataPort
string server_host = utils.getJasmineGraphProperty("org.jasminegraph.server.host");
write(sockfd, server_host.c_str(), server_host.size());
server_logger.log("Sent fed : " + server_host, "info");
bzero(data, 301);
read(sockfd, data, 300);
response = (data);
response = utils.trim_copy(response, " \f\n\r\t\v");

if (response.compare(JasmineGraphInstanceProtocol::HOST_OK) == 0) {
server_logger.log("Received : " + JasmineGraphInstanceProtocol::HOST_OK, "info");
}

write(sockfd, JasmineGraphInstanceProtocol::INITIATE_CLIENT.c_str(),
JasmineGraphInstanceProtocol::INITIATE_CLIENT.size());
Expand Down
14 changes: 14 additions & 0 deletions src/util/Utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ limitations under the License.
#include <sys/stat.h>
#include <pwd.h>
#include <unistd.h>
#include <chrono>
#include <ctime>
#include <iomanip>
#include "Utils.h"
//#include "../frontend/JasmineGraphFrontEnd.h"
#include "Conts.h"
Expand Down Expand Up @@ -611,3 +614,14 @@ int Utils::connect_wrapper(int sock, const sockaddr *addr, socklen_t slen) {
} while (retry++ < 4);
return -1;
}

std::string Utils::getCurrentTimestamp() {
auto now = chrono::system_clock::now();
time_t time = chrono::system_clock::to_time_t(now);
tm tm_time;
localtime_r(&time, &tm_time);
stringstream timestamp;
timestamp << put_time(&tm_time, "%y%m%d_%H%M%S"); // Format can be customized

return timestamp.str();
}
1 change: 1 addition & 0 deletions src/util/Utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ class Utils {
std::string checkFlag(std::string flagPath);

static int connect_wrapper(int sock, const sockaddr *addr, socklen_t slen);
static std::string getCurrentTimestamp();
};

#endif // JASMINEGRAPH_UTILS_H
2 changes: 1 addition & 1 deletion src_python/fl_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def send_model(self):

weights_path = self.weights_path + 'weights_' + self.graph_id + '_' + self.partition_id + ".npy"

weights = np.array(self.MODEL.get_weights())
weights = self.MODEL.get_weights()

data = {"CLIENT_ID":self.partition_id,"WEIGHTS":weights,"NUM_EXAMPLES":self.graph_params[0]}

Expand Down
2 changes: 1 addition & 1 deletion src_python/fl_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ def send_model(self, client_socket):
if self.ROUNDS == self.training_cycles:
self.stop_flag = True

weights = np.array(self.GLOBAL_WEIGHTS)
weights = self.GLOBAL_WEIGHTS

data = {"STOP_FLAG":self.stop_flag,"WEIGHTS":weights}

Expand Down
20 changes: 15 additions & 5 deletions src_python/merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
]
)

logging.info("start executing merge.py")

arg_names = [
'path_datafolder',
'path_modelstore',
Expand All @@ -38,25 +40,29 @@
logging.info("Folder path \"" + folder_path + "\" exists")
pass
else:
logging.info("Data folder created")
logging.info("Data folder created at " + folder_path)
os.makedirs(folder_path)

args = dict(zip(arg_names, sys.argv[1:]))

path_attributes_localstore = args['path_datafolder'] + args['graph_id'] + '_attributes_' + args['partition_id']
path_attributes_localstore = args['path_datafolder'] + "/" + args['graph_id'] + '_attributes_' + args['partition_id']
logging.info("Reading localstore node attributes from " + path_attributes_localstore)
node_attributes_localstore = pd.read_csv(path_attributes_localstore , sep='\s+', lineterminator='\n',header=None)
node_attributes_localstore.set_index(0,inplace=True)

path_edges_localstore = args['path_modelstore'] + args['graph_id'] + '_' + args['partition_id']
path_edges_localstore = args['path_modelstore'] + "/" + args['graph_id'] + '_' + args['partition_id']
logging.info("Reading localstore edges from : " + path_edges_localstore)
edges_localstore = pd.read_csv(path_edges_localstore, sep='\s+', lineterminator='\n', header=None)
edges_localstore.columns = ["source","target"]


path_attributes_centralstore = args['path_datafolder'] + args['graph_id'] + '_centralstore_attributes_' + args['partition_id']
path_attributes_centralstore = args['path_datafolder'] + "/" + args['graph_id'] + '_centralstore_attributes_' + args['partition_id']
logging.info("Reading centralstore node attributes from : " + path_edges_localstore)
node_attributes_centralstore = pd.read_csv(path_attributes_centralstore , sep='\s+', lineterminator='\n',header=None)
node_attributes_centralstore.set_index(0,inplace=True)

path_edges_centralstore = args['path_modelstore'] + args['graph_id'] + '_centralstore_' + args['partition_id']
path_edges_centralstore = args['path_modelstore'] + "/" + args['graph_id'] + '_centralstore_' + args['partition_id']
logging.info("Reading centralstore edges from : " + path_edges_localstore)
edges_centralstore = pd.read_csv(path_edges_centralstore, sep='\s+', lineterminator='\n', header=None)
edges_centralstore.columns = ["source","target"]

Expand All @@ -74,6 +80,10 @@
path_nodes = args['path_data'] + args['graph_id'] + '_nodes_' + args['partition_id'] + ".csv"
path_edges = args['path_data'] + args['graph_id'] + '_edges_' + args['partition_id'] + ".csv"

logging.info("Writing nodes to : " + path_nodes)
nodes.to_csv(path_nodes)

logging.info("Writing edges to : " + path_edges)
edges.to_csv(path_edges,index=False)

logging.info("complete executing merge.py")
Loading

0 comments on commit a107a5b

Please sign in to comment.