From 9819e5f31189ec19ac4e0adf8d15ebdebd62f61c Mon Sep 17 00:00:00 2001 From: Michael Orlov Date: Sat, 10 Jun 2023 23:48:22 -0700 Subject: [PATCH] Spin recorder node in a separate thread for better handling exit - Run exec->spin() in a separate thread, because we need to call exec->cancel() after recorder->stop() to be able to send notifications about bag split and close. - Wait on conditional variable for exit_ flag Signed-off-by: Michael Orlov --- rosbag2_py/src/rosbag2_py/_transport.cpp | 44 +++++++++++++----------- 1 file changed, 23 insertions(+), 21 deletions(-) diff --git a/rosbag2_py/src/rosbag2_py/_transport.cpp b/rosbag2_py/src/rosbag2_py/_transport.cpp index dcdd2d5825..3285162540 100644 --- a/rosbag2_py/src/rosbag2_py/_transport.cpp +++ b/rosbag2_py/src/rosbag2_py/_transport.cpp @@ -189,11 +189,6 @@ class Player class Recorder { -private: - using ExecutorT = rclcpp::executors::SingleThreadedExecutor; - std::unique_ptr exec_; - std::shared_ptr recorder_; - public: Recorder() { @@ -201,7 +196,6 @@ class Recorder init_options.shutdown_on_signal = false; rclcpp::init(0, nullptr, init_options, rclcpp::SignalHandlerOptions::None); - exec_ = std::make_unique(); std::signal( SIGTERM, [](int /* signal */) { rosbag2_py::Recorder::cancel(); @@ -222,42 +216,50 @@ class Recorder RecordOptions & record_options, std::string & node_name) { + exit_ = false; + auto exec = std::make_unique(); if (record_options.rmw_serialization_format.empty()) { record_options.rmw_serialization_format = std::string(rmw_get_serialization_format()); } auto writer = rosbag2_transport::ReaderWriterFactory::make_writer(record_options); - recorder_ = std::make_shared( + auto recorder = std::make_shared( std::move(writer), storage_options, record_options, node_name); - recorder_->record(); + recorder->record(); - exec_->add_node(recorder_); - try { + exec->add_node(recorder); + // Run exec->spin() in a separate thread, because we need to call exec->cancel() after + // recorder->stop() to be able to send notifications about bag split and close. + auto spin_thread = std::thread( + [&exec]() { + exec->spin(); + }); + { // Release the GIL for long-running record, so that calling Python code can use other threads py::gil_scoped_release release; - while (!exit_) { - exec_->spin_all(std::chrono::milliseconds(10)); - } - if (recorder_) { - recorder_->stop(); - // Need to spin node to be able to send notifications about bag split and close - exec_->spin_all(std::chrono::milliseconds(10)); - } - } catch (...) { - exec_->remove_node(recorder_); + std::unique_lock lock(wait_for_exit_mutex_); + wait_for_exit_cv_.wait(lock, [] {return rosbag2_py::Recorder::exit_.load();}); + recorder->stop(); } - exec_->remove_node(recorder_); + exec->cancel(); + if (spin_thread.joinable()) {spin_thread.join();} + exec->remove_node(recorder); } static void cancel() { exit_ = true; + wait_for_exit_cv_.notify_all(); } +protected: static std::atomic_bool exit_; + static std::condition_variable wait_for_exit_cv_; + std::mutex wait_for_exit_mutex_; }; std::atomic_bool Recorder::exit_{false}; +std::condition_variable Recorder::wait_for_exit_cv_{}; // Return a RecordOptions struct with defaults set for rewriting bags. rosbag2_transport::RecordOptions bag_rewrite_default_record_options()