Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Errors in Federation Training Commands and Add Testcases. #183

Merged
merged 15 commits into from
Sep 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,16 @@ ENV JASMINEGRAPH_HOME="/home/ubuntu/software/jasminegraph"

WORKDIR /home/ubuntu/software/jasminegraph

RUN apt-get update
RUN apt-get install -y python3.8
RUN apt-get install -y python3-pip
RUN apt-get install -y python3.8-distutils
RUN python3.8 -m pip install stellargraph
RUN python3.8 -m pip install chardet

COPY ./GraphSAGE ./GraphSAGE
RUN pip install -r ./GraphSAGE/requirements
RUN pip install pandas

COPY ./build.sh ./build.sh
COPY ./run-docker.sh ./run-docker.sh
Expand Down
6 changes: 3 additions & 3 deletions conf/jasminegraph-server.properties
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ org.jasminegraph.server.modelDir=/var/tmp/jasminegraph-localstore/models/
#--------------------------------------------------------------------------------

org.jasminegraph.federated.enabled=true
org.jasminegraph.fl.location=/home/ubuntu/software/jasminegraph/src-python/
org.jasminegraph.fl.dataDir=/home/ubuntu/software/jasminegraph/src-python/data/
org.jasminegraph.fl.weights=/home/ubuntu/software/jasminegraph/src-python/weights/
org.jasminegraph.fl.location=/home/ubuntu/software/jasminegraph/src_python/
org.jasminegraph.fl.dataDir=/home/ubuntu/software/jasminegraph/src_python/data/
org.jasminegraph.fl.weights=/home/ubuntu/software/jasminegraph/src_python/weights/
org.jasminegraph.fl_clients=2
org.jasminegraph.fl.epochs=4
org.jasminegraph.fl.rounds=4
Expand Down
20 changes: 6 additions & 14 deletions src/frontend/JasmineGraphFrontEnd.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1148,6 +1148,8 @@ void *frontendservicesesion(std::string masterIP, int connFd, SQLiteDBInterface
JasmineGraphServer *jasmineServer = new JasmineGraphServer();
jasmineServer->initiateFiles(graphID, trainData);
jasmineServer->initiateMerge(graphID, trainData, sqlite);
write(connFd, DONE.c_str(), FRONTEND_COMMAND_LENGTH);
write(connFd, "\r\n", 2);

} else if (line.compare(TRAIN) == 0) {
string message = "Available main flags:\r\n";
Expand All @@ -1158,20 +1160,6 @@ void *frontendservicesesion(std::string masterIP, int connFd, SQLiteDBInterface
string flags =
Conts::FLAGS::GRAPH_ID + " " + Conts::FLAGS::LEARNING_RATE + " " + Conts::FLAGS::BATCH_SIZE + " " +
Conts::FLAGS::VALIDATE_ITER + " " + Conts::FLAGS::EPOCHS;
result_wr = write(connFd, flags.c_str(), flags.size());
if (result_wr < 0) {
frontend_logger.log("Error writing to socket", "error");
}
result_wr = write(connFd, "\r\n", 2);
if (result_wr < 0) {
frontend_logger.log("Error writing to socket", "error");
}
message = "Send --<flag1> <value1> --<flag2> <value2> ..\r\n";
result_wr = write(connFd, message.c_str(), message.size());
if (result_wr < 0) {
frontend_logger.log("Error writing to socket", "error");
}

write(connFd, flags.c_str(), flags.size());
write(connFd, "\r\n", 2);
message = "Send --<flag1> <value1> --<flag2> <value2> ..\r\n";
Expand Down Expand Up @@ -1223,9 +1211,11 @@ void *frontendservicesesion(std::string masterIP, int connFd, SQLiteDBInterface
if (federatedEnabled == "true") {
JasmineGraphServer *jasmineServer = new JasmineGraphServer();
if (utils.getJasmineGraphProperty("org.jasminegraph.fl.org.training") == "true") {
frontend_logger.log("Initiate org communication", "info");
jasmineServer->initiateOrgCommunication(graphID, trainData, sqlite);

} else {
frontend_logger.log("Initiate communication", "info");
jasmineServer->initiateCommunication(graphID, trainData, sqlite);

}
Expand All @@ -1236,6 +1226,8 @@ void *frontendservicesesion(std::string masterIP, int connFd, SQLiteDBInterface
jasminGraphTrainingInitiator->initiateTrainingLocally(graphID, trainData);

}
write(connFd, DONE.c_str(), FRONTEND_COMMAND_LENGTH);
write(connFd, "\r\n", 2);

} else if (line.compare(IN_DEGREE) == 0) {
frontend_logger.log("Calculating In Degree Distribution", "info");
Expand Down
47 changes: 34 additions & 13 deletions src/server/JasmineGraphInstanceService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,10 @@ void *instanceservicesession(void *dummyPt) {
line = (data);
line = utils.trim_copy(line, " \f\n\r\t\v");
string server_hostname = line;
write(connFd, JasmineGraphInstanceProtocol::HOST_OK.c_str(), JasmineGraphInstanceProtocol::HOST_OK.size());
instance_logger.log("Received hostname : " + line, "info");

instance_logger.log("Sending : " + JasmineGraphInstanceProtocol::HOST_OK, "info");
write(connFd, JasmineGraphInstanceProtocol::HOST_OK.c_str(), JasmineGraphInstanceProtocol::HOST_OK.size());
std::cout << "ServerName : " << server_hostname << std::endl;
} else if (line.compare(JasmineGraphInstanceProtocol::CLOSE) == 0) {
write(connFd, JasmineGraphInstanceProtocol::CLOSE_ACK.c_str(),
Expand Down Expand Up @@ -1688,7 +1690,7 @@ void *instanceservicesession(void *dummyPt) {
string trainData(data);

std::vector<std::string> trainargs = Utils::split(trainData, ' ');

instance_logger.info("Received options : " + trainData);
string graphID;
string partitionID = trainargs[trainargs.size() - 1];

Expand Down Expand Up @@ -1760,7 +1762,7 @@ void *instanceservicesession(void *dummyPt) {
string trainData(data);

std::vector<std::string> trainargs = Utils::split(trainData, ' ');

instance_logger.info("Received options : " + trainData);
string graphID;
string partitionID = trainargs[trainargs.size() - 1];

Expand All @@ -1782,6 +1784,7 @@ void *instanceservicesession(void *dummyPt) {
bzero(data, INSTANCE_DATA_LENGTH);
read(connFd, data, INSTANCE_DATA_LENGTH);
string trainData(data);
instance_logger.log("Train Data : " + trainData, "info");

std::vector<std::string> trainargs = Utils::split(trainData, ' ');

Expand Down Expand Up @@ -3167,13 +3170,14 @@ void JasmineGraphInstanceService::trainPartition(string trainData) {
std::transform(trainargs.begin(), trainargs.end(), std::back_inserter(vc), converter);

std::string path = "cd " + utils.getJasmineGraphProperty("org.jasminegraph.graphsage") + " && ";
std::string command = path + "python3.11 -m unsupervised_train ";
std::string command = path + "python3.11 -m unsupervised_train > /home/ubuntu/software/jasminegraph/logs/unsupervised_train" + partitionID + "-" + Utils::getCurrentTimestamp() + ".txt" ;

int argc = trainargs.size();
for (int i = 0; i < argc - 2; ++i) {
command += trainargs[i + 2];
command += " ";
}
instance_logger.log("Executing : " + command, "info");
system(command.c_str());
}

Expand Down Expand Up @@ -4480,11 +4484,13 @@ void JasmineGraphInstanceService::initServer(string trainData){
std::transform(trainargs.begin(), trainargs.end(), std::back_inserter(vc), converter);

std::string path = "cd " + utils.getJasmineGraphProperty("org.jasminegraph.fl.location") + " && ";
std::string command = path + "python3.11 fl_server.py "+ utils.getJasmineGraphProperty("org.jasminegraph.fl.weights") + " "
std::string command = path + "python3.8 fl_server.py "+ utils.getJasmineGraphProperty("org.jasminegraph.fl.weights") + " "
+ utils.getJasmineGraphProperty("org.jasminegraph.fl.dataDir")
+ " " + utils.getJasmineGraphProperty("org.jasminegraph.fl.dataDir")+ " "+ graphID + " 0 "
+ utils.getJasmineGraphProperty("org.jasminegraph.fl_clients")
+ " " + utils.getJasmineGraphProperty("org.jasminegraph.fl.epochs") +" localhost 5000 > server_logs.txt";
+ " " + utils.getJasmineGraphProperty("org.jasminegraph.fl.epochs") +" localhost 5000 > "
+ "/home/ubuntu/software/jasminegraph/logs/server_logs-" + Utils::getCurrentTimestamp() + ".txt";
instance_logger.log("Executing : " + command, "info");
popen(command.c_str(), "r");
}

Expand All @@ -4507,7 +4513,8 @@ void JasmineGraphInstanceService::initOrgServer(string trainData){
std::string path = "cd " + utils.getJasmineGraphProperty("org.jasminegraph.fl.location") + " && ";
std::string command = path + "python3.11 org_server.py " + graphID+ " " + utils.getJasmineGraphProperty("org.jasminegraph.fl_clients")
+ " " + utils.getJasmineGraphProperty("org.jasminegraph.fl.epochs")
+" localhost 5050 > org_server_logs.txt";
+" localhost 5050 > /home/ubuntu/software/jasminegraph/logs/org_server_logs-" + Utils::getCurrentTimestamp() + ".txt";
instance_logger.log("Executing : " + command, "info");
popen(command.c_str(), "r");
}

Expand All @@ -4532,7 +4539,9 @@ void JasmineGraphInstanceService::initAgg(string trainData){
+ utils.getJasmineGraphProperty("org.jasminegraph.fl.dataDir")
+ " " + utils.getJasmineGraphProperty("org.jasminegraph.fl.dataDir")+ " " + "4" + " 0 "
+ utils.getJasmineGraphProperty("org.jasminegraph.fl.num.orgs")
+ " " + utils.getJasmineGraphProperty("org.jasminegraph.fl.epochs") +" localhost 5000 > agg_logs.txt";
+ " " + utils.getJasmineGraphProperty("org.jasminegraph.fl.epochs") +" localhost 5000 > "
+ "/home/ubuntu/software/jasminegraph/logs/agg_logs-" + Utils::getCurrentTimestamp() + ".txt";
instance_logger.log("Executing : " + command, "info");
popen(command.c_str(), "r");
}

Expand All @@ -4554,13 +4563,14 @@ void JasmineGraphInstanceService::initClient(string trainData){
std::transform(trainargs.begin(), trainargs.end(), std::back_inserter(vc), converter);

std::string path = "cd " + utils.getJasmineGraphProperty("org.jasminegraph.fl.location") + " && ";
std::string command = path + "python3.11 fl_client.py "+ utils.getJasmineGraphProperty("org.jasminegraph.fl.weights") + " "
std::string command = path + "python3.8 fl_client.py "+ utils.getJasmineGraphProperty("org.jasminegraph.fl.weights") + " "
+ utils.getJasmineGraphProperty("org.jasminegraph.fl.dataDir")
+ " " + utils.getJasmineGraphProperty("org.jasminegraph.fl.dataDir")+ " "+ graphID + " " + partitionID + " "
+ utils.getJasmineGraphProperty("org.jasminegraph.fl.epochs")
+ " localhost " + utils.getJasmineGraphProperty("org.jasminegraph.fl.org.port")
+ " > client_logs_" + partitionID +".txt";
+ " > /home/ubuntu/software/jasminegraph/logs/client_logs_" + partitionID + "-" + Utils::getCurrentTimestamp() + ".txt";

instance_logger.log("Executing : " + command, "info");
popen(command.c_str(), "r");
}

Expand All @@ -4570,12 +4580,23 @@ void JasmineGraphInstanceService::mergeFiles(string trainData){
std::vector<std::string> trainargs = Utils::split(trainData, ' ');
string graphID = trainargs[1];
string partitionID = trainargs[2];
FILE* fp;
int exit_status;

std::string path = "cd " + utils.getJasmineGraphProperty("org.jasminegraph.fl.location") + " && ";
std::string command = path + "python3.11 merge.py "+ utils.getJasmineGraphProperty("org.jasminegraph.server.instance.datafolder")+ " "
+ utils.getJasmineGraphProperty("org.jasminegraph.server.instance.trainedmodelfolder") + " "
+ utils.getJasmineGraphProperty("org.jasminegraph.fl.dataDir") + " " + graphID + " " + partitionID + " > merge_logs"
+ partitionID +".txt";
+ utils.getJasmineGraphProperty("org.jasminegraph.fl.dataDir") + " " + graphID + " " + partitionID
+ " > /home/ubuntu/software/jasminegraph/logs/merge_logs" + partitionID + "-" + Utils::getCurrentTimestamp() + ".txt";

popen(command.c_str(), "r");
instance_logger.log("Executing : " + command, "info");
fp = popen(command.c_str(), "r");
if (fp == NULL) {
instance_logger.log("Merge Command Execution Failed for Graph ID - Patition ID: " + graphID + " - " + partitionID, "error");
}

exit_status = pclose(fp);
if (exit_status == -1) {
instance_logger.log("Merge Command Execution Failed for Graph ID - Patition ID: " + graphID + " - " + partitionID + "; Error : " + strerror(errno) , "error");
}
}
20 changes: 18 additions & 2 deletions src/server/JasmineGraphServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3108,12 +3108,12 @@ void JasmineGraphServer::initiateCommunication(std::string graphID, std::string

if (i==0) {

workerThreads[threadID] = std::thread(initiateServer,"localhost", serverPort,
workerThreads[threadID] = std::thread(initiateServer,workerInstance.hostname, serverPort,
serverDataPort,trainingArgs,fl_clients, to_string(i));
threadID++;
}

workerThreads[threadID] = std::thread(initiateClient,"localhost", serverPort, serverDataPort,trainingArgs +
workerThreads[threadID] = std::thread(initiateClient,workerInstance.hostname, serverPort, serverDataPort,trainingArgs +
" " + to_string(i), fl_clients, to_string(i));
threadID++;

Expand Down Expand Up @@ -3402,6 +3402,14 @@ bool JasmineGraphServer::initiateServer(std::string host, int port, int dataPort
string server_host = utils.getJasmineGraphProperty("org.jasminegraph.server.host");
write(sockfd, server_host.c_str(), server_host.size());
server_logger.log("Sent fed : " + server_host, "info");
bzero(data, 301);
read(sockfd, data, 300);
response = (data);
response = utils.trim_copy(response, " \f\n\r\t\v");

if (response.compare(JasmineGraphInstanceProtocol::HOST_OK) == 0) {
server_logger.log("Received : " + JasmineGraphInstanceProtocol::HOST_OK, "info");
}

write(sockfd, JasmineGraphInstanceProtocol::INITIATE_SERVER.c_str(),
JasmineGraphInstanceProtocol::INITIATE_SERVER.size());
Expand Down Expand Up @@ -3477,6 +3485,14 @@ bool JasmineGraphServer::initiateClient(std::string host, int port, int dataPort
string server_host = utils.getJasmineGraphProperty("org.jasminegraph.server.host");
write(sockfd, server_host.c_str(), server_host.size());
server_logger.log("Sent fed : " + server_host, "info");
bzero(data, 301);
read(sockfd, data, 300);
response = (data);
response = utils.trim_copy(response, " \f\n\r\t\v");

if (response.compare(JasmineGraphInstanceProtocol::HOST_OK) == 0) {
server_logger.log("Received : " + JasmineGraphInstanceProtocol::HOST_OK, "info");
}

write(sockfd, JasmineGraphInstanceProtocol::INITIATE_CLIENT.c_str(),
JasmineGraphInstanceProtocol::INITIATE_CLIENT.size());
Expand Down
14 changes: 14 additions & 0 deletions src/util/Utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ limitations under the License.
#include <sys/stat.h>
#include <pwd.h>
#include <unistd.h>
#include <chrono>
#include <ctime>
#include <iomanip>
#include "Utils.h"
//#include "../frontend/JasmineGraphFrontEnd.h"
#include "Conts.h"
Expand Down Expand Up @@ -611,3 +614,14 @@ int Utils::connect_wrapper(int sock, const sockaddr *addr, socklen_t slen) {
} while (retry++ < 4);
return -1;
}

std::string Utils::getCurrentTimestamp() {
auto now = chrono::system_clock::now();
time_t time = chrono::system_clock::to_time_t(now);
tm tm_time;
localtime_r(&time, &tm_time);
stringstream timestamp;
timestamp << put_time(&tm_time, "%y%m%d_%H%M%S"); // Format can be customized

return timestamp.str();
}
1 change: 1 addition & 0 deletions src/util/Utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ class Utils {
std::string checkFlag(std::string flagPath);

static int connect_wrapper(int sock, const sockaddr *addr, socklen_t slen);
static std::string getCurrentTimestamp();
};

#endif // JASMINEGRAPH_UTILS_H
2 changes: 1 addition & 1 deletion src_python/fl_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def send_model(self):

weights_path = self.weights_path + 'weights_' + self.graph_id + '_' + self.partition_id + ".npy"

weights = np.array(self.MODEL.get_weights())
weights = self.MODEL.get_weights()

data = {"CLIENT_ID":self.partition_id,"WEIGHTS":weights,"NUM_EXAMPLES":self.graph_params[0]}

Expand Down
2 changes: 1 addition & 1 deletion src_python/fl_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ def send_model(self, client_socket):
if self.ROUNDS == self.training_cycles:
self.stop_flag = True

weights = np.array(self.GLOBAL_WEIGHTS)
weights = self.GLOBAL_WEIGHTS

data = {"STOP_FLAG":self.stop_flag,"WEIGHTS":weights}

Expand Down
20 changes: 15 additions & 5 deletions src_python/merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
]
)

logging.info("start executing merge.py")

arg_names = [
'path_datafolder',
'path_modelstore',
Expand All @@ -38,25 +40,29 @@
logging.info("Folder path \"" + folder_path + "\" exists")
pass
else:
logging.info("Data folder created")
logging.info("Data folder created at " + folder_path)
os.makedirs(folder_path)

args = dict(zip(arg_names, sys.argv[1:]))

path_attributes_localstore = args['path_datafolder'] + args['graph_id'] + '_attributes_' + args['partition_id']
path_attributes_localstore = args['path_datafolder'] + "/" + args['graph_id'] + '_attributes_' + args['partition_id']
logging.info("Reading localstore node attributes from " + path_attributes_localstore)
node_attributes_localstore = pd.read_csv(path_attributes_localstore , sep='\s+', lineterminator='\n',header=None)
node_attributes_localstore.set_index(0,inplace=True)

path_edges_localstore = args['path_modelstore'] + args['graph_id'] + '_' + args['partition_id']
path_edges_localstore = args['path_modelstore'] + "/" + args['graph_id'] + '_' + args['partition_id']
logging.info("Reading localstore edges from : " + path_edges_localstore)
edges_localstore = pd.read_csv(path_edges_localstore, sep='\s+', lineterminator='\n', header=None)
edges_localstore.columns = ["source","target"]


path_attributes_centralstore = args['path_datafolder'] + args['graph_id'] + '_centralstore_attributes_' + args['partition_id']
path_attributes_centralstore = args['path_datafolder'] + "/" + args['graph_id'] + '_centralstore_attributes_' + args['partition_id']
logging.info("Reading centralstore node attributes from : " + path_edges_localstore)
node_attributes_centralstore = pd.read_csv(path_attributes_centralstore , sep='\s+', lineterminator='\n',header=None)
node_attributes_centralstore.set_index(0,inplace=True)

path_edges_centralstore = args['path_modelstore'] + args['graph_id'] + '_centralstore_' + args['partition_id']
path_edges_centralstore = args['path_modelstore'] + "/" + args['graph_id'] + '_centralstore_' + args['partition_id']
logging.info("Reading centralstore edges from : " + path_edges_localstore)
edges_centralstore = pd.read_csv(path_edges_centralstore, sep='\s+', lineterminator='\n', header=None)
edges_centralstore.columns = ["source","target"]

Expand All @@ -74,6 +80,10 @@
path_nodes = args['path_data'] + args['graph_id'] + '_nodes_' + args['partition_id'] + ".csv"
path_edges = args['path_data'] + args['graph_id'] + '_edges_' + args['partition_id'] + ".csv"

logging.info("Writing nodes to : " + path_nodes)
nodes.to_csv(path_nodes)

logging.info("Writing edges to : " + path_edges)
edges.to_csv(path_edges,index=False)

logging.info("complete executing merge.py")
Loading
Loading