Skip to content

Commit

Permalink
Fix missing Writer callbacks on DomainParticipantListener
Browse files Browse the repository at this point in the history
When setting a Listener on the DomainParticipant that needs to handle events originating at a DataWriter, the application ignores (or crashes in debug mode) the Writer event when it occurs.
This was caused by the DomainParticipant class not overriding the default Listener callback handler set in the Entity class that was meant as an abstract placeholder and caused an assertion to fail.
That is now resolved by properly overriding this placeholder with a callback handler that properly propagates the callback to the appropriate function in the registered DomainParticipantListener.
This should fix issue eclipse-cyclonedds#427
  • Loading branch information
e-hndrks authored and eboasson committed Sep 4, 2023
1 parent c3f656b commit 5316fff
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 52 deletions.
49 changes: 0 additions & 49 deletions src/ddscxx/include/dds/sub/detail/TDataReaderImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -532,55 +532,6 @@ dds::sub::detail::DataReader<T>::init(ObjectDelegate::weak_ref_type weak_ref)
/* Add the datareader to the datareader set of the subscriber */
this->sub_.delegate()->add_datareader(*this);

// Because listeners are added after reader is created (which is in enabled state, because
// disabled state is not yet supported), events could have occured before listeners were
// registered. Therefore the event handlers for those events are called here.
if (this->listener_get()) {
dds::core::status::StatusMask readerStatus = status_changes();

if (listener_mask.to_ulong() & dds::core::status::StatusMask::data_available().to_ulong()
&& readerStatus.test(DDS_DATA_AVAILABLE_STATUS_ID))
{
on_data_available(this->ddsc_entity);
}
if (listener_mask.to_ulong() & dds::core::status::StatusMask::liveliness_changed().to_ulong()
&& readerStatus.test(DDS_LIVELINESS_CHANGED_STATUS_ID))
{
dds::core::status::LivelinessChangedStatus status = liveliness_changed_status();
on_liveliness_changed(this->ddsc_entity, status);
}
if (listener_mask.to_ulong() & dds::core::status::StatusMask::requested_deadline_missed().to_ulong()
&& readerStatus.test(DDS_REQUESTED_DEADLINE_MISSED_STATUS_ID))
{
dds::core::status::RequestedDeadlineMissedStatus status = requested_deadline_missed_status();
on_requested_deadline_missed(this->ddsc_entity, status);
}
if (listener_mask.to_ulong() & dds::core::status::StatusMask::requested_incompatible_qos().to_ulong()
&& readerStatus.test(DDS_REQUESTED_INCOMPATIBLE_QOS_STATUS_ID))
{
dds::core::status::RequestedIncompatibleQosStatus status = requested_incompatible_qos_status();
on_requested_incompatible_qos(this->ddsc_entity, status);
}
if (listener_mask.to_ulong() & dds::core::status::StatusMask::sample_lost().to_ulong()
&& readerStatus.test(DDS_SAMPLE_LOST_STATUS_ID))
{
dds::core::status::SampleLostStatus status = sample_lost_status();
on_sample_lost(this->ddsc_entity, status);
}
if (listener_mask.to_ulong() & dds::core::status::StatusMask::sample_rejected().to_ulong()
&& readerStatus.test(DDS_SAMPLE_REJECTED_STATUS_ID))
{
dds::core::status::SampleRejectedStatus status = sample_rejected_status();
on_sample_rejected(this->ddsc_entity, status);
}
if (listener_mask.to_ulong() & dds::core::status::StatusMask::subscription_matched().to_ulong()
&& readerStatus.test(DDS_SUBSCRIPTION_MATCHED_STATUS_ID))
{
dds::core::status::SubscriptionMatchedStatus status = subscription_matched_status();
on_subscription_matched(this->ddsc_entity, status);
}
}

