Skip to content

Commit

Permalink
Spin recorder node in a separate thread for better handling exit
Browse files Browse the repository at this point in the history
- Run exec->spin() in a separate thread, because we need to call
exec->cancel() after recorder->stop() to be able to send notifications
about bag split and close.
- Wait on conditional variable for exit_ flag

Signed-off-by: Michael Orlov <[email protected]>
  • Loading branch information
MichaelOrlov committed Jun 11, 2023
1 parent 8a08167 commit 9819e5f
Showing 1 changed file with 23 additions and 21 deletions.
44 changes: 23 additions & 21 deletions rosbag2_py/src/rosbag2_py/_transport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -189,19 +189,13 @@ class Player

class Recorder
{
private:
using ExecutorT = rclcpp::executors::SingleThreadedExecutor;
std::unique_ptr<ExecutorT> exec_;
std::shared_ptr<rosbag2_transport::Recorder> recorder_;

public:
Recorder()
{
auto init_options = rclcpp::InitOptions();
init_options.shutdown_on_signal = false;
rclcpp::init(0, nullptr, init_options, rclcpp::SignalHandlerOptions::None);

exec_ = std::make_unique<ExecutorT>();
std::signal(
SIGTERM, [](int /* signal */) {
rosbag2_py::Recorder::cancel();
Expand All @@ -222,42 +216,50 @@ class Recorder
RecordOptions & record_options,
std::string & node_name)
{
exit_ = false;
auto exec = std::make_unique<rclcpp::executors::SingleThreadedExecutor>();
if (record_options.rmw_serialization_format.empty()) {
record_options.rmw_serialization_format = std::string(rmw_get_serialization_format());
}

auto writer = rosbag2_transport::ReaderWriterFactory::make_writer(record_options);
recorder_ = std::make_shared<rosbag2_transport::Recorder>(
auto recorder = std::make_shared<rosbag2_transport::Recorder>(
std::move(writer), storage_options, record_options, node_name);
recorder_->record();
recorder->record();

exec_->add_node(recorder_);
try {
exec->add_node(recorder);
// Run exec->spin() in a separate thread, because we need to call exec->cancel() after
// recorder->stop() to be able to send notifications about bag split and close.
auto spin_thread = std::thread(
[&exec]() {
exec->spin();
});
{
// Release the GIL for long-running record, so that calling Python code can use other threads
py::gil_scoped_release release;
while (!exit_) {
exec_->spin_all(std::chrono::milliseconds(10));
}
if (recorder_) {
recorder_->stop();
// Need to spin node to be able to send notifications about bag split and close
exec_->spin_all(std::chrono::milliseconds(10));
}
} catch (...) {
exec_->remove_node(recorder_);
std::unique_lock<std::mutex> lock(wait_for_exit_mutex_);
wait_for_exit_cv_.wait(lock, [] {return rosbag2_py::Recorder::exit_.load();});
recorder->stop();
}
exec_->remove_node(recorder_);
exec->cancel();
if (spin_thread.joinable()) {spin_thread.join();}
exec->remove_node(recorder);
}

static void cancel()
{
exit_ = true;
wait_for_exit_cv_.notify_all();
}

protected:
static std::atomic_bool exit_;
static std::condition_variable wait_for_exit_cv_;
std::mutex wait_for_exit_mutex_;
};

std::atomic_bool Recorder::exit_{false};
std::condition_variable Recorder::wait_for_exit_cv_{};

// Return a RecordOptions struct with defaults set for rewriting bags.
rosbag2_transport::RecordOptions bag_rewrite_default_record_options()
Expand Down

0 comments on commit 9819e5f

Please sign in to comment.