Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(E2E-minion): E2E-minion-crash-during-parallel-iPerf #50

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 60 additions & 27 deletions src/terragraph-e2e/e2e/minion/TrafficApp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,15 @@ TrafficApp::TrafficApp(
monitorSockUrl,
macAddr,
E2EConsts::kTrafficAppMinionId) {

auto lockedIperfAvailablePorts = iperfAvailablePorts_.wlock();
// Initialize available ports
for (int32_t i = FLAGS_iperf_server_port_min;
i <= FLAGS_iperf_server_port_max;
i++) {
iperfAvailablePorts_.insert(i);
lockedIperfAvailablePorts->insert(i);
}
lockedIperfAvailablePorts.unlock();
}

void
Expand Down Expand Up @@ -96,16 +99,22 @@ TrafficApp::processStartIperfServer(
startMsg,
startServer.value());

auto lockedIperfAvailablePorts = iperfAvailablePorts_.rlock();
// Find an unused port
if (iperfAvailablePorts_.empty()) {
if (lockedIperfAvailablePorts->empty()) {
lockedIperfAvailablePorts.unlock();
LOG(ERROR) << "No unused ports to start iperf server";
return;
}
auto iter = iperfAvailablePorts_.begin();
lockedIperfAvailablePorts.unlock();

auto availablePorts = iperfAvailablePorts_.wlock();
auto iter = availablePorts->begin();
int32_t serverPort = *iter;
iperfAvailablePorts_.erase(iter);
availablePorts->erase(iter);
thrift::StartMinionIperf startClient = startServer.value();
startClient.serverPort = serverPort;
availablePorts.unlock();

// With JSON output, nothing gets printed until iperf completes.
if (startClient.iperfConfig.options_ref().has_value() &&
Expand Down Expand Up @@ -151,7 +160,9 @@ TrafficApp::processStartIperfServer(

// Fork the iperf server
std::function<void(pid_t)> pidCallback = [startClient, this](pid_t pid) {
this->iperfProcesses_[startClient.id] = pid;
auto lockedIperfProcess = this->iperfProcesses_.wlock();
lockedIperfProcess->emplace(startClient.id,pid);
lockedIperfProcess.unlock();
};
std::function<void()> initialDataCallback =
[startClient, senderApp, this]() {
Expand All @@ -170,13 +181,15 @@ TrafficApp::processStartIperfServer(
// Notify the controller when we read the first byte of this.
output = forkCommand(command, pidCallback, initialDataCallback);
}
auto lockedIperfAvailablePorts = this->iperfAvailablePorts_.wlock();
if (!output.has_value()) {
this->iperfAvailablePorts_.insert(startClient.serverPort);
lockedIperfAvailablePorts->insert(startClient.serverPort);
lockedIperfAvailablePorts.unlock();
return;
}

auto lockedIperfProcess = this->iperfProcesses_.rlock();
// Log the output
if (this->iperfProcesses_.count(startClient.id)) {
if (lockedIperfProcess->count(startClient.id)) {
LOG(INFO) << "iperf session " << startClient.id << " finished, "
"sending output to controller...";
thrift::IperfOutput iperfOutput;
Expand All @@ -200,9 +213,12 @@ TrafficApp::processStartIperfServer(
thrift::EventLevel::INFO,
folly::sformat("iperf session was killed: {}", startClient.id));
}

this->iperfProcesses_.erase(startClient.id);
this->iperfAvailablePorts_.insert(startClient.serverPort);
lockedIperfProcess.unlock();
auto lockedIperfProcessErase = this->iperfProcesses_.wlock();
lockedIperfProcessErase->erase(startClient.id);
lockedIperfProcessErase.unlock();
lockedIperfAvailablePorts->insert(startClient.serverPort);
lockedIperfAvailablePorts.unlock();
});
iperfServerThread.detach();
}
Expand Down Expand Up @@ -262,15 +278,17 @@ TrafficApp::processStartIperfClient(

// Fork the iperf client
std::function<void(pid_t)> pidCallback = [startClient, this](pid_t pid) {
this->iperfProcesses_[startClient->id] = pid;
auto lockedIperfProcess = this->iperfProcesses_.wlock();
lockedIperfProcess->emplace(startClient->id,pid);
lockedIperfProcess.unlock();
};
auto output = forkCommand(command, pidCallback);
if (!output.has_value()) {
return;
}

auto lockedIperfProcess = this->iperfProcesses_.rlock();
// Log the output
if (this->iperfProcesses_.count(startClient->id)) {
if (lockedIperfProcess->count(startClient->id)) {
LOG(INFO) << "iperf session " << startClient->id << " finished, "
"sending output to controller...";
thrift::IperfOutput iperfOutput;
Expand All @@ -294,15 +312,19 @@ TrafficApp::processStartIperfClient(
thrift::EventLevel::INFO,
folly::sformat("iperf session was killed: {}", startClient->id));
}
lockedIperfProcess.unlock();

this->iperfProcesses_.erase(startClient->id);
auto lockedIperfProcessErase = this->iperfProcesses_.wlock();
lockedIperfProcessErase->erase(startClient->id);
lockedIperfProcessErase.unlock();
});
iperfClientThread.detach();
}

void
TrafficApp::processStopIperf(
const std::string& senderApp, const thrift::Message& message) {
auto lockedIperfProcess = this->iperfProcesses_.rlock();
auto stopIperf = maybeReadThrift<thrift::StopIperf>(message);
if (!stopIperf) {
handleInvalidMessage("StopIperf", senderApp);
Expand All @@ -311,17 +333,18 @@ TrafficApp::processStopIperf(

LOG(INFO) << "Stopping iperf process for session ID: " << stopIperf->id;

auto iter = iperfProcesses_.find(stopIperf->id);
if (iter != iperfProcesses_.end()) {
auto iter = lockedIperfProcess->find(stopIperf->id);
if (iter != lockedIperfProcess->end()) {
pid_t pid = iter->second;

// Delete this map entry first so that the iperf wrapper thread knows the
// process terminated abnormally
iperfProcesses_.erase(iter);

auto lockedIperfProcessErase = this->iperfProcesses_.wlock();
lockedIperfProcessErase->erase(iter);
// Kill the process (only SIGKILL works :/)
kill(pid, SIGKILL);
lockedIperfProcessErase.unlock();
}
lockedIperfProcess.unlock();
}

void
Expand Down Expand Up @@ -473,16 +496,19 @@ TrafficApp::processStartPing(
command.push_back(addr);

// Fork the ping process
std::function<void(pid_t)> pidCallback = [startPing, this](pid_t pid) {
this->pingProcesses_[startPing->id] = pid;
std::function<void(pid_t)> pidCallback = [startPing,this](pid_t pid) {
auto lockedPingProcess = this->pingProcesses_.wlock();
lockedPingProcess->emplace(startPing->id, pid);
lockedPingProcess.unlock();
};
auto output = forkCommand(command, pidCallback);
if (!output.has_value()) {
return;
}

auto lockedPingProcess = pingProcesses_.rlock();
// Log the output
if (this->pingProcesses_.count(startPing->id)) {
if (lockedPingProcess->count(startPing->id)) {
LOG(INFO) << "ping session " << startPing->id << " finished, "
"sending output to controller...";
thrift::PingOutput pingOutput;
Expand All @@ -505,8 +531,11 @@ TrafficApp::processStartPing(
thrift::EventLevel::INFO,
folly::sformat("ping session was killed: {}", startPing->id));
}
lockedPingProcess.unlock();

this->pingProcesses_.erase(startPing->id);
auto lockedPingProcessErase = pingProcesses_.wlock();
lockedPingProcessErase->erase(startPing->id);
lockedPingProcessErase.unlock();
});
pingThread.detach();
}
Expand All @@ -515,24 +544,28 @@ void
TrafficApp::processStopPing(
const std::string& senderApp, const thrift::Message& message) {
auto stopPing = maybeReadThrift<thrift::StopPing>(message);
auto lockedPingProcess = pingProcesses_.rlock();
if (!stopPing) {
handleInvalidMessage("StopPing", senderApp);
return;
}

LOG(INFO) << "Stopping ping process for session ID: " << stopPing->id;

auto iter = pingProcesses_.find(stopPing->id);
if (iter != pingProcesses_.end()) {
auto iter = lockedPingProcess->find(stopPing->id);
if (iter != lockedPingProcess->end()) {
pid_t pid = iter->second;

// Delete this map entry first so that the ping wrapper thread knows the
// process terminated abnormally
pingProcesses_.erase(iter);
auto lockedPingProcessErase = pingProcesses_.wlock();
lockedPingProcessErase->erase(iter);

// Kill the process
kill(pid, SIGTERM);
lockedPingProcessErase.unlock();
}
lockedPingProcess.unlock();
}

void
Expand Down
7 changes: 4 additions & 3 deletions src/terragraph-e2e/e2e/minion/TrafficApp.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

#include <fbzmq/async/ZmqTimeout.h>
#include <fbzmq/zmq/Zmq.h>
#include <folly/Synchronized.h>

#include "MinionApp.h"
#include "e2e/if/gen-cpp2/Controller_types.h"
Expand Down Expand Up @@ -99,13 +100,13 @@ class TrafficApp final : public MinionApp {
std::optional<std::function<void()>> initialDataCallback = std::nullopt);

/** Running iperf processes. */
std::unordered_map<std::string /* id */, pid_t> iperfProcesses_;
folly::Synchronized<std::unordered_map<std::string /* id */, pid_t>> iperfProcesses_;

/** Running ping processes. */
std::unordered_map<std::string /* id */, pid_t> pingProcesses_;
folly::Synchronized<std::unordered_map<std::string /* id */, pid_t>> pingProcesses_;

/** List of unused ports available for iperf. */
std::unordered_set<int32_t> iperfAvailablePorts_;
folly::Synchronized<std::unordered_set<int32_t>> iperfAvailablePorts_;
};

} // namespace minion
Expand Down