this->enable();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,14 @@ class OMG_DDS_API org::eclipse::cyclonedds::domain::DomainParticipantDelegate :
org::eclipse::cyclonedds::core::SubscriptionMatchedStatusDelegate &sd);
void on_sample_lost(dds_entity_t reader,
org::eclipse::cyclonedds::core::SampleLostStatusDelegate &sd);

void on_offered_deadline_missed(dds_entity_t writer,
org::eclipse::cyclonedds::core::OfferedDeadlineMissedStatusDelegate &sd);
void on_offered_incompatible_qos(dds_entity_t writer,
org::eclipse::cyclonedds::core::OfferedIncompatibleQosStatusDelegate &sd);
void on_liveliness_lost(dds_entity_t writer,
org::eclipse::cyclonedds::core::LivelinessLostStatusDelegate &sd);
void on_publication_matched(dds_entity_t writer,
org::eclipse::cyclonedds::core::PublicationMatchedStatusDelegate &sd);

private:
static org::eclipse::cyclonedds::core::EntitySet participants;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -732,3 +732,59 @@ void org::eclipse::cyclonedds::domain::DomainParticipantDelegate::on_sample_lost

this->listener()->on_sample_lost(adr, s);
}

void org::eclipse::cyclonedds::domain::DomainParticipantDelegate::on_offered_deadline_missed(dds_entity_t writer,
org::eclipse::cyclonedds::core::OfferedDeadlineMissedStatusDelegate &sd)
{
org::eclipse::cyclonedds::pub::AnyDataWriterDelegate::ref_type ref =
::std::dynamic_pointer_cast<org::eclipse::cyclonedds::pub::AnyDataWriterDelegate>(
this->extract_strong_ref(writer));
dds::pub::AnyDataWriter adw(ref);

dds::core::status::OfferedDeadlineMissedStatus s;
s.delegate() = sd;

this->listener()->on_offered_deadline_missed(adw, s);
}

void org::eclipse::cyclonedds::domain::DomainParticipantDelegate::on_offered_incompatible_qos(dds_entity_t writer,
org::eclipse::cyclonedds::core::OfferedIncompatibleQosStatusDelegate &sd)
{
org::eclipse::cyclonedds::pub::AnyDataWriterDelegate::ref_type ref =
::std::dynamic_pointer_cast<org::eclipse::cyclonedds::pub::AnyDataWriterDelegate>(
this->extract_strong_ref(writer));
dds::pub::AnyDataWriter adw(ref);

dds::core::status::OfferedIncompatibleQosStatus s;
s.delegate() = sd;

this->listener()->on_offered_incompatible_qos(adw, s);
}

void org::eclipse::cyclonedds::domain::DomainParticipantDelegate::on_liveliness_lost(dds_entity_t writer,
org::eclipse::cyclonedds::core::LivelinessLostStatusDelegate &sd)
{
org::eclipse::cyclonedds::pub::AnyDataWriterDelegate::ref_type ref =
::std::dynamic_pointer_cast<org::eclipse::cyclonedds::pub::AnyDataWriterDelegate>(
this->extract_strong_ref(writer));
dds::pub::AnyDataWriter adw(ref);

dds::core::status::LivelinessLostStatus s;
s.delegate() = sd;

this->listener()->on_liveliness_lost(adw, s);
}

void org::eclipse::cyclonedds::domain::DomainParticipantDelegate::on_publication_matched(dds_entity_t writer,
org::eclipse::cyclonedds::core::PublicationMatchedStatusDelegate &sd)
{
org::eclipse::cyclonedds::pub::AnyDataWriterDelegate::ref_type ref =
::std::dynamic_pointer_cast<org::eclipse::cyclonedds::pub::AnyDataWriterDelegate>(
this->extract_strong_ref(writer));
dds::pub::AnyDataWriter adw(ref);

dds::core::status::PublicationMatchedStatus s;
s.delegate() = sd;

this->listener()->on_publication_matched(adw, s);
}
64 changes: 62 additions & 2 deletions src/ddscxx/tests/Listener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,14 @@ class DomainParticipantListener : public virtual dds::domain::NoOpDomainParticip
{
public:
dds::sub::Subscriber data_on_readers_subscriber;

dds::sub::AnyDataReader data_available_reader;
dds::pub::AnyDataWriter publication_matched_writer;
dds::core::status::PublicationMatchedStatus publication_matched_status;

DomainParticipantListener() :
data_on_readers_subscriber(dds::core::null),
data_available_reader(dds::core::null) { }
data_available_reader(dds::core::null),
publication_matched_writer(dds::core::null) { }

protected:
virtual void on_data_on_readers(dds::sub::Subscriber& subs)
Expand All @@ -53,6 +55,18 @@ class DomainParticipantListener : public virtual dds::domain::NoOpDomainParticip
ddsrt_cond_broadcast(&g_cond);
ddsrt_mutex_unlock(&g_mutex);
}

