diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index fb5d89fb..04e49dc2 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -20,4 +20,14 @@ install( DESTINATION "${CMAKE_INSTALL_EXAMPLESDIR}/helloworld" COMPONENT dev) +install( + FILES helloworld_shm/publisher_shm.cpp + helloworld_shm/subscriber_shm.cpp + helloworld_shm/HelloWorldDataShm.idl + helloworld_shm/CMakeLists.txt + helloworld_shm/readme.rst + DESTINATION "${CMAKE_INSTALL_EXAMPLESDIR}/helloworld_shm" + COMPONENT dev) + add_subdirectory(helloworld) +add_subdirectory(helloworld_shm) diff --git a/examples/helloworld_shm/CMakeLists.txt b/examples/helloworld_shm/CMakeLists.txt new file mode 100644 index 00000000..26a96076 --- /dev/null +++ b/examples/helloworld_shm/CMakeLists.txt @@ -0,0 +1,38 @@ +# +# Copyright(c) 2020 ADLINK Technology Limited and others +# +# This program and the accompanying materials are made available under the +# terms of the Eclipse Public License v. 2.0 which is available at +# http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License +# v. 1.0 which is available at +# http://www.eclipse.org/org/documents/edl-v10.php. +# +# SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause +# +project(helloworld_shm LANGUAGES C CXX) +cmake_minimum_required(VERSION 3.5) + +if(NOT TARGET CycloneDDS-CXX::ddscxx) + find_package(CycloneDDS-CXX REQUIRED) +endif() + +idlcxx_generate(TARGET helloworlddatashm FILES HelloWorldDataShm.idl) + +add_executable(ddscxxHelloworldPublisherShm publisher_shm.cpp) +add_executable(ddscxxHelloworldSubscriberShm subscriber_shm.cpp) + +# Link both executables to idl data type library and ddscxx. +target_link_libraries(ddscxxHelloworldPublisherShm CycloneDDS-CXX::ddscxx helloworlddatashm) +target_link_libraries(ddscxxHelloworldSubscriberShm CycloneDDS-CXX::ddscxx helloworlddatashm) + +# Disable the static analyzer in GCC to avoid crashing the GNU C++ compiler +# on Azure Pipelines +if(DEFINED ENV{SYSTEM_TEAMFOUNDATIONSERVERURI}) + if(CMAKE_C_COMPILER_ID STREQUAL "GNU" AND ANALYZER STREQUAL "on") + target_compile_options(ddscxxHelloworldPublisherShm PRIVATE -fno-analyzer) + target_compile_options(ddscxxHelloworldSubscriberShm PRIVATE -fno-analyzer) + endif() +endif() + +set_property(TARGET ddscxxHelloworldPublisherShm PROPERTY CXX_STANDARD 17) +set_property(TARGET ddscxxHelloworldSubscriberShm PROPERTY CXX_STANDARD 17) diff --git a/examples/helloworld_shm/HelloWorldDataShm.idl b/examples/helloworld_shm/HelloWorldDataShm.idl new file mode 100644 index 00000000..2681d397 --- /dev/null +++ b/examples/helloworld_shm/HelloWorldDataShm.idl @@ -0,0 +1,19 @@ +/* + * Copyright(c) 2006 to 2020 ADLINK Technology Limited and others + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License + * v. 1.0 which is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause + */ +module HelloWorldDataShm +{ + struct Msg + { + long data; + }; + #pragma keylist Msg data +}; diff --git a/examples/helloworld_shm/publisher_shm.cpp b/examples/helloworld_shm/publisher_shm.cpp new file mode 100644 index 00000000..7844ca15 --- /dev/null +++ b/examples/helloworld_shm/publisher_shm.cpp @@ -0,0 +1,99 @@ +/* + * Copyright(c) 2006 to 2020 ADLINK Technology Limited and others + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License + * v. 1.0 which is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause + */ +#include +#include +#include +#include + +/* Include the C++ DDS API. */ +#include "dds/dds.hpp" + +/* Include data type and specific traits to be used with the C++ DDS API. */ +#include "HelloWorldDataShm.hpp" + +using namespace org::eclipse::cyclonedds; + +int main() { + try { + std::cout << "=== [Publisher] Create writer." << std::endl; + + /* First, a domain participant is needed. + * Create one on the default domain. */ + dds::domain::DomainParticipant participant(domain::default_id()); + + /* To publish something, a topic is needed. */ + dds::topic::Topic topic(participant, "HelloWorldData_Shm"); + + /* A writer also needs a publisher. */ + dds::pub::Publisher publisher(participant); + + /* Set DataWriter QoS + * SHM currently only supports reliable, volatile and keep last QoS on the writer side + */ + dds::pub::qos::DataWriterQos qos; + qos << dds::core::policy::Reliability::Reliable(); + qos << dds::core::policy::Durability::Volatile(); + qos << dds::core::policy::History::KeepLast(10U); + + /* Now, the writer can be created to publish a HelloWorld message. */ + dds::pub::DataWriter writer(publisher, topic, qos); + + /* For this example, we'd like to have a subscriber to actually read + * our message. This is not always necessary. Also, the way it is + * done here is just to illustrate the easiest way to do so. It isn't + * really recommended to do a wait in a polling loop, however. + * Please take a look at Listeners and WaitSets for much better + * solutions, albeit somewhat more elaborate ones. */ + std::cout << "=== [Publisher] Waiting for subscriber." << std::endl; + while (writer.publication_matched_status().current_count() == 0) { + std::this_thread::sleep_for(std::chrono::milliseconds(20)); + } + + /* Loan the memory for the message */ + HelloWorldDataShm::Msg &loaned_msg = writer->loan_sample(); + + /* Fill the message */ + loaned_msg.data(4321); + + /* Realize you do not intend to write this message and cancel the loan. */ + writer->return_loan(loaned_msg); + + /* Loan memory for a new message */ + loaned_msg = writer->loan_sample(); + + /* Fill the message */ + loaned_msg.data(1234); + + /* Write the message. */ + std::cout << "=== [Publisher] Write sample." << std::endl; + writer.write(loaned_msg); + + /* With a normal configuration (see dds::pub::qos::DataWriterQos + * for various different writer configurations), deleting a writer will + * dispose all its related message. + * Wait for the subscriber to have stopped to be sure it received the + * message. Again, not normally necessary and not recommended to do + * this in a polling loop. */ + std::cout << "=== [Publisher] Waiting for sample to be accepted." << std::endl; + while (writer.publication_matched_status().current_count() > 0) { + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + } + } + catch (const dds::core::Exception& e) { + std::cerr << "=== [Publisher] Exception: " << e.what() << std::endl; + return EXIT_FAILURE; + } + + std::cout << "=== [Publisher] Done." << std::endl; + + return EXIT_SUCCESS; +} diff --git a/examples/helloworld_shm/readme.rst b/examples/helloworld_shm/readme.rst new file mode 100644 index 00000000..36eafdc5 --- /dev/null +++ b/examples/helloworld_shm/readme.rst @@ -0,0 +1,105 @@ +.. + Copyright(c) 2006 to 2018 ADLINK Technology Limited and others + + This program and the accompanying materials are made available under the + terms of the Eclipse Public License v. 2.0 which is available at + http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License + v. 1.0 which is available at + http://www.eclipse.org/org/documents/edl-v10.php. + + SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause + +HelloWorld SHM +========== + +Description +*********** + +The basic ddscxx HelloWorldShm example is used to illustrate the necessary steps to setup DCPS entities for using shared memory + +Design +****** + +It consists of 2 units: + +- ddscxxHelloworldPublisherShm: implements the publisher's main +- ddscxxHelloworldSubscriberShm: implements the subscriber's main + +Scenario +******** + +The publisher sends a single HelloWorld sample. The sample contains the following field: + +- a userID field (long type) + +When it receives the sample, the subscriber displays the userID field. + +Running the example +******************* + +Configuration +-------------- + +Cyclone DDS needs to be configured to use the shared memory exchange + +Below is an example of Cyclone DDS configuration file to enable shared memory exchange: + +.. code-block:: xml + + + + + + true + 256 + 16 + 16 + info + + + + +The above example configuration can be saved as *cyclonedds.xml* and can be passed to Cyclone DDS through the environment variable CYCLONEDDS_URI as below + +.. code-block:: bash + + export CYCLONEDDS_URI=file://cyclonedds.xml + +Run +-------------- + +It is recommended that you run the executables in separate terminals to avoid mixing the output. + +- In the first terminal start the RouDi by running iox-roudi + +.. code-block:: bash + + ~/iceoryx/build/iox-roudi + +- In the second terminal start the subscriber by running ddscxxHelloWorldSubscriberShm + +.. code-block:: bash + + export CYCLONEDDS_URI=file://cyclonedds.xml + ~/cyclonedds-cxx/build/bin/ddscxxHelloWorldSubscriberShm + +- In the third terminal start the publisher by running ddscxxHelloWorldPublisherShm + +.. code-block:: bash + + export CYCLONEDDS_URI=file://cyclonedds.xml + ~/cyclonedds-cxx/build/bin/ddscxxHelloWorldPublisherShm + +After establishing a successful communication, the output looks something like below: + +.. code-block:: bash + + === [Subscriber] Wait for message. + === [Subscriber] Message received: + data : 1234 + === [Subscriber] Done. + + + diff --git a/examples/helloworld_shm/subscriber_shm.cpp b/examples/helloworld_shm/subscriber_shm.cpp new file mode 100644 index 00000000..ef23a1f5 --- /dev/null +++ b/examples/helloworld_shm/subscriber_shm.cpp @@ -0,0 +1,103 @@ +/* + * Copyright(c) 2006 to 2020 ADLINK Technology Limited and others + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License + * v. 1.0 which is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause + */ +#include +#include +#include +#include + +/* Include the C++ DDS API. */ +#include "dds/dds.hpp" + +/* Include data type and specific traits to be used with the C++ DDS API. */ +#include "HelloWorldDataShm.hpp" + +using namespace org::eclipse::cyclonedds; + +int main() { + try { + std::cout << "=== [Subscriber] Create reader." << std::endl; + + /* First, a domain participant is needed. + * Create one on the default domain. */ + dds::domain::DomainParticipant participant(domain::default_id()); + + /* To subscribe to something, a topic is needed. */ + dds::topic::Topic topic(participant, "HelloWorldData_Shm"); + + /* A reader also needs a subscriber. */ + dds::sub::Subscriber subscriber(participant); + + /* Set DataReader QoS + * SHM currently only supports reliable/best-effort, volatile and keep last QoS on the reader side + */ + dds::sub::qos::DataReaderQos qos; + qos << dds::core::policy::Reliability::BestEffort(); + qos << dds::core::policy::Durability::Volatile(); + qos << dds::core::policy::History::KeepLast(10U); + + /* Now, the reader can be created to subscribe to a HelloWorld message. */ + dds::sub::DataReader reader(subscriber, topic, qos); + + /* Poll until a message has been read. + * It isn't really recommended to do this kind wait in a polling loop. + * It's done here just to illustrate the easiest way to get data. + * Please take a look at Listeners and WaitSets for much better + * solutions, albeit somewhat more elaborate ones. */ + std::cout << "=== [Subscriber] Wait for message." << std::endl; + bool poll = true; + while (poll) { + /* For this example, the reader will return a set of messages (aka + * Samples). There are other ways of getting samples from reader. + * See the various read() and take() functions that are present. */ + dds::sub::LoanedSamples samples; + + /* Try taking samples from the reader. */ + samples = reader.take(); + + /* Are samples read? */ + if (samples.length() > 0) { + /* Use an iterator to run over the set of samples. */ + dds::sub::LoanedSamples::const_iterator sample_iter; + for (sample_iter = samples.begin(); + sample_iter < samples.end(); + ++sample_iter) { + /* Get the message and sample information. */ + const HelloWorldDataShm::Msg& msg = sample_iter->data(); + const dds::sub::SampleInfo& info = sample_iter->info(); + + /* Sometimes a sample is read, only to indicate a data + * state change (which can be found in the info). If + * that's the case, only the key value of the sample + * is set. The other data parts are not. + * Check if this sample has valid data. */ + if (info.valid()) { + std::cout << "=== [Subscriber] Message received:" << std::endl; + std::cout << " data : " << msg.data() << std::endl; + + /* Only 1 message is expected in this example. */ + poll = false; + } + } + } else { + std::this_thread::sleep_for(std::chrono::milliseconds(20)); + } + } + } + catch (const dds::core::Exception& e) { + std::cerr << "=== [Subscriber] Exception: " << e.what() << std::endl; + return EXIT_FAILURE; + } + + std::cout << "=== [Subscriber] Done." << std::endl; + + return EXIT_SUCCESS; +} diff --git a/src/ddscxx/include/dds/pub/detail/DataWriter.hpp b/src/ddscxx/include/dds/pub/detail/DataWriter.hpp index 2c2f1ed3..35d9a58e 100644 --- a/src/ddscxx/include/dds/pub/detail/DataWriter.hpp +++ b/src/ddscxx/include/dds/pub/detail/DataWriter.hpp @@ -69,6 +69,10 @@ class dds::pub::detail::DataWriter : public ::org::eclipse::cyclonedds::pub::Any void init(ObjectDelegate::weak_ref_type weak_ref); + T& loan_sample(); + + void return_loan(T& sample); + void write_cdr(const org::eclipse::cyclonedds::topic::CDRBlob& sample); void write_cdr(const org::eclipse::cyclonedds::topic::CDRBlob& sample, const dds::core::Time& timestamp); diff --git a/src/ddscxx/include/dds/pub/detail/DataWriterImpl.hpp b/src/ddscxx/include/dds/pub/detail/DataWriterImpl.hpp index 6f1177fa..0f72f3e2 100644 --- a/src/ddscxx/include/dds/pub/detail/DataWriterImpl.hpp +++ b/src/ddscxx/include/dds/pub/detail/DataWriterImpl.hpp @@ -460,6 +460,24 @@ dds::pub::detail::DataWriter::init(ObjectDelegate::weak_ref_type weak_ref) } } +template +T& +dds::pub::detail::DataWriter::loan_sample() +{ + T *sample; + this->check(); + AnyDataWriterDelegate::loan_sample(static_cast(this->ddsc_entity), reinterpret_cast(&sample)); + return *sample; +} + +template +void +dds::pub::detail::DataWriter::return_loan(T& sample) +{ + this->check(); + AnyDataWriterDelegate::return_loan(static_cast(this->ddsc_entity), &sample); +} + template void dds::pub::detail::DataWriter::write_cdr(const org::eclipse::cyclonedds::topic::CDRBlob& sample) diff --git a/src/ddscxx/include/org/eclipse/cyclonedds/pub/AnyDataWriterDelegate.hpp b/src/ddscxx/include/org/eclipse/cyclonedds/pub/AnyDataWriterDelegate.hpp index fad339b4..ec3d3476 100644 --- a/src/ddscxx/include/org/eclipse/cyclonedds/pub/AnyDataWriterDelegate.hpp +++ b/src/ddscxx/include/org/eclipse/cyclonedds/pub/AnyDataWriterDelegate.hpp @@ -131,6 +131,14 @@ class OMG_DDS_API AnyDataWriterDelegate : public org::eclipse::cyclonedds::core: const dds::core::InstanceHandle& handle, const dds::core::Time& timestamp); + void + loan_sample(dds_entity_t writer, + void **sample); + + void + return_loan(dds_entity_t writer, + void *sample); + void write(dds_entity_t writer, const void *data, diff --git a/src/ddscxx/include/org/eclipse/cyclonedds/topic/datatopic.hpp b/src/ddscxx/include/org/eclipse/cyclonedds/topic/datatopic.hpp index 3b5d1877..6c02bb7d 100644 --- a/src/ddscxx/include/org/eclipse/cyclonedds/topic/datatopic.hpp +++ b/src/ddscxx/include/org/eclipse/cyclonedds/topic/datatopic.hpp @@ -21,6 +21,7 @@ #include "dds/ddsrt/endian.h" #include "dds/ddsrt/md5.h" #include "dds/ddsi/q_radmin.h" +#include "dds/ddsi/q_xmsg.h" #include "dds/ddsi/ddsi_serdata.h" #include "org/eclipse/cyclonedds/core/cdr/basic_cdr_ser.hpp" #include "dds/ddsi/ddsi_keyhash.h" @@ -116,35 +117,39 @@ class ddscxx_serdata : public ddsi_serdata { T* getT() { - T *t = m_t.load(std::memory_order_acquire); - if (t == nullptr) { - t = new T(); - org::eclipse::cyclonedds::core::cdr::basic_cdr_stream str; - str.set_buffer(calc_offset(data(),4)); - switch (kind) - { - case SDK_KEY: - key_read(str,*t); - break; - case SDK_DATA: - read(str,*t); - break; - case SDK_EMPTY: - assert(0); - } - - if (str.abort_status()) { - delete t; - t = nullptr; - } - - T* exp = nullptr; - if (!m_t.compare_exchange_strong(exp, t, std::memory_order_seq_cst)) { - delete t; - t = exp; + // if iox chunk is available, dont deserialize the sample, return the chunk directly + if (iox_chunk != nullptr && data() == nullptr) { + return static_cast(SHIFT_PAST_ICEORYX_HEADER(this->iox_chunk)); + } else { + T *t = m_t.load(std::memory_order_acquire); + if (t == nullptr) { + t = new T(); + org::eclipse::cyclonedds::core::cdr::basic_cdr_stream str; + str.set_buffer(calc_offset(data(), 4)); + switch (kind) { + case SDK_KEY: + key_read(str, *t); + break; + case SDK_DATA: + read(str, *t); + break; + case SDK_EMPTY: + assert(0); + } + + if (str.abort_status()) { + delete t; + t = nullptr; + } + + T *exp = nullptr; + if (!m_t.compare_exchange_strong(exp, t, std::memory_order_seq_cst)) { + delete t; + t = exp; + } } + return t; } - return t; } }; @@ -500,6 +505,33 @@ void serdata_get_keyhash( } } +#ifdef DDS_HAS_SHM +template +uint32_t serdata_from_iox_size(const struct ddsi_serdata* d) +{ + assert(sizeof(T) == d->type->iox_size); + return d->type->iox_size; +} + +template +ddsi_serdata * serdata_from_iox( + const struct ddsi_sertype * typecmn, enum ddsi_serdata_kind kind, + void * sub, void * iox_buffer) +{ + try { + auto d = new ddscxx_serdata(typecmn, kind); + d->iox_chunk = iox_buffer; + d->iox_subscriber = sub; + // TODO(Sumanth), how to handle key hash? + + return d; + } + catch (std::exception&) { + return nullptr; + } +} +#endif + template const ddsi_serdata_ops ddscxx_serdata::ddscxx_serdata_ops = { &serdata_eqkey, @@ -517,6 +549,10 @@ const ddsi_serdata_ops ddscxx_serdata::ddscxx_serdata_ops = { &serdata_free, &serdata_print, &serdata_get_keyhash, +#ifdef DDS_HAS_SHM + &serdata_from_iox_size, + &serdata_from_iox, +#endif }; template @@ -531,12 +567,27 @@ template ddscxx_sertype::ddscxx_sertype() : ddsi_sertype{} { +#ifdef DDS_HAS_SHM + uint32_t flags = (org::eclipse::cyclonedds::topic::TopicTraits::isKeyless() ? + DDSI_SERTYPE_FLAG_TOPICKIND_NO_KEY : 0); + // TODO(Sumanth), fix this. Currently assuming the type is fixed always + flags |= DDSI_SERTYPE_FLAG_FIXED_SIZE; + ddsi_sertype_init_flags( + static_cast(this), + org::eclipse::cyclonedds::topic::TopicTraits::getTypeName(), + &ddscxx_sertype::ddscxx_sertype_ops, + &ddscxx_serdata::ddscxx_serdata_ops, + flags); + this->iox_size = + static_cast(org::eclipse::cyclonedds::topic::TopicTraits::getSampleSize()); +#else ddsi_sertype_init( static_cast(this), org::eclipse::cyclonedds::topic::TopicTraits::getTypeName(), &ddscxx_sertype::ddscxx_sertype_ops, &ddscxx_serdata::ddscxx_serdata_ops, org::eclipse::cyclonedds::topic::TopicTraits::isKeyless()); +#endif } template diff --git a/src/ddscxx/src/org/eclipse/cyclonedds/pub/AnyDataWriterDelegate.cpp b/src/ddscxx/src/org/eclipse/cyclonedds/pub/AnyDataWriterDelegate.cpp index 7dbf3563..a2e00808 100644 --- a/src/ddscxx/src/org/eclipse/cyclonedds/pub/AnyDataWriterDelegate.cpp +++ b/src/ddscxx/src/org/eclipse/cyclonedds/pub/AnyDataWriterDelegate.cpp @@ -156,6 +156,28 @@ AnyDataWriterDelegate::unregister_instance_cdr( this->write_cdr(writer, data, handle, timestamp, NN_STATUSINFO_UNREGISTER); } +void +AnyDataWriterDelegate::loan_sample( + dds_entity_t writer, + void **sample) +{ + dds_return_t ret; + + ret = dds_loan_sample(writer, sample); + ISOCPP_DDSC_RESULT_CHECK_AND_THROW(ret, "sample loan failed."); +} + +void +AnyDataWriterDelegate::return_loan( + dds_entity_t writer, + void *sample) +{ + dds_return_t ret; + + ret = dds_return_loan(writer, &sample, 1); + ISOCPP_DDSC_RESULT_CHECK_AND_THROW(ret, "return of sample loan failed."); +} + void AnyDataWriterDelegate::write( dds_entity_t writer, diff --git a/src/ddscxx/tests/CMakeLists.txt b/src/ddscxx/tests/CMakeLists.txt index a36d8ba9..800295ce 100644 --- a/src/ddscxx/tests/CMakeLists.txt +++ b/src/ddscxx/tests/CMakeLists.txt @@ -16,6 +16,14 @@ idlcxx_generate(TARGET ddscxx_test_types FILES data/Space.idl data/HelloWorldDat configure_file( config_simple.xml.in config_simple.xml @ONLY) +if (DDS_HAS_SHM AND NOT WIN32) + find_package(iceoryx_posh QUIET) + find_package(iceoryx_posh_roudi_environment QUIET) + if(iceoryx_posh_roudi_environment_FOUND) + set(SHM_TESTS ${iceoryx_posh_roudi_environment_FOUND}) + endif() +endif() + set(sources Bounded.cpp EntityStatus.cpp @@ -39,9 +47,14 @@ set(sources Query.cpp WaitSet.cpp Qos.cpp - Condition.cpp) + Condition.cpp + Util.cpp) -add_executable(ddscxx_tests ${sources} Util.cpp) +if (SHM_TESTS) + list(APPEND sources SharedMemory.cpp) +endif() + +add_executable(ddscxx_tests ${sources}) # Disable the static analyzer in GCC to avoid crashing the GNU C++ compiler # on Azure Pipelines @@ -53,6 +66,11 @@ endif() set_property(TARGET ddscxx_tests PROPERTY CXX_STANDARD 17) target_link_libraries(ddscxx_tests CycloneDDS-CXX::ddscxx ddscxx_test_types GTest::GTest GTest::Main) +if (SHM_TESTS) + target_link_libraries(ddscxx_tests + iceoryx_posh::iceoryx_posh_roudi + iceoryx_posh_testing::iceoryx_posh_testing) +endif() gtest_add_tests(TARGET ddscxx_tests SOURCES ${sources} TEST_LIST tests) # Ensure shared libraries are found diff --git a/src/ddscxx/tests/SharedMemory.cpp b/src/ddscxx/tests/SharedMemory.cpp new file mode 100644 index 00000000..6c538ee0 --- /dev/null +++ b/src/ddscxx/tests/SharedMemory.cpp @@ -0,0 +1,356 @@ +/* + * Copyright(c) 2006 to 2021 ADLINK Technology Limited and others + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License + * v. 1.0 which is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause + */ +#include + +#include "dds/dds.hpp" +#include "dds/ddsrt/environ.h" +#include "iceoryx_posh/testing/roudi_gtest.hpp" +#include "HelloWorldData.hpp" +#include "Space.hpp" + +namespace +{ +template +void make_sample_(T & sample, const int32_t cnt) +{ + throw std::runtime_error("make_sample_ called on unsupported type"); +} + +template<> +void make_sample_(Space::Type1 & sample, const int32_t cnt) +{ + sample.long_1(cnt); + sample.long_2(cnt + 1); + sample.long_3(cnt + 2); +} + +template<> +void make_sample_(HelloWorldData::Msg & sample, const int32_t cnt) +{ + sample.userID(cnt); + sample.message(std::to_string(cnt)); +} + +template +std::vector write_loaned_data_(dds::pub::DataWriter & writer, const int32_t cnt) +{ + std::vector samples; + for (int32_t i = 0; i < cnt; i++) { + // loan memory from the middleware + auto & loaned_sample = writer.delegate()->loan_sample(); + // make sample + make_sample_(loaned_sample, i); + // store sample for comparison later + samples.push_back(loaned_sample); + // write sample (which will also release the loan to the middleware) + writer.write(loaned_sample); + } + return samples; +} + +template<> +std::vector write_loaned_data_( + dds::pub::DataWriter & writer, const int32_t cnt) +{ + // loaning on a non-fixed type throws an error + try { + auto & loaned_sample = writer.delegate()->loan_sample(); + (void)loaned_sample; + EXPECT_FALSE(true); + } catch (const std::exception & e) { + EXPECT_STREQ(e.what(), "sample loan failed"); + } + + std::vector samples; + for (int32_t i = 0; i < cnt; i++) { + HelloWorldData::Msg sample; + // make sample + make_sample_(sample, i); + // store sample for comparison later + samples.push_back(sample); + // write sample (which will also release the loan to the middleware) + writer.write(sample); + } + return samples; +} +} +/** + * Fixture for the shared memory tests with RouDi + */ +template +class SharedMemoryTest : public RouDi_GTest +{ +public: + using TopicType = T; + + dds::domain::DomainParticipant participant; + dds::sub::Subscriber subscriber; + dds::pub::Publisher publisher; + dds::topic::Topic topic; + dds::sub::DataReader reader; + dds::pub::DataWriter writer; + dds::sub::cond::ReadCondition rc; + dds::core::cond::WaitSet waitset; + + SharedMemoryTest() + : participant(dds::core::null), + subscriber(dds::core::null), + publisher(dds::core::null), + topic(dds::core::null), + reader(dds::core::null), + writer(dds::core::null), + rc(dds::core::null) + { + } + + void SetUp() + { + } + + void CreateParticipant() + { + if (this->participant == dds::core::null) { + // configure cyclone to enable shared memory communication + ddsrt_setenv( + "CYCLONEDDS_URI", + "true2561616verbose"); + this->participant = dds::domain::DomainParticipant( + org::eclipse::cyclonedds::domain::default_id()); + ASSERT_NE(this->participant, dds::core::null); + } + } + + void CreateTopic() + { + if (this->topic == dds::core::null) { + this->CreateParticipant(); + this->topic = dds::topic::Topic(this->participant, "datareader_test_topic"); + ASSERT_NE(this->topic, dds::core::null); + } + } + + void CreateWriter(dds::pub::qos::DataWriterQos w_qos) + { + this->SetupWriter(); + if (this->writer == dds::core::null) { + this->writer = dds::pub::DataWriter(this->publisher, this->topic, w_qos); + ASSERT_NE(this->writer, dds::core::null); + } + } + + void CreateReader(dds::sub::qos::DataReaderQos r_qos) + { + this->SetupReader(); + if (this->reader == dds::core::null) { + this->reader = dds::sub::DataReader(this->subscriber, this->topic, r_qos); + ASSERT_NE(this->reader, dds::core::null); + } + } + + void SetupWriter() + { + this->CreateTopic(); + if (this->publisher == dds::core::null) { + this->publisher = dds::pub::Publisher(this->participant); + } + } + + void SetupReader() + { + this->CreateTopic(); + if (this->subscriber == dds::core::null) { + this->subscriber = dds::sub::Subscriber(this->participant); + } + } + + void SetupCommunication( + dds::sub::qos::DataReaderQos r_qos = dds::sub::qos::DataReaderQos{}, + dds::pub::qos::DataWriterQos w_qos = dds::pub::qos::DataWriterQos{}) + { + this->CreateWriter(w_qos); + this->CreateReader(r_qos); + rc = dds::sub::cond::ReadCondition(reader, dds::sub::status::DataState::any()); + waitset.attach_condition(rc); + } + + void WaitForData() + { + EXPECT_NO_THROW(waitset.wait(dds::core::Duration(10, 0))); + } + + std::vector WriteData(int32_t instances_cnt) + { + std::vector samples; + for (int32_t i = 0; i < instances_cnt; i++) { + // make sample + T sample; + make_sample_(sample, i); + // write sample + this->writer.write(sample); + // store sample + samples.push_back(sample); + } + return samples; + } + + std::vector WriteLoanedData(int32_t instances_cnt) + { + return write_loaned_data_(this->writer, instances_cnt); + } + + void + CheckData( + const dds::sub::LoanedSamples & samples, + const std::vector & test_data, + const dds::sub::status::DataState & test_state = + dds::sub::status::DataState(dds::sub::status::SampleState::not_read(), + dds::sub::status::ViewState::new_view(), + dds::sub::status::InstanceState::alive())) + { + unsigned long count = 0UL; + ASSERT_EQ(samples.length(), test_data.size()); + typename dds::sub::LoanedSamples::const_iterator it; + for (it = samples.begin(); it != samples.end(); ++it, ++count) { + const T & data = it->data(); + const dds::sub::SampleInfo & info = it->info(); + const dds::sub::status::DataState & state = info.state(); + ASSERT_EQ(data, test_data[count]); + ASSERT_EQ(state.view_state(), test_state.view_state()); + ASSERT_EQ(state.sample_state(), test_state.sample_state()); + ASSERT_EQ(state.instance_state(), test_state.instance_state()); + } + } + + void run_test( + const dds::sub::qos::DataReaderQos & r_qos, + const dds::pub::qos::DataWriterQos & w_qos, + const int32_t num_samples, + const bool use_loaning = false) + { + dds::sub::LoanedSamples samples; + std::vector test_samples; + + /* setup communication */ + this->SetupCommunication(r_qos, w_qos); + /* write data. */ + if (use_loaning) { + test_samples = this->WriteLoanedData(num_samples); + } else { + test_samples = this->WriteData(num_samples); + } + /* wait for data */ + this->WaitForData(); + /* Check result by taking. */ + samples = this->reader.take(); + this->CheckData(samples, test_samples); + } + + void TearDown() + { + this->writer = dds::core::null; + this->reader = dds::core::null; + this->topic = dds::core::null; + this->publisher = dds::core::null; + this->subscriber = dds::core::null; + this->participant = dds::core::null; + } +}; + +/** + * Tests + */ + +using TestTypes = testing::Types; +TYPED_TEST_CASE(SharedMemoryTest, TestTypes); + +TYPED_TEST(SharedMemoryTest, writer_reader_valid_shm_qos) +{ + dds::sub::qos::DataReaderQos r_qos; + r_qos << dds::core::policy::Reliability::BestEffort(); + r_qos << dds::core::policy::Durability::Volatile(); + r_qos << dds::core::policy::History::KeepLast(10U); + + dds::pub::qos::DataWriterQos w_qos; + w_qos << dds::core::policy::Reliability::Reliable(); + w_qos << dds::core::policy::Durability::Volatile(); + w_qos << dds::core::policy::History::KeepLast(10U); + + this->run_test(r_qos, w_qos, 10); +} + +TYPED_TEST(SharedMemoryTest, writer_reader_default_qos) +{ + dds::sub::qos::DataReaderQos r_qos{}; + dds::pub::qos::DataWriterQos w_qos{}; + + this->run_test(r_qos, w_qos, 1); +} + +TYPED_TEST(SharedMemoryTest, writer_valid_shm_qos) +{ + dds::sub::qos::DataReaderQos r_qos; + r_qos << dds::core::policy::Reliability::BestEffort(); + r_qos << dds::core::policy::Durability::Volatile(); + r_qos << dds::core::policy::History::KeepLast(10U); + + dds::pub::qos::DataWriterQos w_qos; + w_qos << dds::core::policy::Reliability::Reliable(); + w_qos << dds::core::policy::Durability::Volatile(); + w_qos << dds::core::policy::History::KeepLast(10U); + + this->run_test(r_qos, w_qos, 10); +} + +TYPED_TEST(SharedMemoryTest, reader_valid_shm_qos) +{ + dds::sub::qos::DataReaderQos r_qos; + r_qos << dds::core::policy::Reliability::BestEffort(); + r_qos << dds::core::policy::Durability::Volatile(); + r_qos << dds::core::policy::History::KeepLast(10U); + + dds::pub::qos::DataWriterQos w_qos; + w_qos << dds::core::policy::Reliability::Reliable(); + w_qos << dds::core::policy::Durability::TransientLocal(); + w_qos << dds::core::policy::History::KeepLast(10U); + + this->run_test(r_qos, w_qos, 10); +} + +TYPED_TEST(SharedMemoryTest, invalid_shm_qos) +{ + dds::sub::qos::DataReaderQos r_qos; + r_qos << dds::core::policy::Reliability::BestEffort(); + r_qos << dds::core::policy::Durability::TransientLocal(); + r_qos << dds::core::policy::History::KeepLast(10U); + + dds::pub::qos::DataWriterQos w_qos; + w_qos << dds::core::policy::Reliability::Reliable(); + w_qos << dds::core::policy::Durability::TransientLocal(); + w_qos << dds::core::policy::History::KeepLast(10U); + + this->run_test(r_qos, w_qos, 10); +} + +TYPED_TEST(SharedMemoryTest, loan_sample) +{ + dds::sub::qos::DataReaderQos r_qos; + r_qos << dds::core::policy::Reliability::BestEffort(); + r_qos << dds::core::policy::Durability::Volatile(); + r_qos << dds::core::policy::History::KeepLast(10U); + + dds::pub::qos::DataWriterQos w_qos; + w_qos << dds::core::policy::Reliability::Reliable(); + w_qos << dds::core::policy::Durability::Volatile(); + w_qos << dds::core::policy::History::KeepLast(10U); + + this->run_test(r_qos, w_qos, 10, true); +}