From e3f6142c17cddbaabaf3ae265a1e9a52d2e61560 Mon Sep 17 00:00:00 2001 From: Chen Lihui Date: Fri, 27 Nov 2020 16:39:29 +0800 Subject: [PATCH 1/8] Add functions for supporting a feature of content filtered topic Signed-off-by: Chen Lihui --- rmw_implementation/src/functions.cpp | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/rmw_implementation/src/functions.cpp b/rmw_implementation/src/functions.cpp index c666a810..49614df0 100644 --- a/rmw_implementation/src/functions.cpp +++ b/rmw_implementation/src/functions.cpp @@ -414,6 +414,16 @@ RMW_INTERFACE_FN( rmw_ret_t, RMW_RET_ERROR, 3, ARG_TYPES(rmw_event_t *, const rmw_subscription_t *, rmw_event_type_t)) +RMW_INTERFACE_FN( + rmw_subscription_set_cft_expression_parameters, + rmw_ret_t, RMW_RET_ERROR, + 3, ARG_TYPES(const rmw_subscription_t *, const char *, const rcutils_string_array_t *)) + +RMW_INTERFACE_FN( + rmw_subscription_get_cft_expression_parameters, + rmw_ret_t, RMW_RET_ERROR, + 3, ARG_TYPES(const rmw_subscription_t *, char **, rcutils_string_array_t *)) + RMW_INTERFACE_FN( rmw_take, rmw_ret_t, RMW_RET_ERROR, @@ -762,6 +772,8 @@ void prefetch_symbols(void) GET_SYMBOL(rmw_subscription_count_matched_publishers); GET_SYMBOL(rmw_subscription_get_actual_qos); GET_SYMBOL(rmw_subscription_event_init) + GET_SYMBOL(rmw_subscription_set_cft_expression_parameters) + GET_SYMBOL(rmw_subscription_get_cft_expression_parameters) GET_SYMBOL(rmw_take) GET_SYMBOL(rmw_take_with_info) GET_SYMBOL(rmw_take_serialized_message) @@ -872,6 +884,8 @@ unload_library() symbol_rmw_subscription_count_matched_publishers = nullptr; symbol_rmw_subscription_get_actual_qos = nullptr; symbol_rmw_subscription_event_init = nullptr; + symbol_rmw_subscription_set_cft_expression_parameters = nullptr; + symbol_rmw_subscription_get_cft_expression_parameters = nullptr; symbol_rmw_take = nullptr; symbol_rmw_take_sequence = nullptr; symbol_rmw_take_with_info = nullptr; From 14331370f38ddf86b97c29da9a46ebe95404b6d0 Mon Sep 17 00:00:00 2001 From: Chen Lihui Date: Fri, 12 Mar 2021 13:00:37 +0800 Subject: [PATCH 2/8] to support reset content filtered topic with empty string. remove constness for rmw_subscription because is_cft_supported might be updated Signed-off-by: Chen Lihui --- rmw_implementation/src/functions.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rmw_implementation/src/functions.cpp b/rmw_implementation/src/functions.cpp index 49614df0..5fc9dddf 100644 --- a/rmw_implementation/src/functions.cpp +++ b/rmw_implementation/src/functions.cpp @@ -417,7 +417,7 @@ RMW_INTERFACE_FN( RMW_INTERFACE_FN( rmw_subscription_set_cft_expression_parameters, rmw_ret_t, RMW_RET_ERROR, - 3, ARG_TYPES(const rmw_subscription_t *, const char *, const rcutils_string_array_t *)) + 3, ARG_TYPES(rmw_subscription_t *, const char *, const rcutils_string_array_t *)) RMW_INTERFACE_FN( rmw_subscription_get_cft_expression_parameters, From 5dcf45ab7a5b63371b4eedaf16c635319ab92c43 Mon Sep 17 00:00:00 2001 From: Chen Lihui Date: Tue, 12 Oct 2021 09:57:08 +0800 Subject: [PATCH 3/8] Update interface Signed-off-by: Chen Lihui --- rmw_implementation/src/functions.cpp | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/rmw_implementation/src/functions.cpp b/rmw_implementation/src/functions.cpp index 5fc9dddf..d96daccb 100644 --- a/rmw_implementation/src/functions.cpp +++ b/rmw_implementation/src/functions.cpp @@ -417,12 +417,15 @@ RMW_INTERFACE_FN( RMW_INTERFACE_FN( rmw_subscription_set_cft_expression_parameters, rmw_ret_t, RMW_RET_ERROR, - 3, ARG_TYPES(rmw_subscription_t *, const char *, const rcutils_string_array_t *)) + 2, ARG_TYPES( + rmw_subscription_t *, const rmw_subscription_content_filtered_topic_options_t *)) RMW_INTERFACE_FN( rmw_subscription_get_cft_expression_parameters, rmw_ret_t, RMW_RET_ERROR, - 3, ARG_TYPES(const rmw_subscription_t *, char **, rcutils_string_array_t *)) + 3, ARG_TYPES( + const rmw_subscription_t *, rcutils_allocator_t *, + rmw_subscription_content_filtered_topic_options_t *)) RMW_INTERFACE_FN( rmw_take, From af46b95d47569c96ce5ffb018aad06340f2e3cbc Mon Sep 17 00:00:00 2001 From: Chen Lihui Date: Tue, 26 Oct 2021 14:26:19 +0800 Subject: [PATCH 4/8] rename Signed-off-by: Chen Lihui --- rmw_implementation/src/functions.cpp | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/rmw_implementation/src/functions.cpp b/rmw_implementation/src/functions.cpp index d96daccb..99c934e3 100644 --- a/rmw_implementation/src/functions.cpp +++ b/rmw_implementation/src/functions.cpp @@ -415,17 +415,17 @@ RMW_INTERFACE_FN( 3, ARG_TYPES(rmw_event_t *, const rmw_subscription_t *, rmw_event_type_t)) RMW_INTERFACE_FN( - rmw_subscription_set_cft_expression_parameters, + rmw_subscription_set_content_filter, rmw_ret_t, RMW_RET_ERROR, 2, ARG_TYPES( - rmw_subscription_t *, const rmw_subscription_content_filtered_topic_options_t *)) + rmw_subscription_t *, const rmw_subscription_content_filter_options_t *)) RMW_INTERFACE_FN( - rmw_subscription_get_cft_expression_parameters, + rmw_subscription_get_content_filter, rmw_ret_t, RMW_RET_ERROR, 3, ARG_TYPES( const rmw_subscription_t *, rcutils_allocator_t *, - rmw_subscription_content_filtered_topic_options_t *)) + rmw_subscription_content_filter_options_t *)) RMW_INTERFACE_FN( rmw_take, @@ -775,8 +775,8 @@ void prefetch_symbols(void) GET_SYMBOL(rmw_subscription_count_matched_publishers); GET_SYMBOL(rmw_subscription_get_actual_qos); GET_SYMBOL(rmw_subscription_event_init) - GET_SYMBOL(rmw_subscription_set_cft_expression_parameters) - GET_SYMBOL(rmw_subscription_get_cft_expression_parameters) + GET_SYMBOL(rmw_subscription_set_content_filter) + GET_SYMBOL(rmw_subscription_get_content_filter) GET_SYMBOL(rmw_take) GET_SYMBOL(rmw_take_with_info) GET_SYMBOL(rmw_take_serialized_message) @@ -887,8 +887,8 @@ unload_library() symbol_rmw_subscription_count_matched_publishers = nullptr; symbol_rmw_subscription_get_actual_qos = nullptr; symbol_rmw_subscription_event_init = nullptr; - symbol_rmw_subscription_set_cft_expression_parameters = nullptr; - symbol_rmw_subscription_get_cft_expression_parameters = nullptr; + symbol_rmw_subscription_set_content_filter = nullptr; + symbol_rmw_subscription_get_content_filter = nullptr; symbol_rmw_take = nullptr; symbol_rmw_take_sequence = nullptr; symbol_rmw_take_with_info = nullptr; From bec59fdbe2ce2877ff50b0030a9ef78f68aca57f Mon Sep 17 00:00:00 2001 From: Chen Lihui Date: Thu, 13 Jan 2022 12:52:42 +0800 Subject: [PATCH 5/8] add test cases Signed-off-by: Chen Lihui --- .../test/test_subscription.cpp | 539 ++++++++++++++++++ 1 file changed, 539 insertions(+) diff --git a/test_rmw_implementation/test/test_subscription.cpp b/test_rmw_implementation/test/test_subscription.cpp index 0b13d7d5..d9fd32b0 100644 --- a/test_rmw_implementation/test/test_subscription.cpp +++ b/test_rmw_implementation/test/test_subscription.cpp @@ -22,6 +22,7 @@ #include "rmw/rmw.h" #include "rmw/error_handling.h" +#include "rmw/subscription_content_filter_options.h" #include "test_msgs/msg/basic_types.h" #include "test_msgs/msg/strings.h" @@ -1041,3 +1042,541 @@ bool operator==(const rmw_message_info_t & m1, const rmw_message_info_t & m2) m1.received_timestamp == m2.received_timestamp && m1.from_intra_process == m2.from_intra_process; } + +class CLASSNAME (TestContentFilterSubscriptionUse, RMW_IMPLEMENTATION) + : public CLASSNAME(TestSubscription, RMW_IMPLEMENTATION) +{ +protected: + using Base = CLASSNAME(TestSubscription, RMW_IMPLEMENTATION); + + void SetUp() override + { + Base::SetUp(); + // Tighten QoS policies to force mismatch. + qos_profile.reliability = RMW_QOS_POLICY_RELIABILITY_RELIABLE; + rmw_subscription_options_t options = rmw_get_default_subscription_options(); + + // Create subscription with content filter + auto allocator = rcutils_get_default_allocator(); + + rmw_subscription_content_filter_options_t * content_filter_options = + static_cast( + allocator.allocate( + sizeof(rmw_subscription_content_filter_options_t), allocator.state)); + OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( + { + allocator.deallocate(content_filter_options, allocator.state); + }); + *content_filter_options = rmw_get_zero_initialized_content_filter_options(); + + EXPECT_EQ( + RMW_RET_OK, rmw_subscription_content_filter_options_init( + filter_expression, + expression_parameters_count, + expression_parameters, + &allocator, + content_filter_options)); + OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( + { + EXPECT_EQ( + RMW_RET_OK, + rmw_subscription_content_filter_options_fini(content_filter_options, &allocator)); + }); + options.content_filter_options = content_filter_options; + + sub = rmw_create_subscription(node, ts, topic_name, &qos_profile, &options); + ASSERT_NE(nullptr, sub) << rmw_get_error_string().str; + } + + void TearDown() override + { + rmw_ret_t ret = rmw_destroy_subscription(node, sub); + EXPECT_EQ(RMW_RET_OK, ret) << rmw_get_error_string().str; + Base::TearDown(); + } + + rmw_subscription_t * sub{nullptr}; + const char * const topic_name = "/test"; + const rosidl_message_type_support_t * ts{ + ROSIDL_GET_MSG_TYPE_SUPPORT(test_msgs, msg, BasicTypes)}; + rmw_qos_profile_t qos_profile{rmw_qos_profile_default}; + const char * filter_expression = "float32_value=%0"; + enum { expression_parameters_count = 1 }; + const char * expression_parameters[expression_parameters_count] = { + "3.14159" + }; + const char * expression_parameters_2[expression_parameters_count] = { + "3.14" + }; +}; + +TEST_F(CLASSNAME(TestContentFilterSubscriptionUse, RMW_IMPLEMENTATION), get_content_filter) { + rmw_subscription_content_filter_options_t options; + auto allocator = rcutils_get_default_allocator(); + rmw_ret_t ret = rmw_subscription_get_content_filter(sub, &allocator, &options); + if (sub->is_cft_enabled) { + EXPECT_EQ(RMW_RET_OK, ret); + EXPECT_STREQ(options.filter_expression, filter_expression); + ASSERT_NE(nullptr, options.expression_parameters); + ASSERT_EQ(expression_parameters_count, options.expression_parameters->size); + for (size_t i = 0; i < options.expression_parameters->size; ++i) { + EXPECT_STREQ( + options.expression_parameters->data[i], + expression_parameters[i]); + } + EXPECT_EQ( + RMW_RET_OK, + rmw_subscription_content_filter_options_fini(&options, &allocator)); + } else { + EXPECT_EQ(RMW_RET_UNSUPPORTED, ret); + } +} + +TEST_F(CLASSNAME(TestContentFilterSubscriptionUse, RMW_IMPLEMENTATION), set_content_filter) { + rmw_subscription_content_filter_options_t options = + rmw_get_zero_initialized_content_filter_options(); + auto allocator = rcutils_get_default_allocator(); + EXPECT_EQ( + RMW_RET_OK, rmw_subscription_content_filter_options_init( + filter_expression, + expression_parameters_count, + expression_parameters_2, + &allocator, + &options)); + OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( + { + EXPECT_EQ( + RMW_RET_OK, + rmw_subscription_content_filter_options_fini(&options, &allocator)); + }); + + bool is_cft_enabled = sub->is_cft_enabled; + rmw_ret_t ret = rmw_subscription_set_content_filter(sub, &options); + if (is_cft_enabled) { + EXPECT_EQ(RMW_RET_OK, ret); + } else { + EXPECT_EQ(RMW_RET_UNSUPPORTED, ret); + } +} + +TEST_F(CLASSNAME(TestContentFilterSubscriptionUse, RMW_IMPLEMENTATION), content_filter_get_begin) { + rmw_ret_t ret; + bool taken = false; + + // Create publisher + rmw_publisher_options_t pub_options = rmw_get_default_publisher_options(); + rmw_publisher_t * pub = rmw_create_publisher(node, ts, topic_name, &qos_profile, &pub_options); + ASSERT_NE(nullptr, pub) << rmw_get_error_string().str; + OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( + { + EXPECT_EQ(RMW_RET_OK, rmw_destroy_publisher(node, pub)) << rmw_get_error_string().str; + }); + + bool is_cft_enabled = sub->is_cft_enabled; + + size_t subscription_count = 0u; + SLEEP_AND_RETRY_UNTIL(rmw_intraprocess_discovery_delay, rmw_intraprocess_discovery_delay * 10) { + ret = rmw_publisher_count_matched_subscriptions(pub, &subscription_count); + if (RMW_RET_OK == ret && 1u == subscription_count) { // Early return on failure. + break; + } + } + + // Publish message with float (3.14159) from publisher to subscription + test_msgs__msg__BasicTypes original_message{}; + ASSERT_TRUE(test_msgs__msg__BasicTypes__init(&original_message)); + original_message.float32_value = 3.14159f; + OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( + { + test_msgs__msg__BasicTypes__fini(&original_message); + }); + + rmw_publisher_allocation_t * null_allocation_p{nullptr}; + rmw_subscription_allocation_t * null_allocation_s{nullptr}; + + ret = rmw_publish(pub, &original_message, null_allocation_p); + EXPECT_EQ(RMW_RET_OK, ret) << rmw_get_error_string().str; + + // expect to get message because content filter is 3.14159 + { + rmw_subscriptions_t subscriptions; + void * subscriptions_storage[1]; + subscriptions_storage[0] = sub->data; + subscriptions.subscribers = subscriptions_storage; + subscriptions.subscriber_count = 1; + + rmw_wait_set_t * wait_set = rmw_create_wait_set(&context, 1); + ASSERT_NE(nullptr, wait_set) << rmw_get_error_string().str; + OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( + { + EXPECT_EQ( + RMW_RET_OK, rmw_destroy_wait_set(wait_set)) << rmw_get_error_string().str; + }); + rmw_time_t timeout = {1, 0}; // 1000ms + ret = rmw_wait(&subscriptions, nullptr, nullptr, nullptr, nullptr, wait_set, &timeout); + EXPECT_EQ(RMW_RET_OK, ret) << rmw_get_error_string().str; + ASSERT_NE(nullptr, subscriptions.subscribers[0]); + + test_msgs__msg__BasicTypes output_message{}; + ASSERT_TRUE(test_msgs__msg__BasicTypes__init(&output_message)); + OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( + { + test_msgs__msg__BasicTypes__fini(&output_message); + }); + + ret = rmw_take(sub, &output_message, &taken, null_allocation_s); + // whatever the cft is supported or not + EXPECT_EQ(RMW_RET_OK, ret) << rmw_get_error_string().str; + + EXPECT_TRUE(taken); + EXPECT_EQ(original_message, output_message); + } + + // set content filter with 3.14 + if (is_cft_enabled) { + rmw_subscription_content_filter_options_t options = + rmw_get_zero_initialized_content_filter_options(); + auto allocator = rcutils_get_default_allocator(); + EXPECT_EQ( + RMW_RET_OK, rmw_subscription_content_filter_options_init( + filter_expression, + expression_parameters_count, + expression_parameters_2, + &allocator, + &options)); + OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( + { + EXPECT_EQ( + RMW_RET_OK, + rmw_subscription_content_filter_options_fini(&options, &allocator)); + }); + + EXPECT_EQ( + RMW_RET_OK, + rmw_subscription_set_content_filter(sub, &options)); + + ret = rmw_publish(pub, &original_message, null_allocation_p); + EXPECT_EQ(RMW_RET_OK, ret) << rmw_get_error_string().str; + + rmw_subscriptions_t subscriptions; + void * subscriptions_storage[1]; + subscriptions_storage[0] = sub->data; + subscriptions.subscribers = subscriptions_storage; + subscriptions.subscriber_count = 1; + + rmw_wait_set_t * wait_set = rmw_create_wait_set(&context, 1); + ASSERT_NE(nullptr, wait_set) << rmw_get_error_string().str; + OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( + { + EXPECT_EQ( + RMW_RET_OK, rmw_destroy_wait_set(wait_set)) << rmw_get_error_string().str; + }); + rmw_time_t timeout = {1, 0}; // 1000ms + ret = rmw_wait(&subscriptions, nullptr, nullptr, nullptr, nullptr, wait_set, &timeout); + EXPECT_EQ(RMW_RET_TIMEOUT, ret) << rmw_get_error_string().str; + ASSERT_EQ(nullptr, subscriptions.subscribers[0]); + + // content filter subscription with 3.14 that is not equal with 3.14159 + { + test_msgs__msg__BasicTypes output_message{}; + ASSERT_TRUE(test_msgs__msg__BasicTypes__init(&output_message)); + OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( + { + test_msgs__msg__BasicTypes__fini(&output_message); + }); + + ret = rmw_take(sub, &output_message, &taken, null_allocation_s); + EXPECT_EQ(RMW_RET_OK, ret) << rmw_get_error_string().str; + + EXPECT_FALSE(taken); + } + } +} + +TEST_F(CLASSNAME(TestContentFilterSubscriptionUse, RMW_IMPLEMENTATION), content_filter_get_later) { + rmw_ret_t ret; + bool taken = false; + + // Create publisher + rmw_publisher_options_t pub_options = rmw_get_default_publisher_options(); + rmw_publisher_t * pub = rmw_create_publisher(node, ts, topic_name, &qos_profile, &pub_options); + ASSERT_NE(nullptr, pub) << rmw_get_error_string().str; + OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( + { + EXPECT_EQ(RMW_RET_OK, rmw_destroy_publisher(node, pub)) << rmw_get_error_string().str; + }); + + bool is_cft_enabled = sub->is_cft_enabled; + + size_t subscription_count = 0u; + SLEEP_AND_RETRY_UNTIL(rmw_intraprocess_discovery_delay, rmw_intraprocess_discovery_delay * 10) { + ret = rmw_publisher_count_matched_subscriptions(pub, &subscription_count); + if (RMW_RET_OK == ret && 1u == subscription_count) { // Early return on failure. + break; + } + } + + // Publish message with float (3.14) from publisher to subscription + test_msgs__msg__BasicTypes original_message{}; + ASSERT_TRUE(test_msgs__msg__BasicTypes__init(&original_message)); + original_message.float32_value = 3.14f; + OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( + { + test_msgs__msg__BasicTypes__fini(&original_message); + }); + + rmw_publisher_allocation_t * null_allocation_p{nullptr}; + rmw_subscription_allocation_t * null_allocation_s{nullptr}; + + ret = rmw_publish(pub, &original_message, null_allocation_p); + EXPECT_EQ(RMW_RET_OK, ret) << rmw_get_error_string().str; + + { + rmw_subscriptions_t subscriptions; + void * subscriptions_storage[1]; + subscriptions_storage[0] = sub->data; + subscriptions.subscribers = subscriptions_storage; + subscriptions.subscriber_count = 1; + + rmw_wait_set_t * wait_set = rmw_create_wait_set(&context, 1); + ASSERT_NE(nullptr, wait_set) << rmw_get_error_string().str; + OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( + { + EXPECT_EQ( + RMW_RET_OK, rmw_destroy_wait_set(wait_set)) << rmw_get_error_string().str; + }); + rmw_time_t timeout = {1, 0}; // 1000ms + ret = rmw_wait(&subscriptions, nullptr, nullptr, nullptr, nullptr, wait_set, &timeout); + + // if cft is supported, not to expect to get the message because content filter is 3.14159 + if (is_cft_enabled) { + EXPECT_EQ(RMW_RET_TIMEOUT, ret) << rmw_get_error_string().str; + ASSERT_EQ(nullptr, subscriptions.subscribers[0]); + } else { + EXPECT_EQ(RMW_RET_OK, ret) << rmw_get_error_string().str; + ASSERT_NE(nullptr, subscriptions.subscribers[0]); + } + + test_msgs__msg__BasicTypes output_message{}; + ASSERT_TRUE(test_msgs__msg__BasicTypes__init(&output_message)); + OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( + { + test_msgs__msg__BasicTypes__fini(&output_message); + }); + + ret = rmw_take(sub, &output_message, &taken, null_allocation_s); + EXPECT_EQ(RMW_RET_OK, ret) << rmw_get_error_string().str; + + if (is_cft_enabled) { + EXPECT_FALSE(taken); + } else { + EXPECT_TRUE(taken); + EXPECT_EQ(original_message, output_message); + } + } + + // set content filter with 3.14 + if (is_cft_enabled) { + rmw_subscription_content_filter_options_t options = + rmw_get_zero_initialized_content_filter_options(); + auto allocator = rcutils_get_default_allocator(); + EXPECT_EQ( + RMW_RET_OK, rmw_subscription_content_filter_options_init( + filter_expression, + expression_parameters_count, + expression_parameters_2, + &allocator, + &options)); + OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( + { + EXPECT_EQ( + RMW_RET_OK, + rmw_subscription_content_filter_options_fini(&options, &allocator)); + }); + + EXPECT_EQ( + RMW_RET_OK, + rmw_subscription_set_content_filter(sub, &options)); + + ret = rmw_publish(pub, &original_message, null_allocation_p); + EXPECT_EQ(RMW_RET_OK, ret) << rmw_get_error_string().str; + + rmw_subscriptions_t subscriptions; + void * subscriptions_storage[1]; + subscriptions_storage[0] = sub->data; + subscriptions.subscribers = subscriptions_storage; + subscriptions.subscriber_count = 1; + + rmw_wait_set_t * wait_set = rmw_create_wait_set(&context, 1); + ASSERT_NE(nullptr, wait_set) << rmw_get_error_string().str; + OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( + { + EXPECT_EQ( + RMW_RET_OK, rmw_destroy_wait_set(wait_set)) << rmw_get_error_string().str; + }); + rmw_time_t timeout = {5, 0}; // 5000ms + ret = rmw_wait(&subscriptions, nullptr, nullptr, nullptr, nullptr, wait_set, &timeout); + EXPECT_EQ(RMW_RET_OK, ret) << rmw_get_error_string().str; + ASSERT_NE(nullptr, subscriptions.subscribers[0]); + + // content filter subscription with 3.14 that is equal with 3.14 + { + test_msgs__msg__BasicTypes output_message{}; + ASSERT_TRUE(test_msgs__msg__BasicTypes__init(&output_message)); + OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( + { + test_msgs__msg__BasicTypes__fini(&output_message); + }); + + ret = rmw_take(sub, &output_message, &taken, null_allocation_s); + EXPECT_EQ(RMW_RET_OK, ret) << rmw_get_error_string().str; + + EXPECT_TRUE(taken); + EXPECT_EQ(original_message, output_message); + } + } +} + +TEST_F(CLASSNAME(TestContentFilterSubscriptionUse, RMW_IMPLEMENTATION), content_filter_reset) { + rmw_ret_t ret; + bool taken = false; + + // Create publisher + rmw_publisher_options_t pub_options = rmw_get_default_publisher_options(); + rmw_publisher_t * pub = rmw_create_publisher(node, ts, topic_name, &qos_profile, &pub_options); + ASSERT_NE(nullptr, pub) << rmw_get_error_string().str; + OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( + { + EXPECT_EQ(RMW_RET_OK, rmw_destroy_publisher(node, pub)) << rmw_get_error_string().str; + }); + + bool is_cft_enabled = sub->is_cft_enabled; + + size_t subscription_count = 0u; + SLEEP_AND_RETRY_UNTIL(rmw_intraprocess_discovery_delay, rmw_intraprocess_discovery_delay * 10) { + ret = rmw_publisher_count_matched_subscriptions(pub, &subscription_count); + if (RMW_RET_OK == ret && 1u == subscription_count) { // Early return on failure. + break; + } + } + + // Publish message with float (3.14) from publisher to subscription + test_msgs__msg__BasicTypes original_message{}; + ASSERT_TRUE(test_msgs__msg__BasicTypes__init(&original_message)); + original_message.float32_value = 3.14f; + OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( + { + test_msgs__msg__BasicTypes__fini(&original_message); + }); + + rmw_publisher_allocation_t * null_allocation_p{nullptr}; + rmw_subscription_allocation_t * null_allocation_s{nullptr}; + + ret = rmw_publish(pub, &original_message, null_allocation_p); + EXPECT_EQ(RMW_RET_OK, ret) << rmw_get_error_string().str; + + { + rmw_subscriptions_t subscriptions; + void * subscriptions_storage[1]; + subscriptions_storage[0] = sub->data; + subscriptions.subscribers = subscriptions_storage; + subscriptions.subscriber_count = 1; + + rmw_wait_set_t * wait_set = rmw_create_wait_set(&context, 1); + ASSERT_NE(nullptr, wait_set) << rmw_get_error_string().str; + OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( + { + EXPECT_EQ( + RMW_RET_OK, rmw_destroy_wait_set(wait_set)) << rmw_get_error_string().str; + }); + rmw_time_t timeout = {1, 0}; // 1000ms + ret = rmw_wait(&subscriptions, nullptr, nullptr, nullptr, nullptr, wait_set, &timeout); + + // if cft is supported, not to expect to get the message because content filter is 3.14159 + if (is_cft_enabled) { + EXPECT_EQ(RMW_RET_TIMEOUT, ret) << rmw_get_error_string().str; + ASSERT_EQ(nullptr, subscriptions.subscribers[0]); + } else { + EXPECT_EQ(RMW_RET_OK, ret) << rmw_get_error_string().str; + ASSERT_NE(nullptr, subscriptions.subscribers[0]); + } + + test_msgs__msg__BasicTypes output_message{}; + ASSERT_TRUE(test_msgs__msg__BasicTypes__init(&output_message)); + OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( + { + test_msgs__msg__BasicTypes__fini(&output_message); + }); + + ret = rmw_take(sub, &output_message, &taken, null_allocation_s); + EXPECT_EQ(RMW_RET_OK, ret) << rmw_get_error_string().str; + + if (is_cft_enabled) { + EXPECT_FALSE(taken); + } else { + EXPECT_TRUE(taken); + EXPECT_EQ(original_message, output_message); + } + } + + // set content filter with empty("") + if (is_cft_enabled) { + rmw_subscription_content_filter_options_t options = + rmw_get_zero_initialized_content_filter_options(); + auto allocator = rcutils_get_default_allocator(); + EXPECT_EQ( + RMW_RET_OK, rmw_subscription_content_filter_options_init( + "", + 0, + nullptr, + &allocator, + &options)); + OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( + { + EXPECT_EQ( + RMW_RET_OK, + rmw_subscription_content_filter_options_fini(&options, &allocator)); + }); + + EXPECT_EQ( + RMW_RET_OK, + rmw_subscription_set_content_filter(sub, &options)); + + ret = rmw_publish(pub, &original_message, null_allocation_p); + EXPECT_EQ(RMW_RET_OK, ret) << rmw_get_error_string().str; + + rmw_subscriptions_t subscriptions; + void * subscriptions_storage[1]; + subscriptions_storage[0] = sub->data; + subscriptions.subscribers = subscriptions_storage; + subscriptions.subscriber_count = 1; + + rmw_wait_set_t * wait_set = rmw_create_wait_set(&context, 1); + ASSERT_NE(nullptr, wait_set) << rmw_get_error_string().str; + OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( + { + EXPECT_EQ( + RMW_RET_OK, rmw_destroy_wait_set(wait_set)) << rmw_get_error_string().str; + }); + rmw_time_t timeout = {5, 0}; // 5000ms + ret = rmw_wait(&subscriptions, nullptr, nullptr, nullptr, nullptr, wait_set, &timeout); + EXPECT_EQ(RMW_RET_OK, ret) << rmw_get_error_string().str; + ASSERT_NE(nullptr, subscriptions.subscribers[0]); + + // content filter subscription is reset + { + test_msgs__msg__BasicTypes output_message{}; + ASSERT_TRUE(test_msgs__msg__BasicTypes__init(&output_message)); + OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( + { + test_msgs__msg__BasicTypes__fini(&output_message); + }); + + ret = rmw_take(sub, &output_message, &taken, null_allocation_s); + EXPECT_EQ(RMW_RET_OK, ret) << rmw_get_error_string().str; + + EXPECT_TRUE(taken); + EXPECT_EQ(original_message, output_message); + } + } +} From 549091fb929faef013070321d9ea91e31fa97d06 Mon Sep 17 00:00:00 2001 From: Chen Lihui Date: Thu, 13 Jan 2022 16:52:06 +0800 Subject: [PATCH 6/8] add another test Signed-off-by: Chen Lihui --- .../test/test_subscription.cpp | 137 ++++++++++++++++++ 1 file changed, 137 insertions(+) diff --git a/test_rmw_implementation/test/test_subscription.cpp b/test_rmw_implementation/test/test_subscription.cpp index d9fd32b0..d898e8be 100644 --- a/test_rmw_implementation/test/test_subscription.cpp +++ b/test_rmw_implementation/test/test_subscription.cpp @@ -871,6 +871,143 @@ TEST_F( RMW_RET_OK, rmw_serialized_message_fini(&serialized_message)) << rmw_get_error_string().str; } +TEST_F(CLASSNAME(TestSubscriptionUse, RMW_IMPLEMENTATION), no_content_filter_get) { + rmw_subscription_content_filter_options_t options; + auto allocator = rcutils_get_default_allocator(); + rmw_ret_t ret = rmw_subscription_get_content_filter(sub, &allocator, &options); + EXPECT_NE(RMW_RET_OK, ret); +} + +TEST_F(CLASSNAME(TestSubscriptionUse, RMW_IMPLEMENTATION), no_content_filter_set) { + rmw_ret_t ret; + bool taken = false; + + // Create publisher + rmw_publisher_options_t pub_options = rmw_get_default_publisher_options(); + rmw_publisher_t * pub = rmw_create_publisher(node, ts, topic_name, &qos_profile, &pub_options); + ASSERT_NE(nullptr, pub) << rmw_get_error_string().str; + OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( + { + EXPECT_EQ(RMW_RET_OK, rmw_destroy_publisher(node, pub)) << rmw_get_error_string().str; + }); + + size_t subscription_count = 0u; + SLEEP_AND_RETRY_UNTIL(rmw_intraprocess_discovery_delay, rmw_intraprocess_discovery_delay * 10) { + ret = rmw_publisher_count_matched_subscriptions(pub, &subscription_count); + if (RMW_RET_OK == ret && 1u == subscription_count) { // Early return on failure. + break; + } + } + + // Publish message with float (3.14159) from publisher to subscription + test_msgs__msg__BasicTypes original_message{}; + ASSERT_TRUE(test_msgs__msg__BasicTypes__init(&original_message)); + original_message.float32_value = 3.14159f; + OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( + { + test_msgs__msg__BasicTypes__fini(&original_message); + }); + + rmw_publisher_allocation_t * null_allocation_p{nullptr}; + rmw_subscription_allocation_t * null_allocation_s{nullptr}; + + ret = rmw_publish(pub, &original_message, null_allocation_p); + EXPECT_EQ(RMW_RET_OK, ret) << rmw_get_error_string().str; + + // expect to get message because no content filter set + { + rmw_subscriptions_t subscriptions; + void * subscriptions_storage[1]; + subscriptions_storage[0] = sub->data; + subscriptions.subscribers = subscriptions_storage; + subscriptions.subscriber_count = 1; + + rmw_wait_set_t * wait_set = rmw_create_wait_set(&context, 1); + ASSERT_NE(nullptr, wait_set) << rmw_get_error_string().str; + OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( + { + EXPECT_EQ( + RMW_RET_OK, rmw_destroy_wait_set(wait_set)) << rmw_get_error_string().str; + }); + rmw_time_t timeout = {1, 0}; // 1000ms + ret = rmw_wait(&subscriptions, nullptr, nullptr, nullptr, nullptr, wait_set, &timeout); + EXPECT_EQ(RMW_RET_OK, ret) << rmw_get_error_string().str; + ASSERT_NE(nullptr, subscriptions.subscribers[0]); + + test_msgs__msg__BasicTypes output_message{}; + ASSERT_TRUE(test_msgs__msg__BasicTypes__init(&output_message)); + OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( + { + test_msgs__msg__BasicTypes__fini(&output_message); + }); + + ret = rmw_take(sub, &output_message, &taken, null_allocation_s); + // whatever the cft is supported or not + EXPECT_EQ(RMW_RET_OK, ret) << rmw_get_error_string().str; + + EXPECT_TRUE(taken); + EXPECT_EQ(original_message, output_message); + } + + // set content filter with 3.14 + rmw_subscription_content_filter_options_t options = + rmw_get_zero_initialized_content_filter_options(); + auto allocator = rcutils_get_default_allocator(); + EXPECT_EQ( + RMW_RET_OK, rmw_subscription_content_filter_options_init( + "float32_value=3.14", + 0, + nullptr, + &allocator, + &options)); + OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( + { + EXPECT_EQ( + RMW_RET_OK, + rmw_subscription_content_filter_options_fini(&options, &allocator)); + }); + + ret = rmw_subscription_set_content_filter(sub, &options); + if (ret != RMW_RET_UNSUPPORTED) { + ASSERT_EQ(RMW_RET_OK, ret); + ret = rmw_publish(pub, &original_message, null_allocation_p); + EXPECT_EQ(RMW_RET_OK, ret) << rmw_get_error_string().str; + + rmw_subscriptions_t subscriptions; + void * subscriptions_storage[1]; + subscriptions_storage[0] = sub->data; + subscriptions.subscribers = subscriptions_storage; + subscriptions.subscriber_count = 1; + + rmw_wait_set_t * wait_set = rmw_create_wait_set(&context, 1); + ASSERT_NE(nullptr, wait_set) << rmw_get_error_string().str; + OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( + { + EXPECT_EQ( + RMW_RET_OK, rmw_destroy_wait_set(wait_set)) << rmw_get_error_string().str; + }); + rmw_time_t timeout = {1, 0}; // 1000ms + ret = rmw_wait(&subscriptions, nullptr, nullptr, nullptr, nullptr, wait_set, &timeout); + EXPECT_EQ(RMW_RET_TIMEOUT, ret) << rmw_get_error_string().str; + ASSERT_EQ(nullptr, subscriptions.subscribers[0]); + + // content filter subscription with 3.14 that is not equal with 3.14159 + { + test_msgs__msg__BasicTypes output_message{}; + ASSERT_TRUE(test_msgs__msg__BasicTypes__init(&output_message)); + OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( + { + test_msgs__msg__BasicTypes__fini(&output_message); + }); + + ret = rmw_take(sub, &output_message, &taken, null_allocation_s); + EXPECT_EQ(RMW_RET_OK, ret) << rmw_get_error_string().str; + + EXPECT_FALSE(taken); + } + } +} + class CLASSNAME (TestSubscriptionUseLoan, RMW_IMPLEMENTATION) : public CLASSNAME(TestSubscriptionUse, RMW_IMPLEMENTATION) { From f780a036e2f3650762c9adc4646209d4ed0403f4 Mon Sep 17 00:00:00 2001 From: Chen Lihui Date: Thu, 17 Mar 2022 11:11:35 +0800 Subject: [PATCH 7/8] relate to `rcutils_string_array_t expression_parameters` changed in rmw Signed-off-by: Chen Lihui --- test_rmw_implementation/test/test_subscription.cpp | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/test_rmw_implementation/test/test_subscription.cpp b/test_rmw_implementation/test/test_subscription.cpp index d898e8be..4494dfa2 100644 --- a/test_rmw_implementation/test/test_subscription.cpp +++ b/test_rmw_implementation/test/test_subscription.cpp @@ -1254,11 +1254,10 @@ TEST_F(CLASSNAME(TestContentFilterSubscriptionUse, RMW_IMPLEMENTATION), get_cont if (sub->is_cft_enabled) { EXPECT_EQ(RMW_RET_OK, ret); EXPECT_STREQ(options.filter_expression, filter_expression); - ASSERT_NE(nullptr, options.expression_parameters); - ASSERT_EQ(expression_parameters_count, options.expression_parameters->size); - for (size_t i = 0; i < options.expression_parameters->size; ++i) { + ASSERT_EQ(expression_parameters_count, options.expression_parameters.size); + for (size_t i = 0; i < options.expression_parameters.size; ++i) { EXPECT_STREQ( - options.expression_parameters->data[i], + options.expression_parameters.data[i], expression_parameters[i]); } EXPECT_EQ( From 57395495657f549f1e3e1cf7c7e3df612eb05613 Mon Sep 17 00:00:00 2001 From: Chen Lihui Date: Sun, 20 Mar 2022 12:55:58 +0800 Subject: [PATCH 8/8] wait to allow for filter propagation Signed-off-by: Chen Lihui --- test_rmw_implementation/test/test_subscription.cpp | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/test_rmw_implementation/test/test_subscription.cpp b/test_rmw_implementation/test/test_subscription.cpp index 4494dfa2..38e54db3 100644 --- a/test_rmw_implementation/test/test_subscription.cpp +++ b/test_rmw_implementation/test/test_subscription.cpp @@ -14,6 +14,9 @@ #include +#include +#include + #include "osrf_testing_tools_cpp/memory_tools/gtest_quickstart.hpp" #include "osrf_testing_tools_cpp/scope_exit.hpp" @@ -970,6 +973,8 @@ TEST_F(CLASSNAME(TestSubscriptionUse, RMW_IMPLEMENTATION), no_content_filter_set ret = rmw_subscription_set_content_filter(sub, &options); if (ret != RMW_RET_UNSUPPORTED) { ASSERT_EQ(RMW_RET_OK, ret); + // waiting to allow for filter propagation + std::this_thread::sleep_for(std::chrono::seconds(1)); ret = rmw_publish(pub, &original_message, null_allocation_p); EXPECT_EQ(RMW_RET_OK, ret) << rmw_get_error_string().str; @@ -1390,6 +1395,8 @@ TEST_F(CLASSNAME(TestContentFilterSubscriptionUse, RMW_IMPLEMENTATION), content_ EXPECT_EQ( RMW_RET_OK, rmw_subscription_set_content_filter(sub, &options)); + // waiting to allow for filter propagation + std::this_thread::sleep_for(std::chrono::seconds(1)); ret = rmw_publish(pub, &original_message, null_allocation_p); EXPECT_EQ(RMW_RET_OK, ret) << rmw_get_error_string().str; @@ -1533,6 +1540,8 @@ TEST_F(CLASSNAME(TestContentFilterSubscriptionUse, RMW_IMPLEMENTATION), content_ EXPECT_EQ( RMW_RET_OK, rmw_subscription_set_content_filter(sub, &options)); + // waiting to allow for filter propagation + std::this_thread::sleep_for(std::chrono::seconds(1)); ret = rmw_publish(pub, &original_message, null_allocation_p); EXPECT_EQ(RMW_RET_OK, ret) << rmw_get_error_string().str; @@ -1677,6 +1686,8 @@ TEST_F(CLASSNAME(TestContentFilterSubscriptionUse, RMW_IMPLEMENTATION), content_ EXPECT_EQ( RMW_RET_OK, rmw_subscription_set_content_filter(sub, &options)); + // waiting to allow for filter propagation + std::this_thread::sleep_for(std::chrono::seconds(1)); ret = rmw_publish(pub, &original_message, null_allocation_p); EXPECT_EQ(RMW_RET_OK, ret) << rmw_get_error_string().str;