Skip to content

Commit

Permalink
Fix transient local IPC publish (#2708) (#2722)
Browse files Browse the repository at this point in the history
* Fix transient local publish when inter and intra process communications are both present.

* Apply the fix to TypeAdapted signature

* Add an executor to intra_process_inter_process_mix_transient_local test case to enable inter process publishing test

Signed-off-by: Jeffery Hsu <[email protected]>
Co-authored-by: Tomoya Fujita <[email protected]>
(cherry picked from commit f5e08c2)

Co-authored-by: Jeffery Hsu <[email protected]>
  • Loading branch information
mergify[bot] and jefferyyjhsu authored Jan 7, 2025
1 parent 985e320 commit 7c89626
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 7 deletions.
11 changes: 9 additions & 2 deletions rclcpp/include/rclcpp/publisher.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -251,8 +251,12 @@ class Publisher : public PublisherBase
// interprocess publish, resulting in lower publish-to-subscribe latency.
// It's not possible to do that with an unique_ptr,
// as do_intra_process_publish takes the ownership of the message.

// When durability is set to TransientLocal (i.e. there is a buffer),
// inter process publish should always take place to ensure
// late joiners receive past data.
bool inter_process_publish_needed =
get_subscription_count() > get_intra_process_subscription_count();
get_subscription_count() > get_intra_process_subscription_count() || buffer_;

if (inter_process_publish_needed) {
auto shared_msg =
Expand Down Expand Up @@ -329,8 +333,11 @@ class Publisher : public PublisherBase
return;
}

// When durability is set to TransientLocal (i.e. there is a buffer),
// inter process publish should always take place to ensure
// late joiners receive past data.
bool inter_process_publish_needed =
get_subscription_count() > get_intra_process_subscription_count();
get_subscription_count() > get_intra_process_subscription_count() || buffer_;

if (inter_process_publish_needed) {
auto ros_msg_ptr = std::make_shared<ROSMessageType>();
Expand Down
48 changes: 43 additions & 5 deletions rclcpp/test/rclcpp/test_publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include <gtest/gtest.h>

#include <future>
#include <chrono>
#include <memory>
#include <string>
Expand Down Expand Up @@ -716,11 +717,7 @@ TEST_F(TestPublisher, intra_process_transient_local) {
auto do_nothing = [](std::shared_ptr<const test_msgs::msg::Empty>) {};
struct IntraProcessCallback
{
void callback_fun(size_t s)
{
(void) s;
called = true;
}
void callback_fun(size_t) {called = true;}
bool called = false;
};
rclcpp::SubscriptionOptions sub_options_ipm_disabled;
Expand Down Expand Up @@ -777,3 +774,44 @@ TEST_F(TestPublisher, intra_process_transient_local) {
EXPECT_FALSE(callback3.called);
EXPECT_FALSE(callback4.called);
}

TEST_F(TestPublisher, intra_process_inter_process_mix_transient_local) {
constexpr auto history_depth = 10u;
initialize(rclcpp::NodeOptions().use_intra_process_comms(true));

rclcpp::PublisherOptionsWithAllocator<std::allocator<void>> pub_options_ipm_enabled;
pub_options_ipm_enabled.use_intra_process_comm = rclcpp::IntraProcessSetting::Enable;

auto pub_ipm_enabled_transient_local_enabled = node->create_publisher<test_msgs::msg::Empty>(
"topic1",
rclcpp::QoS(rclcpp::KeepLast(history_depth)).transient_local(), pub_options_ipm_enabled);

test_msgs::msg::Empty msg;
pub_ipm_enabled_transient_local_enabled->publish(msg);

auto do_nothing = [](std::shared_ptr<const test_msgs::msg::Empty>) {};
struct SubscriptionCallback
{
void callback_fun(size_t) {called.set_value();}
std::promise<void> called;
};
rclcpp::SubscriptionOptions sub_options_ipm_disabled;
sub_options_ipm_disabled.use_intra_process_comm = rclcpp::IntraProcessSetting::Disable;
SubscriptionCallback intra_callback, inter_callback;
std::future<void> intra_callback_future = intra_callback.called.get_future(),
inter_callback_future = inter_callback.called.get_future();
auto sub_ipm_disabled_transient_local_enabled = node->create_subscription<test_msgs::msg::Empty>(
"topic1",
rclcpp::QoS(rclcpp::KeepLast(history_depth)).transient_local(),
do_nothing, sub_options_ipm_disabled);
sub_ipm_disabled_transient_local_enabled->set_on_new_intra_process_message_callback(
std::bind(&SubscriptionCallback::callback_fun, &intra_callback, std::placeholders::_1));
sub_ipm_disabled_transient_local_enabled->set_on_new_message_callback(
std::bind(&SubscriptionCallback::callback_fun, &inter_callback, std::placeholders::_1));
rclcpp::executors::SingleThreadedExecutor executor;
executor.add_node(node);
EXPECT_EQ(executor.spin_until_future_complete(inter_callback_future,
std::chrono::milliseconds(100)), rclcpp::FutureReturnCode::SUCCESS);
EXPECT_EQ(executor.spin_until_future_complete(intra_callback_future,
std::chrono::milliseconds(100)), rclcpp::FutureReturnCode::TIMEOUT);
}

0 comments on commit 7c89626

Please sign in to comment.