Skip to content

Commit

Permalink
Address further comments
Browse files Browse the repository at this point in the history
* Add comment for future work
* Change scope of ?
* Change some names
* Clean up backpedalling method
  • Loading branch information
matthewd0123 committed Aug 9, 2024
1 parent 7aefa30 commit d6de3ef
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 43 deletions.
10 changes: 6 additions & 4 deletions subscription-cache/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ pub struct SubscriptionInformation {
pub config: EventDeliveryConfig,
}

// Will be moving this to up-rust
// Issue: https://github.com/eclipse-uprotocol/up-rust/issues/178
impl Eq for SubscriptionInformation {}

impl PartialEq for SubscriptionInformation {
Expand Down Expand Up @@ -82,13 +84,13 @@ impl SubscriptionCache {
UCode::INVALID_ARGUMENT,
"Unable to retrieve topic".to_string(),
)
});
})?;
let subscriber = subscription.subscriber.into_option().ok_or_else(|| {
UStatus::fail_with_code(
UCode::INVALID_ARGUMENT,
"Unable to retrieve topic".to_string(),
)
});
})?;
let status = if let Some(status) = subscription.status.into_option() {
status
} else {
Expand All @@ -109,8 +111,8 @@ impl SubscriptionCache {
};
// Create new hashset if the key does not exist and insert the subscription
let subscription_information = SubscriptionInformation {
topic: topic.clone()?,
subscriber: subscriber.clone()?,
topic: topic.clone(),
subscriber: subscriber.clone(),
status,
attributes,
config,
Expand Down
12 changes: 4 additions & 8 deletions up-linux-streamer/examples/mE_subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,8 @@ const PUB_TOPIC_UE_VERSION_MAJOR: u8 = 1;
const PUB_TOPIC_RESOURCE_ID: u16 = 0x8001;

#[allow(dead_code)]
struct PublishReceiver {}
impl PublishReceiver {
pub fn new() -> Self {
Self {}
}
}
struct PublishReceiver;

#[async_trait]
impl UListener for PublishReceiver {
async fn on_receive(&self, msg: UMessage) {
Expand All @@ -47,7 +43,7 @@ impl UListener for PublishReceiver {
let Some(payload_bytes) = msg.payload else {
panic!("No bytes available");
};
let _ = match Timer::parse_from_bytes(&payload_bytes) {
match Timer::parse_from_bytes(&payload_bytes) {
Ok(timer_message) => {
println!("timer: {timer_message:?}");
timer_message
Expand Down Expand Up @@ -97,7 +93,7 @@ async fn main() -> Result<(), UStatus> {
..Default::default()
};

let publish_receiver: Arc<dyn UListener> = Arc::new(PublishReceiver::new());
let publish_receiver: Arc<dyn UListener> = Arc::new(PublishReceiver);
// TODO: Need to revisit how the vsomeip config file is used in non point-to-point cases
subscriber
.register_listener(&source_filter, None, publish_receiver.clone())
Expand Down
9 changes: 2 additions & 7 deletions up-linux-streamer/examples/uE_subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,7 @@ const PUB_TOPIC_UE_VERSION_MAJOR: u8 = 1;
const PUB_TOPIC_RESOURCE_ID: u16 = 0x8001;

#[allow(dead_code)]
struct PublishReceiver {}
impl PublishReceiver {
pub fn new() -> Self {
Self {}
}
}
struct PublishReceiver;

#[derive(Parser, Debug)]
#[command(version, about, long_about = None)]
Expand Down Expand Up @@ -105,7 +100,7 @@ async fn main() -> Result<(), UStatus> {
..Default::default()
};

let publish_receiver: Arc<dyn UListener> = Arc::new(PublishReceiver::new());
let publish_receiver: Arc<dyn UListener> = Arc::new(PublishReceiver);
// TODO: Need to revisit how the vsomeip config file is used in non point-to-point cases
subscriber
.register_listener(&source_filter, None, publish_receiver.clone())
Expand Down
74 changes: 50 additions & 24 deletions up-streamer/src/ustreamer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@
* SPDX-License-Identifier: Apache-2.0
********************************************************************************/

#![allow(clippy::mutable_key_type)]

use crate::endpoint::Endpoint;
use async_std::channel::{Receiver, Sender};
use async_std::sync::{Arc, Mutex};
Expand Down Expand Up @@ -209,12 +207,13 @@ impl ForwardingListeners {
let forwarding_listener =
Arc::new(ForwardingListener::new(forwarding_id, out_sender.clone()));

type UUriPair = Vec<(UUri, Option<UUri>)>;
let mut uuri_to_backpedal: HashSet<UUriPair> = HashSet::new();
type SourceSinkFilterPair = (UUri, Option<UUri>);
#[allow(clippy::mutable_key_type)]
let mut uuris_to_backpedal: HashSet<SourceSinkFilterPair> = HashSet::new();

// Perform async registration and fetching

uuri_to_backpedal.insert(vec![(any_uuri(), Some(uauthority_to_uuri(out_authority)))]);
uuris_to_backpedal.insert((any_uuri(), Some(uauthority_to_uuri(out_authority))));
if let Err(err) = in_transport
.register_listener(
&any_uuri(),
Expand All @@ -223,47 +222,74 @@ impl ForwardingListeners {
)
.await
{
warn!("{FORWARDING_LISTENERS_TAG}:{FORWARDING_LISTENERS_FN_INSERT_TAG} unable to register request listener, error: {err}");
for uuri_pair in &uuri_to_backpedal {
for (uuri1, uuri2) in uuri_pair {
// Do something with uuri1 and uuri2
let _ = in_transport
.unregister_listener(uuri1, uuri2.as_ref(), forwarding_listener.clone())
.await;
}
warn!(
"{}:{} unable to register request listener, error: {}",
FORWARDING_LISTENERS_TAG, FORWARDING_LISTENERS_FN_INSERT_TAG, err
);
for uuri_pair in &uuris_to_backpedal {
if let Err(err) = in_transport
.unregister_listener(
&uuri_pair.0,
uuri_pair.1.as_ref(),
forwarding_listener.clone(),
)
.await
{
warn!(
"{}:{} unable to unregister listener, error: {}",
FORWARDING_LISTENERS_TAG, FORWARDING_LISTENERS_FN_INSERT_TAG, err
);
};
}
return Err(ForwardingListenerError::FailToRegisterNotificationRequestResponseListener);
} else {
debug!("{FORWARDING_LISTENERS_TAG}:{FORWARDING_LISTENERS_FN_INSERT_TAG} able to register request listener");
debug!(
"{}:{} able to register request listener",
FORWARDING_LISTENERS_TAG, FORWARDING_LISTENERS_FN_INSERT_TAG
);
}

#[allow(clippy::mutable_key_type)]
let subscribers = match subscription_cache
.lock()
.await
.fetch_cache_entry(out_authority.into())
{
Some(subscribers) => subscribers,
None => {
warn!("{FORWARDING_LISTENERS_TAG}:{FORWARDING_LISTENERS_FN_INSERT_TAG} no subscribers found for out_authority: {out_authority:?}");
warn!(
"{}:{} no subscribers found for out_authority: {:?}",
FORWARDING_LISTENERS_TAG, FORWARDING_LISTENERS_FN_INSERT_TAG, out_authority
);
HashSet::new()
}
};

for subscriber in subscribers {
uuri_to_backpedal.insert(vec![(subscriber.topic.clone(), None)]);
uuris_to_backpedal.insert((subscriber.topic.clone(), None));
if let Err(err) = in_transport
.register_listener(&subscriber.topic, None, forwarding_listener.clone())
.await
{
warn!("{FORWARDING_LISTENERS_TAG}:{FORWARDING_LISTENERS_FN_INSERT_TAG} unable to register listener, error: {err}");
warn!(
"{}:{} unable to register listener, error: {}",
FORWARDING_LISTENERS_TAG, FORWARDING_LISTENERS_FN_INSERT_TAG, err
);
// Perform async unregister_listener
for uuri_pair in &uuri_to_backpedal {
for (uuri1, uuri2) in uuri_pair {
// Do something with uuri1 and uuri2
let _ = in_transport
.unregister_listener(uuri1, uuri2.as_ref(), forwarding_listener.clone())
.await;
}
for uuri_pair in &uuris_to_backpedal {
if let Err(err) = in_transport
.unregister_listener(
&uuri_pair.0,
uuri_pair.1.as_ref(),
forwarding_listener.clone(),
)
.await
{
warn!(
"{}:{} unable to unregister listener, error: {}",
FORWARDING_LISTENERS_TAG, FORWARDING_LISTENERS_FN_INSERT_TAG, err
);
};
}
return Err(ForwardingListenerError::FailToRegisterPublishListener(
subscriber.topic,
Expand Down

0 comments on commit d6de3ef

Please sign in to comment.