Skip to content

Commit

Permalink
Rollback to the non-static executor and don't call executor->cancel()
Browse files Browse the repository at this point in the history
- In case when signal will arrive we will trigger our internal exit_
variable and wait while current exec_->spin_all(10msec) will exit.

Signed-off-by: Michael Orlov <[email protected]>
  • Loading branch information
MichaelOrlov committed Jun 9, 2023
1 parent a474446 commit 744b09a
Showing 1 changed file with 14 additions and 24 deletions.
38 changes: 14 additions & 24 deletions rosbag2_py/src/rosbag2_py/_transport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,27 +191,17 @@ class Recorder
{
private:
using ExecutorT = rclcpp::executors::SingleThreadedExecutor;
static std::unique_ptr<ExecutorT, void (*)(ExecutorT *)> & get_executor_instance()
{
static std::unique_ptr<ExecutorT, void (*)(ExecutorT *)> executor(
new ExecutorT(),
[](ExecutorT * exec_ptr) {
delete exec_ptr;
rclcpp::shutdown();
}
);
return executor;
}
std::unique_ptr<ExecutorT> exec_;
std::shared_ptr<rosbag2_transport::Recorder> recorder_;

public:
Recorder()
{
auto init_options = rclcpp::InitOptions();
init_options.shutdown_on_signal = false;
if (!rclcpp::ok()) {
rclcpp::init(0, nullptr, init_options, rclcpp::SignalHandlerOptions::None);
}
rclcpp::init(0, nullptr, init_options, rclcpp::SignalHandlerOptions::None);

exec_ = std::make_unique<ExecutorT>();
std::signal(
SIGTERM, [](int /* signal */) {
rosbag2_py::Recorder::cancel();
Expand All @@ -222,7 +212,10 @@ class Recorder
});
}

virtual ~Recorder() = default;
virtual ~Recorder()
{
rclcpp::shutdown();
}

void record(
const rosbag2_storage::StorageOptions & storage_options,
Expand All @@ -238,30 +231,27 @@ class Recorder
std::move(writer), storage_options, record_options, node_name);
recorder_->record();

get_executor_instance()->add_node(recorder_);
exec_->add_node(recorder_);
try {
// Release the GIL for long-running record, so that calling Python code can use other threads
py::gil_scoped_release release;
while (!exit_) {
get_executor_instance()->spin_all(std::chrono::milliseconds(30));
exec_->spin_all(std::chrono::milliseconds(10));
}
if (recorder_) {
recorder_->stop();
// Need to spin once to be able to send notifications about bag split and close
get_executor_instance()->spin_some(std::chrono::milliseconds(30));
// Need to spin node to be able to send notifications about bag split and close
exec_->spin_all(std::chrono::milliseconds(10));
}
} catch (...) {
get_executor_instance()->remove_node(recorder_);
exec_->remove_node(recorder_);
}
get_executor_instance()->remove_node(recorder_);
exec_->remove_node(recorder_);
}

static void cancel()
{
exit_ = true;
if (get_executor_instance()) {
get_executor_instance()->cancel();
}
}

static std::atomic_bool exit_;
Expand Down

0 comments on commit 744b09a

Please sign in to comment.