virtual void on_publication_matched(dds::pub::AnyDataWriter& writer,
const ::dds::core::status::PublicationMatchedStatus& status)
{
ddsrt_mutex_lock(&g_mutex);
cb_called |= DDS_PUBLICATION_MATCHED_STATUS;
this->publication_matched_writer = writer;
this->publication_matched_status = status;
ddsrt_cond_broadcast(&g_cond);
ddsrt_mutex_unlock(&g_mutex);
}

};

class TopicListener : public virtual dds::topic::NoOpTopicListener<HelloWorldData::Msg>
Expand Down Expand Up @@ -1471,3 +1485,49 @@ TEST_F(Listener, propagation)
ASSERT_EQ(publisherListener.publication_matched_writer.delegate(), writer.delegate());
ASSERT_EQ(participantListener.data_on_readers_subscriber.delegate(), subscriber.delegate());
}

TEST_F(Listener, propagation2)
{
DomainParticipantListener participantListener;
SubscriberListener subscriberListener;
PublisherListener publisherListener;
dds::core::status::StatusMask participantMask =
dds::core::status::StatusMask() <<
dds::core::status::StatusMask::data_on_readers() <<
dds::core::status::StatusMask::publication_matched();
dds::core::status::StatusMask subscriberMask =
dds::core::status::StatusMask() <<
dds::core::status::StatusMask::subscription_matched();

// Set listener on participant
participant.listener(&participantListener, participantMask);

// Set listener on subscriber
subscriber.listener(&subscriberListener, subscriberMask);

// Create reader and writer without listener
dds::sub::DataReader<HelloWorldData::Msg> reader(
subscriber, topic);
ASSERT_NE(reader, dds::core::null);

dds::pub::DataWriter<HelloWorldData::Msg> writer(
publisher, topic);
ASSERT_NE(writer, dds::core::null);

// Publication and Subscription should be matched.
uint32_t triggered = waitfor_cb(DDS_PUBLICATION_MATCHED_STATUS | DDS_SUBSCRIPTION_MATCHED_STATUS);
ASSERT_EQ(triggered & DDS_SUBSCRIPTION_MATCHED_STATUS, DDS_SUBSCRIPTION_MATCHED_STATUS);
ASSERT_EQ(triggered & DDS_PUBLICATION_MATCHED_STATUS, DDS_PUBLICATION_MATCHED_STATUS);
ASSERT_EQ(participantListener.publication_matched_writer.delegate(), writer.delegate());
ASSERT_EQ(subscriberListener.subscription_matched_reader.delegate(), reader.delegate());

// Write sample
HelloWorldData::Msg sample(1, "test");
writer << sample;

// Data on readers should be triggered with the right status.
triggered = waitfor_cb(DDS_DATA_ON_READERS_STATUS);
ASSERT_EQ(triggered & DDS_DATA_ON_READERS_STATUS, DDS_DATA_ON_READERS_STATUS);
ASSERT_EQ(participantListener.publication_matched_writer.delegate(), writer.delegate());
ASSERT_EQ(participantListener.data_on_readers_subscriber.delegate(), subscriber.delegate());
}

0 comments on commit 5316fff

Please sign in to comment.