Skip to content

Commit

Permalink
test
Browse files Browse the repository at this point in the history
  • Loading branch information
Ishad-M-I-M committed Aug 27, 2023
1 parent 2a07562 commit 9c3c12b
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 3 deletions.
2 changes: 2 additions & 0 deletions src/centralstore/incremental/DataPublisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,11 @@ DataPublisher::DataPublisher(int worker_port, std::string worker_address) {
if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
data_publisher_logger.error("Socket creation error!");
}
data_publisher_logger.info("Socket created");
if (Utils::connect_wrapper(sock, (struct sockaddr *)&serv_addr, sizeof(serv_addr)) < 0) {
data_publisher_logger.error("Connection Failed!");
}
data_publisher_logger.info("Connected to worker");
}

DataPublisher::~DataPublisher() { close(sock); }
Expand Down
13 changes: 10 additions & 3 deletions src/frontend/JasmineGraphFrontEnd.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,12 +120,15 @@ void *frontendservicesesion(std::string masterIP, int connFd, SQLiteDBInterface
KafkaConnector *kstream;
Partitioner graphPartitioner(numberOfPartitions, 1, spt::Algorithms::HASH);

frontend_logger.log("Creating data publishers", "info");

for (int i = 0; i < workerList.size(); i++) {

Utils::worker currentWorker = workerList.at(i);
string workerHost = currentWorker.hostname;
string workerID = currentWorker.workerID;
int workerPort = atoi(string(currentWorker.port).c_str());
frontend_logger.log("Worker Host : " + workerHost + "; Worker ID : " + workerID + "; Worker port : " + string(currentWorker.port), "info");
DataPublisher *workerClient = new DataPublisher(workerPort, workerHost);
workerClients.push_back(workerClient);
}
Expand All @@ -142,10 +145,13 @@ void *frontendservicesesion(std::string masterIP, int connFd, SQLiteDBInterface
}

bzero(data, FRONTEND_DATA_LENGTH + 1);
frontend_logger.info("Reading ..");
read(connFd, data, FRONTEND_DATA_LENGTH);
frontend_logger.info("Read done");

string line(data);
if (line.compare("\r\n") == 0) {
frontend_logger.info("Empty line");
continue;
}
frontend_logger.log("Command received: " + line, "info");
Expand Down Expand Up @@ -1704,19 +1710,20 @@ int JasmineGraphFrontEnd::run() {
int noThread = 0;

while (true) {
frontend_logger.log("Frontend Listening", "info");
frontend_logger.log("Frontend Listening hello ... ", "info");

//this is where client connects. svr will hang in this mode until client conn
connFd = accept(listenFd, (struct sockaddr *) &clntAdd, &len);

if (connFd < 0) {
frontend_logger.log("Cannot accept connection", "error");
frontend_logger.log("Cannot accept connection hello ...", "error");
return 0;
} else {
frontend_logger.log("Connection successful", "info");
frontend_logger.log("Connection successful hello ...", "info");
}

frontend_logger.log("Master IP" + masterIP, "info");
frontend_logger.log("Client IP" + std::string(inet_ntoa(clntAdd.sin_addr)), "info");

struct frontendservicesessionargs *frontendservicesessionargs1 = (struct frontendservicesessionargs *) malloc(
sizeof(struct frontendservicesessionargs) * 1);;
Expand Down
4 changes: 4 additions & 0 deletions src/util/Utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -596,12 +596,16 @@ std::string Utils::checkFlag(std::string flagPath){
}

int Utils::connect_wrapper(int sock, const sockaddr *addr, socklen_t slen) {
util_logger.info(string(inet_ntoa(((struct sockaddr_in *)addr)->sin_addr)));
int retry = 0;
do {
if (retry) sleep(retry * 2);
util_logger.info("retry " + std::to_string(retry));
if (connect(sock, addr, slen) == 0) {
util_logger.info("Connected to worker 12345");
return 0;
}
util_logger.info("Connection Failed! 12345");
} while (retry++ < 4);
return -1;
}

0 comments on commit 9c3c12b

Please sign in to comment.