diff --git a/.gitignore b/.gitignore index fa72dea2..ef1e7bd7 100644 --- a/.gitignore +++ b/.gitignore @@ -16,3 +16,4 @@ tarpaulin-report.html .idea/ .vscode/launch.json +.vscode/settings.json diff --git a/example-utils/usubscription-static-file/src/lib.rs b/example-utils/usubscription-static-file/src/lib.rs index b2a1e4da..0ea68db7 100644 --- a/example-utils/usubscription-static-file/src/lib.rs +++ b/example-utils/usubscription-static-file/src/lib.rs @@ -24,17 +24,14 @@ use up_rust::core::usubscription::{ FetchSubscriptionsResponse, NotificationsRequest, SubscriberInfo, Subscription, SubscriptionRequest, SubscriptionResponse, USubscription, UnsubscribeRequest, }; -use up_rust::{UStatus, UUri}; +use up_rust::{UCode, UStatus, UUri}; pub struct USubscriptionStaticFile { static_file: PathBuf, } impl USubscriptionStaticFile { - pub fn new(static_file: Option) -> Self { - // Set static_file to static_file if it has value, else set to "static-configs/testdata.json" - let static_file = - static_file.unwrap_or_else(|| PathBuf::from("static-configs/testdata.json")); + pub fn new(static_file: PathBuf) -> Self { USubscriptionStaticFile { static_file } } } @@ -43,7 +40,7 @@ impl USubscriptionStaticFile { impl USubscription for USubscriptionStaticFile { async fn subscribe( &self, - subscription_request: SubscriptionRequest, + _subscription_request: SubscriptionRequest, ) -> Result { todo!() } @@ -64,120 +61,118 @@ impl USubscription for USubscriptionStaticFile { let mut subscriptions_vec = Vec::new(); - let empty_fetch_response = FetchSubscriptionsResponse { - ..Default::default() - }; - println!("subscription_json_file: {:?}", subscription_json_file); - match canonicalize(subscription_json_file) { - Ok(subscription_json_file) => { - - match fs::read_to_string(&subscription_json_file) { - Ok(data) => match serde_json::from_str::(&data) { - Ok(res) => { - if let Some(obj) = res.as_object() { - for (key, value) in obj { - println!("key: {}, value: {}", key, value); - let mut subscriber_set: HashSet = HashSet::new(); - - if let Some(array) = value.as_array() { - for subscriber in array { - println!("subscriber: {}", subscriber); - - if let Some(subscriber_str) = subscriber.as_str() { - match UUri::from_str(subscriber_str) { - Ok(uri) => { - println!("All good for subscriber"); - subscriber_set.insert(uri); - } - Err(error) => { - println!("Error with Deserializing Subscriber: {}", error); - } - } - } else { - println!("Unable to parse subscriber"); - } - } - } - - println!("key: {}", key); - let topic = match UUri::from_str(&key.to_string()) { - Ok(mut uri) => { - println!("All good for key"); - uri.resource_id = 0x8001; - uri - } - Err(error) => { - println!("Error with Deserializing Key: {}", error); - continue; - } - }; - - let details_vec = Vec::new(); - for subscriber in subscriber_set { - let subscriber_info_tmp = SubscriberInfo { - uri: Some(subscriber).into(), - details: details_vec.clone(), - ..Default::default() - }; - let subscription_tmp = Subscription { - topic: Some(topic.clone()).into(), - subscriber: Some(subscriber_info_tmp).into(), - ..Default::default() - }; - subscriptions_vec.push(subscription_tmp); - } + let subscription_json_file = match canonicalize(subscription_json_file) { + Ok(path) => path, + Err(e) => { + return Err(UStatus::fail_with_code( + UCode::INVALID_ARGUMENT, + format!("Static subscription file not found: {e:?}"), + )) + } + }; + + let data = fs::read_to_string(subscription_json_file).map_err(|e| { + UStatus::fail_with_code( + UCode::INVALID_ARGUMENT, + format!("Unable to read file: {e:?}"), + ) + })?; + + let res: Value = serde_json::from_str(&data).map_err(|e| { + UStatus::fail_with_code( + UCode::INVALID_ARGUMENT, + format!("Unable to parse JSON: {e:?}"), + ) + })?; + + if let Some(obj) = res.as_object() { + for (key, value) in obj { + println!("key: {}, value: {}", key, value); + let mut subscriber_set: HashSet = HashSet::new(); + + if let Some(array) = value.as_array() { + for subscriber in array { + println!("subscriber: {}", subscriber); + + if let Some(subscriber_str) = subscriber.as_str() { + match UUri::from_str(subscriber_str) { + Ok(uri) => { + println!("All good for subscriber"); + subscriber_set.insert(uri); + } + Err(error) => { + println!("Error with Deserializing Subscriber: {}", error); } } - - println!("{}", res); - dbg!(&subscriptions_vec); - let fetch_response = FetchSubscriptionsResponse { - subscriptions: subscriptions_vec, - ..Default::default() - }; - Ok(fetch_response) + } else { + println!("Unable to parse subscriber"); } - Err(e) => { - eprintln!("Unable to parse JSON: {}", e); - Ok(empty_fetch_response) - } - }, - Err(e) => { - eprintln!("Unable to read file: {}", e); - Ok(empty_fetch_response) } } - } - Err(e) => { - eprintln!("Unable to canonicalize path: {}", e); - Ok(empty_fetch_response) + + println!("key: {}", key); + let topic = match UUri::from_str(&key.to_string()) { + Ok(mut uri) => { + println!("All good for key"); + uri.resource_id = 0x8001; + uri + } + Err(error) => { + println!("Error with Deserializing Key: {}", error); + continue; + } + }; + + let details_vec = Vec::new(); + for subscriber in subscriber_set { + let subscriber_info_tmp = SubscriberInfo { + uri: Some(subscriber).into(), + details: details_vec.clone(), + ..Default::default() + }; + let subscription_tmp = Subscription { + topic: Some(topic.clone()).into(), + subscriber: Some(subscriber_info_tmp).into(), + ..Default::default() + }; + subscriptions_vec.push(subscription_tmp); + } } } + + println!("{}", res); + dbg!(&subscriptions_vec); + let fetch_response = FetchSubscriptionsResponse { + subscriptions: subscriptions_vec, + ..Default::default() + }; + + Ok(fetch_response) } - async fn unsubscribe(&self, unsubscribe_request: UnsubscribeRequest) -> Result<(), UStatus> { + async fn unsubscribe(&self, _unsubscribe_request: UnsubscribeRequest) -> Result<(), UStatus> { todo!() } async fn register_for_notifications( &self, - notifications_register_request: NotificationsRequest, + _notifications_register_request: NotificationsRequest, ) -> Result<(), UStatus> { - todo!() + Ok(()) } async fn unregister_for_notifications( &self, - notifications_unregister_request: NotificationsRequest, + _notifications_unregister_request: NotificationsRequest, ) -> Result<(), UStatus> { todo!() } async fn fetch_subscribers( &self, - fetch_subscribers_request: FetchSubscribersRequest, + _fetch_subscribers_request: FetchSubscribersRequest, ) -> Result { todo!(); } diff --git a/subscription-cache/src/lib.rs b/subscription-cache/src/lib.rs index 124b01f0..c83d1e45 100644 --- a/subscription-cache/src/lib.rs +++ b/subscription-cache/src/lib.rs @@ -12,7 +12,6 @@ ********************************************************************************/ use async_std::sync::Mutex; -use protobuf::MessageField; use std::collections::{HashMap, HashSet}; use std::hash::{Hash, Hasher}; use up_rust::core::usubscription::{ @@ -21,9 +20,11 @@ use up_rust::core::usubscription::{ }; use up_rust::UUri; -pub type SubscribersMap = Mutex>>; +pub type SubscribersMap = Mutex>>; +// Tracks subscription information inside the SubscriptionCache pub struct SubscriptionInformation { + pub topic: UUri, pub subscriber: SubscriberInfo, pub status: SubscriptionStatus, pub attributes: SubscribeAttributes, @@ -47,6 +48,7 @@ impl Hash for SubscriptionInformation { impl Clone for SubscriptionInformation { fn clone(&self) -> Self { Self { + topic: self.topic.clone(), subscriber: self.subscriber.clone(), status: self.status.clone(), attributes: self.attributes.clone(), @@ -66,31 +68,31 @@ impl SubscriptionCache { pub fn new(subscription_cache_map: FetchSubscriptionsResponse) -> Self { let mut subscription_cache_hash_map = HashMap::new(); for subscription in subscription_cache_map.subscriptions { - let uri = if let Some(uri) = subscription.topic.into_option(){ - uri + let topic = if let Some(topic) = subscription.topic.into_option() { + topic } else { println!("Unable to parse URI from subscription, skipping..."); continue; }; - let subscriber = if let Some(subscriber) = subscription.subscriber.into_option(){ + let subscriber = if let Some(subscriber) = subscription.subscriber.into_option() { subscriber } else { println!("Unable to parse subscriber from subscription, skipping..."); continue; }; - let status = if let Some(status) = subscription.status.into_option(){ + let status = if let Some(status) = subscription.status.into_option() { status } else { println!("Unable to parse status from subscription, setting as default"); SubscriptionStatus::default() }; - let attributes = if let Some(attributes) = subscription.attributes.into_option(){ + let attributes = if let Some(attributes) = subscription.attributes.into_option() { attributes } else { println!("Unable to parse attributes from subscription, setting as default"); SubscribeAttributes::default() }; - let config = if let Some(config) = subscription.config.into_option(){ + let config = if let Some(config) = subscription.config.into_option() { config } else { println!("Unable to parse config from subscription, setting as default"); @@ -98,13 +100,21 @@ impl SubscriptionCache { }; // Create new hashset if the key does not exist and insert the subscription let subscription_information = SubscriptionInformation { + topic: topic.clone(), subscriber, status, attributes, - config + config, }; + let subscriber_authority_name = subscription_information + .subscriber + .uri + .as_ref() + .unwrap() + .authority_name + .clone(); subscription_cache_hash_map - .entry(uri) + .entry(subscriber_authority_name) .or_insert_with(HashSet::new) .insert(subscription_information); } @@ -113,18 +123,15 @@ impl SubscriptionCache { } } - pub async fn fetch_cache( + pub async fn fetch_cache_entry( &self, - ) -> HashMap> { - let cache_map = self.subscription_cache_map.lock().await; - let mut cloned_map = HashMap::new(); - - for (key, value) in cache_map.iter() { - let cloned_key = key.clone(); - let cloned_value = value.iter().cloned().collect::>(); - cloned_map.insert(cloned_key, cloned_value); - } + entry: String, + ) -> Option> { + let map = self.subscription_cache_map.lock().await; + map.get(&entry).cloned() + } - cloned_map + pub async fn fetch_cache(&self) -> HashMap> { + self.subscription_cache_map.lock().await.clone() } } diff --git a/up-linux-streamer/examples/mE_publisher.rs b/up-linux-streamer/examples/mE_publisher.rs index 5b29bd0d..b4f3d6b1 100644 --- a/up-linux-streamer/examples/mE_publisher.rs +++ b/up-linux-streamer/examples/mE_publisher.rs @@ -50,6 +50,7 @@ async fn main() -> Result<(), UStatus> { &REMOTE_AUTHORITY.to_string(), PUB_TOPIC_UE_ID, &vsomeip_config.unwrap(), + None, ) .unwrap(), ); diff --git a/up-linux-streamer/examples/mE_subscriber.rs b/up-linux-streamer/examples/mE_subscriber.rs index e8cbe2e8..80a36d9c 100644 --- a/up-linux-streamer/examples/mE_subscriber.rs +++ b/up-linux-streamer/examples/mE_subscriber.rs @@ -86,6 +86,7 @@ async fn main() -> Result<(), UStatus> { &REMOTE_AUTHORITY.to_string(), SERVICE_UE_ID, &vsomeip_config.unwrap(), + None, ) .unwrap(), ); diff --git a/up-linux-streamer/src/main.rs b/up-linux-streamer/src/main.rs index fe614d13..110790f2 100644 --- a/up-linux-streamer/src/main.rs +++ b/up-linux-streamer/src/main.rs @@ -3,22 +3,19 @@ mod config; use crate::config::{Config, HostTransport}; use clap::Parser; use log::trace; -use std::path::PathBuf; use std::fs::File; use std::io::Read; -use std::sync::Arc; +use std::path::PathBuf; use std::str::FromStr; +use std::sync::Arc; use std::{env, thread}; use up_rust::{UCode, UStatus, UTransport}; - -use std::str::FromStr; use up_streamer::{Endpoint, UStreamer}; use up_transport_vsomeip::UPTransportVsomeip; use up_transport_zenoh::UPClientZenoh; use usubscription_static_file::USubscriptionStaticFile; -use usubscription_static_file::USubscriptionStaticFile; use zenoh::config::Config as ZenohConfig; -use zenoh::config::Endpoint as ZenohEndpoint; +use zenoh::config::EndPoint as ZenohEndpoint; #[derive(Parser)] #[command()] @@ -31,28 +28,9 @@ struct StreamerArgs { async fn main() -> Result<(), UStatus> { env_logger::init(); - let args = StreamerArgs::parse(); - - let mut file = File::open(args.config) - .map_err(|e| UStatus::fail_with_code(UCode::NOT_FOUND, format!("File not found: {e:?}")))?; - let mut contents = String::new(); - file.read_to_string(&mut contents).map_err(|e| { - UStatus::fail_with_code( - UCode::INTERNAL, - format!("Unable to read config file: {e:?}"), - ) - })?; - - let config: Config = json5::from_str(&contents).map_err(|e| { - UStatus::fail_with_code( - UCode::INTERNAL, - format!("Unable to parse config file: {e:?}"), - ) - })?; - - let usubscription = Arc::new(USubscriptionStaticFile::new(Some(PathBuf::from( + let usubscription = Arc::new(USubscriptionStaticFile::new(PathBuf::from( "example-utils/usubscription-static-file/static-configs/testdata.json", - )))); + ))); let args = StreamerArgs::parse(); @@ -82,7 +60,7 @@ async fn main() -> Result<(), UStatus> { let mut zenoh_config = ZenohConfig::default(); // Specify the address to listen on using IPv4 - let ipv4_endpoint = EndPoint::from_str("tcp/0.0.0.0:7447"); + let ipv4_endpoint = ZenohEndpoint::from_str("tcp/0.0.0.0:7447"); // Add the IPv4 endpoint to the Zenoh configuration zenoh_config diff --git a/up-streamer/src/ustreamer.rs b/up-streamer/src/ustreamer.rs index 70094dec..559f7406 100644 --- a/up-streamer/src/ustreamer.rs +++ b/up-streamer/src/ustreamer.rs @@ -22,12 +22,11 @@ use log::*; use std::collections::{HashMap, HashSet}; use std::hash::{Hash, Hasher}; use std::ops::Deref; -use std::path::PathBuf; use std::str; use std::thread; use subscription_cache::SubscriptionCache; use up_rust::core::usubscription::{ - FetchSubscriptionsRequest, FetchSubscriptionsResponse, SubscriberInfo, USubscription, + FetchSubscriptionsRequest, NotificationsRequest, SubscriberInfo, USubscription, }; use up_rust::{UCode, UListener, UMessage, UStatus, UTransport, UUIDBuilder, UUri}; @@ -157,53 +156,64 @@ impl ForwardingListeners { ) -> Option> { let in_comparable_transport = ComparableTransport::new(in_transport.clone()); let mut forwarding_listeners = self.listeners.lock().await; - let initial_subscriber_cache = - task::block_on(subscription_cache.lock().await.fetch_cache()); - let (active, forwarding_listener) = forwarding_listeners - .entry((in_comparable_transport.clone(), out_authority.to_string())) - .or_insert_with(|| { - let forwarding_listener = Arc::new(ForwardingListener::new(forwarding_id, out_sender)); + if let Some((active, forwarding_listener)) = forwarding_listeners + .get_mut(&(in_comparable_transport.clone(), out_authority.to_string())) + { + *active += 1; + if *active > 1 { + return None; + } else { + return Some(forwarding_listener.clone()); + } + } - let reg_res = task::block_on(in_transport - .register_listener(&any_uuri(), Some(&uauthority_to_uuri(out_authority)), forwarding_listener.clone())); + let forwarding_listener = + Arc::new(ForwardingListener::new(forwarding_id, out_sender.clone())); - if let Err(err) = reg_res { - warn!("{FORWARDING_LISTENERS_TAG}:{FORWARDING_LISTENERS_FN_INSERT_TAG} unable to register request listener, error: {err}"); - } else { - debug!("{FORWARDING_LISTENERS_TAG}:{FORWARDING_LISTENERS_FN_INSERT_TAG} able to register request listener"); - } + // Perform async registration and fetching + if let Err(err) = in_transport + .register_listener( + &any_uuri(), + Some(&uauthority_to_uuri(out_authority)), + forwarding_listener.clone(), + ) + .await + { + warn!("{FORWARDING_LISTENERS_TAG}:{FORWARDING_LISTENERS_FN_INSERT_TAG} unable to register request listener, error: {err}"); + } else { + debug!("{FORWARDING_LISTENERS_TAG}:{FORWARDING_LISTENERS_FN_INSERT_TAG} able to register request listener"); + } - for (topic, subscribers) in initial_subscriber_cache { - let mut authority_name_hash_set: HashSet = HashSet::new(); - for subscriber in subscribers { - authority_name_hash_set.insert(subscriber.subscriber.uri.get_or_default().authority_name.clone()); - } - // TODO: Should we also check if topic's authority matches in_transport's authority, i.e., topic belongs to in_transport - // topic.authority_name == in_authority - if authority_name_hash_set.contains(out_authority) { - let pub_reg_res = task::block_on(in_transport - .register_listener(&topic, None, forwarding_listener.clone())); - if let Err(err) = pub_reg_res { - warn!("{FORWARDING_LISTENERS_TAG}:{FORWARDING_LISTENERS_FN_INSERT_TAG} unable to register listener, error: {err}"); - } else { - debug!("{FORWARDING_LISTENERS_TAG}:{FORWARDING_LISTENERS_FN_INSERT_TAG} able to register listener"); - } + match subscription_cache + .lock() + .await + .fetch_cache_entry(out_authority.into()) + .await + { + Some(subscribers) => { + for subscriber in subscribers { + if let Err(err) = in_transport + .register_listener(&subscriber.topic, None, forwarding_listener.clone()) + .await + { + warn!("{FORWARDING_LISTENERS_TAG}:{FORWARDING_LISTENERS_FN_INSERT_TAG} unable to register listener, error: {err}"); + } else { + debug!("{FORWARDING_LISTENERS_TAG}:{FORWARDING_LISTENERS_FN_INSERT_TAG} able to register listener"); } } - - ( - 0, - forwarding_listener, - ) - }); - *active += 1; - - if *active > 1 { - None - } else { - Some(forwarding_listener.clone()) + } + None => { + warn!("{FORWARDING_LISTENERS_TAG}:{FORWARDING_LISTENERS_FN_INSERT_TAG} no subscribers found for out_authority: {out_authority:?}"); + } } + + // Insert the new listener and update the active count + forwarding_listeners.insert( + (in_comparable_transport, out_authority.to_string()), + (1, forwarding_listener.clone()), + ); + Some(forwarding_listener) } pub async fn remove(&self, in_transport: Arc, out_authority: &str) { @@ -477,31 +487,35 @@ impl UStreamer { let details_vector = Vec::new(); let subscriber_info = SubscriberInfo { - uri: Some(uuri).into(), + uri: Some(uuri.clone()).into(), details: details_vector, ..Default::default() }; + let notifications_register_request = NotificationsRequest { + topic: Some(uuri).into(), + ..Default::default() + }; + let _ = task::block_on( + usubscription.register_for_notifications(notifications_register_request), + ) + .map_err(|e| { + UStatus::fail_with_code( + UCode::INVALID_ARGUMENT, + format!("Failed to register for notifications: {e:?}"), + ) + }); + let mut fetch_request = FetchSubscriptionsRequest { request: None, offset: None, ..Default::default() }; fetch_request.set_subscriber(subscriber_info); - let subscriptions = task::block_on(usubscription.fetch_subscriptions(fetch_request)); - //let subscription_cache_foo = Arc::new(Mutex::new(SubscriptionCache::new(subscriptions.expect("Subscriptions is an Error")))); - - let subscription_cache_foo = match subscriptions { - Ok(subs) => Arc::new(Mutex::new(SubscriptionCache::new(subs))), - Err(_subs) => { - // Handle the error case here, such as returning a default value or creating a default SubscriptionCache - let empty_fetch_response = FetchSubscriptionsResponse { - ..Default::default() - }; - Arc::new(Mutex::new(SubscriptionCache::new(empty_fetch_response))) - } - }; + let subscriptions = task::block_on(usubscription.fetch_subscriptions(fetch_request)) + .expect("Failed to fetch subscriptions"); + let subscription_cache_foo = Arc::new(Mutex::new(SubscriptionCache::new(subscriptions))); Self { name: name.to_string(), registered_forwarding_rules: Mutex::new(HashSet::new()), @@ -544,6 +558,7 @@ impl UStreamer { /// * [`UMessageType::UMESSAGE_TYPE_NOTIFICATION`][up_rust::UMessageType::UMESSAGE_TYPE_NOTIFICATION] /// * [`UMessageType::UMESSAGE_TYPE_REQUEST`][up_rust::UMessageType::UMESSAGE_TYPE_REQUEST] /// * [`UMessageType::UMESSAGE_TYPE_RESPONSE`][up_rust::UMessageType::UMESSAGE_TYPE_RESPONSE] + /// /// As well as a [`UMessageType::UMESSAGE_TYPE_PUBLISH`][up_rust::UMessageType::UMESSAGE_TYPE_PUBLISH] /// type message which only has a source / topic contained in its attributes. /// @@ -810,7 +825,7 @@ impl UListener for ForwardingListener { mod tests { use crate::{Endpoint, UStreamer}; use async_trait::async_trait; - use std::sync::Arc; + use std::{path::PathBuf, sync::Arc}; use up_rust::{UListener, UMessage, UStatus, UTransport, UUri}; use usubscription_static_file::USubscriptionStaticFile; @@ -917,7 +932,8 @@ mod tests { remote_transport.clone(), ); - let usubscription = Arc::new(USubscriptionStaticFile::new(None)); + let subscription_path = PathBuf::from("static-configs/testdata.json"); + let usubscription = Arc::new(USubscriptionStaticFile::new(subscription_path)); let mut ustreamer = UStreamer::new("foo_bar_streamer", 100, usubscription); // Add forwarding rules to endpoint local<->remote assert!(ustreamer @@ -990,7 +1006,8 @@ mod tests { remote_transport_b.clone(), ); - let usubscription = Arc::new(USubscriptionStaticFile::new(None)); + let subscription_path = PathBuf::from("static-configs/testdata.json"); + let usubscription = Arc::new(USubscriptionStaticFile::new(subscription_path)); let mut ustreamer = UStreamer::new("foo_bar_streamer", 100, usubscription); // Add forwarding rules to endpoint local<->remote_a @@ -1051,7 +1068,8 @@ mod tests { remote_transport.clone(), ); - let usubscription = Arc::new(USubscriptionStaticFile::new(None)); + let subscription_path = PathBuf::from("static-configs/testdata.json"); + let usubscription = Arc::new(USubscriptionStaticFile::new(subscription_path)); let mut ustreamer = UStreamer::new("foo_bar_streamer", 100, usubscription); // Add forwarding rules to endpoint local<->remote_a diff --git a/up-streamer/tests/single_local_single_remote.rs b/up-streamer/tests/single_local_single_remote.rs index b25c275a..4b73f26d 100644 --- a/up-streamer/tests/single_local_single_remote.rs +++ b/up-streamer/tests/single_local_single_remote.rs @@ -26,6 +26,7 @@ use integration_test_utils::{ ClientMessages, LocalClientListener, RemoteClientListener, UPClientFoo, }; use log::debug; +use std::path::PathBuf; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use std::time::Duration; @@ -48,7 +49,8 @@ async fn single_local_single_remote() { Arc::new(UPClientFoo::new("upclient_bar", rx_2.clone(), tx_2.clone()).await); // setting up streamer to bridge between "foo" and "bar" - let usubscription = Arc::new(USubscriptionStaticFile::new(None)); + let subscription_path = PathBuf::from("static-configs/testdata.json"); + let usubscription = Arc::new(USubscriptionStaticFile::new(subscription_path)); let mut ustreamer = UStreamer::new("foo_bar_streamer", 3000, usubscription); // setting up endpoints between authorities and protocols diff --git a/up-streamer/tests/single_local_two_remote_add_remove_rules.rs b/up-streamer/tests/single_local_two_remote_add_remove_rules.rs index 612f1972..a3d2a9cd 100644 --- a/up-streamer/tests/single_local_two_remote_add_remove_rules.rs +++ b/up-streamer/tests/single_local_two_remote_add_remove_rules.rs @@ -27,6 +27,7 @@ use integration_test_utils::{ RemoteClientListener, UPClientFoo, }; use log::debug; +use std::path::PathBuf; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use std::time::Duration; @@ -52,7 +53,8 @@ async fn single_local_two_remote_add_remove_rules() { Arc::new(UPClientFoo::new("upclient_bar_2", rx_3.clone(), tx_3.clone()).await); // setting up streamer to bridge between "foo" and "bar" - let usubscription = Arc::new(USubscriptionStaticFile::new(None)); + let subscription_path = PathBuf::from("static-configs/testdata.json"); + let usubscription = Arc::new(USubscriptionStaticFile::new(subscription_path)); let mut ustreamer = UStreamer::new("foo_bar_streamer", 3000, usubscription); // setting up endpoints between authorities and protocols diff --git a/up-streamer/tests/single_local_two_remote_authorities_different_remote_transport.rs b/up-streamer/tests/single_local_two_remote_authorities_different_remote_transport.rs index e09c74c0..38f01adf 100644 --- a/up-streamer/tests/single_local_two_remote_authorities_different_remote_transport.rs +++ b/up-streamer/tests/single_local_two_remote_authorities_different_remote_transport.rs @@ -27,6 +27,7 @@ use integration_test_utils::{ RemoteClientListener, UPClientFoo, }; use log::debug; +use std::path::PathBuf; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use std::time::Duration; @@ -52,7 +53,8 @@ async fn single_local_two_remote_authorities_different_remote_transport() { Arc::new(UPClientFoo::new("upclient_bar_2", rx_3.clone(), tx_3.clone()).await); // setting up streamer to bridge between "foo" and "bar" - let usubscription = Arc::new(USubscriptionStaticFile::new(None)); + let subscription_path = PathBuf::from("static-configs/testdata.json"); + let usubscription = Arc::new(USubscriptionStaticFile::new(subscription_path)); let mut ustreamer = UStreamer::new("foo_bar_streamer", 3000, usubscription); // setting up endpoints between authorities and protocols diff --git a/up-streamer/tests/single_local_two_remote_authorities_same_remote_transport.rs b/up-streamer/tests/single_local_two_remote_authorities_same_remote_transport.rs index 455a69d3..3020f349 100644 --- a/up-streamer/tests/single_local_two_remote_authorities_same_remote_transport.rs +++ b/up-streamer/tests/single_local_two_remote_authorities_same_remote_transport.rs @@ -27,6 +27,7 @@ use integration_test_utils::{ RemoteClientListener, UPClientFoo, }; use log::debug; +use std::path::PathBuf; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use std::time::Duration; @@ -49,7 +50,8 @@ async fn single_local_two_remote_authorities_same_remote_transport() { Arc::new(UPClientFoo::new("upclient_bar", rx_2.clone(), tx_2.clone()).await); // setting up streamer to bridge between "foo" and "bar" - let usubscription = Arc::new(USubscriptionStaticFile::new(None)); + let subscription_path = PathBuf::from("static-configs/testdata.json"); + let usubscription = Arc::new(USubscriptionStaticFile::new(subscription_path)); let mut ustreamer = UStreamer::new("foo_bar_streamer", 3000, usubscription); // setting up endpoints between authorities and protocols