From 049d75cffb233a28e9928b5062945d8054e209d5 Mon Sep 17 00:00:00 2001 From: "Addisu Z. Taddese" Date: Fri, 26 Jul 2024 12:18:17 -0500 Subject: [PATCH 1/7] Use self-pipe trick to implement signal handlers This allows the rest of the Gazebo codebase to use callbacks that can use non async-signal-safe functions Signed-off-by: Addisu Z. Taddese --- src/SignalHandler.cc | 98 ++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 95 insertions(+), 3 deletions(-) diff --git a/src/SignalHandler.cc b/src/SignalHandler.cc index 07b6d054..33750b19 100644 --- a/src/SignalHandler.cc +++ b/src/SignalHandler.cc @@ -19,8 +19,11 @@ // comments when upgrading to gz-cmake's "make codecheck" #include "gz/common/SignalHandler.hh" // NOLINT(*) #include +#include #include // NOLINT(*) #include // NOLINT(*) +#include +#include #include // NOLINT(*) #include // NOLINT(*) #include // NOLINT(*) @@ -31,13 +34,52 @@ using namespace gz; using namespace common; // A wrapper for the sigaction sa_handler. +// TODO(azeey) We should avoid using objects with non-trivial destructors as globals. GZ_COMMON_VISIBLE std::map> gOnSignalWrappers; std::mutex gWrapperMutex; +namespace +{ + +class SelfPipe +{ + public: + static int pipeFd[2]; + + public: + static void Initialize(); + + public: + ~SelfPipe(); + + private: + SelfPipe(); + + private: + void CheckPipe(); + + private: + std::thread checkPipeThread; + + private: + std::atomic running{false}; +}; + +int SelfPipe::pipeFd[2]; + +void onSignalTopHalf(int _value) +{ + auto valueByte = static_cast(_value); + if (write(SelfPipe::pipeFd[1], &valueByte, 1) == -1) + { + // TODO (azeey) Not clear what to do here. + } +} + ///////////////////////////////////////////////// /// \brief Callback to execute when a signal is received. /// \param[in] _value Signal number. -void onSignal(int _value) +void onSignalBottomHalf(int _value) { std::lock_guard lock(gWrapperMutex); // Send the signal to each wrapper @@ -47,6 +89,55 @@ void onSignal(int _value) } } +SelfPipe::SelfPipe() +{ + if (pipe(this->pipeFd)) + { + gzerr << "Unable to create pipe\n"; + } + + int flags = fcntl(this->pipeFd[1], F_GETFL); + fcntl(this->pipeFd[1], F_SETFL, flags | O_NONBLOCK); + // TODO(azeey) Handle errors + this->running = true; + std::cout << "Initialized self pipe " << running << std::endl; + this->checkPipeThread = std::thread(&SelfPipe::CheckPipe, this); +} + +SelfPipe::~SelfPipe() +{ + this->running = false; + onSignalTopHalf(127); + this->checkPipeThread.join(); +} +void SelfPipe::Initialize() +{ + // We actually need this object to be destructed in order to join the thread, + // so we can't use gz::utils::NeverDestroyed here. + static SelfPipe selfPipe; +} + +void SelfPipe::CheckPipe() +{ + while (this->running) + { + std::uint8_t signal; + if (read(SelfPipe::pipeFd[0], &signal, 1) != -1) + { + if (this->running) + { + onSignalBottomHalf(signal); + } + } + else + { + gzerr << errno << " " << std::strerror(errno) << std::endl; + } + } +} + +} // namespace + ///////////////////////////////////////////////// class common::SignalHandlerPrivate { @@ -74,14 +165,15 @@ SignalHandler::SignalHandler() static int counter = 0; std::lock_guard lock(gWrapperMutex); - if (std::signal(SIGINT, onSignal) == SIG_ERR) + SelfPipe::Initialize(); + if (std::signal(SIGINT, onSignalTopHalf) == SIG_ERR) { gzerr << "Unable to catch SIGINT.\n" << " Please visit http://community.gazebosim.org for help.\n"; return; } - if (std::signal(SIGTERM, onSignal) == SIG_ERR) + if (std::signal(SIGTERM, onSignalTopHalf) == SIG_ERR) { gzerr << "Unable to catch SIGTERM.\n" << " Please visit http://community.gazebosim.org for help.\n"; From fc950a87a245b17e1635116dd756fb85ef89fb1d Mon Sep 17 00:00:00 2001 From: "Addisu Z. Taddese" Date: Fri, 26 Jul 2024 13:12:09 -0500 Subject: [PATCH 2/7] Add small sleeps since callbacks are now coming from a separate thread Signed-off-by: Addisu Z. Taddese --- src/SignalHandler_TEST.cc | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/SignalHandler_TEST.cc b/src/SignalHandler_TEST.cc index 420f3aa6..fc489601 100644 --- a/src/SignalHandler_TEST.cc +++ b/src/SignalHandler_TEST.cc @@ -19,6 +19,7 @@ // comments when upgrading to gz-cmake's "make codecheck" #include "gz/common/SignalHandler.hh" // NOLINT(*) #include // NOLINT(*) +#include #include // NOLINT(*) #include // NOLINT(*) #include // NOLINT(*) @@ -79,6 +80,7 @@ TEST(SignalHandler, Single) common::SignalHandler handler1; EXPECT_TRUE(handler1.AddCallback(handler1Cb)); std::raise(SIGTERM); + std::this_thread::sleep_for(std::chrono::milliseconds(1)); EXPECT_EQ(SIGTERM, gHandler1Sig); } @@ -98,6 +100,7 @@ TEST(SignalHandler, Multiple) std::raise(SIGINT); + std::this_thread::sleep_for(std::chrono::milliseconds(1)); EXPECT_EQ(-1, gHandler1Sig); EXPECT_EQ(-1, gHandler2Sig); From 258a565ac1d22810cbcc92def29bcd5cdaceded9 Mon Sep 17 00:00:00 2001 From: "Addisu Z. Taddese" Date: Thu, 15 Aug 2024 18:01:53 -0500 Subject: [PATCH 3/7] Address reviewer feedback, support windows, add documentation Signed-off-by: Addisu Z. Taddese --- src/SignalHandler.cc | 132 ++++++++++++++++++++++++-------------- src/SignalHandler_TEST.cc | 5 +- 2 files changed, 88 insertions(+), 49 deletions(-) diff --git a/src/SignalHandler.cc b/src/SignalHandler.cc index 33750b19..8643f5a9 100644 --- a/src/SignalHandler.cc +++ b/src/SignalHandler.cc @@ -23,7 +23,11 @@ #include // NOLINT(*) #include // NOLINT(*) #include -#include +#ifndef _WIN32 + #include +#else + #include +#endif #include // NOLINT(*) #include // NOLINT(*) #include // NOLINT(*) @@ -38,78 +42,106 @@ using namespace common; GZ_COMMON_VISIBLE std::map> gOnSignalWrappers; std::mutex gWrapperMutex; +#ifdef _WIN32 + define write _write + define read _read +#endif namespace { -class SelfPipe -{ - public: - static int pipeFd[2]; - - public: - static void Initialize(); - - public: - ~SelfPipe(); - - private: - SelfPipe(); - - private: - void CheckPipe(); - - private: - std::thread checkPipeThread; - - private: - std::atomic running{false}; +/// \brief Index of the read file descriptor +constexpr int kReadFd = 0; +/// \brief Index of the write file descriptor +constexpr int kWriteFd = 1; + +/// \brief Class to encalpsulate the self-pipe trick which is a way enable the user of +/// non async-signal-safe functions in downstream signal handler +/// callbacks. +/// +/// It works by creating a pipe between the actual signal handler and +/// a servicing thread. When a signal is received the signal handler +/// writes a byte to the pipe and returns. The servicing thread reads the +/// byte from the pipe and calls all of the registered callbacks. Since +/// the registered callbacks are called from a regular thread instead of +/// an actual signal handler, the callbacks are free to use any function +/// (e.g. call gzdbg). +class SelfPipe { + + /// \brief The two pipes the comprise the self-pipe + public: static int pipeFd[2]; + + /// \brief Static function to create a singleton SelfPipe object + public: static void Initialize(); + + /// \brief Destructor. + public: ~SelfPipe(); + + /// \brief Constructor + /// Creates the pipes, applies configuration flags and starts the servicing + /// thread + private: SelfPipe(); + + /// \brief Servicing thread + private: void CheckPipe(); + + /// \brief Handle for CheckPipe thread + private: std::thread checkPipeThread; + + /// \brief Whether the program is running. This is set to true by the + /// Constructor and set to false by the destructor + private: std::atomic running{false}; }; int SelfPipe::pipeFd[2]; -void onSignalTopHalf(int _value) -{ - auto valueByte = static_cast(_value); - if (write(SelfPipe::pipeFd[1], &valueByte, 1) == -1) - { - // TODO (azeey) Not clear what to do here. - } -} - ///////////////////////////////////////////////// /// \brief Callback to execute when a signal is received. +/// This simply writes a byte to a pipe and returns /// \param[in] _value Signal number. -void onSignalBottomHalf(int _value) +void onSignalWriteToSelfPipe(int _value) { - std::lock_guard lock(gWrapperMutex); - // Send the signal to each wrapper - for (std::pair> func : gOnSignalWrappers) + auto valueByte = static_cast(_value); + if (write(SelfPipe::pipeFd[kWriteFd], &valueByte, 1) == -1) { - func.second(_value); + // TODO (azeey) Not clear what to do here. } } +///////////////////////////////////////////////// SelfPipe::SelfPipe() { - if (pipe(this->pipeFd)) +#ifdef _WIN32 + if (_pipe(this->pipeFd, 256, O_BINARY) == -1) +#else + if (pipe(this->pipeFd) == -1) +#endif { gzerr << "Unable to create pipe\n"; } - int flags = fcntl(this->pipeFd[1], F_GETFL); - fcntl(this->pipeFd[1], F_SETFL, flags | O_NONBLOCK); - // TODO(azeey) Handle errors +#ifndef _WIN32 + int flags = fcntl(this->pipeFd[kWriteFd], F_GETFL); + if (fcntl(this->pipeFd[kWriteFd], F_SETFL, flags | O_NONBLOCK) < 0) + { + gzerr << "Failed to set flags on pipe. Signal handling may not work properly" << std::endl; + } +#endif this->running = true; - std::cout << "Initialized self pipe " << running << std::endl; this->checkPipeThread = std::thread(&SelfPipe::CheckPipe, this); } +///////////////////////////////////////////////// SelfPipe::~SelfPipe() { this->running = false; - onSignalTopHalf(127); + // Write a dummy signal value to the pipe. This is not a real signal, but we + // need to wakeup the CheckPipe thread so it can cleanup properly. The value + // was chosen to make it clear that this is not one of the standard signals. + onSignalWriteToSelfPipe(127); this->checkPipeThread.join(); } + +///////////////////////////////////////////////// void SelfPipe::Initialize() { // We actually need this object to be destructed in order to join the thread, @@ -117,16 +149,22 @@ void SelfPipe::Initialize() static SelfPipe selfPipe; } +///////////////////////////////////////////////// void SelfPipe::CheckPipe() { while (this->running) { std::uint8_t signal; - if (read(SelfPipe::pipeFd[0], &signal, 1) != -1) + if (read(SelfPipe::pipeFd[kReadFd], &signal, 1) != -1) { if (this->running) { - onSignalBottomHalf(signal); + std::lock_guard lock(gWrapperMutex); + // Send the signal to each wrapper + for (std::pair> func : gOnSignalWrappers) + { + func.second(signal); + } } } else @@ -166,14 +204,14 @@ SignalHandler::SignalHandler() std::lock_guard lock(gWrapperMutex); SelfPipe::Initialize(); - if (std::signal(SIGINT, onSignalTopHalf) == SIG_ERR) + if (std::signal(SIGINT, onSignalWriteToSelfPipe) == SIG_ERR) { gzerr << "Unable to catch SIGINT.\n" << " Please visit http://community.gazebosim.org for help.\n"; return; } - if (std::signal(SIGTERM, onSignalTopHalf) == SIG_ERR) + if (std::signal(SIGTERM, onSignalWriteToSelfPipe) == SIG_ERR) { gzerr << "Unable to catch SIGTERM.\n" << " Please visit http://community.gazebosim.org for help.\n"; diff --git a/src/SignalHandler_TEST.cc b/src/SignalHandler_TEST.cc index fc489601..246b86c7 100644 --- a/src/SignalHandler_TEST.cc +++ b/src/SignalHandler_TEST.cc @@ -80,7 +80,7 @@ TEST(SignalHandler, Single) common::SignalHandler handler1; EXPECT_TRUE(handler1.AddCallback(handler1Cb)); std::raise(SIGTERM); - std::this_thread::sleep_for(std::chrono::milliseconds(1)); + std::this_thread::sleep_for(std::chrono::milliseconds(11)); EXPECT_EQ(SIGTERM, gHandler1Sig); } @@ -100,7 +100,7 @@ TEST(SignalHandler, Multiple) std::raise(SIGINT); - std::this_thread::sleep_for(std::chrono::milliseconds(1)); + std::this_thread::sleep_for(std::chrono::milliseconds(11)); EXPECT_EQ(-1, gHandler1Sig); EXPECT_EQ(-1, gHandler2Sig); @@ -130,6 +130,7 @@ TEST(SignalHandler, InitFailure) std::raise(SIGINT); + std::this_thread::sleep_for(std::chrono::milliseconds(11)); EXPECT_EQ(-1, gHandler1Sig); EXPECT_EQ(-1, gHandler2Sig); } From 91f3cb52e5f9a48630126f375c29156227a08a8c Mon Sep 17 00:00:00 2001 From: "Addisu Z. Taddese" Date: Thu, 15 Aug 2024 18:28:45 -0500 Subject: [PATCH 4/7] Missing # Signed-off-by: Addisu Z. Taddese --- src/SignalHandler.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/SignalHandler.cc b/src/SignalHandler.cc index 8643f5a9..480a6e3e 100644 --- a/src/SignalHandler.cc +++ b/src/SignalHandler.cc @@ -43,8 +43,8 @@ GZ_COMMON_VISIBLE std::map> gOnSignalWrappers; std::mutex gWrapperMutex; #ifdef _WIN32 - define write _write - define read _read + #define write _write + #define read _read #endif namespace { From 3611f538242c8cd5978611673da2fb7cf54aa095 Mon Sep 17 00:00:00 2001 From: "Addisu Z. Taddese" Date: Tue, 20 Aug 2024 22:52:09 -0500 Subject: [PATCH 5/7] Add rapid fire test Signed-off-by: Addisu Z. Taddese --- src/SignalHandler_TEST.cc | 36 ++++++++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/src/SignalHandler_TEST.cc b/src/SignalHandler_TEST.cc index 246b86c7..5df2a9d5 100644 --- a/src/SignalHandler_TEST.cc +++ b/src/SignalHandler_TEST.cc @@ -291,3 +291,39 @@ TEST(SignalHandler, MultipleThreads) for (int i = 0; i < threadCount; ++i) EXPECT_EQ(SIGINT, results[i]); } + +///////////////////////////////////////////////// +TEST(SignalHandler, RapidFire) +{ + resetSignals(); + std::condition_variable cv; + std::mutex countMutex; + int countHandlerCalls = 0; + constexpr int kNumSignals = 100; + auto cb = [&](int _sig) + { + if (_sig == SIGTERM) + { + std::lock_guard lk(countMutex); + ++countHandlerCalls; + if (countHandlerCalls >= kNumSignals) + { + cv.notify_one(); + } + } + }; + common::SignalHandler handler1; + EXPECT_TRUE(handler1.AddCallback(cb)); + + for (int i=0; i < kNumSignals; ++i) + { + std::raise(SIGTERM); + std::this_thread::sleep_for(std::chrono::microseconds(100)); + } + + // wait for callback to be called kNumSignal times with a timeout + std::unique_lock lk(countMutex); + cv.wait_for(lk, std::chrono::seconds(5), + [&] { return countHandlerCalls >= kNumSignals; }); + EXPECT_GE(countHandlerCalls, kNumSignals); +} From a6c47dea6d6696be1b1d23ae72c01d8bf589d025 Mon Sep 17 00:00:00 2001 From: "Addisu Z. Taddese" Date: Tue, 20 Aug 2024 22:56:16 -0500 Subject: [PATCH 6/7] Fix linter Signed-off-by: Addisu Z. Taddese --- src/SignalHandler.cc | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/src/SignalHandler.cc b/src/SignalHandler.cc index 480a6e3e..7427d456 100644 --- a/src/SignalHandler.cc +++ b/src/SignalHandler.cc @@ -38,7 +38,8 @@ using namespace gz; using namespace common; // A wrapper for the sigaction sa_handler. -// TODO(azeey) We should avoid using objects with non-trivial destructors as globals. +// TODO(azeey) We should avoid using objects with non-trivial destructors as +// globals. GZ_COMMON_VISIBLE std::map> gOnSignalWrappers; std::mutex gWrapperMutex; @@ -54,10 +55,10 @@ constexpr int kReadFd = 0; /// \brief Index of the write file descriptor constexpr int kWriteFd = 1; -/// \brief Class to encalpsulate the self-pipe trick which is a way enable the user of -/// non async-signal-safe functions in downstream signal handler -/// callbacks. -/// +/// \brief Class to encalpsulate the self-pipe trick which is a way enable +/// the user of non async-signal-safe functions in downstream signal handler +/// callbacks. +/// /// It works by creating a pipe between the actual signal handler and /// a servicing thread. When a signal is received the signal handler /// writes a byte to the pipe and returns. The servicing thread reads the @@ -83,7 +84,7 @@ class SelfPipe { /// \brief Servicing thread private: void CheckPipe(); - + /// \brief Handle for CheckPipe thread private: std::thread checkPipeThread; @@ -103,7 +104,7 @@ void onSignalWriteToSelfPipe(int _value) auto valueByte = static_cast(_value); if (write(SelfPipe::pipeFd[kWriteFd], &valueByte, 1) == -1) { - // TODO (azeey) Not clear what to do here. + // TODO(azeey) Not clear what to do here. } } @@ -123,7 +124,8 @@ SelfPipe::SelfPipe() int flags = fcntl(this->pipeFd[kWriteFd], F_GETFL); if (fcntl(this->pipeFd[kWriteFd], F_SETFL, flags | O_NONBLOCK) < 0) { - gzerr << "Failed to set flags on pipe. Signal handling may not work properly" << std::endl; + gzerr << "Failed to set flags on pipe. " + << "Signal handling may not work properly" << std::endl; } #endif this->running = true; From 06d0586f40e26d36cf96fe1dbc210294238ddcb4 Mon Sep 17 00:00:00 2001 From: "Addisu Z. Taddese" Date: Wed, 21 Aug 2024 00:35:06 -0500 Subject: [PATCH 7/7] Fix test on windows Signed-off-by: Addisu Z. Taddese --- src/SignalHandler.cc | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/SignalHandler.cc b/src/SignalHandler.cc index 7427d456..0612c03f 100644 --- a/src/SignalHandler.cc +++ b/src/SignalHandler.cc @@ -101,6 +101,10 @@ int SelfPipe::pipeFd[2]; /// \param[in] _value Signal number. void onSignalWriteToSelfPipe(int _value) { +#ifdef _WIN32 + // Windows resets the signal handler every time a signal is handled. + std::signal(_value, onSignalWriteToSelfPipe); +#endif auto valueByte = static_cast(_value); if (write(SelfPipe::pipeFd[kWriteFd], &valueByte, 1) == -1) {