Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add content filter topic feature #513

Merged
merged 14 commits into from
Mar 21, 2022
4 changes: 2 additions & 2 deletions rmw_fastrtps_cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,8 @@ ament_export_dependencies(rosidl_typesupport_fastrtps_c)
ament_export_dependencies(rosidl_typesupport_fastrtps_cpp)

register_rmw_implementation(
"c:rosidl_typesupport_c:rosidl_typesupport_fastrtps_c"
"cpp:rosidl_typesupport_cpp:rosidl_typesupport_fastrtps_cpp")
"c:rosidl_typesupport_c:rosidl_typesupport_fastrtps_c:rosidl_typesupport_introspection_c"
"cpp:rosidl_typesupport_cpp:rosidl_typesupport_fastrtps_cpp:rosidl_typesupport_introspection_cpp")
fujitatomoya marked this conversation as resolved.
Show resolved Hide resolved

if(BUILD_TESTING)
find_package(ament_lint_auto REQUIRED)
Expand Down
7 changes: 7 additions & 0 deletions rmw_fastrtps_cpp/src/publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,13 @@ rmw_fastrtps_cpp::create_publisher(
}
info->type_support_ = fastdds_type;

if (!rmw_fastrtps_shared_cpp::register_type_object(type_supports, type_name)) {
fujitatomoya marked this conversation as resolved.
Show resolved Hide resolved
RMW_SET_ERROR_MSG_WITH_FORMAT_STRING(
"failed to register type object with incompatible type %s",
type_name.c_str());
return nullptr;
}

/////
// Create Listener
if (create_publisher_listener) {
Expand Down
43 changes: 42 additions & 1 deletion rmw_fastrtps_cpp/src/rmw_subscription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ rmw_create_subscription(
}

auto common_context = static_cast<rmw_dds_common::Context *>(node->context->impl->common);
auto info = static_cast<const CustomSubscriberInfo *>(subscription->data);
auto info = static_cast<CustomSubscriberInfo *>(subscription->data);

{
// Update graph
std::lock_guard<std::mutex> guard(common_context->node_update_mutex);
Expand Down Expand Up @@ -112,6 +113,9 @@ rmw_create_subscription(
return nullptr;
}
}
info->node_ = node;
info->common_context_ = common_context;

return subscription;
}

Expand Down Expand Up @@ -148,6 +152,43 @@ rmw_subscription_get_actual_qos(
return rmw_fastrtps_shared_cpp::__rmw_subscription_get_actual_qos(subscription, qos);
}

