diff --git a/src/terragraph-e2e/e2e/minion/TrafficApp.cpp b/src/terragraph-e2e/e2e/minion/TrafficApp.cpp index 1fff7f9..5ca6bad 100644 --- a/src/terragraph-e2e/e2e/minion/TrafficApp.cpp +++ b/src/terragraph-e2e/e2e/minion/TrafficApp.cpp @@ -40,12 +40,15 @@ TrafficApp::TrafficApp( monitorSockUrl, macAddr, E2EConsts::kTrafficAppMinionId) { + + auto lockedIperfAvailablePorts = iperfAvailablePorts_.wlock(); // Initialize available ports for (int32_t i = FLAGS_iperf_server_port_min; i <= FLAGS_iperf_server_port_max; i++) { - iperfAvailablePorts_.insert(i); + lockedIperfAvailablePorts->insert(i); } + lockedIperfAvailablePorts.unlock(); } void @@ -96,16 +99,22 @@ TrafficApp::processStartIperfServer( startMsg, startServer.value()); + auto lockedIperfAvailablePorts = iperfAvailablePorts_.rlock(); // Find an unused port - if (iperfAvailablePorts_.empty()) { + if (lockedIperfAvailablePorts->empty()) { + lockedIperfAvailablePorts.unlock(); LOG(ERROR) << "No unused ports to start iperf server"; return; } - auto iter = iperfAvailablePorts_.begin(); + lockedIperfAvailablePorts.unlock(); + + auto availablePorts = iperfAvailablePorts_.wlock(); + auto iter = availablePorts->begin(); int32_t serverPort = *iter; - iperfAvailablePorts_.erase(iter); + availablePorts->erase(iter); thrift::StartMinionIperf startClient = startServer.value(); startClient.serverPort = serverPort; + availablePorts.unlock(); // With JSON output, nothing gets printed until iperf completes. if (startClient.iperfConfig.options_ref().has_value() && @@ -151,7 +160,9 @@ TrafficApp::processStartIperfServer( // Fork the iperf server std::function pidCallback = [startClient, this](pid_t pid) { - this->iperfProcesses_[startClient.id] = pid; + auto lockedIperfProcess = this->iperfProcesses_.wlock(); + lockedIperfProcess->emplace(startClient.id,pid); + lockedIperfProcess.unlock(); }; std::function initialDataCallback = [startClient, senderApp, this]() { @@ -170,13 +181,15 @@ TrafficApp::processStartIperfServer( // Notify the controller when we read the first byte of this. output = forkCommand(command, pidCallback, initialDataCallback); } + auto lockedIperfAvailablePorts = this->iperfAvailablePorts_.wlock(); if (!output.has_value()) { - this->iperfAvailablePorts_.insert(startClient.serverPort); + lockedIperfAvailablePorts->insert(startClient.serverPort); + lockedIperfAvailablePorts.unlock(); return; } - + auto lockedIperfProcess = this->iperfProcesses_.rlock(); // Log the output - if (this->iperfProcesses_.count(startClient.id)) { + if (lockedIperfProcess->count(startClient.id)) { LOG(INFO) << "iperf session " << startClient.id << " finished, " "sending output to controller..."; thrift::IperfOutput iperfOutput; @@ -200,9 +213,12 @@ TrafficApp::processStartIperfServer( thrift::EventLevel::INFO, folly::sformat("iperf session was killed: {}", startClient.id)); } - - this->iperfProcesses_.erase(startClient.id); - this->iperfAvailablePorts_.insert(startClient.serverPort); + lockedIperfProcess.unlock(); + auto lockedIperfProcessErase = this->iperfProcesses_.wlock(); + lockedIperfProcessErase->erase(startClient.id); + lockedIperfProcessErase.unlock(); + lockedIperfAvailablePorts->insert(startClient.serverPort); + lockedIperfAvailablePorts.unlock(); }); iperfServerThread.detach(); } @@ -262,15 +278,17 @@ TrafficApp::processStartIperfClient( // Fork the iperf client std::function pidCallback = [startClient, this](pid_t pid) { - this->iperfProcesses_[startClient->id] = pid; + auto lockedIperfProcess = this->iperfProcesses_.wlock(); + lockedIperfProcess->emplace(startClient->id,pid); + lockedIperfProcess.unlock(); }; auto output = forkCommand(command, pidCallback); if (!output.has_value()) { return; } - + auto lockedIperfProcess = this->iperfProcesses_.rlock(); // Log the output - if (this->iperfProcesses_.count(startClient->id)) { + if (lockedIperfProcess->count(startClient->id)) { LOG(INFO) << "iperf session " << startClient->id << " finished, " "sending output to controller..."; thrift::IperfOutput iperfOutput; @@ -294,8 +312,11 @@ TrafficApp::processStartIperfClient( thrift::EventLevel::INFO, folly::sformat("iperf session was killed: {}", startClient->id)); } + lockedIperfProcess.unlock(); - this->iperfProcesses_.erase(startClient->id); + auto lockedIperfProcessErase = this->iperfProcesses_.wlock(); + lockedIperfProcessErase->erase(startClient->id); + lockedIperfProcessErase.unlock(); }); iperfClientThread.detach(); } @@ -303,6 +324,7 @@ TrafficApp::processStartIperfClient( void TrafficApp::processStopIperf( const std::string& senderApp, const thrift::Message& message) { + auto lockedIperfProcess = this->iperfProcesses_.rlock(); auto stopIperf = maybeReadThrift(message); if (!stopIperf) { handleInvalidMessage("StopIperf", senderApp); @@ -311,17 +333,18 @@ TrafficApp::processStopIperf( LOG(INFO) << "Stopping iperf process for session ID: " << stopIperf->id; - auto iter = iperfProcesses_.find(stopIperf->id); - if (iter != iperfProcesses_.end()) { + auto iter = lockedIperfProcess->find(stopIperf->id); + if (iter != lockedIperfProcess->end()) { pid_t pid = iter->second; - // Delete this map entry first so that the iperf wrapper thread knows the // process terminated abnormally - iperfProcesses_.erase(iter); - + auto lockedIperfProcessErase = this->iperfProcesses_.wlock(); + lockedIperfProcessErase->erase(iter); // Kill the process (only SIGKILL works :/) kill(pid, SIGKILL); + lockedIperfProcessErase.unlock(); } + lockedIperfProcess.unlock(); } void @@ -473,16 +496,19 @@ TrafficApp::processStartPing( command.push_back(addr); // Fork the ping process - std::function pidCallback = [startPing, this](pid_t pid) { - this->pingProcesses_[startPing->id] = pid; + std::function pidCallback = [startPing,this](pid_t pid) { + auto lockedPingProcess = this->pingProcesses_.wlock(); + lockedPingProcess->emplace(startPing->id, pid); + lockedPingProcess.unlock(); }; auto output = forkCommand(command, pidCallback); if (!output.has_value()) { return; } + auto lockedPingProcess = pingProcesses_.rlock(); // Log the output - if (this->pingProcesses_.count(startPing->id)) { + if (lockedPingProcess->count(startPing->id)) { LOG(INFO) << "ping session " << startPing->id << " finished, " "sending output to controller..."; thrift::PingOutput pingOutput; @@ -505,8 +531,11 @@ TrafficApp::processStartPing( thrift::EventLevel::INFO, folly::sformat("ping session was killed: {}", startPing->id)); } + lockedPingProcess.unlock(); - this->pingProcesses_.erase(startPing->id); + auto lockedPingProcessErase = pingProcesses_.wlock(); + lockedPingProcessErase->erase(startPing->id); + lockedPingProcessErase.unlock(); }); pingThread.detach(); } @@ -515,6 +544,7 @@ void TrafficApp::processStopPing( const std::string& senderApp, const thrift::Message& message) { auto stopPing = maybeReadThrift(message); + auto lockedPingProcess = pingProcesses_.rlock(); if (!stopPing) { handleInvalidMessage("StopPing", senderApp); return; @@ -522,17 +552,20 @@ TrafficApp::processStopPing( LOG(INFO) << "Stopping ping process for session ID: " << stopPing->id; - auto iter = pingProcesses_.find(stopPing->id); - if (iter != pingProcesses_.end()) { + auto iter = lockedPingProcess->find(stopPing->id); + if (iter != lockedPingProcess->end()) { pid_t pid = iter->second; // Delete this map entry first so that the ping wrapper thread knows the // process terminated abnormally - pingProcesses_.erase(iter); + auto lockedPingProcessErase = pingProcesses_.wlock(); + lockedPingProcessErase->erase(iter); // Kill the process kill(pid, SIGTERM); + lockedPingProcessErase.unlock(); } + lockedPingProcess.unlock(); } void diff --git a/src/terragraph-e2e/e2e/minion/TrafficApp.h b/src/terragraph-e2e/e2e/minion/TrafficApp.h index 21a46c5..79e960c 100644 --- a/src/terragraph-e2e/e2e/minion/TrafficApp.h +++ b/src/terragraph-e2e/e2e/minion/TrafficApp.h @@ -11,6 +11,7 @@ #include #include +#include #include "MinionApp.h" #include "e2e/if/gen-cpp2/Controller_types.h" @@ -99,13 +100,13 @@ class TrafficApp final : public MinionApp { std::optional> initialDataCallback = std::nullopt); /** Running iperf processes. */ - std::unordered_map iperfProcesses_; + folly::Synchronized> iperfProcesses_; /** Running ping processes. */ - std::unordered_map pingProcesses_; + folly::Synchronized> pingProcesses_; /** List of unused ports available for iperf. */ - std::unordered_set iperfAvailablePorts_; + folly::Synchronized> iperfAvailablePorts_; }; } // namespace minion