Skip to content

Commit

Permalink
Prevent removing workers with load
Browse files Browse the repository at this point in the history
  • Loading branch information
thevindu-w committed May 8, 2024
1 parent 67d2b69 commit f4cabd1
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 2 deletions.
2 changes: 1 addition & 1 deletion src/frontend/core/executor/impl/TriangleCountExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -630,7 +630,7 @@ void TriangleCountExecutor::execute() {
}
for (auto it = used_workers.cbegin(); it != used_workers.cend();) {
if (it->second <= 0) {
used_workers.erase(it++); // or "it = m.erase(it)" since C++11
used_workers.erase(it++);
} else {
it++;
}
Expand Down
14 changes: 13 additions & 1 deletion src/scale/scaler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,19 @@ static void scale_down_thread_fn() {
schedulerMutex.lock();
vector<string> workers; // [worker_id]
const std::vector<vector<pair<string, string>>> &results =
sqlite->runSelect("SELECT DISTINCT idworker FROM worker;");
sqlite->runSelect("SELECT DISTINCT idworker, ip, server_port FROM worker;");

map<string, int> loads;
const map<string, string> &cpu_map = Utils::getMetricMap("cpu_usage");

for (int i = 0; i < results.size(); i++) {
string ip = results[i][1].second;
string port = results[i][2].second;
const auto workerLoadIt = cpu_map.find(ip + ":" + port);
if (workerLoadIt != cpu_map.end()) {
double load = stod(workerLoadIt->second.c_str());
if (load > 0.25) continue; // worker is running some task. should not remove this node.
}
string workerId = results[i][0].second;
workers.push_back(workerId);
}
Expand Down Expand Up @@ -93,6 +104,7 @@ static void scale_down_thread_fn() {
if (it == removing.end()) break;
removing.erase(it);
}
if (removing.empty()) continue;

K8sWorkerController *k8s = K8sWorkerController::getInstance();
k8s->scaleDown(removing);
Expand Down

0 comments on commit f4cabd1

Please sign in to comment.