From d6de3ef5c72063fb195d84b82cf0250b2bf0ea19 Mon Sep 17 00:00:00 2001 From: Matthew D'Alonzo Date: Fri, 9 Aug 2024 10:12:26 -0400 Subject: [PATCH] Address further comments * Add comment for future work * Change scope of ? * Change some names * Clean up backpedalling method --- subscription-cache/src/lib.rs | 10 +-- up-linux-streamer/examples/mE_subscriber.rs | 12 ++-- up-linux-streamer/examples/uE_subscriber.rs | 9 +-- up-streamer/src/ustreamer.rs | 74 ++++++++++++++------- 4 files changed, 62 insertions(+), 43 deletions(-) diff --git a/subscription-cache/src/lib.rs b/subscription-cache/src/lib.rs index 2723b721..a3d5dfa2 100644 --- a/subscription-cache/src/lib.rs +++ b/subscription-cache/src/lib.rs @@ -32,6 +32,8 @@ pub struct SubscriptionInformation { pub config: EventDeliveryConfig, } +// Will be moving this to up-rust +// Issue: https://github.com/eclipse-uprotocol/up-rust/issues/178 impl Eq for SubscriptionInformation {} impl PartialEq for SubscriptionInformation { @@ -82,13 +84,13 @@ impl SubscriptionCache { UCode::INVALID_ARGUMENT, "Unable to retrieve topic".to_string(), ) - }); + })?; let subscriber = subscription.subscriber.into_option().ok_or_else(|| { UStatus::fail_with_code( UCode::INVALID_ARGUMENT, "Unable to retrieve topic".to_string(), ) - }); + })?; let status = if let Some(status) = subscription.status.into_option() { status } else { @@ -109,8 +111,8 @@ impl SubscriptionCache { }; // Create new hashset if the key does not exist and insert the subscription let subscription_information = SubscriptionInformation { - topic: topic.clone()?, - subscriber: subscriber.clone()?, + topic: topic.clone(), + subscriber: subscriber.clone(), status, attributes, config, diff --git a/up-linux-streamer/examples/mE_subscriber.rs b/up-linux-streamer/examples/mE_subscriber.rs index f02d9888..2f4eefcd 100644 --- a/up-linux-streamer/examples/mE_subscriber.rs +++ b/up-linux-streamer/examples/mE_subscriber.rs @@ -33,12 +33,8 @@ const PUB_TOPIC_UE_VERSION_MAJOR: u8 = 1; const PUB_TOPIC_RESOURCE_ID: u16 = 0x8001; #[allow(dead_code)] -struct PublishReceiver {} -impl PublishReceiver { - pub fn new() -> Self { - Self {} - } -} +struct PublishReceiver; + #[async_trait] impl UListener for PublishReceiver { async fn on_receive(&self, msg: UMessage) { @@ -47,7 +43,7 @@ impl UListener for PublishReceiver { let Some(payload_bytes) = msg.payload else { panic!("No bytes available"); }; - let _ = match Timer::parse_from_bytes(&payload_bytes) { + match Timer::parse_from_bytes(&payload_bytes) { Ok(timer_message) => { println!("timer: {timer_message:?}"); timer_message @@ -97,7 +93,7 @@ async fn main() -> Result<(), UStatus> { ..Default::default() }; - let publish_receiver: Arc = Arc::new(PublishReceiver::new()); + let publish_receiver: Arc = Arc::new(PublishReceiver); // TODO: Need to revisit how the vsomeip config file is used in non point-to-point cases subscriber .register_listener(&source_filter, None, publish_receiver.clone()) diff --git a/up-linux-streamer/examples/uE_subscriber.rs b/up-linux-streamer/examples/uE_subscriber.rs index 4bdf6f0c..2222f968 100644 --- a/up-linux-streamer/examples/uE_subscriber.rs +++ b/up-linux-streamer/examples/uE_subscriber.rs @@ -29,12 +29,7 @@ const PUB_TOPIC_UE_VERSION_MAJOR: u8 = 1; const PUB_TOPIC_RESOURCE_ID: u16 = 0x8001; #[allow(dead_code)] -struct PublishReceiver {} -impl PublishReceiver { - pub fn new() -> Self { - Self {} - } -} +struct PublishReceiver; #[derive(Parser, Debug)] #[command(version, about, long_about = None)] @@ -105,7 +100,7 @@ async fn main() -> Result<(), UStatus> { ..Default::default() }; - let publish_receiver: Arc = Arc::new(PublishReceiver::new()); + let publish_receiver: Arc = Arc::new(PublishReceiver); // TODO: Need to revisit how the vsomeip config file is used in non point-to-point cases subscriber .register_listener(&source_filter, None, publish_receiver.clone()) diff --git a/up-streamer/src/ustreamer.rs b/up-streamer/src/ustreamer.rs index ba3758f2..040316b3 100644 --- a/up-streamer/src/ustreamer.rs +++ b/up-streamer/src/ustreamer.rs @@ -11,8 +11,6 @@ * SPDX-License-Identifier: Apache-2.0 ********************************************************************************/ -#![allow(clippy::mutable_key_type)] - use crate::endpoint::Endpoint; use async_std::channel::{Receiver, Sender}; use async_std::sync::{Arc, Mutex}; @@ -209,12 +207,13 @@ impl ForwardingListeners { let forwarding_listener = Arc::new(ForwardingListener::new(forwarding_id, out_sender.clone())); - type UUriPair = Vec<(UUri, Option)>; - let mut uuri_to_backpedal: HashSet = HashSet::new(); + type SourceSinkFilterPair = (UUri, Option); + #[allow(clippy::mutable_key_type)] + let mut uuris_to_backpedal: HashSet = HashSet::new(); // Perform async registration and fetching - uuri_to_backpedal.insert(vec![(any_uuri(), Some(uauthority_to_uuri(out_authority)))]); + uuris_to_backpedal.insert((any_uuri(), Some(uauthority_to_uuri(out_authority)))); if let Err(err) = in_transport .register_listener( &any_uuri(), @@ -223,20 +222,34 @@ impl ForwardingListeners { ) .await { - warn!("{FORWARDING_LISTENERS_TAG}:{FORWARDING_LISTENERS_FN_INSERT_TAG} unable to register request listener, error: {err}"); - for uuri_pair in &uuri_to_backpedal { - for (uuri1, uuri2) in uuri_pair { - // Do something with uuri1 and uuri2 - let _ = in_transport - .unregister_listener(uuri1, uuri2.as_ref(), forwarding_listener.clone()) - .await; - } + warn!( + "{}:{} unable to register request listener, error: {}", + FORWARDING_LISTENERS_TAG, FORWARDING_LISTENERS_FN_INSERT_TAG, err + ); + for uuri_pair in &uuris_to_backpedal { + if let Err(err) = in_transport + .unregister_listener( + &uuri_pair.0, + uuri_pair.1.as_ref(), + forwarding_listener.clone(), + ) + .await + { + warn!( + "{}:{} unable to unregister listener, error: {}", + FORWARDING_LISTENERS_TAG, FORWARDING_LISTENERS_FN_INSERT_TAG, err + ); + }; } return Err(ForwardingListenerError::FailToRegisterNotificationRequestResponseListener); } else { - debug!("{FORWARDING_LISTENERS_TAG}:{FORWARDING_LISTENERS_FN_INSERT_TAG} able to register request listener"); + debug!( + "{}:{} able to register request listener", + FORWARDING_LISTENERS_TAG, FORWARDING_LISTENERS_FN_INSERT_TAG + ); } + #[allow(clippy::mutable_key_type)] let subscribers = match subscription_cache .lock() .await @@ -244,26 +257,39 @@ impl ForwardingListeners { { Some(subscribers) => subscribers, None => { - warn!("{FORWARDING_LISTENERS_TAG}:{FORWARDING_LISTENERS_FN_INSERT_TAG} no subscribers found for out_authority: {out_authority:?}"); + warn!( + "{}:{} no subscribers found for out_authority: {:?}", + FORWARDING_LISTENERS_TAG, FORWARDING_LISTENERS_FN_INSERT_TAG, out_authority + ); HashSet::new() } }; for subscriber in subscribers { - uuri_to_backpedal.insert(vec![(subscriber.topic.clone(), None)]); + uuris_to_backpedal.insert((subscriber.topic.clone(), None)); if let Err(err) = in_transport .register_listener(&subscriber.topic, None, forwarding_listener.clone()) .await { - warn!("{FORWARDING_LISTENERS_TAG}:{FORWARDING_LISTENERS_FN_INSERT_TAG} unable to register listener, error: {err}"); + warn!( + "{}:{} unable to register listener, error: {}", + FORWARDING_LISTENERS_TAG, FORWARDING_LISTENERS_FN_INSERT_TAG, err + ); // Perform async unregister_listener - for uuri_pair in &uuri_to_backpedal { - for (uuri1, uuri2) in uuri_pair { - // Do something with uuri1 and uuri2 - let _ = in_transport - .unregister_listener(uuri1, uuri2.as_ref(), forwarding_listener.clone()) - .await; - } + for uuri_pair in &uuris_to_backpedal { + if let Err(err) = in_transport + .unregister_listener( + &uuri_pair.0, + uuri_pair.1.as_ref(), + forwarding_listener.clone(), + ) + .await + { + warn!( + "{}:{} unable to unregister listener, error: {}", + FORWARDING_LISTENERS_TAG, FORWARDING_LISTENERS_FN_INSERT_TAG, err + ); + }; } return Err(ForwardingListenerError::FailToRegisterPublishListener( subscriber.topic,