Skip to content

Commit

Permalink
Gracefully handle SIGINT and SIGTERM in rosbag2 recorder (#1301) (#1394)
Browse files Browse the repository at this point in the history
* Gracefully handle SIGINT and SIGTERM signal for rosbag2 recorder

- Call recorder->stop() and exec_->cancel() instead of rclcpp::shutdown

Signed-off-by: Michael Orlov <[email protected]>

* Use singleton for static executor in the rosbag2_py::Recorder

Signed-off-by: Michael Orlov <[email protected]>

* Rollback to the non-static executor and don't call executor->cancel()

- In case when signal will arrive we will trigger our internal exit_
variable and wait while current exec_->spin_all(10msec) will exit.

Signed-off-by: Michael Orlov <[email protected]>

* Spin recorder node in a separate thread for better handling exit

- Run exec->spin() in a separate thread, because we need to call
exec->cancel() after recorder->stop() to be able to send notifications
about bag split and close.
- Wait on conditional variable for exit_ flag

Signed-off-by: Michael Orlov <[email protected]>

* Address race condition in rosbag2_py.test_record_cancel

- Add `record_thread.join()` before trying to parse metadata.

Signed-off-by: Michael Orlov <[email protected]>

---------

Signed-off-by: Michael Orlov <[email protected]>
(cherry picked from commit 46a23e9)

Co-authored-by: Michael Orlov <[email protected]>
  • Loading branch information
mergify[bot] and MichaelOrlov authored Jun 14, 2023
1 parent 019a037 commit 5bc6122
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 12 deletions.
51 changes: 39 additions & 12 deletions rosbag2_py/src/rosbag2_py/_transport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -189,17 +189,20 @@ class Player

class Recorder
{
private:
std::unique_ptr<rclcpp::executors::SingleThreadedExecutor> exec_;

public:
Recorder()
{
rclcpp::init(0, nullptr);
exec_ = std::make_unique<rclcpp::executors::SingleThreadedExecutor>();
auto init_options = rclcpp::InitOptions();
init_options.shutdown_on_signal = false;
rclcpp::init(0, nullptr, init_options, rclcpp::SignalHandlerOptions::None);

std::signal(
SIGTERM, [](int /* signal */) {
rclcpp::shutdown();
rosbag2_py::Recorder::cancel();
});
std::signal(
SIGINT, [](int /* signal */) {
rosbag2_py::Recorder::cancel();
});
}

Expand All @@ -213,6 +216,8 @@ class Recorder
RecordOptions & record_options,
std::string & node_name)
{
exit_ = false;
auto exec = std::make_unique<rclcpp::executors::SingleThreadedExecutor>();
if (record_options.rmw_serialization_format.empty()) {
record_options.rmw_serialization_format = std::string(rmw_get_serialization_format());
}
Expand All @@ -222,20 +227,42 @@ class Recorder
std::move(writer), storage_options, record_options, node_name);
recorder->record();

exec_->add_node(recorder);
// Release the GIL for long-running record, so that calling Python code can use other threads
exec->add_node(recorder);
// Run exec->spin() in a separate thread, because we need to call exec->cancel() after
// recorder->stop() to be able to send notifications about bag split and close.
auto spin_thread = std::thread(
[&exec]() {
exec->spin();
});
{
// Release the GIL for long-running record, so that calling Python code can use other threads
py::gil_scoped_release release;
exec_->spin();
std::unique_lock<std::mutex> lock(wait_for_exit_mutex_);
wait_for_exit_cv_.wait(lock, [] {return rosbag2_py::Recorder::exit_.load();});
recorder->stop();
}
exec->cancel();
if (spin_thread.joinable()) {
spin_thread.join();
}
exec->remove_node(recorder);
}

void cancel()
static void cancel()
{
exec_->cancel();
exit_ = true;
wait_for_exit_cv_.notify_all();
}

protected:
static std::atomic_bool exit_;
static std::condition_variable wait_for_exit_cv_;
std::mutex wait_for_exit_mutex_;
};

std::atomic_bool Recorder::exit_{false};
std::condition_variable Recorder::wait_for_exit_cv_{};

// Return a RecordOptions struct with defaults set for rewriting bags.
rosbag2_transport::RecordOptions bag_rewrite_default_record_options()
{
Expand Down Expand Up @@ -360,7 +387,7 @@ PYBIND11_MODULE(_transport, m) {
.def(
"record", &rosbag2_py::Recorder::record, py::arg("storage_options"), py::arg("record_options"),
py::arg("node_name") = "rosbag2_recorder")
.def("cancel", &rosbag2_py::Recorder::cancel)
.def_static("cancel", &rosbag2_py::Recorder::cancel)
;

m.def(
Expand Down
1 change: 1 addition & 0 deletions rosbag2_py/test/test_transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ def test_record_cancel(tmp_path, storage_id):
metadata_io = rosbag2_py.MetadataIo()
assert wait_for(lambda: metadata_io.metadata_file_exists(bag_path),
timeout=rclpy.duration.Duration(seconds=3))
record_thread.join()

metadata = metadata_io.read_metadata(bag_path)
assert(len(metadata.relative_file_paths))
Expand Down

0 comments on commit 5bc6122

Please sign in to comment.