diff --git a/rmw_connextdds/src/rmw_api_impl_ndds.cpp b/rmw_connextdds/src/rmw_api_impl_ndds.cpp index 74d0ce0b..19251f1c 100644 --- a/rmw_connextdds/src/rmw_api_impl_ndds.cpp +++ b/rmw_connextdds/src/rmw_api_impl_ndds.cpp @@ -716,33 +716,25 @@ rmw_subscription_get_actual_qos( return rmw_api_connextdds_subscription_get_actual_qos(subscription, qos); } - rmw_ret_t rmw_subscription_set_content_filter( rmw_subscription_t * subscription, const rmw_subscription_content_filter_options_t * options) { - UNUSED_ARG(subscription); - UNUSED_ARG(options); - RMW_CONNEXT_LOG_NOT_IMPLEMENTED - return RMW_RET_UNSUPPORTED; + return rmw_api_connextdds_subscription_set_content_filter( + subscription, options); } - rmw_ret_t rmw_subscription_get_content_filter( const rmw_subscription_t * subscription, rcutils_allocator_t * allocator, rmw_subscription_content_filter_options_t * options) { - UNUSED_ARG(subscription); - UNUSED_ARG(allocator); - UNUSED_ARG(options); - RMW_CONNEXT_LOG_NOT_IMPLEMENTED - return RMW_RET_UNSUPPORTED; + return rmw_api_connextdds_subscription_get_content_filter( + subscription, allocator, options); } - rmw_ret_t rmw_destroy_subscription( rmw_node_t * node, diff --git a/rmw_connextdds_common/CMakeLists.txt b/rmw_connextdds_common/CMakeLists.txt index 7ffef358..86ef857b 100644 --- a/rmw_connextdds_common/CMakeLists.txt +++ b/rmw_connextdds_common/CMakeLists.txt @@ -62,6 +62,11 @@ function(rtirmw_add_library) ament_target_dependencies(${_rti_build_NAME} ${_rti_build_DEPS}) + set(_extra_defines) + if("${CMAKE_BUILD_TYPE}" MATCHES "[dD]ebug") + list(APPEND _extra_defines "RMW_CONNEXT_DEBUG=1") + endif() + target_compile_definitions(${_rti_build_NAME} PUBLIC RMW_VERSION_MAJOR=${rmw_VERSION_MAJOR} @@ -69,6 +74,7 @@ function(rtirmw_add_library) RMW_VERSION_PATCH=${rmw_VERSION_PATCH} RMW_CONNEXT_DDS_API=RMW_CONNEXT_DDS_API_${_rti_build_API} ${_rti_build_DEFINES} + ${_extra_defines} ) set(private_defines) @@ -189,6 +195,9 @@ else() if("${CONNEXTDDS_VERSION}" VERSION_LESS "6.0.0") list(APPEND extra_defines "RMW_CONNEXT_DDS_API_PRO_LEGACY=1") endif() + if(CONNEXTDDS_ARCH MATCHES ".*Win.*") + list(APPEND extra_defines "RMW_CONNEXT_BUILTIN_CFT_COMPATIBILITY_MODE=1") + endif() rtirmw_add_library( NAME ${PROJECT_NAME}_pro API PRO @@ -197,8 +206,10 @@ else() src/ndds/rmw_type_support_ndds.cpp src/ndds/rmw_typecode.cpp src/ndds/dds_api_ndds.cpp + src/ndds/custom_sql_filter.cpp include/rmw_connextdds/typecode.hpp include/rmw_connextdds/dds_api_ndds.hpp + include/rmw_connextdds/custom_sql_filter.hpp DEPS ${RMW_CONNEXT_DEPS} LIBRARIES RTIConnextDDS::c_api DEFINES ${extra_defines}) diff --git a/rmw_connextdds_common/include/rmw_connextdds/custom_sql_filter.hpp b/rmw_connextdds_common/include/rmw_connextdds/custom_sql_filter.hpp new file mode 100644 index 00000000..7c134205 --- /dev/null +++ b/rmw_connextdds_common/include/rmw_connextdds/custom_sql_filter.hpp @@ -0,0 +1,66 @@ +// Copyright 2021 Real-Time Innovations, Inc. (RTI) +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#ifndef RMW_CONNEXTDDS__CUSTOM_SQL_FILTER_HPP_ +#define RMW_CONNEXTDDS__CUSTOM_SQL_FILTER_HPP_ + +#include "rmw_connextdds/dds_api.hpp" + +#if RMW_CONNEXT_DDS_API == RMW_CONNEXT_DDS_API_PRO + +namespace rti_connext_dds_custom_sql_filter +{ + +struct CustomSqlFilterData +{ + DDS_SqlFilterGeneratorQos base; + + CustomSqlFilterData(); + + DDS_ReturnCode_t + set_memory_management_property( + const DDS_DomainParticipantQos & dp_qos); +}; + +RMW_CONNEXTDDS_PUBLIC +DDS_ReturnCode_t +register_content_filter( + DDS_DomainParticipant * const participant, + CustomSqlFilterData * const filter_data); + +RMW_CONNEXTDDS_PUBLIC +extern const char * const PLUGIN_NAME; + +} // namespace rti_connext_dds_custom_sql_filter + +#if !RMW_CONNEXT_BUILTIN_CFT_COMPATIBILITY_MODE +extern "C" { +// This is an internal function from RTI Connext DDS which allows a filter to +// be registered as "built-in". We need this because we want this custom filter +// to be a replacement for the built-in SQL-like filter. +RMW_CONNEXTDDS_PUBLIC +DDS_ReturnCode_t +DDS_ContentFilter_register_filter( + DDS_DomainParticipant * participant, + const char * name, + const struct DDS_ContentFilter * filter, + const DDS_ContentFilterEvaluateFunction evaluateOnSerialized, + const DDS_ContentFilterWriterEvaluateFunction writerEvaluateOnSerialized, + const DDS_ContentFilterQueryFunction query, + DDS_Boolean isBuiltin); +} +#endif // RMW_CONNEXT_BUILTIN_CFT_COMPATIBILITY_MODE + +#endif // RMW_CONNEXT_DDS_API == RMW_CONNEXT_DDS_API_PRO + +#endif // RMW_CONNEXTDDS__CUSTOM_SQL_FILTER_HPP_ diff --git a/rmw_connextdds_common/include/rmw_connextdds/dds_api.hpp b/rmw_connextdds_common/include/rmw_connextdds/dds_api.hpp index f342e4b3..3a8d2b3d 100644 --- a/rmw_connextdds_common/include/rmw_connextdds/dds_api.hpp +++ b/rmw_connextdds_common/include/rmw_connextdds/dds_api.hpp @@ -65,6 +65,11 @@ rmw_connextdds_initialize_participant_qos_impl( rmw_context_impl_t * const ctx, DDS_DomainParticipantQos * const dp_qos); +rmw_ret_t +rmw_connextdds_configure_participant( + rmw_context_impl_t * const ctx, + DDS_DomainParticipant * const participant); + rmw_ret_t rmw_connextdds_create_contentfilteredtopic( rmw_context_impl_t * const ctx, @@ -72,6 +77,7 @@ rmw_connextdds_create_contentfilteredtopic( DDS_Topic * const base_topic, const char * const cft_name, const char * const cft_filter, + const rcutils_string_array_t * const cft_expression_parameters, DDS_TopicDescription ** const cft_out); rmw_ret_t @@ -258,4 +264,16 @@ rmw_connextdds_enable_security( DDS_SECURITY_PROPERTY_PREFIX ".logging.log_level" #endif /* DDS_SECURITY_LOGGING_LEVEL_PROPERTY */ +rmw_ret_t +rmw_connextdds_set_cft_filter_expression( + DDS_TopicDescription * const topic_desc, + const char * const cft_expression, + const rcutils_string_array_t * const cft_expression_parameters); + +rmw_ret_t +rmw_connextdds_get_cft_filter_expression( + DDS_TopicDescription * const topic_desc, + rcutils_allocator_t * const allocator, + rmw_subscription_content_filter_options_t * const options); + #endif // RMW_CONNEXTDDS__DDS_API_HPP_ diff --git a/rmw_connextdds_common/include/rmw_connextdds/dds_api_ndds.hpp b/rmw_connextdds_common/include/rmw_connextdds/dds_api_ndds.hpp index a40b5b30..1b0305c1 100644 --- a/rmw_connextdds_common/include/rmw_connextdds/dds_api_ndds.hpp +++ b/rmw_connextdds_common/include/rmw_connextdds/dds_api_ndds.hpp @@ -19,18 +19,6 @@ #include "rcutils/types.h" -DDS_SEQUENCE(RMW_Connext_Uint8ArrayPtrSeq, rcutils_uint8_array_t *); - -typedef RMW_Connext_Uint8ArrayPtrSeq RMW_Connext_UntypedSampleSeq; - -#define RMW_Connext_UntypedSampleSeq_INITIALIZER DDS_SEQUENCE_INITIALIZER - -#define DDS_UntypedSampleSeq_get_reference(seq_, i_) \ - *RMW_Connext_Uint8ArrayPtrSeq_get_reference(seq_, i_) - -#define DDS_UntypedSampleSeq_get_length(seq_) \ - RMW_Connext_Uint8ArrayPtrSeq_get_length(seq_) - #if RMW_CONNEXT_DDS_API_PRO_LEGACY #ifndef RTIXCdrLong_MAX #define RTIXCdrLong_MAX 2147483647 diff --git a/rmw_connextdds_common/include/rmw_connextdds/rmw_api_impl.hpp b/rmw_connextdds_common/include/rmw_connextdds/rmw_api_impl.hpp index cd0ddc14..a03ce76e 100644 --- a/rmw_connextdds_common/include/rmw_connextdds/rmw_api_impl.hpp +++ b/rmw_connextdds_common/include/rmw_connextdds/rmw_api_impl.hpp @@ -483,6 +483,18 @@ rmw_api_connextdds_subscription_get_actual_qos( const rmw_subscription_t * subscription, rmw_qos_profile_t * qos); +RMW_CONNEXTDDS_PUBLIC +rmw_ret_t +rmw_api_connextdds_subscription_set_content_filter( + rmw_subscription_t * subscription, + const rmw_subscription_content_filter_options_t * options); + +RMW_CONNEXTDDS_PUBLIC +rmw_ret_t +rmw_api_connextdds_subscription_get_content_filter( + const rmw_subscription_t * subscription, + rcutils_allocator_t * const allocator, + rmw_subscription_content_filter_options_t * options); RMW_CONNEXTDDS_PUBLIC rmw_ret_t diff --git a/rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp b/rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp index bd79a4a1..5bf93f0c 100644 --- a/rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp +++ b/rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp @@ -438,6 +438,15 @@ class RMW_Connext_Subscriber rmw_message_info_t * const message_info, bool * const taken); + rmw_ret_t + set_content_filter( + const rmw_subscription_content_filter_options_t * const options); + + rmw_ret_t + get_content_filter( + rcutils_allocator_t * allocator, + rmw_subscription_content_filter_options_t * const options); + bool has_data() { @@ -470,6 +479,17 @@ class RMW_Connext_Subscriber return this->dds_topic; } + static std::string get_atomic_id() + { + static std::atomic_uint64_t id; + return std::to_string(id++); + } + + bool is_cft_enabled() + { + return !this->cft_expression.empty(); + } + const bool internal; const bool ignore_local; @@ -478,6 +498,7 @@ class RMW_Connext_Subscriber DDS_DataReader * dds_reader; DDS_Topic * dds_topic; DDS_TopicDescription * dds_topic_cft; + std::string cft_expression; RMW_Connext_MessageTypeSupport * type_support; rmw_gid_t ros_gid; const bool created_topic; @@ -496,6 +517,7 @@ class RMW_Connext_Subscriber const bool ignore_local, const bool created_topic, DDS_TopicDescription * const dds_topic_cft, + const char * const cft_expression, const bool internal); friend class RMW_Connext_SubscriberStatusCondition; diff --git a/rmw_connextdds_common/include/rmw_connextdds/static_config.hpp b/rmw_connextdds_common/include/rmw_connextdds/static_config.hpp index b5f1022f..120e5790 100644 --- a/rmw_connextdds_common/include/rmw_connextdds/static_config.hpp +++ b/rmw_connextdds_common/include/rmw_connextdds/static_config.hpp @@ -15,6 +15,13 @@ #ifndef RMW_CONNEXTDDS__STATIC_CONFIG_HPP_ #define RMW_CONNEXTDDS__STATIC_CONFIG_HPP_ +/****************************************************************************** + * Debug flags + ******************************************************************************/ +#ifndef RMW_CONNEXT_DEBUG +#define RMW_CONNEXT_DEBUG 0 +#endif // RMW_CONNEXT_DEBUG + /****************************************************************************** * Default User Configuration ******************************************************************************/ @@ -311,6 +318,14 @@ #define RMW_CONNEXT_LEGACY_RMW_COMPATIBILITY_MODE 1 #endif /* RMW_CONNEXT_LEGACY_RMW_COMPATIBILITY_MODE */ +/****************************************************************************** + * On windows, the custom SQL filter cannot be registered as "built-in", so we + * must enable some additional code to register it as a user plugin. + ******************************************************************************/ +#ifndef RMW_CONNEXT_BUILTIN_CFT_COMPATIBILITY_MODE +#define RMW_CONNEXT_BUILTIN_CFT_COMPATIBILITY_MODE 0 +#endif /* RMW_CONNEXT_LEGACY_RMW_COMPATIBILITY_MODE */ + #include "resource_limits.hpp" #endif // RMW_CONNEXTDDS__STATIC_CONFIG_HPP_ diff --git a/rmw_connextdds_common/include/rmw_connextdds/type_support.hpp b/rmw_connextdds_common/include/rmw_connextdds/type_support.hpp index 41d3bd8e..284148de 100644 --- a/rmw_connextdds_common/include/rmw_connextdds/type_support.hpp +++ b/rmw_connextdds_common/include/rmw_connextdds/type_support.hpp @@ -167,11 +167,21 @@ class RMW_Connext_MessageTypeSupport struct RMW_Connext_Message { - const void * user_data; - bool serialized; - RMW_Connext_MessageTypeSupport * type_support; + const void * user_data{nullptr}; + bool serialized{false}; + RMW_Connext_MessageTypeSupport * type_support{nullptr}; + rcutils_uint8_array_t data_buffer; }; +rmw_ret_t +RMW_Connext_Message_initialize( + RMW_Connext_Message * const self, + RMW_Connext_MessageTypeSupport * const type_support, + const size_t data_buffer_size); + +void +RMW_Connext_Message_finalize(RMW_Connext_Message * const self); + class RMW_Connext_ServiceTypeSupportWrapper { public: @@ -276,5 +286,19 @@ class RMW_Connext_ServiceTypeSupportWrapper const rosidl_service_type_support_t * const type_supports); }; +#if RMW_CONNEXT_DDS_API == RMW_CONNEXT_DDS_API_PRO +DDS_SEQUENCE(RMW_Connext_MessagePtrSeq, RMW_Connext_Message *); + +typedef RMW_Connext_MessagePtrSeq RMW_Connext_UntypedSampleSeq; + +#define RMW_Connext_UntypedSampleSeq_INITIALIZER DDS_SEQUENCE_INITIALIZER + +#define DDS_UntypedSampleSeq_get_reference(seq_, i_) \ + *RMW_Connext_MessagePtrSeq_get_reference(seq_, i_) + +#define DDS_UntypedSampleSeq_get_length(seq_) \ + RMW_Connext_MessagePtrSeq_get_length(seq_) + +#endif // RMW_CONNEXT_DDS_API == RMW_CONNEXT_DDS_API_PRO #endif // RMW_CONNEXTDDS__TYPE_SUPPORT_HPP_ diff --git a/rmw_connextdds_common/src/common/rmw_context.cpp b/rmw_connextdds_common/src/common/rmw_context.cpp index 54102045..4ca175f7 100644 --- a/rmw_connextdds_common/src/common/rmw_context.cpp +++ b/rmw_connextdds_common/src/common/rmw_context.cpp @@ -218,6 +218,12 @@ rmw_context_impl_t::initialize_participant(const bool localhost_only) return RMW_RET_ERROR; } + rmw_ret_t cfg_rc = rmw_connextdds_configure_participant(this, this->participant); + if (RMW_RET_OK != cfg_rc) { + RMW_CONNEXT_LOG_ERROR("failed to configure DDS participant") + return cfg_rc; + } + /* Create DDS publisher/subscriber objects that will be used for all DDS writers/readers created to support RMW publishers/subscriptions. */ @@ -332,7 +338,21 @@ rmw_ret_t rmw_context_impl_t::finalize_participant() { RMW_CONNEXT_LOG_DEBUG("finalizing DDS DomainParticipant") - +#if RMW_CONNEXT_DEBUG && RMW_CONNEXT_DDS_API == RMW_CONNEXT_DDS_API_PRO + // If we are building in Debug mode, an issue in Connext may prevent the + // participant from being able to delete any content-filtered topic if + // the participant has not been enabled. + // For this reason, make sure to enable the participant before trying to + // finalize it. + // TODO(asorbini) reconsider the need for this code in Connext > 6.1.0 + if (DDS_RETCODE_OK != + DDS_Entity_enable(DDS_DomainParticipant_as_entity(participant))) + { + RMW_CONNEXT_LOG_ERROR_SET( + "failed to enable DomainParticipant before deletion") + return RMW_RET_ERROR; + } +#endif // RMW_CONNEXT_DEBUG && RMW_CONNEXT_DDS_API == RMW_CONNEXT_DDS_API_PRO if (RMW_RET_OK != rmw_connextdds_graph_finalize(this)) { RMW_CONNEXT_LOG_ERROR("failed to finalize graph cache") return RMW_RET_ERROR; @@ -386,7 +406,7 @@ rmw_context_impl_t::finalize_participant() if (nullptr != this->participant) { // If we are cleaning up after some RMW failure, it is possible for some // DataWriter to not have been deleted. - // Call DDS_Publisher_delete_contained_entities() to make sure we can + // Call DDS_DomainParticipant_delete_contained_entities() to make sure we can // dispose the publisher. if (DDS_RETCODE_OK != DDS_DomainParticipant_delete_contained_entities(this->participant)) @@ -402,6 +422,7 @@ rmw_context_impl_t::finalize_participant() RMW_CONNEXT_LOG_ERROR_SET("failed to delete DDS participant") return RMW_RET_ERROR; } + this->participant = nullptr; } diff --git a/rmw_connextdds_common/src/common/rmw_impl.cpp b/rmw_connextdds_common/src/common/rmw_impl.cpp index bd7e41ad..2b074e9b 100644 --- a/rmw_connextdds_common/src/common/rmw_impl.cpp +++ b/rmw_connextdds_common/src/common/rmw_impl.cpp @@ -29,6 +29,7 @@ const char * const ROS_TOPIC_PREFIX = "rt"; const char * const ROS_SERVICE_REQUESTER_PREFIX = ROS_SERVICE_REQUESTER_PREFIX_STR; const char * const ROS_SERVICE_RESPONSE_PREFIX = ROS_SERVICE_RESPONSE_PREFIX_STR; +const char * const ROS_CFT_TOPIC_NAME_INFIX = "_ContentFilterTopic"; std::string rmw_connextdds_create_topic_name( @@ -903,9 +904,11 @@ RMW_Connext_Publisher::write( int64_t * const sn_out) { RMW_Connext_Message user_msg; + if (RMW_RET_OK != RMW_Connext_Message_initialize(&user_msg, this->type_support, 0)) { + return RMW_RET_ERROR; + } user_msg.user_data = ros_message; user_msg.serialized = serialized; - user_msg.type_support = this->type_support; return rmw_connextdds_write_message(this, &user_msg, sn_out); } @@ -947,6 +950,11 @@ rmw_ret_t RMW_Connext_Publisher::wait_for_all_acked(rmw_time_t wait_timeout) { DDS_Duration_t timeout = rmw_connextdds_duration_from_ros_time(&wait_timeout); +#if RMW_CONNEXT_DDS_API == RMW_CONNEXT_DDS_API_MICRO + // Avoid warnings for unused variable in micro, since wait_for_ack() is + // mapped to an empty call. + UNUSED_ARG(timeout); +#endif // RMW_CONNEXT_DDS_API == RMW_CONNEXT_DDS_API_MICRO const DDS_ReturnCode_t dds_rc = DDS_DataWriter_wait_for_acknowledgments(this->dds_writer, &timeout); @@ -1129,6 +1137,7 @@ RMW_Connext_Subscriber::RMW_Connext_Subscriber( const bool ignore_local, const bool created_topic, DDS_TopicDescription * const dds_topic_cft, + const char * const cft_expression, const bool internal) : internal(internal), ignore_local(ignore_local), @@ -1136,6 +1145,7 @@ RMW_Connext_Subscriber::RMW_Connext_Subscriber( dds_reader(dds_reader), dds_topic(dds_topic), dds_topic_cft(dds_topic_cft), + cft_expression(cft_expression), type_support(type_support), created_topic(created_topic), status_condition(dds_reader, ignore_local, internal) @@ -1236,7 +1246,7 @@ RMW_Connext_Subscriber::create( } auto scope_exit_topic_delete = rcpputils::make_scope_exit( - [ctx, topic_created, dp, topic, cft_topic]() + [ctx, &topic_created, dp, &topic, &cft_topic]() { if (nullptr != cft_topic) { if (RMW_RET_OK != @@ -1256,19 +1266,40 @@ RMW_Connext_Subscriber::create( }); DDS_TopicDescription * sub_topic = DDS_Topic_as_topicdescription(topic); + std::string sub_cft_name; + const char * sub_cft_expr = ""; + const rcutils_string_array_t * sub_cft_params = nullptr; if (nullptr != cft_name) { - rmw_ret_t cft_rc = - rmw_connextdds_create_contentfilteredtopic( - ctx, dp, topic, cft_name, cft_filter, &cft_topic); + sub_cft_name = cft_name; + sub_cft_expr = cft_filter; + } else { + sub_cft_name = + fqtopic_name + ROS_CFT_TOPIC_NAME_INFIX + RMW_Connext_Subscriber::get_atomic_id(); + if (nullptr != subscriber_options->content_filter_options) { + sub_cft_expr = + subscriber_options->content_filter_options->filter_expression; + sub_cft_params = + &subscriber_options->content_filter_options->expression_parameters; + } + } - if (RMW_RET_OK != cft_rc) { - if (RMW_RET_UNSUPPORTED != cft_rc) { - return nullptr; - } - } else { - sub_topic = cft_topic; + rmw_ret_t cft_rc = + rmw_connextdds_create_contentfilteredtopic( + ctx, + dp, + topic, + sub_cft_name.c_str(), + sub_cft_expr, + sub_cft_params, + &cft_topic); + + if (RMW_RET_OK != cft_rc) { + if (RMW_RET_UNSUPPORTED != cft_rc) { + return nullptr; } + } else { + sub_topic = cft_topic; } // The following initialization generates warnings when built @@ -1337,15 +1368,16 @@ RMW_Connext_Subscriber::create( subscriber_options->ignore_local_publications, topic_created, cft_topic, + sub_cft_expr, internal); if (nullptr == rmw_sub_impl) { RMW_CONNEXT_LOG_ERROR_SET("failed to allocate RMW subscriber") return nullptr; } - scope_exit_type_unregister.cancel(); - scope_exit_topic_delete.cancel(); scope_exit_dds_reader_delete.cancel(); + scope_exit_topic_delete.cancel(); + scope_exit_type_unregister.cancel(); return rmw_sub_impl; } @@ -1529,6 +1561,30 @@ RMW_Connext_Subscriber::take_serialized( return rc; } + +rmw_ret_t +RMW_Connext_Subscriber::set_content_filter( + const rmw_subscription_content_filter_options_t * const options) +{ + if (RMW_RET_OK != + rmw_connextdds_set_cft_filter_expression( + this->dds_topic_cft, options->filter_expression, &options->expression_parameters)) + { + return RMW_RET_ERROR; + } + + this->cft_expression = options->filter_expression; + return RMW_RET_OK; +} + +rmw_ret_t +RMW_Connext_Subscriber::get_content_filter( + rcutils_allocator_t * allocator, + rmw_subscription_content_filter_options_t * const options) +{ + return rmw_connextdds_get_cft_filter_expression(this->dds_topic_cft, allocator, options); +} + rmw_ret_t RMW_Connext_Subscriber::loan_messages(const bool update_condition) { @@ -1634,8 +1690,8 @@ RMW_Connext_Subscriber::take_next( RMW_RET_OK == rc_exit; this->loan_next++) { - rcutils_uint8_array_t * data_buffer = - reinterpret_cast( + RMW_Connext_Message * msg = + reinterpret_cast( DDS_UntypedSampleSeq_get_reference( &this->loan_data, static_cast(this->loan_next))); DDS_SampleInfo * info = @@ -1656,7 +1712,7 @@ RMW_Connext_Subscriber::take_next( if (RMW_RET_OK != this->type_support->deserialize( - ros_message, data_buffer, deserialized_size, true /* header_only */)) + ros_message, &msg->data_buffer, deserialized_size, true /* header_only */)) { RMW_CONNEXT_LOG_ERROR_SET("failed to deserialize taken sample") rc_exit = RMW_RET_ERROR; @@ -1695,7 +1751,7 @@ RMW_Connext_Subscriber::take_next( if (RCUTILS_RET_OK != rcutils_uint8_array_copy( reinterpret_cast(ros_message), - data_buffer)) + &msg->data_buffer)) { RMW_CONNEXT_LOG_ERROR_SET("failed to copy uint8 array") rc_exit = RMW_RET_ERROR; @@ -1706,7 +1762,7 @@ RMW_Connext_Subscriber::take_next( if (RMW_RET_OK != this->type_support->deserialize( - ros_message, data_buffer, deserialized_size)) + ros_message, &msg->data_buffer, deserialized_size)) { RMW_CONNEXT_LOG_ERROR_SET( "failed to deserialize taken sample") @@ -1771,7 +1827,6 @@ rmw_connextdds_create_subscriber( "failed to allocate RMW_Connext_Subscriber") return nullptr; } - auto scope_exit_rmw_reader_impl_delete = rcpputils::make_scope_exit( [rmw_sub_impl]() @@ -1782,6 +1837,25 @@ rmw_connextdds_create_subscriber( } delete rmw_sub_impl; }); +#if RMW_CONNEXT_DEBUG && RMW_CONNEXT_DDS_API == RMW_CONNEXT_DDS_API_PRO + auto scope_exit_enable_participant_on_error = + rcpputils::make_scope_exit( + [ctx]() + { + // If we are building in Debug mode, an issue in Connext may prevent the + // participant from being able to delete any content-filtered topic if + // the participant has not been enabled. + // For this reason, make sure to enable the participant before trying to + // finalize it. + // TODO(asorbini) reconsider the need for this code in Connext > 6.1.0 + if (DDS_RETCODE_OK != + DDS_Entity_enable(DDS_DomainParticipant_as_entity(ctx->participant))) + { + RMW_CONNEXT_LOG_ERROR_SET( + "failed to enable DomainParticipant on subscriber creation error") + } + }); +#endif // RMW_CONNEXT_DEBUG && RMW_CONNEXT_DDS_API == RMW_CONNEXT_DDS_API_PRO rmw_subscription_t * rmw_subscriber = rmw_subscription_allocate(); if (nullptr == rmw_subscriber) { @@ -1815,7 +1889,7 @@ rmw_connextdds_create_subscriber( topic_name_len + 1); rmw_subscriber->options = *subscriber_options; rmw_subscriber->can_loan_messages = false; - rmw_subscriber->is_cft_enabled = false; + rmw_subscriber->is_cft_enabled = rmw_sub_impl->is_cft_enabled(); if (!internal) { if (RMW_RET_OK != rmw_sub_impl->enable()) { @@ -1832,6 +1906,9 @@ rmw_connextdds_create_subscriber( } } +#if RMW_CONNEXT_DEBUG && RMW_CONNEXT_DDS_API == RMW_CONNEXT_DDS_API_PRO + scope_exit_enable_participant_on_error.cancel(); +#endif // RMW_CONNEXT_DEBUG && RMW_CONNEXT_DDS_API == RMW_CONNEXT_DDS_API_PRO scope_exit_rmw_reader_impl_delete.cancel(); scope_exit_rmw_reader_delete.cancel(); return rmw_subscriber; diff --git a/rmw_connextdds_common/src/common/rmw_subscription.cpp b/rmw_connextdds_common/src/common/rmw_subscription.cpp index 97f6a779..698ccb3e 100644 --- a/rmw_connextdds_common/src/common/rmw_subscription.cpp +++ b/rmw_connextdds_common/src/common/rmw_subscription.cpp @@ -166,6 +166,54 @@ rmw_api_connextdds_subscription_get_actual_qos( return sub_impl->qos(qos); } +rmw_ret_t +rmw_api_connextdds_subscription_set_content_filter( + rmw_subscription_t * subscription, + const rmw_subscription_content_filter_options_t * options) +{ + RMW_CHECK_ARGUMENT_FOR_NULL(subscription, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + subscription, + subscription->implementation_identifier, + RMW_CONNEXTDDS_ID, + return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + RMW_CHECK_ARGUMENT_FOR_NULL(options, RMW_RET_INVALID_ARGUMENT); + + RMW_Connext_Subscriber * const sub_impl = + reinterpret_cast(subscription->data); + + rmw_ret_t rc = sub_impl->set_content_filter(options); + subscription->is_cft_enabled = sub_impl->is_cft_enabled(); + + return rc; +} + + +rmw_ret_t +rmw_api_connextdds_subscription_get_content_filter( + const rmw_subscription_t * subscription, + rcutils_allocator_t * const allocator, + rmw_subscription_content_filter_options_t * options) +{ + RMW_CHECK_ARGUMENT_FOR_NULL(subscription, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + subscription, + subscription->implementation_identifier, + RMW_CONNEXTDDS_ID, + return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + RMW_CHECK_ARGUMENT_FOR_NULL(allocator, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(options, RMW_RET_INVALID_ARGUMENT); + + RMW_Connext_Subscriber * const sub_impl = + reinterpret_cast(subscription->data); + + if (!sub_impl->is_cft_enabled()) { + RMW_CONNEXT_LOG_ERROR_SET("no content-filter associated with subscription") + return RMW_RET_ERROR; + } + + return sub_impl->get_content_filter(allocator, options); +} rmw_ret_t rmw_api_connextdds_destroy_subscription( diff --git a/rmw_connextdds_common/src/common/rmw_type_support.cpp b/rmw_connextdds_common/src/common/rmw_type_support.cpp index 9209dc68..8055398b 100644 --- a/rmw_connextdds_common/src/common/rmw_type_support.cpp +++ b/rmw_connextdds_common/src/common/rmw_type_support.cpp @@ -813,3 +813,36 @@ RMW_Connext_ServiceTypeSupportWrapper::get_response_type_name( return rmw_connextdds_create_type_name_response(svc_callbacks); } + + +rmw_ret_t +RMW_Connext_Message_initialize( + RMW_Connext_Message * const self, + RMW_Connext_MessageTypeSupport * const type_support, + const size_t data_buffer_size) +{ + const rcutils_allocator_t allocator = rcutils_get_default_allocator(); + + self->user_data = nullptr; + self->serialized = false; + self->type_support = type_support; + + if (RCUTILS_RET_OK != + rcutils_uint8_array_init(&self->data_buffer, data_buffer_size, &allocator)) + { + RMW_CONNEXT_LOG_ERROR_A_SET( + "failed to initialize message buffer: size=%lu", + data_buffer_size) + return RMW_RET_ERROR; + } + + return RMW_RET_OK; +} + +void +RMW_Connext_Message_finalize(RMW_Connext_Message * const self) +{ + if (RCUTILS_RET_OK != rcutils_uint8_array_fini(&self->data_buffer)) { + RMW_CONNEXT_LOG_ERROR_SET("failed to finalize uint8 array") + } +} diff --git a/rmw_connextdds_common/src/ndds/custom_sql_filter.cpp b/rmw_connextdds_common/src/ndds/custom_sql_filter.cpp new file mode 100644 index 00000000..3050d44c --- /dev/null +++ b/rmw_connextdds_common/src/ndds/custom_sql_filter.cpp @@ -0,0 +1,801 @@ +// Copyright 2021 Real-Time Innovations, Inc. (RTI) +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#include +#include +#include +#include + +#include "rcpputils/scope_exit.hpp" + +#include "rmw_connextdds/custom_sql_filter.hpp" +#include "rmw_connextdds/type_support.hpp" + +#if RMW_CONNEXT_DDS_API == RMW_CONNEXT_DDS_API_PRO + +struct RTI_CustomSqlFilterProgram +{ + void * base{nullptr}; + std::string expression; + std::string type_name; + DDS_StringSeq expression_params DDS_SEQUENCE_INITIALIZER; + ~RTI_CustomSqlFilterProgram() + { + DDS_StringSeq_finalize(&this->expression_params); + } +}; + +struct RTI_CustomSqlFilterWriterData +{ + void * base{nullptr}; + size_t filtered_readers_count{0}; + size_t unfiltered_readers_count{0}; + std::vector unfiltered_readers_buffer; + std::vector matched_readers_buffer; + DDS_CookieSeq matched_readers DDS_SEQUENCE_INITIALIZER; + REDASkiplistDescription readers_desc; + REDASkiplist readers; + std::vector serialize_buffer; +}; + +struct RTI_CustomSqlFilterReaderData +{ + DDS_Cookie_t cookie; + REDAWeakReference wr; + std::string expression; + + explicit RTI_CustomSqlFilterReaderData(REDAWeakReference * const wr) + : cookie(DDS_COOKIE_DEFAULT), + wr(*wr) + { + DDS_Cookie_t_initialize(&this->cookie); + DDS_OctetSeq_loan_contiguous( + &this->cookie.value, + reinterpret_cast(&this->wr), + sizeof(struct REDAWeakReference), + sizeof(struct REDAWeakReference)); + } +}; + +using RTI_CustomSqlFilterData = rti_connext_dds_custom_sql_filter::CustomSqlFilterData; + +RTI_CustomSqlFilterData::CustomSqlFilterData() + : base(DDS_SQLFILTER_QOS_DEFAULT) +{ +} + +DDS_ReturnCode_t +RTI_CustomSqlFilterData::set_memory_management_property( + const DDS_DomainParticipantQos & dp_qos) +{ + static const DDS_SqlFilterMemoryManagementQos DEFAULT = + DDS_SqlFilterMemoryManagementQos_INITIALIZER; + this->base.memory_management = DEFAULT; + + auto properties = const_cast(&dp_qos.property); + + const DDS_Property_t * property = DDS_PropertyQosPolicyHelper_lookup_property( + properties, + DDS_CONTENT_FILTER_SQL_DESERIALIZED_SAMPLE_MIN_BUFFER_SIZE_PROPERTY_NAME); + + if (nullptr != property) { + this->base.memory_management.buffer_min_size = strtol(property->value, nullptr, 0); + } + + property = DDS_PropertyQosPolicyHelper_lookup_property( + properties, + DDS_CONTENT_FILTER_SQL_DESERIALIZED_SAMPLE_TRIM_TO_SIZE_PROPERTY_NAME); + + if (nullptr != property) { + if (!REDAString_iCompare(property->value, "1") || + !REDAString_iCompare(property->value, "true") || + !REDAString_iCompare(property->value, "yes")) + { + this->base.memory_management.trim_buffer = DDS_BOOLEAN_TRUE; + } + } + + return DDS_RETCODE_OK; +} + +static +DDS_ReturnCode_t +RTI_CustomSqlFilter_compile( + void * filter_data, + void ** newHandle, + const char * expression, + const struct DDS_StringSeq * param_seq, + const struct DDS_TypeCode * typeCode, + const char * typeClassName, + void * oldhandle) +{ + *newHandle = nullptr; + + RTI_CustomSqlFilterData * const cft_data = + static_cast(filter_data); + + RTI_CustomSqlFilterProgram * program = + new (std::nothrow) RTI_CustomSqlFilterProgram(); + if (nullptr == program) { + // TODO(asorbini) log error + return DDS_RETCODE_ERROR; + } + + auto scope_exit_program = rcpputils::make_scope_exit( + [program]() + { + delete program; + }); + + RTI_CustomSqlFilterProgram * old_program = + static_cast(oldhandle); + void * const old_program_base = + (nullptr != old_program) ? old_program->base : nullptr; + + program->type_name = typeClassName; + program->expression = expression; + if (!DDS_StringSeq_copy(&program->expression_params, param_seq)) { + // TODO(asorbini) log error + return DDS_RETCODE_ERROR; + } + + if (!program->expression.empty()) { + const DDS_ReturnCode_t retcode = + DDS_SqlFilter_compile( + &cft_data->base, + &program->base, + expression, + param_seq, + typeCode, + typeClassName, + old_program_base); + if (DDS_RETCODE_OK != retcode) { + // TODO(asorbini) log error + return retcode; + } + } + + // Delete old_program if we didn't actually compile anything. + // In the other case, Connext doesn't delete the old program to + // avoid a possible race condition, so we do the same here and + // leave it alone. TODO(asorbini) This is likely a memory leak. + if (nullptr != old_program && nullptr == old_program->base) { + delete old_program; + } + + *newHandle = program; + + scope_exit_program.cancel(); + return DDS_RETCODE_OK; +} + +#if !RMW_CONNEXT_BUILTIN_CFT_COMPATIBILITY_MODE +static +int +RTI_CustomSqlFilter_compare_reader_data( + const void * left, + const void * right) +{ + RTI_CustomSqlFilterReaderData * const l = + const_cast( + static_cast(left)); + RTI_CustomSqlFilterReaderData * const r = + const_cast( + static_cast(right)); + + return RTIOsapiMemory_compare( + DDS_OctetSeq_get_contiguous_buffer(&l->cookie.value), + DDS_OctetSeq_get_contiguous_buffer(&r->cookie.value), + sizeof(struct REDAWeakReference)); +} + +static +DDS_ReturnCode_t +RTI_CustomSqlFilter_writer_attach( + void * filter_data, + void ** writer_filter_data, + void * reserved) +{ + RTI_CustomSqlFilterData * const cft_data = + static_cast(filter_data); + RTI_CustomSqlFilterWriterData * writer_data = + new (std::nothrow) RTI_CustomSqlFilterWriterData(); + if (nullptr == writer_data) { + // TODO(asorbini) log error + return DDS_RETCODE_ERROR; + } + auto scope_exit_data = rcpputils::make_scope_exit( + [writer_data]() + { + delete writer_data; + }); + + if (!REDASkiplist_newDefaultAllocator( + &writer_data->readers_desc, + REDASkiplist_getOptimumMaximumLevel(REDA_FAST_BUFFER_POOL_UNLIMITED), + 1)) + { + // TODO(asorbini) log error + return DDS_RETCODE_ERROR; + } + + REDASkiplist_init( + &writer_data->readers, + &writer_data->readers_desc, + RTI_CustomSqlFilter_compare_reader_data, + nullptr, + 0, + 0); + + DDS_ReturnCode_t rc = + DDS_SqlFilter_writerAttach(&cft_data->base, &writer_data->base, reserved); + if (DDS_RETCODE_OK != rc) { + // TODO(asorbini) log error + return rc; + } + *writer_filter_data = writer_data; + scope_exit_data.cancel(); + return DDS_RETCODE_OK; +} + +void +RTI_CustomSqlFilter_writer_detach( + void * filter_data, + void * writer_filter_data) +{ + RTI_CustomSqlFilterData * const cft_data = + static_cast(filter_data); + RTI_CustomSqlFilterWriterData * const writer_data = + static_cast(writer_filter_data); + + DDS_SqlFilter_writerDetach(&cft_data->base, writer_data->base); + + REDASkiplistNode * node = nullptr; + REDASkiplist_gotoTopNode(&writer_data->readers, &node); + while (REDASkiplist_gotoNextNode(&writer_data->readers, &node)) { + RTI_CustomSqlFilterReaderData * const rdata = + static_cast(REDASkiplistNode_getUserData(node)); + delete rdata; + } + + REDASkiplist_deleteDefaultAllocator(&writer_data->readers_desc); + delete writer_data; +} + +static +void +RTI_CustomSqlFilter_update_unfiltered_list( + RTI_CustomSqlFilterWriterData * const writer_data) +{ + writer_data->unfiltered_readers_buffer.clear(); + writer_data->unfiltered_readers_buffer.reserve(writer_data->unfiltered_readers_count); + + REDASkiplistNode * node = nullptr; + REDASkiplist_gotoTopNode(&writer_data->readers, &node); + while (REDASkiplist_gotoNextNode(&writer_data->readers, &node)) { + RTI_CustomSqlFilterReaderData * const rdata = + static_cast(REDASkiplistNode_getUserData(node)); + if (rdata->expression.empty()) { + writer_data->unfiltered_readers_buffer.push_back(&rdata->cookie); + } + } +} + +static +DDS_ReturnCode_t +RTI_CustomSqlFilter_writer_compile( + void * filter_data, + void * writer_filter_data, + struct DDS_ExpressionProperty * prop, + const char * expression, + const struct DDS_StringSeq * parameters, + const struct DDS_TypeCode * type_code, + const char * type_class_name, + const struct DDS_Cookie_t * cookie) +{ + const bool empty_expr = expression[0] == '\0'; + + RTI_CustomSqlFilterData * const cft_data = + static_cast(filter_data); + RTI_CustomSqlFilterWriterData * const writer_data = + static_cast(writer_filter_data); + + bool regen_unfiltered = false; + bool new_reader = false; + + RTI_CustomSqlFilterReaderData * reader_data = nullptr; + bool reader_data_added = false; + bool reader_data_compiled = false; + auto scope_exit_reader_data = rcpputils::make_scope_exit( + [cft_data, writer_data, cookie, + &reader_data_added, &reader_data_compiled, &reader_data]() + { + if (reader_data_compiled) { + DDS_SqlFilter_writerFinalize( + &cft_data->base, writer_data->base, cookie); + } + if (reader_data_added) { + REDASkiplist_removeNodeEA(&writer_data->readers, reader_data); + } + if (nullptr != reader_data) { + delete reader_data; + } + }); + REDAWeakReference cookie_wr; + RTIOsapiMemory_copy( + &cookie_wr, + DDS_OctetSeq_get_contiguous_buffer(&cookie->value), + sizeof(struct REDAWeakReference)); + RTI_CustomSqlFilterReaderData lookup_data(&cookie_wr); + + REDASkiplistNode * node = nullptr; + RTIBool precise_match = RTI_FALSE; + REDASkiplist_findNode( + &writer_data->readers, + const_cast(&node), + &precise_match, + static_cast(&lookup_data)); + if (!precise_match) { + node = nullptr; + } + if (nullptr != node) { + reader_data = + static_cast(REDASkiplistNode_getUserData(node)); + reader_data_added = true; + + if (!reader_data->expression.empty() && empty_expr) { + DDS_SqlFilter_writerFinalize( + &cft_data->base, writer_data->base, cookie); + writer_data->unfiltered_readers_count += 1; + writer_data->filtered_readers_count -= 1; + } else if (reader_data->expression.empty() && !empty_expr) { + // Regenerate the list of unfiltered readers + writer_data->unfiltered_readers_count -= 1; + writer_data->filtered_readers_count += 1; + regen_unfiltered = true; + } + + } else { + new_reader = true; + reader_data = new (std::nothrow) RTI_CustomSqlFilterReaderData(&cookie_wr); + if (nullptr == reader_data) { + // TODO(asorbini) log error + return DDS_RETCODE_ERROR; + } + + if (REDASkiplist_assertNodeEA( + &writer_data->readers, + nullptr, + reader_data, + 0, + 0) == nullptr) + { + // TODO(asorbini) log error + return DDS_RETCODE_ERROR; + } + reader_data_added = true; + + if (empty_expr) { + writer_data->unfiltered_readers_count += 1; + } else { + writer_data->filtered_readers_count += 1; + } + } + + reader_data->expression = expression; + + if (empty_expr) { + prop->key_only_filter = DDS_BOOLEAN_FALSE; + prop->writer_side_filter_optimization = DDS_BOOLEAN_FALSE; + } else { + DDS_ReturnCode_t rc = DDS_SqlFilter_writerCompile( + &cft_data->base, + writer_data->base, + prop, + expression, + parameters, + type_code, + type_class_name, + cookie); + if (DDS_RETCODE_OK != rc) { + // TODO(asorbini) log error + return rc; + } + reader_data_compiled = true; + } + + const size_t readers_size = writer_data->matched_readers_buffer.size(); + if (new_reader) { + writer_data->matched_readers_buffer.reserve(readers_size + 1); + } + + if (regen_unfiltered) { + RTI_CustomSqlFilter_update_unfiltered_list(writer_data); + } else if (new_reader && empty_expr) { + writer_data->unfiltered_readers_buffer.push_back(&reader_data->cookie); + } + + scope_exit_reader_data.cancel(); + return DDS_RETCODE_OK; +} + +DDS_CookieSeq * +RTI_CustomSqlFilter_writer_evaluated_result( + RTI_CustomSqlFilterWriterData * const writer_data, + DDS_CookieSeq * const sql_matched) +{ + if (DDS_CookieSeq_get_maximum(&writer_data->matched_readers) > 0) { + DDS_CookieSeq_unloan(&writer_data->matched_readers); + } + + const size_t sql_matched_len = + (nullptr != sql_matched) ? DDS_CookieSeq_get_length(sql_matched) : 0; + const size_t unfiltered_size = writer_data->unfiltered_readers_buffer.size(); + size_t readers_size = sql_matched_len + unfiltered_size; + size_t matched_readers_start = 0; + + writer_data->matched_readers_buffer.resize(readers_size); + + if (sql_matched_len > 0) { + DDS_Cookie_t ** const sql_matched_buffer = + DDS_CookieSeq_get_discontiguous_buffer(sql_matched); + memcpy( + &(writer_data->matched_readers_buffer[matched_readers_start]), + sql_matched_buffer, + sizeof(DDS_Cookie_t *) * sql_matched_len); + matched_readers_start += sql_matched_len; + } + + if (unfiltered_size > 0) { + memcpy( + &(writer_data->matched_readers_buffer[matched_readers_start]), + &(writer_data->unfiltered_readers_buffer[0]), + sizeof(DDS_Cookie_t *) * unfiltered_size); + } + + readers_size = writer_data->matched_readers_buffer.size(); + if (readers_size > 0) { + if (!DDS_CookieSeq_loan_discontiguous( + &writer_data->matched_readers, + &(writer_data->matched_readers_buffer[0]), + readers_size, + readers_size)) + { + // TODO(asorbini) log error + return nullptr; + } + } + + return &writer_data->matched_readers; +} + +static +struct DDS_CookieSeq * +RTI_CustomSqlFilter_writer_evaluate_on_serialized( + void * filter_data, + void * writer_filter_data, + const void * sample, + const struct DDS_FilterSampleInfo * meta_data); + +DDS_CookieSeq * +RTI_CustomSqlFilter_writer_evaluate( + void * filter_data, + void * writer_filter_data, + const void * sample, + const DDS_FilterSampleInfo * meta_data) +{ + // RTI_CustomSqlFilterData * const cft_data = + // static_cast(filter_data); + RTI_CustomSqlFilterWriterData * const writer_data = + static_cast(writer_filter_data); + + if (writer_data->filtered_readers_count > 0) { + // `sample` is a ROS 2 message so we can't pass it to the SQL filter's + // evaluate() function because the filter expects a Connext memory layout. + // For this reason, we must first serialize the sample to a buffer, then + // we can then call evaluate_on_serialized(). + size_t serialized_size = 0; + const RMW_Connext_Message * const msg = + reinterpret_cast(sample); + const uint8_t * serialized_sample = nullptr; + + if (msg->serialized) { + const rcutils_uint8_array_t * const serialized_msg = + reinterpret_cast(msg->user_data); + serialized_size += static_cast(serialized_msg->buffer_length); + serialized_sample = serialized_msg->buffer; + } else { + RMW_CONNEXT_ASSERT(nullptr != msg->type_support) + serialized_size += + msg->type_support->serialized_size_max( + msg->user_data, true /* include_encapsulation */); + writer_data->serialize_buffer.resize(serialized_size); + + rcutils_uint8_array_t data_buffer; + data_buffer.allocator = rcutils_get_default_allocator(); + data_buffer.buffer = &writer_data->serialize_buffer[0]; + data_buffer.buffer_length = serialized_size; + data_buffer.buffer_capacity = serialized_size; + rmw_ret_t rc = msg->type_support->serialize(msg->user_data, &data_buffer); + if (RMW_RET_OK != rc) { + return nullptr; + } + serialized_sample = &writer_data->serialize_buffer[0]; + } + + RTICdrStream cdr_stream; + RTICdrStream_init(&cdr_stream); + RTICdrStream_set( + &cdr_stream, + // "Cast the const away", but the buffer will not be written to. + reinterpret_cast( + const_cast(serialized_sample)), + serialized_size); + RTICdrStream_setCurrentPositionOffset( + &cdr_stream, + RMW_Connext_MessageTypeSupport::ENCAPSULATION_HEADER_SIZE); + + return RTI_CustomSqlFilter_writer_evaluate_on_serialized( + filter_data, + writer_filter_data, + &cdr_stream, + meta_data); + } else { + return RTI_CustomSqlFilter_writer_evaluated_result(writer_data, nullptr); + } +} + +void +RTI_CustomSqlFilter_writer_finalize( + void * filter_data, + void * writer_filter_data, + const struct DDS_Cookie_t * cookie) +{ + RTI_CustomSqlFilterData * const cft_data = + static_cast(filter_data); + RTI_CustomSqlFilterWriterData * const writer_data = + static_cast(writer_filter_data); + + REDAWeakReference cookie_wr; + RTIOsapiMemory_copy( + &cookie_wr, + DDS_OctetSeq_get_contiguous_buffer(&cookie->value), + sizeof(struct REDAWeakReference)); + RTI_CustomSqlFilterReaderData lookup_data(&cookie_wr); + + REDASkiplistNode * node = + const_cast( + REDASkiplist_removeNodeEA(&writer_data->readers, &lookup_data)); + if (nullptr != node) { + RTI_CustomSqlFilterReaderData * reader_data = + static_cast(REDASkiplistNode_getUserData(node)); + if (reader_data->expression.empty()) { + writer_data->unfiltered_readers_count -= 1; + RTI_CustomSqlFilter_update_unfiltered_list(writer_data); + } else { + writer_data->filtered_readers_count -= 1; + DDS_SqlFilter_writerFinalize(&cft_data->base, writer_data->base, cookie); + } + delete reader_data; + REDASkiplist_deleteNode(&writer_data->readers, node); + } +} + +void +RTI_CustomSqlFilter_writer_return_loan( + void * filter_data, + void * writer_filter_data, + struct DDS_CookieSeq * cookies) +{ + RTI_CustomSqlFilterData * const cft_data = + static_cast(filter_data); + UNUSED_ARG(cft_data); + RTI_CustomSqlFilterWriterData * const writer_data = + static_cast(writer_filter_data); + UNUSED_ARG(writer_data); + DDS_CookieSeq_set_length(cookies, 0); +} + + +static +struct DDS_CookieSeq * +RTI_CustomSqlFilter_writer_evaluate_on_serialized( + void * filter_data, + void * writer_filter_data, + const void * sample, + const struct DDS_FilterSampleInfo * meta_data) +{ + RTI_CustomSqlFilterData * const cft_data = + static_cast(filter_data); + RTI_CustomSqlFilterWriterData * const writer_data = + static_cast(writer_filter_data); + + DDS_CookieSeq * sql_matched = nullptr; + + if (writer_data->filtered_readers_count > 0) { + sql_matched = DDS_SqlFilter_writerEvaluateOnSerialized( + &cft_data->base, writer_data->base, sample, meta_data); + if (nullptr == sql_matched) { + // TODO(asorbini) log error + return nullptr; + } + } + return RTI_CustomSqlFilter_writer_evaluated_result(writer_data, sql_matched); +} + +DDS_Boolean +RTI_CustomSqlFilter_evaluate_on_serialized( + void * filter_data, + void * handle, + const void * sample, + const struct DDS_FilterSampleInfo * meta_data) +{ + RTI_CustomSqlFilterData * const cft_data = + static_cast(filter_data); + RTI_CustomSqlFilterProgram * const program = + static_cast(handle); + DDS_Boolean accepted = DDS_BOOLEAN_TRUE; + if (nullptr != program->base) { + accepted = DDS_SqlFilter_evaluateOnSerialized( + &cft_data->base, program->base, sample, meta_data); + } + return accepted; +} + +DDS_Long +RTI_CustomSqlFilter_query(void * filter_data, void * handle) +{ + RTI_CustomSqlFilterData * const cft_data = + static_cast(filter_data); + (void)cft_data; + + RTI_CustomSqlFilterProgram * const program = + static_cast(handle); + + if (nullptr == program->base) { + return 0; + } + + return DDS_SqlFilter_query(filter_data, program->base); +} + +#endif // !RMW_CONNEXT_BUILTIN_CFT_COMPATIBILITY_MODE + +// This function is only called when RMW_CONNEXT_BUILTIN_CFT_COMPATIBILITY_MODE +// is enabled. In that flag is set, the filter will be registered as a user +// filter instead of built-in one, so it won't be able to do filtering on +// serialized samples. When this function is used, we don't perform +// writer-side optimizations and we rely only on reader-side filtering to +// avoid having to serialize the sample unnecessarily. +DDS_Boolean +RTI_CustomSqlFilter_evaluate( + void * filter_data, + void * handle, + const void * sample, + const struct DDS_FilterSampleInfo * meta_data) +{ + RTI_CustomSqlFilterData * const cft_data = + static_cast(filter_data); + RTI_CustomSqlFilterProgram * const program = + static_cast(handle); + + if (nullptr == program->base) { + return DDS_BOOLEAN_TRUE; + } + + const RMW_Connext_Message * const msg = + static_cast(sample); + + if (nullptr != msg->user_data) { + // On the writer side we don't actually perform any filtering, because that + // could require to perform a (possibly useless) additional serialization + // into a buffer, so we always return true. + return DDS_BOOLEAN_TRUE; + } + + RTICdrStream cdr_stream; + RTICdrStream_init(&cdr_stream); + RTICdrStream_set( + &cdr_stream, + reinterpret_cast(msg->data_buffer.buffer), + msg->data_buffer.buffer_length); + RTICdrStream_setCurrentPositionOffset( + &cdr_stream, + RMW_Connext_MessageTypeSupport::ENCAPSULATION_HEADER_SIZE); + + return DDS_SqlFilter_evaluateOnSerialized( + &cft_data->base, program->base, &cdr_stream, meta_data); +} + +void +RTI_CustomSqlFilter_finalize(void * filter_data, void * handle) +{ + RTI_CustomSqlFilterData * const cft_data = + static_cast(filter_data); + RTI_CustomSqlFilterProgram * const program = + static_cast(handle); + + if (nullptr != program->base) { + DDS_SqlFilter_finalize(&cft_data->base, program->base); + } + + delete program; +} + + +DDS_ReturnCode_t +rti_connext_dds_custom_sql_filter::register_content_filter( + DDS_DomainParticipant * const participant, + rti_connext_dds_custom_sql_filter::CustomSqlFilterData * const filter_data) +{ + DDS_DomainParticipantQos dp_qos = DDS_DomainParticipantQos_INITIALIZER; + auto scope_exit_qos = rcpputils::make_scope_exit( + [&dp_qos]() + { + if (DDS_RETCODE_OK != DDS_DomainParticipantQos_finalize(&dp_qos)) { + // TODO(asorbini) log error + } + }); + + + if (DDS_RETCODE_OK != DDS_DomainParticipant_get_qos(participant, &dp_qos)) { + // TODO(asorbini) log error + return DDS_RETCODE_ERROR; + } + + DDS_ReturnCode_t rc = filter_data->set_memory_management_property(dp_qos); + if (DDS_RETCODE_OK != rc) { + // TODO(asorbini) log error + return rc; + } + + DDS_ContentFilter filter = DDS_ContentFilter_INITIALIZER; + filter.compile = RTI_CustomSqlFilter_compile; + filter.evaluate = RTI_CustomSqlFilter_evaluate; + filter.finalize = RTI_CustomSqlFilter_finalize; + filter.filter_data = filter_data; + +#if RMW_CONNEXT_BUILTIN_CFT_COMPATIBILITY_MODE + rc = DDS_DomainParticipant_register_contentfilter( + participant, + PLUGIN_NAME, + &filter); +#else + filter.writer_attach = RTI_CustomSqlFilter_writer_attach; + filter.writer_compile = RTI_CustomSqlFilter_writer_compile; + filter.writer_detach = RTI_CustomSqlFilter_writer_detach; + filter.writer_evaluate = RTI_CustomSqlFilter_writer_evaluate; + filter.writer_finalize = RTI_CustomSqlFilter_writer_finalize; + filter.writer_return_loan = RTI_CustomSqlFilter_writer_return_loan; + + rc = DDS_ContentFilter_register_filter( + participant, + PLUGIN_NAME, + &filter, + RTI_CustomSqlFilter_evaluate_on_serialized, + RTI_CustomSqlFilter_writer_evaluate_on_serialized, + RTI_CustomSqlFilter_query, + DDS_BOOLEAN_TRUE); +#endif // RMW_CONNEXT_BUILTIN_CFT_COMPATIBILITY_MODE + if (DDS_RETCODE_OK != rc) { + // TODO(asorbini) log error + return DDS_RETCODE_ERROR; + } + + return DDS_RETCODE_OK; +} + +const char * const +rti_connext_dds_custom_sql_filter::PLUGIN_NAME = "RTI_CONNEXTDDS_CUSTOM_SQL_FILTER"; + +#endif // RMW_CONNEXT_DDS_API == RMW_CONNEXT_DDS_API_PRO diff --git a/rmw_connextdds_common/src/ndds/dds_api_ndds.cpp b/rmw_connextdds_common/src/ndds/dds_api_ndds.cpp index 546daea1..8cc2c123 100644 --- a/rmw_connextdds_common/src/ndds/dds_api_ndds.cpp +++ b/rmw_connextdds_common/src/ndds/dds_api_ndds.cpp @@ -17,6 +17,7 @@ #include #include "rmw/impl/cpp/key_value.hpp" +#include "rmw_connextdds/custom_sql_filter.hpp" #include "rmw_connextdds/type_support.hpp" #include "rmw_connextdds/rmw_impl.hpp" @@ -25,6 +26,13 @@ const char * const RMW_CONNEXTDDS_ID = "rmw_connextdds"; const char * const RMW_CONNEXTDDS_SERIALIZATION_FORMAT = "cdr"; +struct rmw_connextdds_api_pro +{ + rti_connext_dds_custom_sql_filter::CustomSqlFilterData custom_filter_data; +}; + +rmw_connextdds_api_pro * RMW_Connext_fv_FactoryContext = nullptr; + rmw_ret_t rmw_connextdds_set_log_verbosity(rmw_log_severity_t severity) { @@ -69,9 +77,28 @@ rmw_ret_t rmw_connextdds_initialize_participant_factory_context( rmw_context_impl_t * const ctx) { + RMW_CONNEXT_ASSERT(RMW_Connext_fv_FactoryContext == nullptr) + // RMW_Connext_gv_DomainParticipantFactory is initialized by + // rmw_api_connextdds_init(). RMW_CONNEXT_ASSERT(RMW_Connext_gv_DomainParticipantFactory == nullptr) UNUSED_ARG(ctx); - // Nothing to do + + rmw_connextdds_api_pro * ctx_api = nullptr; + auto scope_exit_api_delete = rcpputils::make_scope_exit( + [ctx_api]() + { + if (nullptr != ctx_api) { + delete ctx_api; + } + }); + + ctx_api = new (std::nothrow) rmw_connextdds_api_pro(); + if (nullptr == ctx_api) { + return RMW_RET_ERROR; + } + + scope_exit_api_delete.cancel(); + RMW_Connext_fv_FactoryContext = ctx_api; return RMW_RET_OK; } @@ -79,6 +106,71 @@ rmw_ret_t rmw_connextdds_finalize_participant_factory_context( rmw_context_impl_t * const ctx) { + RMW_CONNEXT_ASSERT(nullptr != RMW_Connext_fv_FactoryContext) + rmw_connextdds_api_pro * const ctx_api = RMW_Connext_fv_FactoryContext; + RMW_Connext_fv_FactoryContext = nullptr; + + delete ctx_api; + + if (nullptr == RMW_Connext_gv_DomainParticipantFactory) { + // Nothing else to do if we didn't even initialize the factory; + return RMW_RET_OK; + } + + // There might be some DomainParticipants left-over from a ("failed context + // initialization" + "failed participant finalization"), so let's try to + // clean them up. + DDS_DomainParticipantSeq participants = DDS_SEQUENCE_INITIALIZER; + auto scope_exit_seq = rcpputils::make_scope_exit( + [&participants]() + { + DDS_DomainParticipantSeq_finalize(&participants); + }); + + if (DDS_RETCODE_OK != + DDS_DomainParticipantFactory_get_participants( + RMW_Connext_gv_DomainParticipantFactory, &participants)) + { + RMW_CONNEXT_LOG_ERROR_SET("failed to list existing participants") + return RMW_RET_ERROR; + } + + const DDS_Long pending = DDS_DomainParticipantSeq_get_length(&participants); + for (DDS_Long i = 0; i < pending; i++) { + DDS_DomainParticipant * const participant = + *DDS_DomainParticipantSeq_get_reference(&participants, i); +#if RMW_CONNEXT_DEBUG + // If we are building in Debug mode, an issue in Connext may prevent the + // participant from being able to delete any content-filtered topic if + // the participant has not been enabled. + // For this reason, make sure to enable the participant before trying to + // finalize it. + // TODO(asorbini) reconsider the need for this code in Connext > 6.1.0 + if (DDS_RETCODE_OK != + DDS_Entity_enable(DDS_DomainParticipant_as_entity(participant))) + { + RMW_CONNEXT_LOG_ERROR_SET( + "failed to enable pending DomainParticipant before deletion") + return RMW_RET_ERROR; + } +#endif // RMW_CONNEXT_DEBUG + if (DDS_RETCODE_OK != + DDS_DomainParticipant_delete_contained_entities(participant)) + { + RMW_CONNEXT_LOG_ERROR_SET( + "failed to delete pending DomainParticipant's entities") + return RMW_RET_ERROR; + } + + if (DDS_RETCODE_OK != + DDS_DomainParticipantFactory_delete_participant( + RMW_Connext_gv_DomainParticipantFactory, participant)) + { + RMW_CONNEXT_LOG_ERROR_SET("failed to delete pending DomainParticipant") + return RMW_RET_ERROR; + } + } + UNUSED_ARG(ctx); return RMW_RET_OK; } @@ -229,6 +321,48 @@ rmw_connextdds_initialize_participant_qos_impl( return RMW_RET_OK; } +rmw_ret_t +rmw_connextdds_configure_participant( + rmw_context_impl_t * const ctx, + DDS_DomainParticipant * const participant) +{ + UNUSED_ARG(ctx); + + if (DDS_RETCODE_OK != + rti_connext_dds_custom_sql_filter::register_content_filter( + participant, &RMW_Connext_fv_FactoryContext->custom_filter_data)) + { + RMW_CONNEXT_LOG_ERROR_SET("failed to register custom SQL filter") + return RMW_RET_ERROR; + } + + return RMW_RET_OK; +} + +static +rmw_ret_t +rmw_connextdds_initialize_cft_parameters( + struct DDS_StringSeq * const cft_parameters, + const rcutils_string_array_t * const cft_expression_parameters) +{ + // Cache value locally to avoid conversion warnings on Windows. + const DDS_Long params_len = static_cast(cft_expression_parameters->size); + + if (!DDS_StringSeq_ensure_length(cft_parameters, params_len, params_len)) { + RMW_CONNEXT_LOG_ERROR_SET("failed to ensure length for cft parameters sequence") + return RMW_RET_ERROR; + } + if (!DDS_StringSeq_from_array( + cft_parameters, + const_cast(cft_expression_parameters->data), + params_len)) + { + RMW_CONNEXT_LOG_ERROR_SET("failed to copy data for cft parameters sequence") + return RMW_RET_ERROR; + } + return RMW_RET_OK; +} + rmw_ret_t rmw_connextdds_create_contentfilteredtopic( rmw_context_impl_t * const ctx, @@ -236,6 +370,7 @@ rmw_connextdds_create_contentfilteredtopic( DDS_Topic * const base_topic, const char * const cft_name, const char * const cft_filter, + const rcutils_string_array_t * const cft_expression_parameters, DDS_TopicDescription ** const cft_out) { UNUSED_ARG(ctx); @@ -243,13 +378,28 @@ rmw_connextdds_create_contentfilteredtopic( RMW_CONNEXT_ASSERT(nullptr != cft_filter) struct DDS_StringSeq cft_parameters = DDS_SEQUENCE_INITIALIZER; - DDS_StringSeq_ensure_length(&cft_parameters, 0, 0); + auto scope_exit_cft_params = rcpputils::make_scope_exit( + [&cft_parameters]() { + if (!DDS_StringSeq_finalize(&cft_parameters)) { + RMW_CONNEXT_LOG_ERROR_SET("failed to finalize cft parameters sequence") + } + }); + if (cft_expression_parameters) { + if (RMW_RET_OK != + rmw_connextdds_initialize_cft_parameters(&cft_parameters, cft_expression_parameters)) + { + RMW_CONNEXT_LOG_ERROR_SET("failed to rmw_connextdds_initialize_cft_parameters") + return RMW_RET_ERROR; + } + } *cft_out = nullptr; DDS_ContentFilteredTopic * cft_topic = - DDS_DomainParticipant_create_contentfilteredtopic( - dp, cft_name, base_topic, cft_filter, &cft_parameters); + DDS_DomainParticipant_create_contentfilteredtopic_with_filter( + dp, cft_name, base_topic, cft_filter, &cft_parameters, + rti_connext_dds_custom_sql_filter::PLUGIN_NAME); + if (nullptr == cft_topic) { RMW_CONNEXT_LOG_ERROR_A_SET( "failed to create content-filtered topic: " @@ -570,7 +720,6 @@ rmw_connextdds_write_message( // enable WriteParams::replace_auto to retrieve SN of published message write_params.replace_auto = DDS_BOOLEAN_TRUE; } - if (DDS_RETCODE_OK != DDS_DataWriter_write_w_params_untypedI( pub->writer(), message, &write_params)) @@ -640,9 +789,9 @@ rmw_connextdds_take_samples( RMW_CONNEXT_LOG_ERROR_SET("failed to take data from DDS reader") return RMW_RET_ERROR; } - RMW_CONNEXT_ASSERT(data_len > 0)(void) RMW_Connext_Uint8ArrayPtrSeq_loan_contiguous( + RMW_CONNEXT_ASSERT(data_len > 0)(void) RMW_Connext_MessagePtrSeq_loan_contiguous( sub->data_seq(), - reinterpret_cast(data_buffer), + reinterpret_cast(data_buffer), data_len, data_len); @@ -654,11 +803,11 @@ rmw_connextdds_return_samples( RMW_Connext_Subscriber * const sub) { void ** data_buffer = reinterpret_cast( - RMW_Connext_Uint8ArrayPtrSeq_get_contiguous_buffer(sub->data_seq())); + RMW_Connext_MessagePtrSeq_get_contiguous_buffer(sub->data_seq())); const DDS_Long data_len = - RMW_Connext_Uint8ArrayPtrSeq_get_length(sub->data_seq()); + RMW_Connext_MessagePtrSeq_get_length(sub->data_seq()); - if (!RMW_Connext_Uint8ArrayPtrSeq_unloan(sub->data_seq())) { + if (!RMW_Connext_MessagePtrSeq_unloan(sub->data_seq())) { RMW_CONNEXT_LOG_ERROR_SET("failed to unloan sample sequence") return RMW_RET_ERROR; } @@ -1243,3 +1392,98 @@ rmw_connextdds_enable_security( return RMW_RET_OK; } + + +rmw_ret_t +rmw_connextdds_set_cft_filter_expression( + DDS_TopicDescription * const topic_desc, + const char * const cft_expression, + const rcutils_string_array_t * const cft_expression_parameters) +{ + DDS_ContentFilteredTopic * const cft_topic = + DDS_ContentFilteredTopic_narrow(topic_desc); + + struct DDS_StringSeq cft_parameters = DDS_SEQUENCE_INITIALIZER; + auto scope_exit_cft_parameters = rcpputils::make_scope_exit( + [&cft_parameters]() { + if (!DDS_StringSeq_finalize(&cft_parameters)) { + RMW_CONNEXT_LOG_ERROR_SET("failed to finalize cft parameters sequence") + } + }); + if (nullptr != cft_expression_parameters) { + if (RMW_RET_OK != + rmw_connextdds_initialize_cft_parameters(&cft_parameters, cft_expression_parameters)) + { + RMW_CONNEXT_LOG_ERROR_SET("failed to rmw_connextdds_initialize_cft_parameters") + return RMW_RET_ERROR; + } + } + + DDS_ReturnCode_t ret = + DDS_ContentFilteredTopic_set_expression(cft_topic, cft_expression, &cft_parameters); + if (DDS_RETCODE_OK != ret) { + RMW_CONNEXT_LOG_ERROR_SET("failed to set content-filtered topic") + return RMW_RET_ERROR; + } + return RMW_RET_OK; +} + +rmw_ret_t +rmw_connextdds_get_cft_filter_expression( + DDS_TopicDescription * const topic_desc, + rcutils_allocator_t * const allocator, + rmw_subscription_content_filter_options_t * const options) +{ + DDS_ContentFilteredTopic * const cft_topic = + DDS_ContentFilteredTopic_narrow(topic_desc); + + // get filter_expression + const char * filter_expression = DDS_ContentFilteredTopic_get_filter_expression(cft_topic); + if (!filter_expression) { + RMW_CONNEXT_LOG_ERROR_SET("failed to get filter expression") + return RMW_RET_ERROR; + } + + // get parameters + struct DDS_StringSeq parameters = DDS_SEQUENCE_INITIALIZER; + DDS_ReturnCode_t status = + DDS_ContentFilteredTopic_get_expression_parameters(cft_topic, ¶meters); + if (DDS_RETCODE_OK != status) { + RMW_CONNEXT_LOG_ERROR_SET("failed to get expression parameters") + return RMW_RET_ERROR; + } + auto scope_exit_parameters_delete = + rcpputils::make_scope_exit( + [¶meters]() + { + DDS_StringSeq_finalize(¶meters); + }); + + const DDS_Long parameters_len = DDS_StringSeq_get_length(¶meters); + std::vector expression_parameters; + expression_parameters.reserve(parameters_len); + + for (DDS_Long i = 0; i < parameters_len; ++i) { + const char * ref = *DDS_StringSeq_get_reference(¶meters, i); + if (!ref) { + RMW_CONNEXT_LOG_ERROR_A_SET( + "failed to get a reference for parameters with index %d", i) + return RMW_RET_ERROR; + } + + expression_parameters.push_back(ref); + } + + if (RMW_RET_OK != rmw_subscription_content_filter_options_init( + filter_expression, + expression_parameters.size(), + expression_parameters.data(), + allocator, + options)) + { + RMW_CONNEXT_LOG_ERROR_SET("failed to rmw_subscription_content_filter_options_init") + return RMW_RET_ERROR; + } + + return RMW_RET_OK; +} diff --git a/rmw_connextdds_common/src/ndds/rmw_type_support_ndds.cpp b/rmw_connextdds_common/src/ndds/rmw_type_support_ndds.cpp index dc4e33f6..ffbb6862 100644 --- a/rmw_connextdds_common/src/ndds/rmw_type_support_ndds.cpp +++ b/rmw_connextdds_common/src/ndds/rmw_type_support_ndds.cpp @@ -88,15 +88,15 @@ struct RMW_Connext_NddsTypePluginI REDAFastBufferPool_delete(this->pool_samples); } - rcutils_uint8_array_t * + RMW_Connext_Message * allocate_sample() { - return reinterpret_cast( + return reinterpret_cast( REDAFastBufferPool_getBuffer(this->pool_samples)); } void - return_sample(rcutils_uint8_array_t * const sample) + return_sample(RMW_Connext_Message * const sample) { REDAFastBufferPool_returnBuffer(this->pool_samples, sample); } @@ -126,8 +126,8 @@ struct RMW_Connext_NddsParticipantData static RTIBool -RMW_Connext_Uint8ArrayPtr_initialize_w_params( - rcutils_uint8_array_t ** self, +RMW_Connext_MessagePtr_initialize_w_params( + RMW_Connext_Message ** self, const struct DDS_TypeAllocationParams_t * allocParams) { UNUSED_ARG(allocParams); @@ -136,8 +136,8 @@ RMW_Connext_Uint8ArrayPtr_initialize_w_params( } static RTIBool -RMW_Connext_Uint8ArrayPtr_finalize_w_params( - rcutils_uint8_array_t ** self, +RMW_Connext_MessagePtr_finalize_w_params( + RMW_Connext_Message ** self, const struct DDS_TypeDeallocationParams_t * deallocParams) { UNUSED_ARG(deallocParams); @@ -146,19 +146,19 @@ RMW_Connext_Uint8ArrayPtr_finalize_w_params( } static RTIBool -RMW_Connext_Uint8ArrayPtr_copy( - rcutils_uint8_array_t ** dst, - const rcutils_uint8_array_t ** src) +RMW_Connext_MessagePtr_copy( + RMW_Connext_Message ** dst, + const RMW_Connext_Message ** src) { - *dst = const_cast(*src); + *dst = const_cast(*src); return RTI_TRUE; } -#define T rcutils_uint8_array_t * -#define TSeq RMW_Connext_Uint8ArrayPtrSeq -#define T_initialize_w_params RMW_Connext_Uint8ArrayPtr_initialize_w_params -#define T_finalize_w_params RMW_Connext_Uint8ArrayPtr_finalize_w_params -#define T_copy RMW_Connext_Uint8ArrayPtr_copy +#define T RMW_Connext_Message * +#define TSeq RMW_Connext_MessagePtrSeq +#define T_initialize_w_params RMW_Connext_MessagePtr_initialize_w_params +#define T_finalize_w_params RMW_Connext_MessagePtr_finalize_w_params +#define T_copy RMW_Connext_MessagePtr_copy #include "dds_c/generic/dds_c_sequence_TSeq.gen" static @@ -232,29 +232,22 @@ RMW_Connext_TypePlugin_create_data(void ** sample, void * user_data) RMW_Connext_MessageTypeSupport * const type_support = reinterpret_cast(user_data); - rcutils_uint8_array_t * data_buffer = - new (std::nothrow) rcutils_uint8_array_t(); - if (nullptr == data_buffer) { - return RTI_FALSE; - } - - const rcutils_allocator_t allocator = rcutils_get_default_allocator(); size_t buffer_size = 0; - if (type_support->unbounded()) { buffer_size = 0; } else { buffer_size = type_support->type_serialized_size_max(); } - if (RCUTILS_RET_OK != - rcutils_uint8_array_init(data_buffer, buffer_size, &allocator)) - { - delete data_buffer; + RMW_Connext_Message * msg = new (std::nothrow) RMW_Connext_Message(); + if (nullptr == msg) { return RTI_FALSE; } - - *sample = data_buffer; + if (RMW_RET_OK != RMW_Connext_Message_initialize(msg, type_support, buffer_size)) { + delete msg; + return RTI_FALSE; + } + *sample = msg; return RTI_TRUE; } @@ -268,14 +261,9 @@ RMW_Connext_TypePlugin_destroy_data(void ** sample, void * user_data) UNUSED_ARG(type_support); - rcutils_uint8_array_t * data_buffer = - reinterpret_cast(*sample); - - if (RCUTILS_RET_OK != rcutils_uint8_array_fini(data_buffer)) { - RMW_CONNEXT_LOG_ERROR_SET("failed to finalize array") - } - - delete data_buffer; + RMW_Connext_Message * msg = reinterpret_cast(*sample); + RMW_Connext_Message_finalize(msg); + delete msg; } /* ---------------------------------------------------------------------------- @@ -418,11 +406,14 @@ RMW_Connext_TypePlugin_copy_sample( { UNUSED_ARG(endpoint_data); - const rcutils_uint8_array_t * src_buffer = - reinterpret_cast(src); - rcutils_uint8_array_t * dst_buffer = reinterpret_cast(dst); + const RMW_Connext_Message * src_msg = + reinterpret_cast(src); + RMW_Connext_Message * dst_msg = + reinterpret_cast(dst); - if (RCUTILS_RET_OK != rcutils_uint8_array_copy(dst_buffer, src_buffer)) { + if (RCUTILS_RET_OK != + rcutils_uint8_array_copy(&dst_msg->data_buffer, &src_msg->data_buffer)) + { return RTI_FALSE; } @@ -461,6 +452,14 @@ RMW_Connext_TypePlugin_serialize( const RMW_Connext_Message * const msg = reinterpret_cast(sample); + if (nullptr == msg->user_data) { + // Samples written by the application layer should always have a non-null + // user_data pointer. The only samples which do not use that pointers are + // the ones created internally in the DataReader queue, and it would be + // unexpected for them to be passed to this function. + return RTI_FALSE; + } + rcutils_uint8_array_t data_buffer; data_buffer.allocator = rcutils_get_default_allocator(); data_buffer.buffer = @@ -517,22 +516,22 @@ RMW_Connext_TypePlugin_deserialize( return RTI_FALSE; } - rcutils_uint8_array_t * const data_buffer = - reinterpret_cast(*sample); + RMW_Connext_Message * const msg = + reinterpret_cast(*sample); + const size_t deserialize_size = RTICdrStream_getRemainder(stream); void * const src_ptr = RTICdrStream_getCurrentPosition(stream); - if (data_buffer->buffer_capacity < deserialize_size) { + if (msg->data_buffer.buffer_capacity < deserialize_size) { if (RCUTILS_RET_OK != - rcutils_uint8_array_resize(data_buffer, deserialize_size)) + rcutils_uint8_array_resize(&msg->data_buffer, deserialize_size)) { return RTI_FALSE; } } - memcpy(data_buffer->buffer, src_ptr, deserialize_size); - - data_buffer->buffer_length = deserialize_size; + memcpy(msg->data_buffer.buffer, src_ptr, deserialize_size); + msg->data_buffer.buffer_length = deserialize_size; RTICdrStream_setCurrentPosition(stream, reinterpret_cast(src_ptr) + deserialize_size); @@ -606,6 +605,12 @@ RMW_Connext_TypePlugin_get_serialized_sample_size( const RMW_Connext_Message * const msg = reinterpret_cast(sample); + // Samples written by the application layer should always have a non-null + // user_data pointer. The only samples which do not use that pointers are + // the ones created internally in the DataReader queue, and it would be + // unexpected for them to be passed to this function. + RMW_CONNEXT_ASSERT(nullptr != msg->user_data) + if (msg->serialized) { const rcutils_uint8_array_t * const serialized_msg = reinterpret_cast(msg->user_data); @@ -796,7 +801,7 @@ rmw_connextdds_register_type_support( struct REDAFastBufferPool * const pool_samples = REDAFastBufferPool_new( - sizeof(rcutils_uint8_array_t), + sizeof(RMW_Connext_Message), RTIOsapiAlignment_getDefaultAlignment(), &pool_prop); diff --git a/rmw_connextdds_common/src/rtime/dds_api_rtime.cpp b/rmw_connextdds_common/src/rtime/dds_api_rtime.cpp index 729acd42..bdc6d4ac 100644 --- a/rmw_connextdds_common/src/rtime/dds_api_rtime.cpp +++ b/rmw_connextdds_common/src/rtime/dds_api_rtime.cpp @@ -649,6 +649,8 @@ rmw_connextdds_finalize_participant_factory_context( RMW_CONNEXT_ASSERT(nullptr != RMW_Connext_fv_FactoryContext) rmw_connextdds_api_micro * const ctx_api = RMW_Connext_fv_FactoryContext; + RMW_Connext_fv_FactoryContext = nullptr; + RT_Registry_T * registry = DDS_DomainParticipantFactory_get_registry( RMW_Connext_gv_DomainParticipantFactory); @@ -860,6 +862,17 @@ rmw_connextdds_initialize_participant_qos_impl( return RMW_RET_OK; } +rmw_ret_t +rmw_connextdds_configure_participant( + rmw_context_impl_t * const ctx, + DDS_DomainParticipant * const participant) +{ + UNUSED_ARG(participant); + UNUSED_ARG(ctx); + + return RMW_RET_OK; +} + rmw_ret_t rmw_connextdds_create_contentfilteredtopic( rmw_context_impl_t * const ctx, @@ -867,6 +880,7 @@ rmw_connextdds_create_contentfilteredtopic( DDS_Topic * const base_topic, const char * const cft_name, const char * const cft_filter, + const rcutils_string_array_t * const cft_expression_parameters, DDS_TopicDescription ** const cft_out) { UNUSED_ARG(ctx); @@ -874,6 +888,7 @@ rmw_connextdds_create_contentfilteredtopic( UNUSED_ARG(base_topic); UNUSED_ARG(cft_name); UNUSED_ARG(cft_filter); + UNUSED_ARG(cft_expression_parameters); UNUSED_ARG(cft_out); return RMW_RET_UNSUPPORTED; } @@ -1952,3 +1967,27 @@ rmw_connextdds_enable_security( return RMW_RET_ERROR; #endif /* RMW_CONNEXT_ENABLE_SECURITY */ } + +rmw_ret_t +rmw_connextdds_set_cft_filter_expression( + DDS_TopicDescription * const topic_desc, + const char * const cft_expression, + const rcutils_string_array_t * const cft_expression_parameters) +{ + UNUSED_ARG(topic_desc); + UNUSED_ARG(cft_expression); + UNUSED_ARG(cft_expression_parameters); + return RMW_RET_UNSUPPORTED; +} + +rmw_ret_t +rmw_connextdds_get_cft_filter_expression( + DDS_TopicDescription * const topic_desc, + rcutils_allocator_t * const allocator, + rmw_subscription_content_filter_options_t * const options) +{ + UNUSED_ARG(topic_desc); + UNUSED_ARG(allocator); + UNUSED_ARG(options); + return RMW_RET_UNSUPPORTED; +} diff --git a/rmw_connextdds_common/src/rtime/rmw_type_support_rtime.cpp b/rmw_connextdds_common/src/rtime/rmw_type_support_rtime.cpp index a0af9461..57086bb6 100644 --- a/rmw_connextdds_common/src/rtime/rmw_type_support_rtime.cpp +++ b/rmw_connextdds_common/src/rtime/rmw_type_support_rtime.cpp @@ -62,29 +62,23 @@ RMW_Connext_MemoryPlugin_create_sample( { auto type_support = RMW_Connext_RtimeTypePluginI::type_support(tp); - rcutils_uint8_array_t * data_buffer = - new (std::nothrow) rcutils_uint8_array_t(); - if (nullptr == data_buffer) { - return RTI_FALSE; - } - - const rcutils_allocator_t allocator = rcutils_get_default_allocator(); size_t buffer_size = 0; - if (type_support->unbounded()) { buffer_size = 0; } else { buffer_size = type_support->type_serialized_size_max(); } - if (RCUTILS_RET_OK != - rcutils_uint8_array_init(data_buffer, buffer_size, &allocator)) - { - delete data_buffer; + RMW_Connext_Message * const msg = new (std::nothrow) RMW_Connext_Message(); + if (nullptr == msg) { + return RTI_FALSE; + } + if (RMW_RET_OK != RMW_Connext_Message_initialize(msg, type_support, buffer_size)) { + delete msg; return RTI_FALSE; } - *sample = data_buffer; + *sample = msg; return RTI_TRUE; } @@ -97,15 +91,10 @@ RMW_Connext_MemoryPlugin_delete_sample( { UNUSED_ARG(plugin); - rcutils_uint8_array_t * data_buffer = - reinterpret_cast(sample); - - if (RCUTILS_RET_OK != rcutils_uint8_array_fini(data_buffer)) { - delete data_buffer; - return RTI_FALSE; - } - - delete data_buffer; + RMW_Connext_Message * const msg = + reinterpret_cast(sample); + RMW_Connext_Message_finalize(msg); + delete msg; return RTI_TRUE; } @@ -117,14 +106,16 @@ RMW_Connext_MemoryPlugin_copy_sample( void * dst, const void * src) { - const rcutils_uint8_array_t * src_buffer = - reinterpret_cast(src); - rcutils_uint8_array_t * dst_buffer = - reinterpret_cast(dst); + const RMW_Connext_Message * src_msg = + reinterpret_cast(src); + RMW_Connext_Message * dst_msg = + reinterpret_cast(dst); UNUSED_ARG(plugin); - if (RCUTILS_RET_OK != rcutils_uint8_array_copy(dst_buffer, src_buffer)) { + if (RCUTILS_RET_OK != + rcutils_uint8_array_copy(&dst_msg->data_buffer, &src_msg->data_buffer)) + { return RTI_FALSE; } @@ -145,6 +136,14 @@ RMW_Connext_EncapsulationPlugin_serialize( UNUSED_ARG(plugin); UNUSED_ARG(destination); + if (nullptr == msg->user_data) { + // Samples written by the application layer should always have a non-null + // user_data pointer. The only samples which do not use that pointers are + // the ones created internally in the DataReader queue, and it would be + // unexpected for them to be passed to this function. + return RTI_FALSE; + } + DDS_TypePluginBuffer * const tbuf = reinterpret_cast(stream->real_buff); @@ -182,8 +181,7 @@ RMW_Connext_EncapsulationPlugin_serialize( if (type_support->unbounded()) { if (msg_buffer_unbound->buffer_capacity < serialized_size) { if (RCUTILS_RET_OK != - rcutils_uint8_array_resize( - msg_buffer_unbound, serialized_size)) + rcutils_uint8_array_resize(msg_buffer_unbound, serialized_size)) { return RTI_FALSE; } @@ -243,15 +241,15 @@ RMW_Connext_EncapsulationPlugin_deserialize( UNUSED_ARG(source); UNUSED_ARG(plugin); - rcutils_uint8_array_t * const data_buffer = - reinterpret_cast(void_sample); + RMW_Connext_Message * const msg = + reinterpret_cast(void_sample); const size_t deserialize_size = stream->length - CDR_Stream_get_current_position_offset(stream) + RMW_Connext_MessageTypeSupport::ENCAPSULATION_HEADER_SIZE; - if (data_buffer->buffer_capacity < deserialize_size) { + if (msg->data_buffer.buffer_capacity < deserialize_size) { if (RCUTILS_RET_OK != - rcutils_uint8_array_resize(data_buffer, deserialize_size)) + rcutils_uint8_array_resize(&msg->data_buffer, deserialize_size)) { return RTI_FALSE; } @@ -262,11 +260,11 @@ RMW_Connext_EncapsulationPlugin_deserialize( RMW_Connext_MessageTypeSupport::ENCAPSULATION_HEADER_SIZE; memcpy( - data_buffer->buffer, + msg->data_buffer.buffer, src_ptr, deserialize_size); - data_buffer->buffer_length = deserialize_size; + msg->data_buffer.buffer_length = deserialize_size; return RTI_TRUE; } diff --git a/rmw_connextddsmicro/src/rmw_api_impl_rtime.cpp b/rmw_connextddsmicro/src/rmw_api_impl_rtime.cpp index 88b60473..7d1f6cc8 100644 --- a/rmw_connextddsmicro/src/rmw_api_impl_rtime.cpp +++ b/rmw_connextddsmicro/src/rmw_api_impl_rtime.cpp @@ -716,16 +716,15 @@ rmw_subscription_get_actual_qos( return rmw_api_connextdds_subscription_get_actual_qos(subscription, qos); } - rmw_ret_t rmw_subscription_set_content_filter( rmw_subscription_t * subscription, const rmw_subscription_content_filter_options_t * options) { - UNUSED_ARG(subscription); - UNUSED_ARG(options); - RMW_CONNEXT_LOG_NOT_IMPLEMENTED - return RMW_RET_UNSUPPORTED; + // Content filters are not supported by Micro and this call will + // eventually fail with RMW_RET_UNSUPPORTED. + return rmw_api_connextdds_subscription_set_content_filter( + subscription, options); } @@ -735,11 +734,10 @@ rmw_subscription_get_content_filter( rcutils_allocator_t * allocator, rmw_subscription_content_filter_options_t * options) { - UNUSED_ARG(subscription); - UNUSED_ARG(allocator); - UNUSED_ARG(options); - RMW_CONNEXT_LOG_NOT_IMPLEMENTED - return RMW_RET_UNSUPPORTED; + // Content filters are not supported by Micro and this call will + // eventually fail with RMW_RET_UNSUPPORTED. + return rmw_api_connextdds_subscription_get_content_filter( + subscription, allocator, options); }