From 8ab02e33287ef1943e7934e79db313faf31244b8 Mon Sep 17 00:00:00 2001 From: Shrusti-W Date: Wed, 28 Dec 2022 11:03:14 +0530 Subject: [PATCH 1/2] fix(E2E-minion): E2E-minion-crash-during-parallel-iPerf Signed-off-by: Shrusti-W --- src/terragraph-e2e/e2e/minion/TrafficApp.cpp | 92 ++++++++++++++------ src/terragraph-e2e/e2e/minion/TrafficApp.h | 7 +- 2 files changed, 70 insertions(+), 29 deletions(-) diff --git a/src/terragraph-e2e/e2e/minion/TrafficApp.cpp b/src/terragraph-e2e/e2e/minion/TrafficApp.cpp index 1fff7f9..6053971 100644 --- a/src/terragraph-e2e/e2e/minion/TrafficApp.cpp +++ b/src/terragraph-e2e/e2e/minion/TrafficApp.cpp @@ -40,12 +40,15 @@ TrafficApp::TrafficApp( monitorSockUrl, macAddr, E2EConsts::kTrafficAppMinionId) { + + auto lockedIperfAvailablePorts = iperfAvailablePorts_.wlock(); // Initialize available ports for (int32_t i = FLAGS_iperf_server_port_min; i <= FLAGS_iperf_server_port_max; i++) { - iperfAvailablePorts_.insert(i); + lockedIperfAvailablePorts->insert(i); } + lockedIperfAvailablePorts.unlock(); } void @@ -96,16 +99,22 @@ TrafficApp::processStartIperfServer( startMsg, startServer.value()); + auto lockedIperfAvailablePorts = iperfAvailablePorts_.rlock(); // Find an unused port - if (iperfAvailablePorts_.empty()) { + if (lockedIperfAvailablePorts->empty()) { + lockedIperfAvailablePorts.unlock(); LOG(ERROR) << "No unused ports to start iperf server"; return; } - auto iter = iperfAvailablePorts_.begin(); + lockedIperfAvailablePorts.unlock(); + + auto availablePorts = iperfAvailablePorts_.wlock(); + auto iter = availablePorts->begin(); int32_t serverPort = *iter; - iperfAvailablePorts_.erase(iter); + availablePorts->erase(iter); thrift::StartMinionIperf startClient = startServer.value(); startClient.serverPort = serverPort; + availablePorts.unlock(); // With JSON output, nothing gets printed until iperf completes. if (startClient.iperfConfig.options_ref().has_value() && @@ -151,7 +160,9 @@ TrafficApp::processStartIperfServer( // Fork the iperf server std::function pidCallback = [startClient, this](pid_t pid) { - this->iperfProcesses_[startClient.id] = pid; + auto lockedIperfProcess = this->iperfProcesses_.wlock(); + lockedIperfProcess->emplace(startClient.id,pid); + lockedIperfProcess.unlock(); }; std::function initialDataCallback = [startClient, senderApp, this]() { @@ -170,13 +181,15 @@ TrafficApp::processStartIperfServer( // Notify the controller when we read the first byte of this. output = forkCommand(command, pidCallback, initialDataCallback); } + auto lockedIperfAvailablePorts = this->iperfAvailablePorts_.wlock(); if (!output.has_value()) { - this->iperfAvailablePorts_.insert(startClient.serverPort); + lockedIperfAvailablePorts->insert(startClient.serverPort); + lockedIperfAvailablePorts.unlock(); return; } - + auto lockedIperfProcess = this->iperfProcesses_.rlock(); // Log the output - if (this->iperfProcesses_.count(startClient.id)) { + if (lockedIperfProcess->count(startClient.id)) { LOG(INFO) << "iperf session " << startClient.id << " finished, " "sending output to controller..."; thrift::IperfOutput iperfOutput; @@ -192,6 +205,7 @@ TrafficApp::processStartIperfServer( thrift::EventId::IPERF_INFO, thrift::EventLevel::INFO, folly::sformat("iperf session finished: {}", startClient.id)); + lockedIperfProcess.unlock(); } else { LOG(INFO) << "iperf session " << startClient.id << " was killed"; this->eventClient_->logEvent( @@ -199,10 +213,14 @@ TrafficApp::processStartIperfServer( thrift::EventId::IPERF_INFO, thrift::EventLevel::INFO, folly::sformat("iperf session was killed: {}", startClient.id)); + lockedIperfProcess.unlock(); } - this->iperfProcesses_.erase(startClient.id); - this->iperfAvailablePorts_.insert(startClient.serverPort); + auto lockedIperfProcessErase = this->iperfProcesses_.wlock(); + lockedIperfProcessErase->erase(startClient.id); + lockedIperfProcessErase.unlock(); + lockedIperfAvailablePorts->insert(startClient.serverPort); + lockedIperfAvailablePorts.unlock(); }); iperfServerThread.detach(); } @@ -262,15 +280,17 @@ TrafficApp::processStartIperfClient( // Fork the iperf client std::function pidCallback = [startClient, this](pid_t pid) { - this->iperfProcesses_[startClient->id] = pid; + auto lockedIperfProcess = this->iperfProcesses_.wlock(); + lockedIperfProcess->emplace(startClient->id,pid); + lockedIperfProcess.unlock(); }; auto output = forkCommand(command, pidCallback); if (!output.has_value()) { return; } - + auto lockedIperfProcess = this->iperfProcesses_.rlock(); // Log the output - if (this->iperfProcesses_.count(startClient->id)) { + if (lockedIperfProcess->count(startClient->id)) { LOG(INFO) << "iperf session " << startClient->id << " finished, " "sending output to controller..."; thrift::IperfOutput iperfOutput; @@ -286,6 +306,7 @@ TrafficApp::processStartIperfClient( thrift::EventId::IPERF_INFO, thrift::EventLevel::INFO, folly::sformat("iperf session finished: {}", startClient->id)); + lockedIperfProcess.unlock(); } else { LOG(INFO) << "iperf session " << startClient->id << " was killed"; this->eventClient_->logEvent( @@ -293,9 +314,12 @@ TrafficApp::processStartIperfClient( thrift::EventId::IPERF_INFO, thrift::EventLevel::INFO, folly::sformat("iperf session was killed: {}", startClient->id)); + lockedIperfProcess.unlock(); } - this->iperfProcesses_.erase(startClient->id); + auto lockedIperfProcessErase = this->iperfProcesses_.wlock(); + lockedIperfProcessErase->erase(startClient->id); + lockedIperfProcessErase.unlock(); }); iperfClientThread.detach(); } @@ -303,6 +327,7 @@ TrafficApp::processStartIperfClient( void TrafficApp::processStopIperf( const std::string& senderApp, const thrift::Message& message) { + auto lockedIperfProcess = this->iperfProcesses_.rlock(); auto stopIperf = maybeReadThrift(message); if (!stopIperf) { handleInvalidMessage("StopIperf", senderApp); @@ -311,16 +336,18 @@ TrafficApp::processStopIperf( LOG(INFO) << "Stopping iperf process for session ID: " << stopIperf->id; - auto iter = iperfProcesses_.find(stopIperf->id); - if (iter != iperfProcesses_.end()) { + auto iter = lockedIperfProcess->find(stopIperf->id); + lockedIperfProcess.unlock(); + if (iter != lockedIperfProcess->end()) { pid_t pid = iter->second; - + lockedIperfProcess.unlock(); // Delete this map entry first so that the iperf wrapper thread knows the // process terminated abnormally - iperfProcesses_.erase(iter); - + auto lockedIperfProcessErase = this->iperfProcesses_.wlock(); + lockedIperfProcessErase->erase(iter); // Kill the process (only SIGKILL works :/) kill(pid, SIGKILL); + lockedIperfProcessErase.unlock(); } } @@ -473,16 +500,19 @@ TrafficApp::processStartPing( command.push_back(addr); // Fork the ping process - std::function pidCallback = [startPing, this](pid_t pid) { - this->pingProcesses_[startPing->id] = pid; + std::function pidCallback = [startPing,this](pid_t pid) { + auto lockedPingProcess = this->pingProcesses_.wlock(); + lockedPingProcess->emplace(startPing->id, pid); + lockedPingProcess.unlock(); }; auto output = forkCommand(command, pidCallback); if (!output.has_value()) { return; } + auto lockedPingProcess = pingProcesses_.rlock(); // Log the output - if (this->pingProcesses_.count(startPing->id)) { + if (lockedPingProcess->count(startPing->id)) { LOG(INFO) << "ping session " << startPing->id << " finished, " "sending output to controller..."; thrift::PingOutput pingOutput; @@ -497,6 +527,7 @@ TrafficApp::processStartPing( thrift::EventId::PING_INFO, thrift::EventLevel::INFO, folly::sformat("ping session finished: {}", startPing->id)); + lockedPingProcess.unlock(); } else { LOG(INFO) << "ping session " << startPing->id << " was killed"; this->eventClient_->logEvent( @@ -504,9 +535,12 @@ TrafficApp::processStartPing( thrift::EventId::PING_INFO, thrift::EventLevel::INFO, folly::sformat("ping session was killed: {}", startPing->id)); + lockedPingProcess.unlock(); } - this->pingProcesses_.erase(startPing->id); + auto lockedPingProcessErase = pingProcesses_.wlock(); + lockedPingProcessErase->erase(startPing->id); + lockedPingProcessErase.unlock(); }); pingThread.detach(); } @@ -515,6 +549,7 @@ void TrafficApp::processStopPing( const std::string& senderApp, const thrift::Message& message) { auto stopPing = maybeReadThrift(message); + auto lockedPingProcess = pingProcesses_.rlock(); if (!stopPing) { handleInvalidMessage("StopPing", senderApp); return; @@ -522,16 +557,21 @@ TrafficApp::processStopPing( LOG(INFO) << "Stopping ping process for session ID: " << stopPing->id; - auto iter = pingProcesses_.find(stopPing->id); - if (iter != pingProcesses_.end()) { + auto iter = lockedPingProcess->find(stopPing->id); + lockedPingProcess.unlock(); + if (iter != lockedPingProcess->end()) { pid_t pid = iter->second; + lockedPingProcess.unlock(); + // Delete this map entry first so that the ping wrapper thread knows the // process terminated abnormally - pingProcesses_.erase(iter); + auto lockedPingProcessErase = pingProcesses_.wlock(); + lockedPingProcessErase->erase(iter); // Kill the process kill(pid, SIGTERM); + lockedPingProcessErase.unlock(); } } diff --git a/src/terragraph-e2e/e2e/minion/TrafficApp.h b/src/terragraph-e2e/e2e/minion/TrafficApp.h index 21a46c5..79e960c 100644 --- a/src/terragraph-e2e/e2e/minion/TrafficApp.h +++ b/src/terragraph-e2e/e2e/minion/TrafficApp.h @@ -11,6 +11,7 @@ #include #include +#include #include "MinionApp.h" #include "e2e/if/gen-cpp2/Controller_types.h" @@ -99,13 +100,13 @@ class TrafficApp final : public MinionApp { std::optional> initialDataCallback = std::nullopt); /** Running iperf processes. */ - std::unordered_map iperfProcesses_; + folly::Synchronized> iperfProcesses_; /** Running ping processes. */ - std::unordered_map pingProcesses_; + folly::Synchronized> pingProcesses_; /** List of unused ports available for iperf. */ - std::unordered_set iperfAvailablePorts_; + folly::Synchronized> iperfAvailablePorts_; }; } // namespace minion From c878d12bcc2ea501291f5486c30e573094cd7276 Mon Sep 17 00:00:00 2001 From: Shrusti-W Date: Sat, 31 Dec 2022 13:34:05 +0530 Subject: [PATCH 2/2] fix(E2E-minion): E2E-minion-crash-during-parallel-iPerf Signed-off-by: Shrusti-W --- src/terragraph-e2e/e2e/minion/TrafficApp.cpp | 17 +++++------------ 1 file changed, 5 insertions(+), 12 deletions(-) diff --git a/src/terragraph-e2e/e2e/minion/TrafficApp.cpp b/src/terragraph-e2e/e2e/minion/TrafficApp.cpp index 6053971..5ca6bad 100644 --- a/src/terragraph-e2e/e2e/minion/TrafficApp.cpp +++ b/src/terragraph-e2e/e2e/minion/TrafficApp.cpp @@ -205,7 +205,6 @@ TrafficApp::processStartIperfServer( thrift::EventId::IPERF_INFO, thrift::EventLevel::INFO, folly::sformat("iperf session finished: {}", startClient.id)); - lockedIperfProcess.unlock(); } else { LOG(INFO) << "iperf session " << startClient.id << " was killed"; this->eventClient_->logEvent( @@ -213,9 +212,8 @@ TrafficApp::processStartIperfServer( thrift::EventId::IPERF_INFO, thrift::EventLevel::INFO, folly::sformat("iperf session was killed: {}", startClient.id)); - lockedIperfProcess.unlock(); } - + lockedIperfProcess.unlock(); auto lockedIperfProcessErase = this->iperfProcesses_.wlock(); lockedIperfProcessErase->erase(startClient.id); lockedIperfProcessErase.unlock(); @@ -306,7 +304,6 @@ TrafficApp::processStartIperfClient( thrift::EventId::IPERF_INFO, thrift::EventLevel::INFO, folly::sformat("iperf session finished: {}", startClient->id)); - lockedIperfProcess.unlock(); } else { LOG(INFO) << "iperf session " << startClient->id << " was killed"; this->eventClient_->logEvent( @@ -314,8 +311,8 @@ TrafficApp::processStartIperfClient( thrift::EventId::IPERF_INFO, thrift::EventLevel::INFO, folly::sformat("iperf session was killed: {}", startClient->id)); - lockedIperfProcess.unlock(); } + lockedIperfProcess.unlock(); auto lockedIperfProcessErase = this->iperfProcesses_.wlock(); lockedIperfProcessErase->erase(startClient->id); @@ -337,10 +334,8 @@ TrafficApp::processStopIperf( LOG(INFO) << "Stopping iperf process for session ID: " << stopIperf->id; auto iter = lockedIperfProcess->find(stopIperf->id); - lockedIperfProcess.unlock(); if (iter != lockedIperfProcess->end()) { pid_t pid = iter->second; - lockedIperfProcess.unlock(); // Delete this map entry first so that the iperf wrapper thread knows the // process terminated abnormally auto lockedIperfProcessErase = this->iperfProcesses_.wlock(); @@ -349,6 +344,7 @@ TrafficApp::processStopIperf( kill(pid, SIGKILL); lockedIperfProcessErase.unlock(); } + lockedIperfProcess.unlock(); } void @@ -527,7 +523,6 @@ TrafficApp::processStartPing( thrift::EventId::PING_INFO, thrift::EventLevel::INFO, folly::sformat("ping session finished: {}", startPing->id)); - lockedPingProcess.unlock(); } else { LOG(INFO) << "ping session " << startPing->id << " was killed"; this->eventClient_->logEvent( @@ -535,8 +530,8 @@ TrafficApp::processStartPing( thrift::EventId::PING_INFO, thrift::EventLevel::INFO, folly::sformat("ping session was killed: {}", startPing->id)); - lockedPingProcess.unlock(); } + lockedPingProcess.unlock(); auto lockedPingProcessErase = pingProcesses_.wlock(); lockedPingProcessErase->erase(startPing->id); @@ -558,12 +553,9 @@ TrafficApp::processStopPing( LOG(INFO) << "Stopping ping process for session ID: " << stopPing->id; auto iter = lockedPingProcess->find(stopPing->id); - lockedPingProcess.unlock(); if (iter != lockedPingProcess->end()) { pid_t pid = iter->second; - lockedPingProcess.unlock(); - // Delete this map entry first so that the ping wrapper thread knows the // process terminated abnormally auto lockedPingProcessErase = pingProcesses_.wlock(); @@ -573,6 +565,7 @@ TrafficApp::processStopPing( kill(pid, SIGTERM); lockedPingProcessErase.unlock(); } + lockedPingProcess.unlock(); } void