From d2e79881ce4ffc2fa80c273d942b78c3fae7f029 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rickard=20Hallerb=C3=A4ck?= Date: Wed, 14 Aug 2024 18:50:47 +0200 Subject: [PATCH] Passing zenoh configuration --- src/client.rs | 13 ++++++++++--- src/session/mod.rs | 28 +++++++++++++++++++--------- 2 files changed, 29 insertions(+), 12 deletions(-) diff --git a/src/client.rs b/src/client.rs index c04bb24..1639435 100644 --- a/src/client.rs +++ b/src/client.rs @@ -54,6 +54,10 @@ struct Cli { #[clap(long)] #[arg(default_value = "false")] test_sender: bool, + + #[clap(short, long)] + #[arg(default_value = "zenoh/config.json5")] + zenoh_config: String, } #[tokio::main] @@ -66,6 +70,7 @@ async fn main() { let sub = cli.sub; let test_sender = cli.test_sender; let test_receiver = cli.test_receiver; + let zenoh_config = cli.zenoh_config; if gpgkey.len() > 0 { let mut cert = None; @@ -100,7 +105,9 @@ async fn main() { let pgp_handler = PGPEnDeCrypt::new(&cert, &passphrase); let pub_key_fingerprint = pgp_handler.get_public_key_fingerprint(); - let mut session = Session::new(pgp_handler); + let mut session = Session::new(pgp_handler, zenoh_config.clone()); + + let zenoh_config = Config::from_file(zenoh_config).unwrap(); if test_receiver { println!("-- Testing initiailize session [receiver]"); @@ -116,7 +123,7 @@ async fn main() { let timeout_discovery = Duration::from_secs(5); let zenoh_session = - Arc::new(Mutex::new(zenoh::open(config::peer()).res().await.unwrap())); + Arc::new(Mutex::new(zenoh::open(zenoh_config).res().await.unwrap())); let handler = ZenohHandler::new(zenoh_session); let mut cont = true; let mut attempts = 0; @@ -190,7 +197,7 @@ async fn main() { let timeout_discovery = Duration::from_secs(5); let zenoh_session = - Arc::new(Mutex::new(zenoh::open(config::peer()).res().await.unwrap())); + Arc::new(Mutex::new(zenoh::open(zenoh_config).res().await.unwrap())); let handler = ZenohHandler::new(zenoh_session); let mut cont = true; let mut attempts = 0; diff --git a/src/session/mod.rs b/src/session/mod.rs index 5aa1133..763930d 100644 --- a/src/session/mod.rs +++ b/src/session/mod.rs @@ -73,16 +73,19 @@ where pub host_encro: Arc>, pub tx: mpsc::Sender<(String, String)>, pub rx: mpsc::Receiver<(String, String)>, + + pub middleware_config: String, } impl<'a> Session> { - pub fn new(host_encro: PGPEnDeCrypt<'a>) -> Self { + pub fn new(host_encro: PGPEnDeCrypt<'a>, middleware_config: String) -> Self { let (tx, mut rx) = mpsc::channel(100); Session { sessions: Arc::new(Mutex::new(HashMap::new())), host_encro: Arc::new(Mutex::new(host_encro)), tx, rx, + middleware_config, } } @@ -162,8 +165,9 @@ impl<'a> Session> { let mut topic = Topic::Initialize.as_str().to_string(); topic.push_str("/"); topic.push_str(&cert.fingerprint().to_string()); - - let zenoh_session = Arc::new(Mutex::new(zenoh::open(config::peer()).res().await.unwrap())); + let zc = self.middleware_config.clone(); + let zenoh_config = Config::from_file(zc).unwrap(); + let zenoh_session = Arc::new(Mutex::new(zenoh::open(zenoh_config).res().await.unwrap())); let handler = ZenohHandler::new(zenoh_session); let await_response_interval = Duration::from_secs(5); @@ -276,10 +280,12 @@ impl<'a> Session> { for topic in topics { let tx_clone = tx.clone(); let t = topic.clone(); + let zc = self.middleware_config.clone(); let h = tokio::spawn(async move { + let zenoh_config = Config::from_file(zc).unwrap(); let zenoh_session = - Arc::new(Mutex::new(zenoh::open(config::peer()).res().await.unwrap())); + Arc::new(Mutex::new(zenoh::open(zenoh_config).res().await.unwrap())); let handler = ZenohHandler::new(zenoh_session); let mut keep_alive = true; while keep_alive { @@ -316,8 +322,10 @@ impl<'a> Session> { self.serve_topics(topics_to_subscribe, &tx_clone, false) .await; + let zc = self.middleware_config.clone(); + let zenoh_config = Config::from_file(zc).unwrap(); let zenoh_session_responder = - Arc::new(Mutex::new(zenoh::open(config::peer()).res().await.unwrap())); + Arc::new(Mutex::new(zenoh::open(zenoh_config).res().await.unwrap())); let responder = ZenohHandler::new(zenoh_session_responder); while let Some(received) = self.rx.recv().await { @@ -408,9 +416,10 @@ impl<'a> Session> { let tx_clone = self.tx.clone(); self.serve_topics(topics_to_subscribe, &tx_clone, false) .await; - + let zc = self.middleware_config.clone(); + let zenoh_config = Config::from_file(zc).unwrap(); let zenoh_session_responder = - Arc::new(Mutex::new(zenoh::open(config::peer()).res().await.unwrap())); + Arc::new(Mutex::new(zenoh::open(zenoh_config).res().await.unwrap())); let responder = ZenohHandler::new(zenoh_session_responder); while let Some(received) = self.rx.recv().await { @@ -481,9 +490,10 @@ impl<'a> Session> { let tx_clone = self.tx.clone(); self.serve_topics(topics_to_subscribe, &tx_clone, false) .await; - + let zc = self.middleware_config.clone(); + let zenoh_config = Config::from_file(zc).unwrap(); let zenoh_session_responder = - Arc::new(Mutex::new(zenoh::open(config::peer()).res().await.unwrap())); + Arc::new(Mutex::new(zenoh::open(zenoh_config).res().await.unwrap())); let responder = ZenohHandler::new(zenoh_session_responder); let mut msg_count = 0;