From 7e6d1a3f7246493009bfa61fd7199fe31782f5a0 Mon Sep 17 00:00:00 2001 From: Matthew D'Alonzo Date: Thu, 27 Jun 2024 13:29:45 -0400 Subject: [PATCH] Add publish to up-streamer rust * Add support for tracking subscriptions and forwarding published messages in ustreamer.rs * Add usubscription-static-file to stub out subscriptions before usubscription is written * Add tests for mE (SOME/IP) subscribe/publish and uE (zenoh) subscribe/publish NOTE: This PR also contains changes that @PLeVasseur has done for up-streamer-rust. His will be merged first and then I will rebase based on that merge. --- .gitignore | 1 + Cargo.lock | 34 ++++ Cargo.toml | 6 +- .../usubscription-static-file/Cargo.toml | 19 ++ .../usubscription-static-file/src/lib.rs | 183 ++++++++++++++++++ .../static-configs/testdata.json | 6 + subscription-cache/Cargo.toml | 24 +++ subscription-cache/src/lib.rs | 101 ++++++++++ up-linux-streamer/Cargo.toml | 2 + up-linux-streamer/examples/mE_publisher.rs | 87 +++++++++ up-linux-streamer/examples/mE_subscriber.rs | 106 ++++++++++ up-linux-streamer/examples/uE_client.rs | 28 ++- up-linux-streamer/examples/uE_publisher.rs | 88 +++++++++ up-linux-streamer/examples/uE_service.rs | 13 +- up-linux-streamer/examples/uE_subscriber.rs | 106 ++++++++++ up-linux-streamer/src/main.rs | 22 ++- .../vsomeip-configs/mE_publisher.json | 9 + .../vsomeip-configs/mE_subscriber.json | 9 + up-streamer/Cargo.toml | 5 +- up-streamer/src/ustreamer.rs | 100 +++++++++- .../tests/single_local_single_remote.rs | 4 +- ...ingle_local_two_remote_add_remove_rules.rs | 4 +- ..._authorities_different_remote_transport.rs | 4 +- ...emote_authorities_same_remote_transport.rs | 4 +- 24 files changed, 941 insertions(+), 24 deletions(-) create mode 100644 example-utils/usubscription-static-file/Cargo.toml create mode 100644 example-utils/usubscription-static-file/src/lib.rs create mode 100644 example-utils/usubscription-static-file/static-configs/testdata.json create mode 100644 subscription-cache/Cargo.toml create mode 100644 subscription-cache/src/lib.rs create mode 100644 up-linux-streamer/examples/mE_publisher.rs create mode 100644 up-linux-streamer/examples/mE_subscriber.rs create mode 100644 up-linux-streamer/examples/uE_publisher.rs create mode 100644 up-linux-streamer/examples/uE_subscriber.rs create mode 100644 up-linux-streamer/vsomeip-configs/mE_publisher.json create mode 100644 up-linux-streamer/vsomeip-configs/mE_subscriber.json diff --git a/.gitignore b/.gitignore index f0a059a6..fa72dea2 100644 --- a/.gitignore +++ b/.gitignore @@ -15,3 +15,4 @@ lcov.info tarpaulin-report.html .idea/ +.vscode/launch.json diff --git a/Cargo.lock b/Cargo.lock index c9828053..4dc36576 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3817,6 +3817,23 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "subscription-cache" +version = "0.1.5-dev" +dependencies = [ + "async-std", + "async-trait", + "env_logger 0.10.2", + "futures", + "log", + "prost", + "protobuf", + "serde", + "serde_json", + "up-rust", + "uuid", +] + [[package]] name = "subtle" version = "2.5.0" @@ -4390,6 +4407,7 @@ name = "up-linux-streamer" version = "0.1.5-dev" dependencies = [ "async-trait", + "chrono", "env_logger 0.10.2", "hello-world-protos", "log", @@ -4399,6 +4417,7 @@ dependencies = [ "up-streamer", "up-transport-vsomeip", "up-transport-zenoh", + "usubscription-static-file", "zenoh", ] @@ -4435,8 +4454,11 @@ dependencies = [ "integration-test-utils", "log", "prost", + "protobuf", "serde_json", + "subscription-cache", "up-rust", + "usubscription-static-file", "uuid", ] @@ -4508,6 +4530,18 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "usubscription-static-file" +version = "0.1.5-dev" +dependencies = [ + "async-std", + "async-trait", + "serde_json", + "subscription-cache", + "up-rust", + "uriparse", +] + [[package]] name = "utf-8" version = "0.7.6" diff --git a/Cargo.toml b/Cargo.toml index 21d5deb5..3fdf74b5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,11 +14,10 @@ resolver = "2" members = [ "example-utils/hello-world-protos", "up-linux-streamer", - "up-streamer", -] + "up-streamer", "subscription-cache", "example-utils/usubscription-static-file"] [workspace.package] -rust-version = "1.76.0" +rust-version = "1.72.1" version = "0.1.5-dev" # uProtocol version repository = "https://github.com/eclipse-uprotocol/up-streamer-rust" homepage = "https://github.com/eclipse-uprotocol" @@ -34,6 +33,7 @@ futures = { version = "0.3.30" } log = { version = "0.4.20" } prost = { version = "0.12" } prost-types = { version = "0.12" } +serde = { version = "1.0.111" } serde_json = { version = "1.0.111" } uuid = { version = "1.7.0" } up-rust = { default-features = false, git = "https://github.com/eclipse-uprotocol/up-rust", rev = "3a50104421a801d52e1d9c68979db54c013ce43d" } diff --git a/example-utils/usubscription-static-file/Cargo.toml b/example-utils/usubscription-static-file/Cargo.toml new file mode 100644 index 00000000..ab7bc2ab --- /dev/null +++ b/example-utils/usubscription-static-file/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "usubscription-static-file" +rust-version.workspace = true +version.workspace = true +repository.workspace = true +homepage.workspace = true +edition.workspace = true +keywords.workspace = true +license.workspace = true + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +async-std = { workspace = true, features = ["unstable"] } +async-trait = "0.1.80" +serde_json = { version = "1.0.111" } +subscription-cache = {path="../../subscription-cache"} +up-rust = { workspace = true, features = ["usubscription"] } +uriparse = { version = "0.6" } \ No newline at end of file diff --git a/example-utils/usubscription-static-file/src/lib.rs b/example-utils/usubscription-static-file/src/lib.rs new file mode 100644 index 00000000..f8752bb7 --- /dev/null +++ b/example-utils/usubscription-static-file/src/lib.rs @@ -0,0 +1,183 @@ +/******************************************************************************** + * Copyright (c) 2024 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + ********************************************************************************/ + +#![allow(clippy::mutable_key_type)] + +use async_trait::async_trait; +use serde_json::Value; +use std::collections::HashSet; +use std::fs::{self, canonicalize}; +use std::path::PathBuf; +use std::str::FromStr; +use up_rust::core::usubscription::{ + FetchSubscribersRequest, FetchSubscribersResponse, FetchSubscriptionsRequest, + FetchSubscriptionsResponse, NotificationsRequest, SubscriberInfo, Subscription, + SubscriptionRequest, SubscriptionResponse, USubscription, UnsubscribeRequest, +}; +use up_rust::{UStatus, UUri}; + +pub struct USubscriptionStaticFile { + static_file: PathBuf, +} + +impl USubscriptionStaticFile { + pub fn new(static_file: Option) -> Self { + // Set static_file to static_file if it has value, else set to "static-configs/testdata.json" + let static_file = + static_file.unwrap_or_else(|| PathBuf::from("static-configs/testdata.json")); + USubscriptionStaticFile { static_file } + } +} + +#[async_trait] +impl USubscription for USubscriptionStaticFile { + async fn subscribe( + &self, + subscription_request: SubscriptionRequest, + ) -> Result { + todo!() + } + + async fn fetch_subscriptions( + &self, + fetch_subscriptions_request: FetchSubscriptionsRequest, + ) -> Result { + // Reads in a file and builds it into a subscription_cache data type + // This is a static file, so we will just return the same set of subscribers + // for all URIs + println!( + "fetch_subscriptions for topic: {}", + fetch_subscriptions_request.subscriber() + ); + + let subscription_json_file = self.static_file.clone(); + + let mut subscriptions_vec = Vec::new(); + + let empty_fetch_response = FetchSubscriptionsResponse { + ..Default::default() + }; + + match canonicalize(subscription_json_file) { + Ok(subscription_json_file) => { + println!("subscription_json_file: {:?}", subscription_json_file); + + match fs::read_to_string(&subscription_json_file) { + Ok(data) => match serde_json::from_str::(&data) { + Ok(res) => { + if let Some(obj) = res.as_object() { + for (key, value) in obj { + println!("key: {}, value: {}", key, value); + let mut subscriber_set: HashSet = HashSet::new(); + + if let Some(array) = value.as_array() { + for subscriber in array { + println!("subscriber: {}", subscriber); + + if let Some(subscriber_str) = subscriber.as_str() { + match UUri::from_str(subscriber_str) { + Ok(uri) => { + println!("All good for subscriber"); + subscriber_set.insert(uri); + } + Err(error) => { + println!("Error with Deserializing Subscriber: {}", error); + } + } + } else { + println!("Unable to parse subscriber"); + } + } + } + + println!("key: {}", key); + let topic = match UUri::from_str(&key.to_string()) { + Ok(mut uri) => { + println!("All good for key"); + uri.resource_id = 0x8001; + uri + } + Err(error) => { + println!("Error with Deserializing Key: {}", error); + continue; + } + }; + + let details_vec = Vec::new(); + for subscriber in subscriber_set { + let subscriber_info_tmp = SubscriberInfo { + uri: Some(subscriber).into(), + details: details_vec.clone(), + ..Default::default() + }; + let subscription_tmp = Subscription { + topic: Some(topic.clone()).into(), + subscriber: Some(subscriber_info_tmp).into(), + ..Default::default() + }; + subscriptions_vec.push(subscription_tmp); + } + } + } + + println!("{}", res); + dbg!(&subscriptions_vec); + let fetch_response = FetchSubscriptionsResponse { + subscriptions: subscriptions_vec, + ..Default::default() + }; + Ok(fetch_response) + } + Err(e) => { + eprintln!("Unable to parse JSON: {}", e); + Ok(empty_fetch_response) + } + }, + Err(e) => { + eprintln!("Unable to read file: {}", e); + Ok(empty_fetch_response) + } + } + } + Err(e) => { + eprintln!("Unable to canonicalize path: {}", e); + Ok(empty_fetch_response) + } + } + } + + async fn unsubscribe(&self, unsubscribe_request: UnsubscribeRequest) -> Result<(), UStatus> { + todo!() + } + + async fn register_for_notifications( + &self, + notifications_register_request: NotificationsRequest, + ) -> Result<(), UStatus> { + todo!() + } + + async fn unregister_for_notifications( + &self, + notifications_unregister_request: NotificationsRequest, + ) -> Result<(), UStatus> { + todo!() + } + + async fn fetch_subscribers( + &self, + fetch_subscribers_request: FetchSubscribersRequest, + ) -> Result { + todo!(); + } +} diff --git a/example-utils/usubscription-static-file/static-configs/testdata.json b/example-utils/usubscription-static-file/static-configs/testdata.json new file mode 100644 index 00000000..22d4eecf --- /dev/null +++ b/example-utils/usubscription-static-file/static-configs/testdata.json @@ -0,0 +1,6 @@ +{ + "//pub_topic/1236/1/327": ["//me_authority/17185/1/1057", "//VIN.phones/8000/2/160", "//VIN.apps/8000/2/170"], + "//linux/1237/1/327": ["//linux/17185/1/1057", "//VIN.phones/8000/2/160", "//VIN.apps/8000/2/170"], + "//VIN.topic/1000/1/3": ["//VIN.vehicles/8000/2/150"], + "//VIN.topic/1000/1/4": ["//VIN.phones/8000/2/160"] +} diff --git a/subscription-cache/Cargo.toml b/subscription-cache/Cargo.toml new file mode 100644 index 00000000..2edc28f8 --- /dev/null +++ b/subscription-cache/Cargo.toml @@ -0,0 +1,24 @@ +[package] +name = "subscription-cache" +rust-version.workspace = true +version.workspace = true +repository.workspace = true +homepage.workspace = true +edition.workspace = true +keywords.workspace = true +license.workspace = true + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +async-std = { workspace = true, features = ["unstable"] } +async-trait = { workspace = true } +env_logger = { workspace = true } +futures = { workspace = true } +log = { workspace = true } +prost = { workspace = true } +uuid = { workspace = true } +serde_json = { workspace = true } +serde = { workspace = true } +up-rust = { workspace = true, features = ["usubscription"] } +protobuf = { version = "3.3", features = ["with-bytes"] } \ No newline at end of file diff --git a/subscription-cache/src/lib.rs b/subscription-cache/src/lib.rs new file mode 100644 index 00000000..ce789f97 --- /dev/null +++ b/subscription-cache/src/lib.rs @@ -0,0 +1,101 @@ +/******************************************************************************** + * Copyright (c) 2024 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + ********************************************************************************/ + +use async_std::sync::Mutex; +use protobuf::MessageField; +use std::collections::{HashMap, HashSet}; +use std::hash::{Hash, Hasher}; +use up_rust::core::usubscription::{ + EventDeliveryConfig, FetchSubscriptionsResponse, SubscribeAttributes, SubscriberInfo, + SubscriptionStatus, +}; +use up_rust::UUri; + +pub type SubscribersMap = Mutex, HashSet>>; + +pub struct SubscriptionInformation { + pub subscriber: MessageField, + pub status: MessageField, + pub attributes: MessageField, + pub config: MessageField, +} + +impl Eq for SubscriptionInformation {} + +impl PartialEq for SubscriptionInformation { + fn eq(&self, other: &Self) -> bool { + self.subscriber == other.subscriber + } +} + +impl Hash for SubscriptionInformation { + fn hash(&self, state: &mut H) { + self.subscriber.hash(state); + } +} + +impl Clone for SubscriptionInformation { + fn clone(&self) -> Self { + Self { + subscriber: self.subscriber.clone(), + status: self.status.clone(), + attributes: self.attributes.clone(), + config: self.config.clone(), + } + } +} + +pub struct SubscriptionCache { + subscription_cache_map: SubscribersMap, +} + +/// A [`SubscriptionCache`] is used to store and manage subscriptions to +/// topics. It is kept local to the streamer. The streamer will receive updates +/// from the subscription service, and update the SubscriptionCache accordingly. +impl SubscriptionCache { + pub fn new(subscription_cache_map: FetchSubscriptionsResponse) -> Self { + let mut subscription_cache_hash_map = HashMap::new(); + for subscription in subscription_cache_map.subscriptions { + let uri = subscription.topic; + // Create new hashset if the key does not exist and insert the subscription + let subscription_information = SubscriptionInformation { + subscriber: subscription.subscriber, + status: subscription.status, + attributes: subscription.attributes, + config: subscription.config, + }; + subscription_cache_hash_map + .entry(uri) + .or_insert_with(HashSet::new) + .insert(subscription_information); + } + Self { + subscription_cache_map: Mutex::new(subscription_cache_hash_map), + } + } + + pub async fn fetch_cache( + &self, + ) -> HashMap, HashSet> { + let cache_map = self.subscription_cache_map.lock().await; + let mut cloned_map = HashMap::new(); + + for (key, value) in cache_map.iter() { + let cloned_key = key.clone(); + let cloned_value = value.iter().cloned().collect::>(); + cloned_map.insert(cloned_key, cloned_value); + } + + cloned_map + } +} diff --git a/up-linux-streamer/Cargo.toml b/up-linux-streamer/Cargo.toml index 48494859..e0001210 100644 --- a/up-linux-streamer/Cargo.toml +++ b/up-linux-streamer/Cargo.toml @@ -20,6 +20,8 @@ env_logger = "0.10.2" log = "0.4.21" async-trait = "0.1.80" protobuf = { version = "3.3", features = ["with-bytes"] } +usubscription-static-file = {path = "../example-utils/usubscription-static-file"} +chrono = "0.4" [dev-dependencies] hello-world-protos = { path = "../example-utils/hello-world-protos" } diff --git a/up-linux-streamer/examples/mE_publisher.rs b/up-linux-streamer/examples/mE_publisher.rs new file mode 100644 index 00000000..f2a99387 --- /dev/null +++ b/up-linux-streamer/examples/mE_publisher.rs @@ -0,0 +1,87 @@ +/******************************************************************************** + * Copyright (c) 2024 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + ********************************************************************************/ + +use chrono::Local; +use chrono::Timelike; +use hello_world_protos::hello_world_topics::Timer; +use hello_world_protos::timeofday::TimeOfDay; +use log::trace; +use std::fs::canonicalize; +use std::path::PathBuf; +use std::sync::Arc; +use std::time::Duration; +use up_rust::{UMessageBuilder, UStatus, UTransport, UUri}; +use up_transport_vsomeip::UPTransportVsomeip; + +const PUB_TOPIC_AUTHORITY: &str = "linux"; +const PUB_TOPIC_UE_ID: u16 = 0x1237; +const PUB_TOPIC_UE_VERSION_MAJOR: u8 = 1; +const PUB_TOPIC_RESOURCE_ID: u16 = 0x8001; + +#[tokio::main] +async fn main() -> Result<(), UStatus> { + env_logger::init(); + + println!("mE_publisher"); + + let crate_dir = env!("CARGO_MANIFEST_DIR"); + // TODO: Make configurable to pass the path to the vsomeip config as a command line argument + let vsomeip_config = PathBuf::from(crate_dir).join("vsomeip-configs/mE_publisher.json"); + let vsomeip_config = canonicalize(vsomeip_config).ok(); + trace!("vsomeip_config: {vsomeip_config:?}"); + + // There will be a single vsomeip_transport, as there is a connection into device and a streamer + // TODO: Add error handling if we fail to create a UPTransportVsomeip + let client: Arc = Arc::new( + UPTransportVsomeip::new_with_config( + &PUB_TOPIC_AUTHORITY.to_string(), + PUB_TOPIC_UE_ID, + &vsomeip_config.unwrap(), + ) + .unwrap(), + ); + + let source = UUri { + authority_name: PUB_TOPIC_AUTHORITY.to_string(), + ue_id: PUB_TOPIC_UE_ID as u32, + ue_version_major: PUB_TOPIC_UE_VERSION_MAJOR as u32, + resource_id: PUB_TOPIC_RESOURCE_ID as u32, + ..Default::default() + }; + + loop { + tokio::time::sleep(Duration::from_millis(1000)).await; + + let now = Local::now(); + + let time_of_day = TimeOfDay { + hours: now.hour() as i32, + minutes: now.minute() as i32, + seconds: now.second() as i32, + nanos: now.nanosecond() as i32, + ..Default::default() + }; + + let timer_message = Timer { + time: Some(time_of_day).into(), + ..Default::default() + }; + + let publish_msg = UMessageBuilder::publish(source.clone()) + .build_with_protobuf_payload(&timer_message) + .unwrap(); + println!("Sending Publish message:\n{publish_msg:?}"); + + client.send(publish_msg).await?; + } +} diff --git a/up-linux-streamer/examples/mE_subscriber.rs b/up-linux-streamer/examples/mE_subscriber.rs new file mode 100644 index 00000000..51060c3a --- /dev/null +++ b/up-linux-streamer/examples/mE_subscriber.rs @@ -0,0 +1,106 @@ +/******************************************************************************** + * Copyright (c) 2024 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + ********************************************************************************/ + +use async_trait::async_trait; +use hello_world_protos::hello_world_topics::Timer; +use log::{error, trace}; +use protobuf::Message; +use std::fs::canonicalize; +use std::path::PathBuf; +use std::sync::Arc; +use std::thread; +use up_rust::{UListener, UMessage, UStatus, UTransport, UUri}; +use up_transport_vsomeip::UPTransportVsomeip; + +const SERVICE_AUTHORITY: &str = "me_authority"; +const SERVICE_UE_ID: u16 = 0x1236; + +const PUB_TOPIC_AUTHORITY: &str = "pub_topic"; +const PUB_TOPIC_UE_ID: u16 = 0x1236; +const PUB_TOPIC_UE_VERSION_MAJOR: u8 = 1; +const PUB_TOPIC_RESOURCE_ID: u16 = 0x8001; + +#[allow(dead_code)] +struct PublishReceiver { + client: Arc, +} +impl PublishReceiver { + pub fn new(client: Arc) -> Self { + Self { client } + } +} +#[async_trait] +impl UListener for PublishReceiver { + async fn on_receive(&self, msg: UMessage) { + println!("PublishReceiver: Received a message: {msg:?}"); + + let Some(payload_bytes) = msg.payload else { + panic!("No bytes available"); + }; + let _ = match Timer::parse_from_bytes(&payload_bytes) { + Ok(timer_message) => { + println!("timer: {timer_message:?}"); + timer_message + } + Err(err) => { + error!("Unable to parse Timer Message: {err:?}"); + return; + } + }; + } + + async fn on_error(&self, err: UStatus) { + println!("ServiceRequestResponder: Encountered an error: {err:?}"); + } +} + +#[tokio::main] +async fn main() -> Result<(), UStatus> { + env_logger::init(); + + println!("mE_subscriber"); + + let crate_dir = env!("CARGO_MANIFEST_DIR"); + // TODO: Make configurable to pass the path to the vsomeip config as a command line argument + let vsomeip_config = PathBuf::from(crate_dir).join("vsomeip-configs/mE_subscriber.json"); + let vsomeip_config = canonicalize(vsomeip_config).ok(); + trace!("vsomeip_config: {vsomeip_config:?}"); + + // There will be a single vsomeip_transport, as there is a connection into device and a streamer + // TODO: Add error handling if we fail to create a UPTransportVsomeip + let service: Arc = Arc::new( + UPTransportVsomeip::new_with_config( + &SERVICE_AUTHORITY.to_string(), + SERVICE_UE_ID, + &vsomeip_config.unwrap(), + ) + .unwrap(), + ); + + let source_filter = UUri { + authority_name: PUB_TOPIC_AUTHORITY.to_string(), + ue_id: PUB_TOPIC_UE_ID as u32, + ue_version_major: PUB_TOPIC_UE_VERSION_MAJOR as u32, + resource_id: PUB_TOPIC_RESOURCE_ID as u32, + ..Default::default() + }; + + let publish_receiver: Arc = Arc::new(PublishReceiver::new(service.clone())); + // TODO: Need to revisit how the vsomeip config file is used in non point-to-point cases + service + .register_listener(&source_filter, None, publish_receiver.clone()) + .await?; + + thread::park(); + Ok(()) +} diff --git a/up-linux-streamer/examples/uE_client.rs b/up-linux-streamer/examples/uE_client.rs index 3ed7177e..4fefd31c 100644 --- a/up-linux-streamer/examples/uE_client.rs +++ b/up-linux-streamer/examples/uE_client.rs @@ -1,11 +1,25 @@ +/******************************************************************************** + * Copyright (c) 2024 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + ********************************************************************************/ + use async_trait::async_trait; use hello_world_protos::hello_world_service::{HelloRequest, HelloResponse}; use protobuf::Message; +use std::str::FromStr; use std::sync::Arc; use std::time::Duration; use up_rust::{UListener, UMessage, UMessageBuilder, UPayloadFormat, UStatus, UTransport, UUri}; use up_transport_zenoh::UPClientZenoh; -use zenoh::config::Config; +use zenoh::config::{Config, EndPoint}; const SERVICE_AUTHORITY: &str = "me_authority"; const SERVICE_UE_ID: u16 = 0x4321; @@ -49,7 +63,17 @@ async fn main() -> Result<(), UStatus> { println!("uE_client"); // TODO: Probably make somewhat configurable? - let zenoh_config = Config::default(); + // Create a configuration object + let mut zenoh_config = Config::default(); + + // Specify the address to listen on using IPv4 + let ipv4_endpoint = EndPoint::from_str("tcp/0.0.0.0:7445"); + + // Add the IPv4 endpoint to the Zenoh configuration + zenoh_config + .listen + .endpoints + .push(ipv4_endpoint.expect("FAIL")); // TODO: Add error handling if we fail to create a UPClientZenoh let client: Arc = Arc::new( UPClientZenoh::new(zenoh_config, "linux".to_string()) diff --git a/up-linux-streamer/examples/uE_publisher.rs b/up-linux-streamer/examples/uE_publisher.rs new file mode 100644 index 00000000..a417964f --- /dev/null +++ b/up-linux-streamer/examples/uE_publisher.rs @@ -0,0 +1,88 @@ +/******************************************************************************** + * Copyright (c) 2024 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + ********************************************************************************/ + +use chrono::Local; +use chrono::Timelike; +use hello_world_protos::hello_world_topics::Timer; +use hello_world_protos::timeofday::TimeOfDay; +use std::str::FromStr; +use std::sync::Arc; +use std::time::Duration; +use up_rust::{UMessageBuilder, UStatus, UTransport, UUri}; +use up_transport_zenoh::UPClientZenoh; +use zenoh::config::{Config, EndPoint}; + +const PUB_TOPIC_AUTHORITY: &str = "pub_topic"; +const PUB_TOPIC_UE_ID: u16 = 0x1236; +const PUB_TOPIC_UE_VERSION_MAJOR: u8 = 1; +const PUB_TOPIC_RESOURCE_ID: u16 = 0x8001; + +#[tokio::main] +async fn main() -> Result<(), UStatus> { + env_logger::init(); + + println!("uE_publisher"); + + // TODO: Probably make somewhat configurable? + // Create a configuration object + let mut zenoh_config = Config::default(); + + // Specify the address to listen on using IPv4 + let ipv4_endpoint = EndPoint::from_str("tcp/0.0.0.0:7445"); + + // Add the IPv4 endpoint to the Zenoh configuration + zenoh_config + .listen + .endpoints + .push(ipv4_endpoint.expect("FAIL")); + // TODO: Add error handling if we fail to create a UPClientZenoh + let client: Arc = Arc::new( + UPClientZenoh::new(zenoh_config, "linux".to_string()) + .await + .unwrap(), + ); + + let source = UUri { + authority_name: PUB_TOPIC_AUTHORITY.to_string(), + ue_id: PUB_TOPIC_UE_ID as u32, + ue_version_major: PUB_TOPIC_UE_VERSION_MAJOR as u32, + resource_id: PUB_TOPIC_RESOURCE_ID as u32, + ..Default::default() + }; + + loop { + tokio::time::sleep(Duration::from_millis(1000)).await; + + let now = Local::now(); + + let time_of_day = TimeOfDay { + hours: now.hour() as i32, + minutes: now.minute() as i32, + seconds: now.second() as i32, + nanos: now.nanosecond() as i32, + ..Default::default() + }; + + let timer_message = Timer { + time: Some(time_of_day).into(), + ..Default::default() + }; + + let publish_msg = UMessageBuilder::publish(source.clone()) + .build_with_protobuf_payload(&timer_message) + .unwrap(); + println!("Sending Publish message:\n{publish_msg:?}"); + + client.send(publish_msg).await?; + } +} diff --git a/up-linux-streamer/examples/uE_service.rs b/up-linux-streamer/examples/uE_service.rs index d16ba474..684b6671 100644 --- a/up-linux-streamer/examples/uE_service.rs +++ b/up-linux-streamer/examples/uE_service.rs @@ -2,12 +2,13 @@ use async_trait::async_trait; use hello_world_protos::hello_world_service::{HelloRequest, HelloResponse}; use log::error; use protobuf::Message; +use std::str::FromStr; use std::sync::Arc; use std::thread; use std::time::Duration; use up_rust::{UListener, UMessage, UMessageBuilder, UPayloadFormat, UStatus, UTransport, UUri}; use up_transport_zenoh::UPClientZenoh; -use zenoh::config::Config; +use zenoh::config::{Config, EndPoint}; const SERVICE_AUTHORITY: &str = "linux"; const SERVICE_UE_ID: u16 = 0x1236; @@ -71,7 +72,15 @@ async fn main() -> Result<(), UStatus> { println!("uE_service"); // TODO: Probably make somewhat configurable? - let zenoh_config = Config::default(); + let mut zenoh_config = Config::default(); + // Specify the address to listen on using IPv4 + let ipv4_endpoint = EndPoint::from_str("tcp/0.0.0.0:7445"); + + // Add the IPv4 endpoint to the Zenoh configuration + zenoh_config + .listen + .endpoints + .push(ipv4_endpoint.expect("FAIL")); // TODO: Add error handling if we fail to create a UPClientZenoh let service: Arc = Arc::new( UPClientZenoh::new(zenoh_config, "linux".to_string()) diff --git a/up-linux-streamer/examples/uE_subscriber.rs b/up-linux-streamer/examples/uE_subscriber.rs new file mode 100644 index 00000000..0dd3f72d --- /dev/null +++ b/up-linux-streamer/examples/uE_subscriber.rs @@ -0,0 +1,106 @@ +/******************************************************************************** + * Copyright (c) 2024 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + ********************************************************************************/ + +use async_trait::async_trait; +use hello_world_protos::hello_world_topics::Timer; +use log::error; +use protobuf::Message; +use std::str::FromStr; +use std::sync::Arc; +use std::thread; +use up_rust::{UListener, UMessage, UStatus, UTransport, UUri}; +use up_transport_zenoh::UPClientZenoh; +use zenoh::config::{Config, EndPoint}; + +const PUB_TOPIC_AUTHORITY: &str = "me_authority"; +const PUB_TOPIC_UE_ID: u16 = 0x1237; +const PUB_TOPIC_UE_VERSION_MAJOR: u8 = 1; +const PUB_TOPIC_RESOURCE_ID: u16 = 0x8001; + +#[allow(dead_code)] +struct PublishReceiver { + client: Arc, +} +impl PublishReceiver { + pub fn new(client: Arc) -> Self { + Self { client } + } +} +#[async_trait] +impl UListener for PublishReceiver { + async fn on_receive(&self, msg: UMessage) { + println!("PublishReceiver: Received a message: {msg:?}"); + + let Some(payload_bytes) = msg.payload else { + panic!("No bytes available"); + }; + let _ = match Timer::parse_from_bytes(&payload_bytes) { + Ok(timer_message) => { + println!("timer: {timer_message:?}"); + timer_message + } + Err(err) => { + error!("Unable to parse Timer Message: {err:?}"); + return; + } + }; + } + + async fn on_error(&self, err: UStatus) { + println!("ServiceRequestResponder: Encountered an error: {err:?}"); + } +} + +#[tokio::main] +async fn main() -> Result<(), UStatus> { + env_logger::init(); + + println!("uE_subscriber"); + + // TODO: Probably make somewhat configurable? + // Create a configuration object + let mut zenoh_config = Config::default(); + + // Specify the address to listen on using IPv4 + let ipv4_endpoint = EndPoint::from_str("tcp/0.0.0.0:7445"); + + // Add the IPv4 endpoint to the Zenoh configuration + zenoh_config + .listen + .endpoints + .push(ipv4_endpoint.expect("FAIL")); + // TODO: Add error handling if we fail to create a UPClientZenoh + // TODO: Add error handling if we fail to create a UPClientZenoh + let service: Arc = Arc::new( + UPClientZenoh::new(zenoh_config, "linux".to_string()) + .await + .unwrap(), + ); + + let source_filter = UUri { + authority_name: PUB_TOPIC_AUTHORITY.to_string(), + ue_id: PUB_TOPIC_UE_ID as u32, + ue_version_major: PUB_TOPIC_UE_VERSION_MAJOR as u32, + resource_id: PUB_TOPIC_RESOURCE_ID as u32, + ..Default::default() + }; + + let publish_receiver: Arc = Arc::new(PublishReceiver::new(service.clone())); + // TODO: Need to revisit how the vsomeip config file is used in non point-to-point cases + service + .register_listener(&source_filter, None, publish_receiver.clone()) + .await?; + + thread::park(); + Ok(()) +} diff --git a/up-linux-streamer/src/main.rs b/up-linux-streamer/src/main.rs index e1c11dc7..106df9e9 100644 --- a/up-linux-streamer/src/main.rs +++ b/up-linux-streamer/src/main.rs @@ -5,16 +5,22 @@ use std::sync::Arc; use std::thread; use up_rust::UStatus; use up_rust::UTransport; + +use std::str::FromStr; use up_streamer::{Endpoint, UStreamer}; use up_transport_vsomeip::UPTransportVsomeip; use up_transport_zenoh::UPClientZenoh; -use zenoh::config::Config; +use usubscription_static_file::USubscriptionStaticFile; +use zenoh::config::{Config, EndPoint}; #[tokio::main] async fn main() -> Result<(), UStatus> { env_logger::init(); - let mut streamer = UStreamer::new("up-linux-streamer", 10000); + let usubscription = Arc::new(USubscriptionStaticFile::new(Some(PathBuf::from( + "example-utils/usubscription-static-file/static-configs/testdata.json", + )))); + let mut streamer = UStreamer::new("up-linux-streamer", 10000, usubscription); let crate_dir = env!("CARGO_MANIFEST_DIR"); // TODO: Make configurable to pass the path to the vsomeip config as a command line argument @@ -30,7 +36,17 @@ async fn main() -> Result<(), UStatus> { ); // TODO: Probably make somewhat configurable? - let zenoh_config = Config::default(); + // Create a configuration object + let mut zenoh_config = Config::default(); + + // Specify the address to listen on using IPv4 + let ipv4_endpoint = EndPoint::from_str("tcp/0.0.0.0:7447"); + + // Add the IPv4 endpoint to the Zenoh configuration + zenoh_config + .listen + .endpoints + .push(ipv4_endpoint.expect("FAIL")); // TODO: Add error handling if we fail to create a UPClientZenoh let zenoh_transport: Arc = Arc::new( UPClientZenoh::new(zenoh_config, "linux".to_string()) diff --git a/up-linux-streamer/vsomeip-configs/mE_publisher.json b/up-linux-streamer/vsomeip-configs/mE_publisher.json new file mode 100644 index 00000000..4ed3ddbe --- /dev/null +++ b/up-linux-streamer/vsomeip-configs/mE_publisher.json @@ -0,0 +1,9 @@ +{ + "applications" : + [ + { + "name" : "0x1237", + "id" : "0x1237" + } + ] +} diff --git a/up-linux-streamer/vsomeip-configs/mE_subscriber.json b/up-linux-streamer/vsomeip-configs/mE_subscriber.json new file mode 100644 index 00000000..8f840b13 --- /dev/null +++ b/up-linux-streamer/vsomeip-configs/mE_subscriber.json @@ -0,0 +1,9 @@ +{ + "applications" : + [ + { + "name" : "0x4321", + "id" : "0x4321" + } + ] +} diff --git a/up-streamer/Cargo.toml b/up-streamer/Cargo.toml index 60e9210e..8de2f3c8 100644 --- a/up-streamer/Cargo.toml +++ b/up-streamer/Cargo.toml @@ -30,7 +30,10 @@ log = { workspace = true } prost = { workspace = true } uuid = { workspace = true } serde_json = { workspace = true } -up-rust = { workspace = true } +up-rust = { workspace = true, features = ["usubscription"] } +protobuf = { version = "3.3", features = ["with-bytes"] } +subscription-cache = {path="../subscription-cache"} +usubscription-static-file = {path="../example-utils/usubscription-static-file"} [dev-dependencies] async-broadcast = { version = "0.7.0" } diff --git a/up-streamer/src/ustreamer.rs b/up-streamer/src/ustreamer.rs index 0655a33f..c36833de 100644 --- a/up-streamer/src/ustreamer.rs +++ b/up-streamer/src/ustreamer.rs @@ -11,6 +11,8 @@ * SPDX-License-Identifier: Apache-2.0 ********************************************************************************/ +#![allow(clippy::mutable_key_type)] + use crate::endpoint::Endpoint; use async_std::channel::{Receiver, Sender}; use async_std::sync::{Arc, Mutex}; @@ -20,7 +22,13 @@ use log::*; use std::collections::{HashMap, HashSet}; use std::hash::{Hash, Hasher}; use std::ops::Deref; +use std::path::PathBuf; +use std::str; use std::thread; +use subscription_cache::SubscriptionCache; +use up_rust::core::usubscription::{ + FetchSubscriptionsRequest, FetchSubscriptionsResponse, SubscriberInfo, USubscription, +}; use up_rust::{UCode, UListener, UMessage, UStatus, UTransport, UUIDBuilder, UUri}; const USTREAMER_TAG: &str = "UStreamer:"; @@ -145,10 +153,12 @@ impl ForwardingListeners { out_authority: &str, forwarding_id: &str, out_sender: Sender>, + subscription_cache: Arc>, ) -> Option> { let in_comparable_transport = ComparableTransport::new(in_transport.clone()); - let mut forwarding_listeners = self.listeners.lock().await; + let initial_subscriber_cache = + task::block_on(subscription_cache.lock().await.fetch_cache()); let (active, forwarding_listener) = forwarding_listeners .entry((in_comparable_transport.clone(), out_authority.to_string())) @@ -159,9 +169,27 @@ impl ForwardingListeners { .register_listener(&any_uuri(), Some(&uauthority_to_uuri(out_authority)), forwarding_listener.clone())); if let Err(err) = reg_res { - warn!("{FORWARDING_LISTENERS_TAG}:{FORWARDING_LISTENERS_FN_INSERT_TAG} unable to register listener, error: {err}"); + warn!("{FORWARDING_LISTENERS_TAG}:{FORWARDING_LISTENERS_FN_INSERT_TAG} unable to register request listener, error: {err}"); } else { - debug!("{FORWARDING_LISTENERS_TAG}:{FORWARDING_LISTENERS_FN_INSERT_TAG} able to register listener"); + debug!("{FORWARDING_LISTENERS_TAG}:{FORWARDING_LISTENERS_FN_INSERT_TAG} able to register request listener"); + } + + for (topic, subscribers) in initial_subscriber_cache { + let mut authority_name_hash_set: HashSet = HashSet::new(); + for subscriber in subscribers { + authority_name_hash_set.insert(subscriber.subscriber.get_or_default().uri.get_or_default().authority_name.clone()); + } + // TODO: Should we also check if topic's authority matches in_transport's authority, i.e., topic belongs to in_transport + // topic.authority_name == in_authority + if authority_name_hash_set.contains(out_authority) { + let pub_reg_res = task::block_on(in_transport + .register_listener(&topic, None, forwarding_listener.clone())); + if let Err(err) = pub_reg_res { + warn!("{FORWARDING_LISTENERS_TAG}:{FORWARDING_LISTENERS_FN_INSERT_TAG} unable to register listener, error: {err}"); + } else { + debug!("{FORWARDING_LISTENERS_TAG}:{FORWARDING_LISTENERS_FN_INSERT_TAG} able to register listener"); + } + } } ( @@ -228,6 +256,7 @@ impl ForwardingListeners { /// /// ## Typical usage /// ``` +/// use usubscription_static_file::USubscriptionStaticFile; /// use std::sync::Arc; /// use async_std::sync::Mutex; /// use up_rust::{UListener, UTransport}; @@ -350,7 +379,8 @@ impl ForwardingListeners { /// let remote_authority = "remote"; /// let remote_endpoint = Endpoint::new("remote_endpoint", remote_authority, remote_transport); /// -/// let mut streamer = UStreamer::new("hoge", 100); +/// let usubscription = Arc::new(USubscriptionStaticFile::new(None)); +/// let mut streamer = UStreamer::new("hoge", 100, usubscription); /// /// // Add forwarding rules to endpoint local<->remote /// assert_eq!( @@ -410,6 +440,10 @@ pub struct UStreamer { registered_forwarding_rules: ForwardingRules, transport_forwarders: TransportForwarders, forwarding_listeners: ForwardingListeners, + subscription_cache: Arc>, + // TODO: Use this when USubsription is implemented + #[allow(dead_code)] + usubscription: Arc, } impl UStreamer { @@ -420,22 +454,61 @@ impl UStreamer { /// * name - Used to uniquely identify this UStreamer in logs /// * message_queue_size - Determines size of channel used to communicate between `ForwardingListener` /// and the worker tasks for each currently endpointd `UTransport` - pub fn new(name: &str, message_queue_size: u16) -> Self { + /// * usubscription - Subscription service which will be used to store subscription info for topics. + pub fn new(name: &str, message_queue_size: u16, usubscription: Arc) -> Self { let name = format!("{USTREAMER_TAG}:{name}:"); // Try to initiate logging. // Required in case of dynamic lib, otherwise no logs. // But cannot be done twice in case of static link. + std::env::set_var("RUST_LOG", "debug"); let _ = env_logger::try_init(); debug!( "{}:{}:{} UStreamer created", &name, USTREAMER_TAG, USTREAMER_FN_NEW_TAG ); + let uuri: UUri = UUri { + authority_name: "*".to_string(), + ue_id: 0x0000_FFFF, // any instance, any service + ue_version_major: 0xFF, // any + resource_id: 0xFFFF, // any + ..Default::default() + }; + + let details_vector = Vec::new(); + let subscriber_info = SubscriberInfo { + uri: Some(uuri).into(), + details: details_vector, + ..Default::default() + }; + + let mut fetch_request = FetchSubscriptionsRequest { + request: None, + offset: None, + ..Default::default() + }; + fetch_request.set_subscriber(subscriber_info); + let subscriptions = task::block_on(usubscription.fetch_subscriptions(fetch_request)); + //let subscription_cache_foo = Arc::new(Mutex::new(SubscriptionCache::new(subscriptions.expect("Subscriptions is an Error")))); + + let subscription_cache_foo = match subscriptions { + Ok(subs) => Arc::new(Mutex::new(SubscriptionCache::new(subs))), + Err(_subs) => { + // Handle the error case here, such as returning a default value or creating a default SubscriptionCache + let empty_fetch_response = FetchSubscriptionsResponse { + ..Default::default() + }; + Arc::new(Mutex::new(SubscriptionCache::new(empty_fetch_response))) + } + }; + Self { name: name.to_string(), registered_forwarding_rules: Mutex::new(HashSet::new()), transport_forwarders: TransportForwarders::new(message_queue_size as usize), forwarding_listeners: ForwardingListeners::new(), + subscription_cache: subscription_cache_foo.clone(), + usubscription, } } @@ -471,6 +544,8 @@ impl UStreamer { /// * [`UMessageType::UMESSAGE_TYPE_NOTIFICATION`][up_rust::UMessageType::UMESSAGE_TYPE_NOTIFICATION] /// * [`UMessageType::UMESSAGE_TYPE_REQUEST`][up_rust::UMessageType::UMESSAGE_TYPE_REQUEST] /// * [`UMessageType::UMESSAGE_TYPE_RESPONSE`][up_rust::UMessageType::UMESSAGE_TYPE_RESPONSE] + /// As well as a [`UMessageType::UMESSAGE_TYPE_PUBLISH`][up_rust::UMessageType::UMESSAGE_TYPE_PUBLISH] + /// type message which only has a source / topic contained in its attributes. /// /// # Parameters /// @@ -524,6 +599,7 @@ impl UStreamer { &out.authority, &Self::forwarding_id(&r#in, &out), out_sender, + self.subscription_cache.clone(), ) .await; Ok(()) @@ -547,6 +623,8 @@ impl UStreamer { /// * [`UMessageType::UMESSAGE_TYPE_NOTIFICATION`][up_rust::UMessageType::UMESSAGE_TYPE_NOTIFICATION] /// * [`UMessageType::UMESSAGE_TYPE_REQUEST`][up_rust::UMessageType::UMESSAGE_TYPE_REQUEST] /// * [`UMessageType::UMESSAGE_TYPE_RESPONSE`][up_rust::UMessageType::UMESSAGE_TYPE_RESPONSE] + /// As well as a [`UMessageType::UMESSAGE_TYPE_PUBLISH`][up_rust::UMessageType::UMESSAGE_TYPE_PUBLISH] + /// type message which only has a source / topic contained in its attributes. /// /// # Parameters /// @@ -639,6 +717,7 @@ impl TransportForwarder { fn new(out_transport: Arc, message_receiver: Receiver>) -> Self { let out_transport_clone = out_transport.clone(); let message_receiver_clone = message_receiver.clone(); + thread::spawn(|| { task::block_on(Self::message_forwarding_loop( UUIDBuilder::build().to_hyphenated_string(), @@ -663,7 +742,6 @@ impl TransportForwarder { TRANSPORT_FORWARDER_FN_MESSAGE_FORWARDING_LOOP_TAG, msg ); - let send_res = out_transport.send(msg.deref().clone()).await; if let Err(err) = send_res { warn!( @@ -734,6 +812,7 @@ mod tests { use async_trait::async_trait; use std::sync::Arc; use up_rust::{UListener, UMessage, UStatus, UTransport, UUri}; + use usubscription_static_file::USubscriptionStaticFile; pub struct UPClientFoo; @@ -838,7 +917,8 @@ mod tests { remote_transport.clone(), ); - let mut ustreamer = UStreamer::new("foo_bar_streamer", 100); + let usubscription = Arc::new(USubscriptionStaticFile::new(None)); + let mut ustreamer = UStreamer::new("foo_bar_streamer", 100, usubscription); // Add forwarding rules to endpoint local<->remote assert!(ustreamer .add_forwarding_rule(local_endpoint.clone(), remote_endpoint.clone()) @@ -910,7 +990,8 @@ mod tests { remote_transport_b.clone(), ); - let mut ustreamer = UStreamer::new("foo_bar_streamer", 100); + let usubscription = Arc::new(USubscriptionStaticFile::new(None)); + let mut ustreamer = UStreamer::new("foo_bar_streamer", 100, usubscription); // Add forwarding rules to endpoint local<->remote_a assert!(ustreamer @@ -970,7 +1051,8 @@ mod tests { remote_transport.clone(), ); - let mut ustreamer = UStreamer::new("foo_bar_streamer", 100); + let usubscription = Arc::new(USubscriptionStaticFile::new(None)); + let mut ustreamer = UStreamer::new("foo_bar_streamer", 100, usubscription); // Add forwarding rules to endpoint local<->remote_a assert!(ustreamer diff --git a/up-streamer/tests/single_local_single_remote.rs b/up-streamer/tests/single_local_single_remote.rs index 5cd3af6f..b25c275a 100644 --- a/up-streamer/tests/single_local_single_remote.rs +++ b/up-streamer/tests/single_local_single_remote.rs @@ -31,6 +31,7 @@ use std::sync::Arc; use std::time::Duration; use up_rust::{UListener, UTransport}; use up_streamer::{Endpoint, UStreamer}; +use usubscription_static_file::USubscriptionStaticFile; const DURATION_TO_RUN_CLIENTS: u128 = 1_000; const SENT_MESSAGE_VEC_CAPACITY: usize = 10_000; @@ -47,7 +48,8 @@ async fn single_local_single_remote() { Arc::new(UPClientFoo::new("upclient_bar", rx_2.clone(), tx_2.clone()).await); // setting up streamer to bridge between "foo" and "bar" - let mut ustreamer = UStreamer::new("foo_bar_streamer", 3000); + let usubscription = Arc::new(USubscriptionStaticFile::new(None)); + let mut ustreamer = UStreamer::new("foo_bar_streamer", 3000, usubscription); // setting up endpoints between authorities and protocols let local_endpoint = Endpoint::new("local_endpoint", &local_authority(), utransport_foo); diff --git a/up-streamer/tests/single_local_two_remote_add_remove_rules.rs b/up-streamer/tests/single_local_two_remote_add_remove_rules.rs index 8a04c703..612f1972 100644 --- a/up-streamer/tests/single_local_two_remote_add_remove_rules.rs +++ b/up-streamer/tests/single_local_two_remote_add_remove_rules.rs @@ -32,6 +32,7 @@ use std::sync::Arc; use std::time::Duration; use up_rust::{UListener, UTransport}; use up_streamer::{Endpoint, UStreamer}; +use usubscription_static_file::USubscriptionStaticFile; const DURATION_TO_RUN_CLIENTS: u128 = 500; const SENT_MESSAGE_VEC_CAPACITY: usize = 20_000; @@ -51,7 +52,8 @@ async fn single_local_two_remote_add_remove_rules() { Arc::new(UPClientFoo::new("upclient_bar_2", rx_3.clone(), tx_3.clone()).await); // setting up streamer to bridge between "foo" and "bar" - let mut ustreamer = UStreamer::new("foo_bar_streamer", 3000); + let usubscription = Arc::new(USubscriptionStaticFile::new(None)); + let mut ustreamer = UStreamer::new("foo_bar_streamer", 3000, usubscription); // setting up endpoints between authorities and protocols let local_endpoint = Endpoint::new("local_endpoint", &local_authority(), utransport_foo); diff --git a/up-streamer/tests/single_local_two_remote_authorities_different_remote_transport.rs b/up-streamer/tests/single_local_two_remote_authorities_different_remote_transport.rs index 5a28abaa..e09c74c0 100644 --- a/up-streamer/tests/single_local_two_remote_authorities_different_remote_transport.rs +++ b/up-streamer/tests/single_local_two_remote_authorities_different_remote_transport.rs @@ -32,6 +32,7 @@ use std::sync::Arc; use std::time::Duration; use up_rust::{UListener, UTransport}; use up_streamer::{Endpoint, UStreamer}; +use usubscription_static_file::USubscriptionStaticFile; const DURATION_TO_RUN_CLIENTS: u128 = 1_000; const SENT_MESSAGE_VEC_CAPACITY: usize = 10_000; @@ -51,7 +52,8 @@ async fn single_local_two_remote_authorities_different_remote_transport() { Arc::new(UPClientFoo::new("upclient_bar_2", rx_3.clone(), tx_3.clone()).await); // setting up streamer to bridge between "foo" and "bar" - let mut ustreamer = UStreamer::new("foo_bar_streamer", 3000); + let usubscription = Arc::new(USubscriptionStaticFile::new(None)); + let mut ustreamer = UStreamer::new("foo_bar_streamer", 3000, usubscription); // setting up endpoints between authorities and protocols let local_endpoint = Endpoint::new("local_endpoint", &local_authority(), utransport_foo); diff --git a/up-streamer/tests/single_local_two_remote_authorities_same_remote_transport.rs b/up-streamer/tests/single_local_two_remote_authorities_same_remote_transport.rs index 24acab59..455a69d3 100644 --- a/up-streamer/tests/single_local_two_remote_authorities_same_remote_transport.rs +++ b/up-streamer/tests/single_local_two_remote_authorities_same_remote_transport.rs @@ -32,6 +32,7 @@ use std::sync::Arc; use std::time::Duration; use up_rust::{UListener, UTransport}; use up_streamer::{Endpoint, UStreamer}; +use usubscription_static_file::USubscriptionStaticFile; const DURATION_TO_RUN_CLIENTS: u128 = 1_000; const SENT_MESSAGE_VEC_CAPACITY: usize = 10_000; @@ -48,7 +49,8 @@ async fn single_local_two_remote_authorities_same_remote_transport() { Arc::new(UPClientFoo::new("upclient_bar", rx_2.clone(), tx_2.clone()).await); // setting up streamer to bridge between "foo" and "bar" - let mut ustreamer = UStreamer::new("foo_bar_streamer", 3000); + let usubscription = Arc::new(USubscriptionStaticFile::new(None)); + let mut ustreamer = UStreamer::new("foo_bar_streamer", 3000, usubscription); // setting up endpoints between authorities and protocols let local_endpoint = Endpoint::new("local_endpoint", &local_authority(), utransport_foo);