Skip to content

Commit

Permalink
Use singleton for static executor in the rosbag2_py::Recorder
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Orlov <[email protected]>
  • Loading branch information
MichaelOrlov committed Jun 3, 2023
1 parent 1f2c53a commit 3197b6f
Showing 1 changed file with 26 additions and 15 deletions.
41 changes: 26 additions & 15 deletions rosbag2_py/src/rosbag2_py/_transport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -190,16 +190,28 @@ class Player
class Recorder
{
private:
static std::unique_ptr<rclcpp::executors::SingleThreadedExecutor> exec_;
using ExecutorT = rclcpp::executors::SingleThreadedExecutor;
static std::unique_ptr<ExecutorT, void (*)(ExecutorT *)> & get_executor_instance()
{
static std::unique_ptr<ExecutorT, void (*)(ExecutorT *)> executor(
new ExecutorT(),
[](ExecutorT * exec_ptr) {
delete exec_ptr;
rclcpp::shutdown();
}
);
return executor;
}
std::shared_ptr<rosbag2_transport::Recorder> recorder_;

public:
Recorder()
{
auto init_options = rclcpp::InitOptions();
init_options.shutdown_on_signal = false;
rclcpp::init(0, nullptr, init_options, rclcpp::SignalHandlerOptions::None);
exec_ = std::make_unique<rclcpp::executors::SingleThreadedExecutor>();
if (!rclcpp::ok()) {
rclcpp::init(0, nullptr, init_options, rclcpp::SignalHandlerOptions::None);
}
std::signal(
SIGTERM, [](int /* signal */) {
rosbag2_py::Recorder::cancel();
Expand All @@ -210,10 +222,7 @@ class Recorder
});
}

virtual ~Recorder()
{
rclcpp::shutdown();
}
virtual ~Recorder() = default;

void record(
const rosbag2_storage::StorageOptions & storage_options,
Expand All @@ -229,33 +238,35 @@ class Recorder
std::move(writer), storage_options, record_options, node_name);
recorder_->record();

exec_->add_node(recorder_);
// Release the GIL for long-running record, so that calling Python code can use other threads
{
get_executor_instance()->add_node(recorder_);
try {
// Release the GIL for long-running record, so that calling Python code can use other threads
py::gil_scoped_release release;
while (!exit_) {
exec_->spin_all(std::chrono::milliseconds(30));
get_executor_instance()->spin_all(std::chrono::milliseconds(30));
}
if (recorder_) {
recorder_->stop();
// Need to spin once to be able to send notifications about bag split and close
exec_->spin_some(std::chrono::milliseconds(30));
get_executor_instance()->spin_some(std::chrono::milliseconds(30));
}
} catch (...) {
get_executor_instance()->remove_node(recorder_);
}
get_executor_instance()->remove_node(recorder_);
}

static void cancel()
{
exit_ = true;
if (exec_) {
exec_->cancel();
if (get_executor_instance()) {
get_executor_instance()->cancel();
}
}

static std::atomic_bool exit_;
};

std::unique_ptr<rclcpp::executors::SingleThreadedExecutor> Recorder::exec_;
std::atomic_bool Recorder::exit_{false};

// Return a RecordOptions struct with defaults set for rewriting bags.
Expand Down

0 comments on commit 3197b6f

Please sign in to comment.