From cae2a0348802ff6d106bacf6faa11d861fd1d6ba Mon Sep 17 00:00:00 2001 From: Matthew D'Alonzo Date: Thu, 8 Aug 2024 14:28:48 -0400 Subject: [PATCH] Address further comments from Pete * Add support for error handling in case topic registration fails * Remove some duplicate comments * Clean up some if-else blocks * Switch async mutex to sync mutex in the subscription cache * Remove fetch_cache method * Remove a naked unwrap * Add some places where explicit errors are returned --- Cargo.lock | 1 - Cargo.toml | 3 +- subscription-cache/Cargo.toml | 1 - subscription-cache/src/lib.rs | 75 ++++++----- up-linux-streamer/DEFAULT_CONFIG.json5 | 8 ++ up-linux-streamer/examples/mE_publisher.rs | 4 +- up-linux-streamer/examples/mE_subscriber.rs | 14 +- up-linux-streamer/examples/uE_client.rs | 28 ++-- up-linux-streamer/examples/uE_publisher.rs | 27 +++- up-linux-streamer/examples/uE_service.rs | 32 +++-- up-linux-streamer/examples/uE_subscriber.rs | 43 ++++--- up-linux-streamer/src/config.rs | 14 ++ up-linux-streamer/src/main.rs | 8 +- up-streamer/src/ustreamer.rs | 136 ++++++++++++++------ 14 files changed, 263 insertions(+), 131 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5b5caa4f..43b9bfdc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3493,7 +3493,6 @@ dependencies = [ "env_logger 0.10.2", "futures", "log", - "prost", "protobuf", "serde", "serde_json", diff --git a/Cargo.toml b/Cargo.toml index 9886e61c..abac373d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,7 +18,7 @@ members = [ "up-streamer", "subscription-cache", "utils/usubscription-static-file"] [workspace.package] -rust-version = "1.72.1" +rust-version = "1.76.0" version = "0.1.5-dev" # uProtocol version repository = "https://github.com/eclipse-uprotocol/up-streamer-rust" homepage = "https://github.com/eclipse-uprotocol" @@ -34,7 +34,6 @@ env_logger = { version = "0.10.1" } futures = { version = "0.3.30" } log = { version = "0.4.20" } json5 = { version = "0.4.1" } -prost = { version = "0.12" } serde = { version = "1.0.154", features = ["derive"] } serde_json = { version = "1.0.94" } uuid = { version = "1.7.0" } diff --git a/subscription-cache/Cargo.toml b/subscription-cache/Cargo.toml index 2edc28f8..62acd8be 100644 --- a/subscription-cache/Cargo.toml +++ b/subscription-cache/Cargo.toml @@ -16,7 +16,6 @@ async-trait = { workspace = true } env_logger = { workspace = true } futures = { workspace = true } log = { workspace = true } -prost = { workspace = true } uuid = { workspace = true } serde_json = { workspace = true } serde = { workspace = true } diff --git a/subscription-cache/src/lib.rs b/subscription-cache/src/lib.rs index c83d1e45..945c8c21 100644 --- a/subscription-cache/src/lib.rs +++ b/subscription-cache/src/lib.rs @@ -11,14 +11,15 @@ * SPDX-License-Identifier: Apache-2.0 ********************************************************************************/ -use async_std::sync::Mutex; use std::collections::{HashMap, HashSet}; use std::hash::{Hash, Hasher}; +use std::sync::Mutex; use up_rust::core::usubscription::{ EventDeliveryConfig, FetchSubscriptionsResponse, SubscribeAttributes, SubscriberInfo, SubscriptionStatus, }; use up_rust::UUri; +use up_rust::{UCode, UStatus}; pub type SubscribersMap = Mutex>>; @@ -61,25 +62,33 @@ pub struct SubscriptionCache { subscription_cache_map: SubscribersMap, } +impl Default for SubscriptionCache { + fn default() -> Self { + Self { + subscription_cache_map: Mutex::new(HashMap::new()), + } + } +} + /// A [`SubscriptionCache`] is used to store and manage subscriptions to /// topics. It is kept local to the streamer. The streamer will receive updates /// from the subscription service, and update the SubscriptionCache accordingly. impl SubscriptionCache { - pub fn new(subscription_cache_map: FetchSubscriptionsResponse) -> Self { + pub fn new(subscription_cache_map: FetchSubscriptionsResponse) -> Result { let mut subscription_cache_hash_map = HashMap::new(); for subscription in subscription_cache_map.subscriptions { - let topic = if let Some(topic) = subscription.topic.into_option() { - topic - } else { - println!("Unable to parse URI from subscription, skipping..."); - continue; - }; - let subscriber = if let Some(subscriber) = subscription.subscriber.into_option() { - subscriber - } else { - println!("Unable to parse subscriber from subscription, skipping..."); - continue; - }; + let topic = subscription.topic.into_option().ok_or_else(|| { + return UStatus::fail_with_code( + UCode::INVALID_ARGUMENT, + format!("Unable to retrieve topic"), + ); + }); + let subscriber = subscription.subscriber.into_option().ok_or_else(|| { + return UStatus::fail_with_code( + UCode::INVALID_ARGUMENT, + format!("Unable to retrieve topic"), + ); + }); let status = if let Some(status) = subscription.status.into_option() { status } else { @@ -100,38 +109,36 @@ impl SubscriptionCache { }; // Create new hashset if the key does not exist and insert the subscription let subscription_information = SubscriptionInformation { - topic: topic.clone(), - subscriber, + topic: topic.clone()?, + subscriber: subscriber.clone()?, status, attributes, config, }; - let subscriber_authority_name = subscription_information - .subscriber - .uri - .as_ref() - .unwrap() - .authority_name - .clone(); + let subscriber_authority_name = match subscription_information.subscriber.uri.as_ref() { + Some(uri) => uri.authority_name.clone(), + None => { + return Err(UStatus::fail_with_code( + UCode::INVALID_ARGUMENT, + "Unable to retrieve authority name", + )) + } + }; subscription_cache_hash_map .entry(subscriber_authority_name) .or_insert_with(HashSet::new) .insert(subscription_information); } - Self { + Ok(Self { subscription_cache_map: Mutex::new(subscription_cache_hash_map), - } + }) } - pub async fn fetch_cache_entry( - &self, - entry: String, - ) -> Option> { - let map = self.subscription_cache_map.lock().await; + pub fn fetch_cache_entry(&self, entry: String) -> Option> { + let map = match self.subscription_cache_map.lock() { + Ok(map) => map, + Err(_) => return None, + }; map.get(&entry).cloned() } - - pub async fn fetch_cache(&self) -> HashMap> { - self.subscription_cache_map.lock().await.clone() - } } diff --git a/up-linux-streamer/DEFAULT_CONFIG.json5 b/up-linux-streamer/DEFAULT_CONFIG.json5 index f5ad0cce..bab1daac 100644 --- a/up-linux-streamer/DEFAULT_CONFIG.json5 +++ b/up-linux-streamer/DEFAULT_CONFIG.json5 @@ -16,6 +16,14 @@ // Used when initializing host transport authority: "linux" }, + usubscription_config: { + // Lists the path to the subscription file when using static file + file_path: "static-configs/testdata.json" + } + zenoh_transport_config: { + // Endpoint to be used when starting Zenoh session + endpoint: "tcp/0.0.0.0:7447" + }, someip_config: { // Determines the authority_name of the mechatronics network // Used when initializing SOME/IP transport diff --git a/up-linux-streamer/examples/mE_publisher.rs b/up-linux-streamer/examples/mE_publisher.rs index b4f3d6b1..08c9be08 100644 --- a/up-linux-streamer/examples/mE_publisher.rs +++ b/up-linux-streamer/examples/mE_publisher.rs @@ -44,7 +44,7 @@ async fn main() -> Result<(), UStatus> { // There will be a single vsomeip_transport, as there is a connection into device and a streamer // TODO: Add error handling if we fail to create a UPTransportVsomeip - let client: Arc = Arc::new( + let publisher: Arc = Arc::new( UPTransportVsomeip::new_with_config( &PUB_TOPIC_AUTHORITY.to_string(), &REMOTE_AUTHORITY.to_string(), @@ -86,6 +86,6 @@ async fn main() -> Result<(), UStatus> { .unwrap(); println!("Sending Publish message:\n{publish_msg:?}"); - client.send(publish_msg).await?; + publisher.send(publish_msg).await?; } } diff --git a/up-linux-streamer/examples/mE_subscriber.rs b/up-linux-streamer/examples/mE_subscriber.rs index 80a36d9c..f02d9888 100644 --- a/up-linux-streamer/examples/mE_subscriber.rs +++ b/up-linux-streamer/examples/mE_subscriber.rs @@ -33,12 +33,10 @@ const PUB_TOPIC_UE_VERSION_MAJOR: u8 = 1; const PUB_TOPIC_RESOURCE_ID: u16 = 0x8001; #[allow(dead_code)] -struct PublishReceiver { - client: Arc, -} +struct PublishReceiver {} impl PublishReceiver { - pub fn new(client: Arc) -> Self { - Self { client } + pub fn new() -> Self { + Self {} } } #[async_trait] @@ -80,7 +78,7 @@ async fn main() -> Result<(), UStatus> { // There will be a single vsomeip_transport, as there is a connection into device and a streamer // TODO: Add error handling if we fail to create a UPTransportVsomeip - let service: Arc = Arc::new( + let subscriber: Arc = Arc::new( UPTransportVsomeip::new_with_config( &SERVICE_AUTHORITY.to_string(), &REMOTE_AUTHORITY.to_string(), @@ -99,9 +97,9 @@ async fn main() -> Result<(), UStatus> { ..Default::default() }; - let publish_receiver: Arc = Arc::new(PublishReceiver::new(service.clone())); + let publish_receiver: Arc = Arc::new(PublishReceiver::new()); // TODO: Need to revisit how the vsomeip config file is used in non point-to-point cases - service + subscriber .register_listener(&source_filter, None, publish_receiver.clone()) .await?; diff --git a/up-linux-streamer/examples/uE_client.rs b/up-linux-streamer/examples/uE_client.rs index e5e4aa20..994061b6 100644 --- a/up-linux-streamer/examples/uE_client.rs +++ b/up-linux-streamer/examples/uE_client.rs @@ -33,6 +33,14 @@ const CLIENT_RESOURCE_ID: u16 = 0; const REQUEST_TTL: u32 = 1000; +#[derive(Parser, Debug)] +#[command(version, about, long_about = None)] +struct Args { + /// The endpoint for Zenoh client to connect to + #[arg(short, long)] + endpoint: String, +} + struct ServiceResponseListener; #[async_trait] @@ -60,21 +68,25 @@ impl UListener for ServiceResponseListener { async fn main() -> Result<(), UStatus> { env_logger::init(); + let args = Args::parse(); + println!("uE_client"); // TODO: Probably make somewhat configurable? // Create a configuration object let mut zenoh_config = Config::default(); - // Specify the address to listen on using IPv4 - let ipv4_endpoint = EndPoint::from_str("tcp/0.0.0.0:7445"); + if !args.endpoint.is_empty() { + // Specify the address to listen on using IPv4 + let ipv4_endpoint = EndPoint::from_str(args.endpoint.as_str()); + + // Add the IPv4 endpoint to the Zenoh configuration + zenoh_config + .listen + .endpoints + .push(ipv4_endpoint.expect("FAIL")); + } - // Add the IPv4 endpoint to the Zenoh configuration - zenoh_config - .listen - .endpoints - .push(ipv4_endpoint.expect("FAIL")); - // TODO: Add error handling if we fail to create a UPClientZenoh let client: Arc = Arc::new( UPClientZenoh::new(zenoh_config, "linux".to_string()) .await diff --git a/up-linux-streamer/examples/uE_publisher.rs b/up-linux-streamer/examples/uE_publisher.rs index ab2eed58..ca51767a 100644 --- a/up-linux-streamer/examples/uE_publisher.rs +++ b/up-linux-streamer/examples/uE_publisher.rs @@ -27,24 +27,37 @@ const PUB_TOPIC_UE_ID: u16 = 0x3039; const PUB_TOPIC_UE_VERSION_MAJOR: u8 = 1; const PUB_TOPIC_RESOURCE_ID: u16 = 0x8001; +#[derive(Parser, Debug)] +#[command(version, about, long_about = None)] +struct Args { + /// The endpoint for Zenoh client to connect to + #[arg(short, long)] + endpoint: String, +} + #[tokio::main] async fn main() -> Result<(), UStatus> { env_logger::init(); + let args = Args::parse(); + println!("uE_publisher"); // TODO: Probably make somewhat configurable? // Create a configuration object let mut zenoh_config = Config::default(); - // Specify the address to listen on using IPv4 - let ipv4_endpoint = EndPoint::from_str("tcp/0.0.0.0:7445"); + if !args.endpoint.is_empty() { + // Specify the address to listen on using IPv4 + let ipv4_endpoint = EndPoint::from_str(args.endpoint.as_str()); + + // Add the IPv4 endpoint to the Zenoh configuration + zenoh_config + .listen + .endpoints + .push(ipv4_endpoint.expect("FAIL")); + } - // Add the IPv4 endpoint to the Zenoh configuration - zenoh_config - .listen - .endpoints - .push(ipv4_endpoint.expect("FAIL")); // TODO: Add error handling if we fail to create a UPClientZenoh let client: Arc = Arc::new( UPClientZenoh::new(zenoh_config, "linux".to_string()) diff --git a/up-linux-streamer/examples/uE_service.rs b/up-linux-streamer/examples/uE_service.rs index 96f63130..c41b59aa 100644 --- a/up-linux-streamer/examples/uE_service.rs +++ b/up-linux-streamer/examples/uE_service.rs @@ -1,4 +1,5 @@ use async_trait::async_trait; +use clap::Parser; use hello_world_protos::hello_world_service::{HelloRequest, HelloResponse}; use log::error; use protobuf::Message; @@ -22,6 +23,15 @@ impl ServiceRequestResponder { Self { client } } } + +#[derive(Parser, Debug)] +#[command(version, about, long_about = None)] +struct Args { + /// The endpoint for Zenoh client to connect to + #[arg(short, long)] + endpoint: String, +} + #[async_trait] impl UListener for ServiceRequestResponder { async fn on_receive(&self, msg: UMessage) { @@ -61,18 +71,24 @@ impl UListener for ServiceRequestResponder { async fn main() -> Result<(), UStatus> { env_logger::init(); + let args = Args::parse(); + println!("uE_service"); // TODO: Probably make somewhat configurable? let mut zenoh_config = Config::default(); - // Specify the address to listen on using IPv4 - let ipv4_endpoint = EndPoint::from_str("tcp/0.0.0.0:7445"); - - // Add the IPv4 endpoint to the Zenoh configuration - zenoh_config - .listen - .endpoints - .push(ipv4_endpoint.expect("FAIL")); + + if !args.endpoint.is_empty() { + // Specify the address to listen on using IPv4 + let ipv4_endpoint = EndPoint::from_str(args.endpoint.as_str()); + + // Add the IPv4 endpoint to the Zenoh configuration + zenoh_config + .listen + .endpoints + .push(ipv4_endpoint.expect("FAIL")); + } + // TODO: Add error handling if we fail to create a UPClientZenoh let service: Arc = Arc::new( UPClientZenoh::new(zenoh_config, "linux".to_string()) diff --git a/up-linux-streamer/examples/uE_subscriber.rs b/up-linux-streamer/examples/uE_subscriber.rs index 1e612794..9f9a9c44 100644 --- a/up-linux-streamer/examples/uE_subscriber.rs +++ b/up-linux-streamer/examples/uE_subscriber.rs @@ -28,14 +28,21 @@ const PUB_TOPIC_UE_VERSION_MAJOR: u8 = 1; const PUB_TOPIC_RESOURCE_ID: u16 = 0x8001; #[allow(dead_code)] -struct PublishReceiver { - client: Arc, -} +struct PublishReceiver {} impl PublishReceiver { - pub fn new(client: Arc) -> Self { - Self { client } + pub fn new() -> Self { + Self {} } } + +#[derive(Parser, Debug)] +#[command(version, about, long_about = None)] +struct Args { + /// The endpoint for Zenoh client to connect to + #[arg(short, long)] + endpoint: String, +} + #[async_trait] impl UListener for PublishReceiver { async fn on_receive(&self, msg: UMessage) { @@ -44,14 +51,12 @@ impl UListener for PublishReceiver { let Some(payload_bytes) = msg.payload else { panic!("No bytes available"); }; - let _ = match Timer::parse_from_bytes(&payload_bytes) { + match Timer::parse_from_bytes(&payload_bytes) { Ok(timer_message) => { println!("timer: {timer_message:?}"); - timer_message } Err(err) => { error!("Unable to parse Timer Message: {err:?}"); - return; } }; } @@ -65,21 +70,25 @@ impl UListener for PublishReceiver { async fn main() -> Result<(), UStatus> { env_logger::init(); + let args = Args::parse(); + println!("uE_subscriber"); // TODO: Probably make somewhat configurable? // Create a configuration object let mut zenoh_config = Config::default(); - // Specify the address to listen on using IPv4 - let ipv4_endpoint = EndPoint::from_str("tcp/0.0.0.0:7445"); + if !args.endpoint.is_empty() { + // Specify the address to listen on using IPv4 + let ipv4_endpoint = EndPoint::from_str(args.endpoint.as_str()); + + // Add the IPv4 endpoint to the Zenoh configuration + zenoh_config + .listen + .endpoints + .push(ipv4_endpoint.expect("FAIL")); + } - // Add the IPv4 endpoint to the Zenoh configuration - zenoh_config - .listen - .endpoints - .push(ipv4_endpoint.expect("FAIL")); - // TODO: Add error handling if we fail to create a UPClientZenoh // TODO: Add error handling if we fail to create a UPClientZenoh let service: Arc = Arc::new( UPClientZenoh::new(zenoh_config, "linux".to_string()) @@ -95,7 +104,7 @@ async fn main() -> Result<(), UStatus> { ..Default::default() }; - let publish_receiver: Arc = Arc::new(PublishReceiver::new(service.clone())); + let publish_receiver: Arc = Arc::new(PublishReceiver::new()); // TODO: Need to revisit how the vsomeip config file is used in non point-to-point cases service .register_listener(&source_filter, None, publish_receiver.clone()) diff --git a/up-linux-streamer/src/config.rs b/up-linux-streamer/src/config.rs index 58e90aad..81f5d614 100644 --- a/up-linux-streamer/src/config.rs +++ b/up-linux-streamer/src/config.rs @@ -6,6 +6,8 @@ use std::path::PathBuf; pub struct Config { pub(crate) up_streamer_config: UpStreamerConfig, pub(crate) host_config: HostConfig, + pub(crate) usubscription_config: USubscriptionConfig, + pub(crate) zenoh_transport_config: ZenohTransportConfig, pub(crate) someip_config: SomeipConfig, } @@ -22,6 +24,18 @@ pub struct HostConfig { pub(crate) authority: String, } +#[derive(Deserialize, Serialize, Debug, Clone)] +#[serde(deny_unknown_fields)] +pub struct USubscriptionConfig { + pub(crate) file_path: String, +} + +#[derive(Deserialize, Serialize, Debug, Clone)] +#[serde(deny_unknown_fields)] +pub struct ZenohTransportConfig { + pub(crate) endpoint: String, +} + #[derive(Deserialize, Serialize, Debug, Clone)] #[serde(deny_unknown_fields)] pub struct SomeipConfig { diff --git a/up-linux-streamer/src/main.rs b/up-linux-streamer/src/main.rs index 6eb607dc..1d32bbd4 100644 --- a/up-linux-streamer/src/main.rs +++ b/up-linux-streamer/src/main.rs @@ -27,9 +27,6 @@ struct StreamerArgs { async fn main() -> Result<(), UStatus> { env_logger::init(); - let subscription_path = "static-configs/testdata.json".to_string(); - let usubscription = Arc::new(USubscriptionStaticFile::new(subscription_path)); - let args = StreamerArgs::parse(); let mut file = File::open(args.config) @@ -49,6 +46,9 @@ async fn main() -> Result<(), UStatus> { ) })?; + let subscription_path = config.usubscription_config.file_path; + let usubscription = Arc::new(USubscriptionStaticFile::new(subscription_path)); + let mut streamer = UStreamer::new( "up-linux-streamer", config.up_streamer_config.message_queue_size, @@ -58,7 +58,7 @@ async fn main() -> Result<(), UStatus> { let mut zenoh_config = ZenohConfig::default(); // Specify the address to listen on using IPv4 - let ipv4_endpoint = ZenohEndpoint::from_str("tcp/0.0.0.0:7447"); + let ipv4_endpoint = ZenohEndpoint::from_str(config.zenoh_transport_config.endpoint.as_str()); // Add the IPv4 endpoint to the Zenoh configuration zenoh_config diff --git a/up-streamer/src/ustreamer.rs b/up-streamer/src/ustreamer.rs index 47c1a69b..b48799a9 100644 --- a/up-streamer/src/ustreamer.rs +++ b/up-streamer/src/ustreamer.rs @@ -18,6 +18,9 @@ use async_std::channel::{Receiver, Sender}; use async_std::sync::{Arc, Mutex}; use async_std::{channel, task}; use async_trait::async_trait; +use std::error::Error; +use std::fmt; +use std::fmt::{Debug, Display, Formatter}; use log::*; use std::collections::{HashMap, HashSet}; use std::hash::{Hash, Hasher}; @@ -25,9 +28,7 @@ use std::ops::Deref; use std::str; use std::thread; use subscription_cache::SubscriptionCache; -use up_rust::core::usubscription::{ - FetchSubscriptionsRequest, NotificationsRequest, SubscriberInfo, USubscription, -}; +use up_rust::core::usubscription::{FetchSubscriptionsRequest, SubscriberInfo, USubscription}; use up_rust::{UCode, UListener, UMessage, UStatus, UTransport, UUIDBuilder, UUri}; const USTREAMER_TAG: &str = "UStreamer:"; @@ -55,6 +56,40 @@ fn any_uuri() -> UUri { } } +// Used to track any errors in creating forwarding listeners +pub enum ForwardingListenerError { + FailToRegisterNotificationRequestResponseListener, + FailToRegisterPublishListener(UUri), // we can embed the subscriber.topic here which failed +} + +impl Debug for ForwardingListenerError { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + match self { + ForwardingListenerError::FailToRegisterNotificationRequestResponseListener => { + write!(f, "FailToRegisterNotificationRequestResponseListener") + } + ForwardingListenerError::FailToRegisterPublishListener(uri) => { + write!(f, "FailToRegisterPublishListener({:?})", uri) + } + } + } +} + +impl Display for ForwardingListenerError { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + match self { + ForwardingListenerError::FailToRegisterNotificationRequestResponseListener => { + write!(f, "Failed to register notification request/response listener") + } + ForwardingListenerError::FailToRegisterPublishListener(uri) => { + write!(f, "Failed to register publish listener for URI: {}", uri) + } + } + } +} + +impl Error for ForwardingListenerError {} + // the 'gatekeeper' which will prevent us from erroneously being able to add duplicate // forwarding rules or delete those rules which don't exist type ForwardingRules = Mutex>; @@ -153,7 +188,7 @@ impl ForwardingListeners { forwarding_id: &str, out_sender: Sender>, subscription_cache: Arc>, - ) -> Option> { + ) -> Result>, ForwardingListenerError> { let in_comparable_transport = ComparableTransport::new(in_transport.clone()); let mut forwarding_listeners = self.listeners.lock().await; @@ -162,9 +197,9 @@ impl ForwardingListeners { { *active += 1; if *active > 1 { - return None; + return Ok(None); } else { - return Some(forwarding_listener.clone()); + return Ok(Some(forwarding_listener.clone())); } } @@ -181,30 +216,50 @@ impl ForwardingListeners { .await { warn!("{FORWARDING_LISTENERS_TAG}:{FORWARDING_LISTENERS_FN_INSERT_TAG} unable to register request listener, error: {err}"); + return Err(ForwardingListenerError::FailToRegisterNotificationRequestResponseListener); } else { debug!("{FORWARDING_LISTENERS_TAG}:{FORWARDING_LISTENERS_FN_INSERT_TAG} able to register request listener"); } - match subscription_cache + let subscribers = match subscription_cache .lock() .await .fetch_cache_entry(out_authority.into()) - .await { - Some(subscribers) => { - for subscriber in subscribers { - if let Err(err) = in_transport - .register_listener(&subscriber.topic, None, forwarding_listener.clone()) - .await - { - warn!("{FORWARDING_LISTENERS_TAG}:{FORWARDING_LISTENERS_FN_INSERT_TAG} unable to register listener, error: {err}"); - } else { - debug!("{FORWARDING_LISTENERS_TAG}:{FORWARDING_LISTENERS_FN_INSERT_TAG} able to register listener"); - } - } - } + Some(subscribers) => subscribers, None => { warn!("{FORWARDING_LISTENERS_TAG}:{FORWARDING_LISTENERS_FN_INSERT_TAG} no subscribers found for out_authority: {out_authority:?}"); + HashSet::new() + } + }; + + for subscriber in subscribers { + if let Err(err) = in_transport + .register_listener(&subscriber.topic, None, forwarding_listener.clone()) + .await + { + warn!("{FORWARDING_LISTENERS_TAG}:{FORWARDING_LISTENERS_FN_INSERT_TAG} unable to register listener, error: {err}"); + // Perform async unregister_listener + if let Err(err) = in_transport + .unregister_listener( + &any_uuri(), + Some(&uauthority_to_uuri(out_authority)), + forwarding_listener.clone(), + ) + .await + { + warn!("{FORWARDING_LISTENERS_TAG}:{FORWARDING_LISTENERS_FN_INSERT_TAG} unable to unregister request listener, error: {err}"); + return Err(ForwardingListenerError::FailToRegisterPublishListener( + uauthority_to_uuri(out_authority), + )); + } else { + debug!("{FORWARDING_LISTENERS_TAG}:{FORWARDING_LISTENERS_FN_INSERT_TAG} able to unregister request listener"); + return Err(ForwardingListenerError::FailToRegisterPublishListener( + uauthority_to_uuri(out_authority), + )); + } + } else { + debug!("{FORWARDING_LISTENERS_TAG}:{FORWARDING_LISTENERS_FN_INSERT_TAG} able to register listener"); } } @@ -213,7 +268,7 @@ impl ForwardingListeners { (in_comparable_transport, out_authority.to_string()), (1, forwarding_listener.clone()), ); - Some(forwarding_listener) + Ok(Some(forwarding_listener)) } pub async fn remove(&self, in_transport: Arc, out_authority: &str) { @@ -453,9 +508,6 @@ pub struct UStreamer { transport_forwarders: TransportForwarders, forwarding_listeners: ForwardingListeners, subscription_cache: Arc>, - // TODO: Use this when USubsription is implemented - #[allow(dead_code)] - usubscription: Arc, } impl UStreamer { @@ -494,20 +546,9 @@ impl UStreamer { ..Default::default() }; - let notifications_register_request = NotificationsRequest { - topic: Some(uuri).into(), - ..Default::default() - }; - let _ = task::block_on( - usubscription.register_for_notifications(notifications_register_request), - ) - .map_err(|e| { - UStatus::fail_with_code( - UCode::INVALID_ARGUMENT, - format!("Failed to register for notifications: {e:?}"), - ) - }); + // TODO: Create a NotificationsRequest and send over host transport + // TODO: We need to form a FetchSubscriptionsRequest and send over host transport let mut fetch_request = FetchSubscriptionsRequest { request: None, offset: None, @@ -517,14 +558,31 @@ impl UStreamer { let subscriptions = task::block_on(usubscription.fetch_subscriptions(fetch_request)) .expect("Failed to fetch subscriptions"); - let subscription_cache_foo = Arc::new(Mutex::new(SubscriptionCache::new(subscriptions))); + let subscription_cache_result = SubscriptionCache::new(subscriptions); + + let subscription_cache_foo = match subscription_cache_result { + Ok(cache) => { + debug!( + "{}:{}:{} SubscriptionCache created", + name, USTREAMER_TAG, USTREAMER_FN_NEW_TAG + ); + Arc::new(Mutex::new(cache)) + } + Err(e) => { + error!( + "{}:{}:{} Unable to create SubscriptionCache: {:?}", + name, USTREAMER_TAG, USTREAMER_FN_NEW_TAG, e + ); + Arc::new(Mutex::new(SubscriptionCache::default())) + } + }; + Self { name: name.to_string(), registered_forwarding_rules: Mutex::new(HashSet::new()), transport_forwarders: TransportForwarders::new(message_queue_size as usize), forwarding_listeners: ForwardingListeners::new(), subscription_cache: subscription_cache_foo.clone(), - usubscription, } }