Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for user-specified content filters #68

Merged
merged 30 commits into from
Mar 22, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
1e0027e
Add support for user-specified content filters.
asorbini Oct 22, 2021
d4788f9
- Resolve memory leak of custom content-filter resources
asorbini Oct 25, 2021
d9a2ba0
Assume non-null options argument
asorbini Oct 25, 2021
1c313e3
- Return error when retrieving content-filter from a subscription tha…
asorbini Oct 25, 2021
3c532ce
Fix compilation error, oops.
asorbini Oct 25, 2021
669ff31
- Define RMW_CONNEXT_DEBUG when building Debug libraries.
asorbini Oct 25, 2021
13087f1
Resolve memory leak for finalization on error.
asorbini Oct 26, 2021
883e9cf
Rename content filter public API.
asorbini Oct 26, 2021
ced2a15
Add client/service QoS getters (#67)
mauropasse Nov 19, 2021
896839f
Changelogs
ivanpauno Nov 19, 2021
a3bbfb6
0.8.1
ivanpauno Nov 19, 2021
8b10deb
Fix cpplint errors (#69)
jacobperron Jan 12, 2022
099692a
0.8.2
paudrow Jan 15, 2022
70d6e2e
Update rti-connext-dds dependency to 6.0.1. (#71)
nuclearsandwich Feb 10, 2022
bbdf5af
0.8.3
nuclearsandwich Feb 10, 2022
4129331
Add rmw listener apis (#44)
Feb 24, 2022
49f3b55
Changelog. (#73)
clalancette Mar 1, 2022
4becb93
0.9.0
clalancette Mar 1, 2022
203f820
add stub for content filtered topic
Mar 18, 2022
5a36784
* Rebased branch asorbini/cft on top of 0.9.0.
asorbini Mar 19, 2022
d43b032
Move custom SQL filter to rmw_connextdds_common
asorbini Mar 19, 2022
60a347c
Try to resolve linking error on Windows.
asorbini Mar 19, 2022
d7d211c
Optionally disable writer-side CFT optimizations to support Windows.
asorbini Mar 20, 2022
ad985bc
No need to declare private CFT function on Windows.
asorbini Mar 20, 2022
f44dceb
Merge branch 'master' into asorbini/cft
fujitatomoya Mar 21, 2022
d30bb6e
remove stub implementation for ContentFilteredTopic.
fujitatomoya Mar 21, 2022
995e244
address cpplint error.
fujitatomoya Mar 21, 2022
b12c6b2
Avoid conversion warnings on Windows.
asorbini Mar 21, 2022
b1b5851
Use strtol instead of sscanf to avoid warnings on Windows.
asorbini Mar 21, 2022
3edb5ae
Avoid finalizing participants if factory is not available.
asorbini Mar 21, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions rmw_connextdds/src/rmw_api_impl_ndds.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -647,6 +647,24 @@ rmw_subscription_get_actual_qos(
return rmw_api_connextdds_subscription_get_actual_qos(subscription, qos);
}

rmw_ret_t
rmw_subscription_set_cft_expression_parameters(
rmw_subscription_t * subscription,
const rmw_subscription_content_filtered_topic_options_t * options)
{
return rmw_api_connextdds_subscription_set_cft_expression_parameters(
subscription, options);
}

rmw_ret_t
rmw_subscription_get_cft_expression_parameters(
const rmw_subscription_t * subscription,
rcutils_allocator_t * allocator,
rmw_subscription_content_filtered_topic_options_t * options)
{
return rmw_api_connextdds_subscription_get_cft_expression_parameters(
subscription, allocator, options);
}

rmw_ret_t
rmw_destroy_subscription(
Expand Down
3 changes: 2 additions & 1 deletion rmw_connextdds_common/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ set(RMW_CONNEXT_DEPS
rosidl_typesupport_fastrtps_cpp
rosidl_typesupport_introspection_c
rosidl_typesupport_introspection_cpp
rti_connext_dds_cmake_module)
rti_connext_dds_cmake_module
rti_connext_dds_custom_sql_filter)

foreach(pkg_dep ${RMW_CONNEXT_DEPS})
find_package(${pkg_dep} REQUIRED)
Expand Down
18 changes: 18 additions & 0 deletions rmw_connextdds_common/include/rmw_connextdds/dds_api.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,19 @@ rmw_connextdds_initialize_participant_qos_impl(
rmw_context_impl_t * const ctx,
DDS_DomainParticipantQos * const dp_qos);

rmw_ret_t
rmw_connextdds_configure_participant(
rmw_context_impl_t * const ctx,
DDS_DomainParticipant * const participant);

rmw_ret_t
rmw_connextdds_create_contentfilteredtopic(
rmw_context_impl_t * const ctx,
DDS_DomainParticipant * const dp,
DDS_Topic * const base_topic,
const char * const cft_name,
const char * const cft_filter,
const rcutils_string_array_t * const cft_expression_parameters,
DDS_TopicDescription ** const cft_out);

rmw_ret_t
Expand Down Expand Up @@ -258,4 +264,16 @@ rmw_connextdds_enable_security(
DDS_SECURITY_PROPERTY_PREFIX ".logging.log_level"
#endif /* DDS_SECURITY_LOGGING_LEVEL_PROPERTY */

rmw_ret_t
rmw_connextdds_set_cft_filter_expression(
DDS_TopicDescription * const topic_desc,
const char * const cft_expression,
const rcutils_string_array_t * const cft_expression_parameters);

rmw_ret_t
rmw_connextdds_get_cft_filter_expression(
DDS_TopicDescription * const topic_desc,
rcutils_allocator_t * const allocator,
rmw_subscription_content_filtered_topic_options_t * const options);

#endif // RMW_CONNEXTDDS__DDS_API_HPP_
12 changes: 12 additions & 0 deletions rmw_connextdds_common/include/rmw_connextdds/rmw_api_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,18 @@ rmw_api_connextdds_subscription_get_actual_qos(
const rmw_subscription_t * subscription,
rmw_qos_profile_t * qos);

RMW_CONNEXTDDS_PUBLIC
rmw_ret_t
rmw_api_connextdds_subscription_set_cft_expression_parameters(
rmw_subscription_t * subscription,
const rmw_subscription_content_filtered_topic_options_t * options);

RMW_CONNEXTDDS_PUBLIC
rmw_ret_t
rmw_api_connextdds_subscription_get_cft_expression_parameters(
const rmw_subscription_t * subscription,
rcutils_allocator_t * const allocator,
rmw_subscription_content_filtered_topic_options_t * options);

RMW_CONNEXTDDS_PUBLIC
rmw_ret_t
Expand Down
22 changes: 22 additions & 0 deletions rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,15 @@ class RMW_Connext_Subscriber
rmw_message_info_t * const message_info,
bool * const taken);

rmw_ret_t
set_cft_expression_parameters(
const rmw_subscription_content_filtered_topic_options_t * const options);

rmw_ret_t
get_cft_expression_parameters(
rcutils_allocator_t * allocator,
rmw_subscription_content_filtered_topic_options_t * const options);

bool
has_data()
{
Expand Down Expand Up @@ -470,6 +479,17 @@ class RMW_Connext_Subscriber
return this->dds_topic;
}

static std::string get_atomic_id()
{
static std::atomic_uint64_t id;
return std::to_string(id++);
}

bool is_cft_enabled()
{
return !this->cft_expression.empty();
}

const bool internal;
const bool ignore_local;

Expand All @@ -478,6 +498,7 @@ class RMW_Connext_Subscriber
DDS_DataReader * dds_reader;
DDS_Topic * dds_topic;
DDS_TopicDescription * dds_topic_cft;
std::string cft_expression;
RMW_Connext_MessageTypeSupport * type_support;
rmw_gid_t ros_gid;
const bool created_topic;
Expand All @@ -496,6 +517,7 @@ class RMW_Connext_Subscriber
const bool ignore_local,
const bool created_topic,
DDS_TopicDescription * const dds_topic_cft,
const char * const cft_expression,
const bool internal);

friend class RMW_Connext_SubscriberStatusCondition;
Expand Down
1 change: 1 addition & 0 deletions rmw_connextdds_common/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
<buildtool_export_depend>ament_cmake</buildtool_export_depend>

<depend>rti_connext_dds_cmake_module</depend>
<depend>rti_connext_dds_custom_sql_filter</depend>
<depend>fastcdr</depend>
<depend>rcutils</depend>
<depend>rcpputils</depend>
Expand Down
9 changes: 8 additions & 1 deletion rmw_connextdds_common/src/common/rmw_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,12 @@ rmw_context_impl_t::initialize_participant(const bool localhost_only)
return RMW_RET_ERROR;
}

rmw_ret_t cfg_rc = rmw_connextdds_configure_participant(this, this->participant);
if (RMW_RET_OK != cfg_rc) {
RMW_CONNEXT_LOG_ERROR("failed to configure DDS participant")
return cfg_rc;
}

/* Create DDS publisher/subscriber objects that will be used for all DDS
writers/readers created to support RMW publishers/subscriptions. */

Expand Down Expand Up @@ -386,7 +392,7 @@ rmw_context_impl_t::finalize_participant()
if (nullptr != this->participant) {
// If we are cleaning up after some RMW failure, it is possible for some
// DataWriter to not have been deleted.
// Call DDS_Publisher_delete_contained_entities() to make sure we can
// Call DDS_DomainParticipant_delete_contained_entities() to make sure we can
// dispose the publisher.
if (DDS_RETCODE_OK !=
DDS_DomainParticipant_delete_contained_entities(this->participant))
Expand All @@ -402,6 +408,7 @@ rmw_context_impl_t::finalize_participant()
RMW_CONNEXT_LOG_ERROR_SET("failed to delete DDS participant")
return RMW_RET_ERROR;
}

this->participant = nullptr;
}

Expand Down
79 changes: 68 additions & 11 deletions rmw_connextdds_common/src/common/rmw_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
const char * const ROS_TOPIC_PREFIX = "rt";
const char * const ROS_SERVICE_REQUESTER_PREFIX = ROS_SERVICE_REQUESTER_PREFIX_STR;
const char * const ROS_SERVICE_RESPONSE_PREFIX = ROS_SERVICE_RESPONSE_PREFIX_STR;
const char * const ROS_CFT_TOPIC_NAME_INFIX = "_ContentFilterTopic";

std::string
rmw_connextdds_create_topic_name(
Expand Down Expand Up @@ -1129,13 +1130,15 @@ RMW_Connext_Subscriber::RMW_Connext_Subscriber(
const bool ignore_local,
const bool created_topic,
DDS_TopicDescription * const dds_topic_cft,
const char * const cft_expression,
const bool internal)
: internal(internal),
ignore_local(ignore_local),
ctx(ctx),
dds_reader(dds_reader),
dds_topic(dds_topic),
dds_topic_cft(dds_topic_cft),
cft_expression(cft_expression),
type_support(type_support),
created_topic(created_topic),
status_condition(dds_reader, ignore_local, internal)
Expand Down Expand Up @@ -1256,19 +1259,40 @@ RMW_Connext_Subscriber::create(
});

DDS_TopicDescription * sub_topic = DDS_Topic_as_topicdescription(topic);
std::string sub_cft_name;
const char * sub_cft_expr = "";
const rcutils_string_array_t * sub_cft_params = nullptr;

if (nullptr != cft_name) {
rmw_ret_t cft_rc =
rmw_connextdds_create_contentfilteredtopic(
ctx, dp, topic, cft_name, cft_filter, &cft_topic);
sub_cft_name = cft_name;
sub_cft_expr = cft_filter;
} else {
sub_cft_name =
fqtopic_name + ROS_CFT_TOPIC_NAME_INFIX + RMW_Connext_Subscriber::get_atomic_id();
if (nullptr != subscriber_options->content_filtered_topic_options) {
sub_cft_expr =
subscriber_options->content_filtered_topic_options->filter_expression;
sub_cft_params =
subscriber_options->content_filtered_topic_options->expression_parameters;
}
}

if (RMW_RET_OK != cft_rc) {
if (RMW_RET_UNSUPPORTED != cft_rc) {
return nullptr;
}
} else {
sub_topic = cft_topic;
rmw_ret_t cft_rc =
rmw_connextdds_create_contentfilteredtopic(
ctx,
dp,
topic,
sub_cft_name.c_str(),
sub_cft_expr,
sub_cft_params,
&cft_topic);

if (RMW_RET_OK != cft_rc) {
if (RMW_RET_UNSUPPORTED != cft_rc) {
return nullptr;
}
} else {
sub_topic = cft_topic;
}

// The following initialization generates warnings when built
Expand Down Expand Up @@ -1337,15 +1361,16 @@ RMW_Connext_Subscriber::create(
subscriber_options->ignore_local_publications,
topic_created,
cft_topic,
sub_cft_expr,
internal);

if (nullptr == rmw_sub_impl) {
RMW_CONNEXT_LOG_ERROR_SET("failed to allocate RMW subscriber")
return nullptr;
}
scope_exit_type_unregister.cancel();
scope_exit_topic_delete.cancel();
scope_exit_dds_reader_delete.cancel();
scope_exit_topic_delete.cancel();
scope_exit_type_unregister.cancel();

return rmw_sub_impl;
}
Expand Down Expand Up @@ -1529,6 +1554,37 @@ RMW_Connext_Subscriber::take_serialized(
return rc;
}


rmw_ret_t
RMW_Connext_Subscriber::set_cft_expression_parameters(
const rmw_subscription_content_filtered_topic_options_t * const options)
{
const char * const filter_expression =
(nullptr != options && nullptr != options->filter_expression) ?
options->filter_expression : "";

const rcutils_string_array_t * filter_params =
(nullptr != options) ? options->expression_parameters : nullptr;

rmw_ret_t rc = rmw_connextdds_set_cft_filter_expression(
this->dds_topic_cft, filter_expression, filter_params);
if (RMW_RET_OK != rc) {
return rc;
}

this->cft_expression = filter_expression;

return RMW_RET_OK;
}

rmw_ret_t
RMW_Connext_Subscriber::get_cft_expression_parameters(
rcutils_allocator_t * allocator,
rmw_subscription_content_filtered_topic_options_t * const options)
{
return rmw_connextdds_get_cft_filter_expression(this->dds_topic_cft, allocator, options);
}

rmw_ret_t
RMW_Connext_Subscriber::loan_messages(const bool update_condition)
{
Expand Down Expand Up @@ -1815,6 +1871,7 @@ rmw_connextdds_create_subscriber(
topic_name_len + 1);
rmw_subscriber->options = *subscriber_options;
rmw_subscriber->can_loan_messages = false;
rmw_subscriber->is_cft_enabled = rmw_sub_impl->is_cft_enabled();

if (!internal) {
if (RMW_RET_OK != rmw_sub_impl->enable()) {
Expand Down
45 changes: 45 additions & 0 deletions rmw_connextdds_common/src/common/rmw_subscription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,51 @@ rmw_api_connextdds_subscription_get_actual_qos(
return sub_impl->qos(qos);
}

rmw_ret_t
rmw_api_connextdds_subscription_set_cft_expression_parameters(
rmw_subscription_t * subscription,
const rmw_subscription_content_filtered_topic_options_t * options)
{
RMW_CHECK_ARGUMENT_FOR_NULL(subscription, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_TYPE_IDENTIFIERS_MATCH(
subscription,
subscription->implementation_identifier,
RMW_CONNEXTDDS_ID,
return RMW_RET_INCORRECT_RMW_IMPLEMENTATION);
// RMW_CHECK_ARGUMENT_FOR_NULL(options, RMW_RET_INVALID_ARGUMENT);

RMW_Connext_Subscriber * const sub_impl =
reinterpret_cast<RMW_Connext_Subscriber *>(subscription->data);

rmw_ret_t rc = sub_impl->set_cft_expression_parameters(options);
subscription->is_cft_enabled = sub_impl->is_cft_enabled();

return rc;
}


rmw_ret_t
rmw_api_connextdds_subscription_get_cft_expression_parameters(
const rmw_subscription_t * subscription,
rcutils_allocator_t * const allocator,
rmw_subscription_content_filtered_topic_options_t * options)
{
RMW_CHECK_ARGUMENT_FOR_NULL(subscription, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_TYPE_IDENTIFIERS_MATCH(
subscription,
subscription->implementation_identifier,
RMW_CONNEXTDDS_ID,
return RMW_RET_INCORRECT_RMW_IMPLEMENTATION);
RMW_CHECK_ARGUMENT_FOR_NULL(allocator, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_ARGUMENT_FOR_NULL(options, RMW_RET_INVALID_ARGUMENT);

RMW_Connext_Subscriber * const sub_impl =
reinterpret_cast<RMW_Connext_Subscriber *>(subscription->data);

rmw_ret_t rc = sub_impl->get_cft_expression_parameters(allocator, options);

return rc;
}

rmw_ret_t
rmw_api_connextdds_destroy_subscription(
Expand Down
Loading