Skip to content

Commit

Permalink
Initialize storing estimated end times of operations
Browse files Browse the repository at this point in the history
  • Loading branch information
thevindu-w committed Aug 7, 2024
1 parent 7094f0a commit c007d63
Show file tree
Hide file tree
Showing 2 changed files with 164 additions and 139 deletions.
4 changes: 2 additions & 2 deletions ddl/metadb.sql
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ create table operation
(
idoperation INTEGER not null primary key,
operation VARCHAR
)
);

create table graph_operation_time
(
Expand All @@ -104,4 +104,4 @@ VALUES (1, 'LOADING'),
(4, 'NONOPERATIONAL');

INSERT INTO operation (idoperation, operation)
VALUES (1, 'trian');
VALUES (1, 'trian');
299 changes: 162 additions & 137 deletions src/frontend/core/executor/impl/TriangleCountExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ limitations under the License.
#include <time.h>
#include <unistd.h>

#include <unordered_map>

#include "../../../../../globals.h"
#include "../../../../k8s/K8sWorkerController.h"
#include "../../../../scale/scaler.h"
Expand All @@ -32,6 +34,13 @@ static std::mutex aggregateWeightMutex;

static time_t last_exec_time = 0;

typedef struct _end_time_t {
time_t endTime;
int partitionCnt;
} end_time_t;

static unordered_map<int, end_time_t> endTimes;

static string isFileAccessibleToWorker(std::string graphId, std::string partitionId, std::string aggregatorHostName,
std::string aggregatorPort, std::string masterIP, std::string fileType,
std::string fileName);
Expand Down Expand Up @@ -578,6 +587,14 @@ void TriangleCountExecutor::execute() {
}
}

// if (estimated_time_known)
{
// TODO (thevindu-w): replace 1200 (seconds) with estimated runtime if it's known
end_time_t end_time_estimate = {.endTime = time(NULL) + 1200, .partitionCnt = partitionCount};

Check warning on line 593 in src/frontend/core/executor/impl/TriangleCountExecutor.cpp

View check run for this annotation

Codecov / codecov/patch

src/frontend/core/executor/impl/TriangleCountExecutor.cpp#L593

Added line #L593 was not covered by tests
endTimes[uniqueId] = end_time_estimate;
}
// TODO (thevindu-w): if queued, then add reserved CPU core count

PerformanceUtil::init();

std::string query =
Expand Down Expand Up @@ -638,6 +655,8 @@ void TriangleCountExecutor::execute() {
}
schedulerMutex.unlock();

endTimes.erase(uniqueId);

workerResponded = true;