rmw_ret_t
rmw_subscription_set_content_filter(
rmw_subscription_t * subscription,
const rmw_subscription_content_filter_options_t * options)
{
RMW_CHECK_ARGUMENT_FOR_NULL(subscription, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_ARGUMENT_FOR_NULL(options, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_TYPE_IDENTIFIERS_MATCH(
subscription,
subscription->implementation_identifier,
eprosima_fastrtps_identifier,
return RMW_RET_INCORRECT_RMW_IMPLEMENTATION);
rmw_ret_t ret = rmw_fastrtps_shared_cpp::__rmw_subscription_set_content_filter(
subscription, options);
auto info = static_cast<const CustomSubscriberInfo *>(subscription->data);
subscription->is_cft_enabled = (info && info->filtered_topic_);
return ret;
}

rmw_ret_t
rmw_subscription_get_content_filter(
const rmw_subscription_t * subscription,
rcutils_allocator_t * allocator,
rmw_subscription_content_filter_options_t * options)
{
RMW_CHECK_ARGUMENT_FOR_NULL(subscription, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_ARGUMENT_FOR_NULL(allocator, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_ARGUMENT_FOR_NULL(options, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_TYPE_IDENTIFIERS_MATCH(
subscription,
subscription->implementation_identifier,
eprosima_fastrtps_identifier,
return RMW_RET_INCORRECT_RMW_IMPLEMENTATION);
return rmw_fastrtps_shared_cpp::__rmw_subscription_get_content_filter(
subscription, allocator, options);
}

rmw_ret_t
rmw_destroy_subscription(rmw_node_t * node, rmw_subscription_t * subscription)
{
Expand Down
76 changes: 40 additions & 36 deletions rmw_fastrtps_cpp/src/subscription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,13 @@ create_subscription(
}
info->type_support_ = fastdds_type;

if (!rmw_fastrtps_shared_cpp::register_type_object(type_supports, type_name)) {
RMW_SET_ERROR_MSG_WITH_FORMAT_STRING(
"failed to register type object with incompatible type %s",
type_name.c_str());
return nullptr;
}

/////
// Create Listener
if (create_subscription_listener) {
Expand Down Expand Up @@ -224,8 +231,30 @@ create_subscription(
return nullptr;
}

info->dds_participant_ = dds_participant;
info->subscriber_ = subscriber;
info->topic_name_mangled_ = topic_name_mangled;
info->topic_ = topic.desc;
des_topic = topic.desc;

// Create ContentFilteredTopic
if (subscription_options->content_filter_options) {
rmw_subscription_content_filter_options_t * options =
subscription_options->content_filter_options;
if (nullptr != options->filter_expression) {
eprosima::fastdds::dds::ContentFilteredTopic * filtered_topic = nullptr;
if (!rmw_fastrtps_shared_cpp::create_content_filtered_topic(
dds_participant, des_topic,
ivanpauno marked this conversation as resolved.
Show resolved Hide resolved
topic_name_mangled, options, &filtered_topic))
{
RMW_SET_ERROR_MSG("create_contentfilteredtopic() failed to create contentfilteredtopic");
return nullptr;
}
info->filtered_topic_ = filtered_topic;
des_topic = filtered_topic;
}
}

/////
// Create DataReader

Expand All @@ -251,44 +280,18 @@ create_subscription(
return nullptr;
}

eprosima::fastdds::dds::DataReaderQos original_qos = reader_qos;
switch (subscription_options->require_unique_network_flow_endpoints) {
default:
case RMW_UNIQUE_NETWORK_FLOW_ENDPOINTS_SYSTEM_DEFAULT:
case RMW_UNIQUE_NETWORK_FLOW_ENDPOINTS_NOT_REQUIRED:
// Unique network flow endpoints not required. We leave the decission to the XML profile.
break;

case RMW_UNIQUE_NETWORK_FLOW_ENDPOINTS_OPTIONALLY_REQUIRED:
case RMW_UNIQUE_NETWORK_FLOW_ENDPOINTS_STRICTLY_REQUIRED:
// Ensure we request unique network flow endpoints
if (nullptr ==
PropertyPolicyHelper::find_property(
reader_qos.properties(),
"fastdds.unique_network_flows"))
{
reader_qos.properties().properties().emplace_back("fastdds.unique_network_flows", "");
}
break;
}
info->datareader_qos_ = reader_qos;

// Creates DataReader (with subscriber name to not change name policy)
info->data_reader_ = subscriber->create_datareader(
des_topic,
reader_qos,
info->listener_);
if (!info->data_reader_ &&
(RMW_UNIQUE_NETWORK_FLOW_ENDPOINTS_OPTIONALLY_REQUIRED ==
subscription_options->require_unique_network_flow_endpoints))
{
info->data_reader_ = subscriber->create_datareader(
// create_datareader
if (!rmw_fastrtps_shared_cpp::create_datareader(
info->datareader_qos_,
subscription_options,
subscriber,
des_topic,
original_qos,
info->listener_);
}

if (!info->data_reader_) {
RMW_SET_ERROR_MSG("create_subscription() could not create data reader");
info->listener_,
&info->data_reader_))
{
RMW_SET_ERROR_MSG("create_datareader() could not create data reader");
return nullptr;
}

Expand Down Expand Up @@ -327,6 +330,7 @@ create_subscription(
}
rmw_subscription->options = *subscription_options;
rmw_fastrtps_shared_cpp::__init_subscription_for_loans(rmw_subscription);
rmw_subscription->is_cft_enabled = info->filtered_topic_ != nullptr;

topic.should_be_deleted = false;
cleanup_rmw_subscription.cancel();
Expand Down
31 changes: 30 additions & 1 deletion rmw_fastrtps_dynamic_cpp/src/rmw_subscription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ rmw_create_subscription(
}

auto common_context = static_cast<rmw_dds_common::Context *>(node->context->impl->common);
auto info = static_cast<const CustomSubscriberInfo *>(subscription->data);
auto info = static_cast<CustomSubscriberInfo *>(subscription->data);
{
// Update graph
std::lock_guard<std::mutex> guard(common_context->node_update_mutex);
Expand Down Expand Up @@ -115,6 +115,9 @@ rmw_create_subscription(
return nullptr;
}
}
info->node_ = node;
info->common_context_ = common_context;

return subscription;
}

Expand Down Expand Up @@ -151,6 +154,32 @@ rmw_subscription_get_actual_qos(
return rmw_fastrtps_shared_cpp::__rmw_subscription_get_actual_qos(subscription, qos);
}

rmw_ret_t
rmw_subscription_set_content_filter(
fujitatomoya marked this conversation as resolved.
Show resolved Hide resolved
rmw_subscription_t * subscription,
const rmw_subscription_content_filter_options_t * options)
{
// Unused in current implementation.
(void) subscription;
(void) options;
RMW_SET_ERROR_MSG("unimplemented");
return RMW_RET_UNSUPPORTED;
}

rmw_ret_t
rmw_subscription_get_content_filter(
const rmw_subscription_t * subscription,
rcutils_allocator_t * allocator,
rmw_subscription_content_filter_options_t * options)
{
// Unused in current implementation.
(void) subscription;
(void) allocator;
(void) options;
RMW_SET_ERROR_MSG("unimplemented");
return RMW_RET_UNSUPPORTED;
}

using BaseTypeSupport = rmw_fastrtps_dynamic_cpp::BaseTypeSupport;

rmw_ret_t
Expand Down
2 changes: 2 additions & 0 deletions rmw_fastrtps_dynamic_cpp/src/subscription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,8 @@ create_subscription(

rmw_subscription->options = *subscription_options;
rmw_fastrtps_shared_cpp::__init_subscription_for_loans(rmw_subscription);
// TODO(iuhilnehc-ynos): update after rmw_fastrtps_cpp is confirmed
rmw_subscription->is_cft_enabled = false;

topic.should_be_deleted = false;
cleanup_rmw_subscription.cancel();
Expand Down
6 changes: 6 additions & 0 deletions rmw_fastrtps_shared_cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ find_package(ament_cmake_ros REQUIRED)
find_package(rcpputils REQUIRED)
find_package(rcutils REQUIRED)
find_package(rmw_dds_common REQUIRED)
find_package(rosidl_typesupport_introspection_c REQUIRED)
find_package(rosidl_typesupport_introspection_cpp REQUIRED)

find_package(fastrtps_cmake_module REQUIRED)
find_package(fastcdr REQUIRED CONFIG)
Expand Down Expand Up @@ -104,6 +106,8 @@ ament_target_dependencies(rmw_fastrtps_shared_cpp
"rcutils"
"rmw"
"rmw_dds_common"
"rosidl_typesupport_introspection_c"
"rosidl_typesupport_introspection_cpp"
)

# Causes the visibility macros to use dllexport rather than dllimport,
Expand All @@ -123,6 +127,8 @@ ament_export_dependencies(rcpputils)
ament_export_dependencies(rcutils)
ament_export_dependencies(rmw)
ament_export_dependencies(rmw_dds_common)
ament_export_dependencies(rosidl_typesupport_introspection_c)
ament_export_dependencies(rosidl_typesupport_introspection_cpp)

if(BUILD_TESTING)
find_package(ament_lint_auto REQUIRED)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@

#include "rcutils/logging_macros.h"

#include "rosidl_runtime_c/message_type_support_struct.h"

#include "./visibility_control.h"

namespace rmw_fastrtps_shared_cpp
Expand Down Expand Up @@ -106,6 +108,11 @@ class TypeSupport : public eprosima::fastdds::dds::TopicDataType
bool is_plain_;
};

RMW_FASTRTPS_SHARED_CPP_PUBLIC
bool register_type_object(
const rosidl_message_type_support_t * type_supports,
const std::string & type_name);

} // namespace rmw_fastrtps_shared_cpp

#endif // RMW_FASTRTPS_SHARED_CPP__TYPESUPPORT_HPP_
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,16 @@
#include <memory>
#include <mutex>
#include <set>
#include <string>
#include <utility>

#include "fastdds/dds/core/status/DeadlineMissedStatus.hpp"
#include "fastdds/dds/core/status/LivelinessChangedStatus.hpp"
#include "fastdds/dds/core/status/SubscriptionMatchedStatus.hpp"
#include "fastdds/dds/subscriber/DataReader.hpp"
#include "fastdds/dds/subscriber/DataReaderListener.hpp"
#include "fastdds/dds/subscriber/qos/DataReaderQos.hpp"
#include "fastdds/dds/topic/ContentFilteredTopic.hpp"
#include "fastdds/dds/topic/TypeSupport.hpp"

#include "fastdds/rtps/common/Guid.h"
Expand All @@ -39,6 +42,8 @@
#include "rmw/impl/cpp/macros.hpp"
#include "rmw/event_callback_type.h"

#include "rmw_dds_common/context.hpp"

#include "rmw_fastrtps_shared_cpp/custom_event_info.hpp"


Expand All @@ -61,6 +66,16 @@ struct CustomSubscriberInfo : public CustomEventInfo
const char * typesupport_identifier_{nullptr};
std::shared_ptr<rmw_fastrtps_shared_cpp::LoanManager> loan_manager_;

// for re-create or delete content filtered topic
const rmw_node_t * node_ {nullptr};
rmw_dds_common::Context * common_context_ {nullptr};
eprosima::fastdds::dds::DomainParticipant * dds_participant_ {nullptr};
eprosima::fastdds::dds::Subscriber * subscriber_ {nullptr};
std::string topic_name_mangled_;
eprosima::fastdds::dds::TopicDescription * topic_ {nullptr};
eprosima::fastdds::dds::ContentFilteredTopic * filtered_topic_ {nullptr};
eprosima::fastdds::dds::DataReaderQos datareader_qos_;

RMW_FASTRTPS_SHARED_CPP_PUBLIC
EventListenerInterface *
getListener() const final;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,8 @@ rmw_ret_t
__rmw_destroy_subscription(
const char * identifier,
const rmw_node_t * node,
rmw_subscription_t * subscription);
rmw_subscription_t * subscription,
bool reset_cft = false);

RMW_FASTRTPS_SHARED_CPP_PUBLIC
rmw_ret_t
Expand All @@ -317,6 +318,19 @@ __rmw_subscription_get_actual_qos(
const rmw_subscription_t * subscription,
rmw_qos_profile_t * qos);

RMW_FASTRTPS_SHARED_CPP_PUBLIC
rmw_ret_t
__rmw_subscription_set_content_filter(
rmw_subscription_t * subscription,
const rmw_subscription_content_filter_options_t * options);

RMW_FASTRTPS_SHARED_CPP_PUBLIC
rmw_ret_t
__rmw_subscription_get_content_filter(
const rmw_subscription_t * subscription,
rcutils_allocator_t * allocator,
rmw_subscription_content_filter_options_t * options);

RMW_FASTRTPS_SHARED_CPP_PUBLIC
rmw_ret_t
__rmw_service_response_publisher_get_actual_qos(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ rmw_ret_t
destroy_subscription(
const char * identifier,
CustomParticipantInfo * participant_info,
rmw_subscription_t * subscription);
rmw_subscription_t * subscription,
bool reset_cft = false);

} // namespace rmw_fastrtps_shared_cpp

Expand Down
Loading