JobResponse jobResponse;
Expand Down Expand Up @@ -726,185 +745,191 @@ long TriangleCountExecutor::getTriangleCount(
triangleCount_logger.log("Sent : " + JasmineGraphInstanceProtocol::HANDSHAKE, "info");
string response = Utils::read_str_trim_wrapper(sockfd, data, INSTANCE_DATA_LENGTH);

if (response.compare(JasmineGraphInstanceProtocol::HANDSHAKE_OK) == 0) {
triangleCount_logger.log("Received : " + JasmineGraphInstanceProtocol::HANDSHAKE_OK, "info");
result_wr = write(sockfd, masterIP.c_str(), masterIP.size());

if (result_wr < 0) {
triangleCount_logger.log("Error writing to socket", "error");
if (response.compare(JasmineGraphInstanceProtocol::HANDSHAKE_OK)) {
triangleCount_logger.error("There was an error in the upload process and the response is :: " + response);
auto end_time_it = endTimes.find(uniqueId);
if (end_time_it != endTimes.end()) {
end_time_it->second.partitionCnt--;

Check warning on line 752 in src/frontend/core/executor/impl/TriangleCountExecutor.cpp

View check run for this annotation

Codecov / codecov/patch

src/frontend/core/executor/impl/TriangleCountExecutor.cpp#L752

Added line #L752 was not covered by tests
}
triangleCount_logger.log("Sent : " + masterIP, "info");
Utils::send_str_wrapper(sockfd, JasmineGraphInstanceProtocol::CLOSE);
close(sockfd);
return 0;

Check warning on line 756 in src/frontend/core/executor/impl/TriangleCountExecutor.cpp

View check run for this annotation

Codecov / codecov/patch

src/frontend/core/executor/impl/TriangleCountExecutor.cpp#L756

Added line #L756 was not covered by tests
}
triangleCount_logger.log("Received : " + JasmineGraphInstanceProtocol::HANDSHAKE_OK, "info");
result_wr = write(sockfd, masterIP.c_str(), masterIP.size());

response = Utils::read_str_trim_wrapper(sockfd, data, INSTANCE_DATA_LENGTH);
if (response.compare(JasmineGraphInstanceProtocol::HOST_OK) == 0) {
triangleCount_logger.log("Received : " + JasmineGraphInstanceProtocol::HOST_OK, "info");
} else {
triangleCount_logger.log("Received : " + response, "error");
}
result_wr = write(sockfd, JasmineGraphInstanceProtocol::TRIANGLES.c_str(),
JasmineGraphInstanceProtocol::TRIANGLES.size());
if (result_wr < 0) {
triangleCount_logger.log("Error writing to socket", "error");
}
triangleCount_logger.log("Sent : " + masterIP, "info");

response = Utils::read_str_trim_wrapper(sockfd, data, INSTANCE_DATA_LENGTH);
if (response.compare(JasmineGraphInstanceProtocol::HOST_OK) == 0) {
triangleCount_logger.log("Received : " + JasmineGraphInstanceProtocol::HOST_OK, "info");
} else {
triangleCount_logger.log("Received : " + response, "error");
}
result_wr =

Check warning on line 772 in src/frontend/core/executor/impl/TriangleCountExecutor.cpp

View check run for this annotation

Codecov / codecov/patch

src/frontend/core/executor/impl/TriangleCountExecutor.cpp#L772

Added line #L772 was not covered by tests
write(sockfd, JasmineGraphInstanceProtocol::TRIANGLES.c_str(), JasmineGraphInstanceProtocol::TRIANGLES.size());

if (result_wr < 0) {
triangleCount_logger.log("Error writing to socket", "error");
}
triangleCount_logger.log("Sent : " + JasmineGraphInstanceProtocol::TRIANGLES, "info");

response = Utils::read_str_trim_wrapper(sockfd, data, INSTANCE_DATA_LENGTH);
if (response.compare(JasmineGraphInstanceProtocol::OK) == 0) {
triangleCount_logger.log("Received : " + JasmineGraphInstanceProtocol::OK, "info");
result_wr = write(sockfd, std::to_string(graphId).c_str(), std::to_string(graphId).size());

if (result_wr < 0) {
triangleCount_logger.log("Error writing to socket", "error");
}
triangleCount_logger.log("Sent : " + JasmineGraphInstanceProtocol::TRIANGLES, "info");
triangleCount_logger.log("Sent : Graph ID " + std::to_string(graphId), "info");

response = Utils::read_str_trim_wrapper(sockfd, data, INSTANCE_DATA_LENGTH);
if (response.compare(JasmineGraphInstanceProtocol::OK) == 0) {
triangleCount_logger.log("Received : " + JasmineGraphInstanceProtocol::OK, "info");
result_wr = write(sockfd, std::to_string(graphId).c_str(), std::to_string(graphId).size());
}

if (result_wr < 0) {
triangleCount_logger.log("Error writing to socket", "error");
}
triangleCount_logger.log("Sent : Graph ID " + std::to_string(graphId), "info");
if (response.compare(JasmineGraphInstanceProtocol::OK) == 0) {
triangleCount_logger.log("Received : " + JasmineGraphInstanceProtocol::OK, "info");
result_wr = write(sockfd, std::to_string(partitionId).c_str(), std::to_string(partitionId).size());

response = Utils::read_str_trim_wrapper(sockfd, data, INSTANCE_DATA_LENGTH);
if (result_wr < 0) {
triangleCount_logger.log("Error writing to socket", "error");
}

if (response.compare(JasmineGraphInstanceProtocol::OK) == 0) {
triangleCount_logger.log("Received : " + JasmineGraphInstanceProtocol::OK, "info");
result_wr = write(sockfd, std::to_string(partitionId).c_str(), std::to_string(partitionId).size());
triangleCount_logger.log("Sent : Partition ID " + std::to_string(partitionId), "info");

if (result_wr < 0) {
triangleCount_logger.log("Error writing to socket", "error");
}
response = Utils::read_str_trim_wrapper(sockfd, data, INSTANCE_DATA_LENGTH);
}

triangleCount_logger.log("Sent : Partition ID " + std::to_string(partitionId), "info");
if (response.compare(JasmineGraphInstanceProtocol::OK) == 0) {
triangleCount_logger.log("Received : " + JasmineGraphInstanceProtocol::OK, "info");
result_wr = write(sockfd, std::to_string(threadPriority).c_str(), std::to_string(threadPriority).size());

response = Utils::read_str_trim_wrapper(sockfd, data, INSTANCE_DATA_LENGTH);
if (result_wr < 0) {
triangleCount_logger.log("Error writing to socket", "error");
}
triangleCount_logger.log("Sent : Thread Priority " + std::to_string(threadPriority), "info");

if (response.compare(JasmineGraphInstanceProtocol::OK) == 0) {
triangleCount_logger.log("Received : " + JasmineGraphInstanceProtocol::OK, "info");
result_wr = write(sockfd, std::to_string(threadPriority).c_str(), std::to_string(threadPriority).size());

if (result_wr < 0) {
triangleCount_logger.log("Error writing to socket", "error");
}
triangleCount_logger.log("Sent : Thread Priority " + std::to_string(threadPriority), "info");
response = Utils::read_str_trim_wrapper(sockfd, data, INSTANCE_DATA_LENGTH);
triangleCount_logger.log("Got response : |" + response + "|", "info");
triangleCount = atol(response.c_str());

Check warning on line 817 in src/frontend/core/executor/impl/TriangleCountExecutor.cpp

View check run for this annotation

Codecov / codecov/patch

src/frontend/core/executor/impl/TriangleCountExecutor.cpp#L817

Added line #L817 was not covered by tests
}

response = Utils::read_str_trim_wrapper(sockfd, data, INSTANCE_DATA_LENGTH);
triangleCount_logger.log("Got response : |" + response + "|", "info");
triangleCount = atol(response.c_str());
}

if (isCompositeAggregation) {
triangleCount_logger.log("###COMPOSITE### Started Composite aggregation ", "info");
for (int combinationIndex = 0; combinationIndex < fileCombinations.size(); ++combinationIndex) {
const std::vector<string> &fileList = fileCombinations.at(combinationIndex);
std::set<string> partitionIdSet;
std::set<string> partitionSet;
std::map<int, int> tempWeightMap;
std::set<string> transferRequireFiles;
std::string combinationKey = "";
std::string availableFiles = "";
std::string transferredFiles = "";
bool isAggregateValid = false;

for (auto listIterator = fileList.begin(); listIterator != fileList.end(); ++listIterator) {
std::string fileName = *listIterator;

size_t lastIndex = fileName.find_last_of(".");
string rawFileName = fileName.substr(0, lastIndex);

const std::vector<std::string> &fileNameParts = Utils::split(rawFileName, '_');

/*Partition numbers are extracted from the file name. The starting index of partition number
* is 2. Therefore the loop starts with 2*/
for (int index = 2; index < fileNameParts.size(); ++index) {
partitionSet.insert(fileNameParts[index]);
}
if (isCompositeAggregation) {
triangleCount_logger.log("###COMPOSITE### Started Composite aggregation ", "info");
for (int combinationIndex = 0; combinationIndex < fileCombinations.size(); ++combinationIndex) {
const std::vector<string> &fileList = fileCombinations.at(combinationIndex);
std::set<string> partitionIdSet;
std::set<string> partitionSet;
std::map<int, int> tempWeightMap;
std::set<string> transferRequireFiles;

Check warning on line 827 in src/frontend/core/executor/impl/TriangleCountExecutor.cpp

View check run for this annotation

Codecov / codecov/patch

src/frontend/core/executor/impl/TriangleCountExecutor.cpp#L824-L827

Added lines #L824 - L827 were not covered by tests
std::string combinationKey = "";
std::string availableFiles = "";
std::string transferredFiles = "";
bool isAggregateValid = false;

Check warning on line 831 in src/frontend/core/executor/impl/TriangleCountExecutor.cpp

View check run for this annotation

Codecov / codecov/patch

src/frontend/core/executor/impl/TriangleCountExecutor.cpp#L831

Added line #L831 was not covered by tests

for (auto listIterator = fileList.begin(); listIterator != fileList.end(); ++listIterator) {
std::string fileName = *listIterator;

size_t lastIndex = fileName.find_last_of(".");

Check warning on line 836 in src/frontend/core/executor/impl/TriangleCountExecutor.cpp

View check run for this annotation

Codecov / codecov/patch

src/frontend/core/executor/impl/TriangleCountExecutor.cpp#L836

Added line #L836 was not covered by tests
string rawFileName = fileName.substr(0, lastIndex);

const std::vector<std::string> &fileNameParts = Utils::split(rawFileName, '_');

/*Partition numbers are extracted from the file name. The starting index of partition number
* is 2. Therefore the loop starts with 2*/
for (int index = 2; index < fileNameParts.size(); ++index) {
partitionSet.insert(fileNameParts[index]);
}
}

if (partitionSet.find(std::to_string(partitionId)) == partitionSet.end()) {
continue;
}
if (partitionSet.find(std::to_string(partitionId)) == partitionSet.end()) {
continue;

Check warning on line 849 in src/frontend/core/executor/impl/TriangleCountExecutor.cpp

View check run for this annotation

Codecov / codecov/patch

src/frontend/core/executor/impl/TriangleCountExecutor.cpp#L849

Added line #L849 was not covered by tests
}

if (!proceedOrNot(partitionSet, partitionId)) {
continue;
}
if (!proceedOrNot(partitionSet, partitionId)) {
continue;

Check warning on line 853 in src/frontend/core/executor/impl/TriangleCountExecutor.cpp

View check run for this annotation

Codecov / codecov/patch

src/frontend/core/executor/impl/TriangleCountExecutor.cpp#L853

Added line #L853 was not covered by tests
}

for (auto fileListIterator = fileList.begin(); fileListIterator != fileList.end(); ++fileListIterator) {
std::string fileName = *fileListIterator;
bool isTransferRequired = true;
for (auto fileListIterator = fileList.begin(); fileListIterator != fileList.end(); ++fileListIterator) {
std::string fileName = *fileListIterator;
bool isTransferRequired = true;

Check warning on line 858 in src/frontend/core/executor/impl/TriangleCountExecutor.cpp

View check run for this annotation

Codecov / codecov/patch

src/frontend/core/executor/impl/TriangleCountExecutor.cpp#L858

Added line #L858 was not covered by tests

combinationKey = fileName + ":" + combinationKey;
combinationKey = fileName + ":" + combinationKey;

size_t lastindex = fileName.find_last_of(".");
string rawFileName = fileName.substr(0, lastindex);
size_t lastindex = fileName.find_last_of(".");

Check warning on line 862 in src/frontend/core/executor/impl/TriangleCountExecutor.cpp

View check run for this annotation

Codecov / codecov/patch

src/frontend/core/executor/impl/TriangleCountExecutor.cpp#L862

Added line #L862 was not covered by tests
string rawFileName = fileName.substr(0, lastindex);

std::vector<std::string> fileNameParts = Utils::split(rawFileName, '_');
std::vector<std::string> fileNameParts = Utils::split(rawFileName, '_');

for (int index = 2; index < fileNameParts.size(); ++index) {
if (fileNameParts[index] == std::to_string(partitionId)) {
isTransferRequired = false;
}
partitionIdSet.insert(fileNameParts[index]);
for (int index = 2; index < fileNameParts.size(); ++index) {
if (fileNameParts[index] == std::to_string(partitionId)) {
isTransferRequired = false;

Check warning on line 869 in src/frontend/core/executor/impl/TriangleCountExecutor.cpp

View check run for this annotation

Codecov / codecov/patch

src/frontend/core/executor/impl/TriangleCountExecutor.cpp#L869

Added line #L869 was not covered by tests
}
partitionIdSet.insert(fileNameParts[index]);
}

if (isTransferRequired) {
transferRequireFiles.insert(fileName);
transferredFiles = fileName + ":" + transferredFiles;
} else {
availableFiles = fileName + ":" + availableFiles;
}
if (isTransferRequired) {
transferRequireFiles.insert(fileName);
transferredFiles = fileName + ":" + transferredFiles;
} else {
availableFiles = fileName + ":" + availableFiles;
}
}

std::string adjustedCombinationKey = combinationKey.substr(0, combinationKey.size() - 1);
std::string adjustedAvailableFiles = availableFiles.substr(0, availableFiles.size() - 1);
std::string adjustedTransferredFile = transferredFiles.substr(0, transferredFiles.size() - 1);
std::string adjustedCombinationKey = combinationKey.substr(0, combinationKey.size() - 1);
std::string adjustedAvailableFiles = availableFiles.substr(0, availableFiles.size() - 1);
std::string adjustedTransferredFile = transferredFiles.substr(0, transferredFiles.size() - 1);

fileCombinationMutex.lock();
std::map<std::string, std::string> &combinationWorkerMap = *combinationWorkerMap_p;
if (combinationWorkerMap.find(combinationKey) == combinationWorkerMap.end()) {
if (partitionIdSet.find(std::to_string(partitionId)) != partitionIdSet.end()) {
combinationWorkerMap[combinationKey] = std::to_string(partitionId);
isAggregateValid = true;
}
fileCombinationMutex.lock();
std::map<std::string, std::string> &combinationWorkerMap = *combinationWorkerMap_p;

Check warning on line 887 in src/frontend/core/executor/impl/TriangleCountExecutor.cpp

View check run for this annotation

Codecov / codecov/patch

src/frontend/core/executor/impl/TriangleCountExecutor.cpp#L887

Added line #L887 was not covered by tests
if (combinationWorkerMap.find(combinationKey) == combinationWorkerMap.end()) {
if (partitionIdSet.find(std::to_string(partitionId)) != partitionIdSet.end()) {
combinationWorkerMap[combinationKey] = std::to_string(partitionId);
isAggregateValid = true;

Check warning on line 891 in src/frontend/core/executor/impl/TriangleCountExecutor.cpp

View check run for this annotation

Codecov / codecov/patch

src/frontend/core/executor/impl/TriangleCountExecutor.cpp#L891

Added line #L891 was not covered by tests
}
fileCombinationMutex.unlock();

if (isAggregateValid) {
for (auto transferRequireFileIterator = transferRequireFiles.begin();
transferRequireFileIterator != transferRequireFiles.end(); ++transferRequireFileIterator) {
std::string transferFileName = *transferRequireFileIterator;
std::string fileAccessible = isFileAccessibleToWorker(
std::to_string(graphId), std::string(), host, std::to_string(port), masterIP,
JasmineGraphInstanceProtocol::FILE_TYPE_CENTRALSTORE_COMPOSITE, transferFileName);

if (fileAccessible.compare("false") == 0) {
copyCompositeCentralStoreToAggregator(host, std::to_string(port), std::to_string(dataPort),
transferFileName, masterIP);
}
}
fileCombinationMutex.unlock();

Check warning on line 894 in src/frontend/core/executor/impl/TriangleCountExecutor.cpp

View check run for this annotation

Codecov / codecov/patch

src/frontend/core/executor/impl/TriangleCountExecutor.cpp#L894

Added line #L894 was not covered by tests

if (isAggregateValid) {
for (auto transferRequireFileIterator = transferRequireFiles.begin();

Check warning on line 897 in src/frontend/core/executor/impl/TriangleCountExecutor.cpp

View check run for this annotation

Codecov / codecov/patch

src/frontend/core/executor/impl/TriangleCountExecutor.cpp#L897

Added line #L897 was not covered by tests
transferRequireFileIterator != transferRequireFiles.end(); ++transferRequireFileIterator) {
std::string transferFileName = *transferRequireFileIterator;
std::string fileAccessible = isFileAccessibleToWorker(
std::to_string(graphId), std::string(), host, std::to_string(port), masterIP,
JasmineGraphInstanceProtocol::FILE_TYPE_CENTRALSTORE_COMPOSITE, transferFileName);

if (fileAccessible.compare("false") == 0) {
copyCompositeCentralStoreToAggregator(host, std::to_string(port), std::to_string(dataPort),
transferFileName, masterIP);
}
}

triangleCount_logger.log("###COMPOSITE### Retrieved Composite triangle list ", "debug");
triangleCount_logger.log("###COMPOSITE### Retrieved Composite triangle list ", "debug");

const auto &triangles =
countCompositeCentralStoreTriangles(host, std::to_string(port), adjustedTransferredFile,
masterIP, adjustedAvailableFiles, threadPriority);
if (triangles.size() > 0) {
triangleCount +=
updateTriangleTreeAndGetTriangleCount(triangles, triangleTree_p, triangleTreeMutex_p);
}
const auto &triangles =
countCompositeCentralStoreTriangles(host, std::to_string(port), adjustedTransferredFile, masterIP,
adjustedAvailableFiles, threadPriority);
if (triangles.size() > 0) {
triangleCount +=

Check warning on line 916 in src/frontend/core/executor/impl/TriangleCountExecutor.cpp

View check run for this annotation

Codecov / codecov/patch

src/frontend/core/executor/impl/TriangleCountExecutor.cpp#L916

Added line #L916 was not covered by tests
updateTriangleTreeAndGetTriangleCount(triangles, triangleTree_p, triangleTreeMutex_p);
}
updateMap(partitionId);
}
updateMap(partitionId);
}
}

triangleCount_logger.info("###COMPOSITE### Returning Total Triangles from executer ");
Utils::send_str_wrapper(sockfd, JasmineGraphInstanceProtocol::CLOSE);
close(sockfd);
return triangleCount;

} else {
triangleCount_logger.log("There was an error in the upload process and the response is :: " + response,
"error");
auto end_time_it = endTimes.find(uniqueId);
if (end_time_it != endTimes.end()) {
end_time_it->second.partitionCnt--;

Check warning on line 926 in src/frontend/core/executor/impl/TriangleCountExecutor.cpp

View check run for this annotation

Codecov / codecov/patch

src/frontend/core/executor/impl/TriangleCountExecutor.cpp#L926

Added line #L926 was not covered by tests
}

triangleCount_logger.info("###COMPOSITE### Returning Total Triangles from executer ");
Utils::send_str_wrapper(sockfd, JasmineGraphInstanceProtocol::CLOSE);
close(sockfd);
return 0;
return triangleCount;

Check warning on line 932 in src/frontend/core/executor/impl/TriangleCountExecutor.cpp

View check run for this annotation

Codecov / codecov/patch

src/frontend/core/executor/impl/TriangleCountExecutor.cpp#L932

Added line #L932 was not covered by tests
}

bool TriangleCountExecutor::proceedOrNot(std::set<string> partitionSet, int partitionId) {
Expand Down

0 comments on commit c007d63

Please sign in to comment.