Skip to content

Commit

Permalink
Passing zenoh configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
Ricardicus committed Aug 14, 2024
1 parent 7fefb36 commit d2e7988
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 12 deletions.
13 changes: 10 additions & 3 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ struct Cli {
#[clap(long)]
#[arg(default_value = "false")]
test_sender: bool,

#[clap(short, long)]
#[arg(default_value = "zenoh/config.json5")]
zenoh_config: String,
}

#[tokio::main]
Expand All @@ -66,6 +70,7 @@ async fn main() {
let sub = cli.sub;
let test_sender = cli.test_sender;
let test_receiver = cli.test_receiver;
let zenoh_config = cli.zenoh_config;

if gpgkey.len() > 0 {
let mut cert = None;
Expand Down Expand Up @@ -100,7 +105,9 @@ async fn main() {
let pgp_handler = PGPEnDeCrypt::new(&cert, &passphrase);
let pub_key_fingerprint = pgp_handler.get_public_key_fingerprint();

let mut session = Session::new(pgp_handler);
let mut session = Session::new(pgp_handler, zenoh_config.clone());

let zenoh_config = Config::from_file(zenoh_config).unwrap();

if test_receiver {
println!("-- Testing initiailize session [receiver]");
Expand All @@ -116,7 +123,7 @@ async fn main() {
let timeout_discovery = Duration::from_secs(5);

let zenoh_session =
Arc::new(Mutex::new(zenoh::open(config::peer()).res().await.unwrap()));
Arc::new(Mutex::new(zenoh::open(zenoh_config).res().await.unwrap()));
let handler = ZenohHandler::new(zenoh_session);
let mut cont = true;
let mut attempts = 0;
Expand Down Expand Up @@ -190,7 +197,7 @@ async fn main() {
let timeout_discovery = Duration::from_secs(5);

let zenoh_session =
Arc::new(Mutex::new(zenoh::open(config::peer()).res().await.unwrap()));
Arc::new(Mutex::new(zenoh::open(zenoh_config).res().await.unwrap()));
let handler = ZenohHandler::new(zenoh_session);
let mut cont = true;
let mut attempts = 0;
Expand Down
28 changes: 19 additions & 9 deletions src/session/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,16 +73,19 @@ where
pub host_encro: Arc<Mutex<HostCrypto>>,
pub tx: mpsc::Sender<(String, String)>,
pub rx: mpsc::Receiver<(String, String)>,

pub middleware_config: String,
}

impl<'a> Session<ChaCha20Poly1305EnDeCrypt, PGPEnDeCrypt<'a>> {
pub fn new(host_encro: PGPEnDeCrypt<'a>) -> Self {
pub fn new(host_encro: PGPEnDeCrypt<'a>, middleware_config: String) -> Self {
let (tx, mut rx) = mpsc::channel(100);
Session {
sessions: Arc::new(Mutex::new(HashMap::new())),
host_encro: Arc::new(Mutex::new(host_encro)),
tx,
rx,
middleware_config,
}
}

Expand Down Expand Up @@ -162,8 +165,9 @@ impl<'a> Session<ChaCha20Poly1305EnDeCrypt, PGPEnDeCrypt<'a>> {
let mut topic = Topic::Initialize.as_str().to_string();
topic.push_str("/");
topic.push_str(&cert.fingerprint().to_string());

let zenoh_session = Arc::new(Mutex::new(zenoh::open(config::peer()).res().await.unwrap()));
let zc = self.middleware_config.clone();
let zenoh_config = Config::from_file(zc).unwrap();
let zenoh_session = Arc::new(Mutex::new(zenoh::open(zenoh_config).res().await.unwrap()));
let handler = ZenohHandler::new(zenoh_session);

let await_response_interval = Duration::from_secs(5);
Expand Down Expand Up @@ -276,10 +280,12 @@ impl<'a> Session<ChaCha20Poly1305EnDeCrypt, PGPEnDeCrypt<'a>> {
for topic in topics {
let tx_clone = tx.clone();
let t = topic.clone();
let zc = self.middleware_config.clone();

let h = tokio::spawn(async move {
let zenoh_config = Config::from_file(zc).unwrap();
let zenoh_session =
Arc::new(Mutex::new(zenoh::open(config::peer()).res().await.unwrap()));
Arc::new(Mutex::new(zenoh::open(zenoh_config).res().await.unwrap()));
let handler = ZenohHandler::new(zenoh_session);
let mut keep_alive = true;
while keep_alive {
Expand Down Expand Up @@ -316,8 +322,10 @@ impl<'a> Session<ChaCha20Poly1305EnDeCrypt, PGPEnDeCrypt<'a>> {
self.serve_topics(topics_to_subscribe, &tx_clone, false)
.await;

let zc = self.middleware_config.clone();
let zenoh_config = Config::from_file(zc).unwrap();
let zenoh_session_responder =
Arc::new(Mutex::new(zenoh::open(config::peer()).res().await.unwrap()));
Arc::new(Mutex::new(zenoh::open(zenoh_config).res().await.unwrap()));
let responder = ZenohHandler::new(zenoh_session_responder);

while let Some(received) = self.rx.recv().await {
Expand Down Expand Up @@ -408,9 +416,10 @@ impl<'a> Session<ChaCha20Poly1305EnDeCrypt, PGPEnDeCrypt<'a>> {
let tx_clone = self.tx.clone();
self.serve_topics(topics_to_subscribe, &tx_clone, false)
.await;

let zc = self.middleware_config.clone();
let zenoh_config = Config::from_file(zc).unwrap();
let zenoh_session_responder =
Arc::new(Mutex::new(zenoh::open(config::peer()).res().await.unwrap()));
Arc::new(Mutex::new(zenoh::open(zenoh_config).res().await.unwrap()));
let responder = ZenohHandler::new(zenoh_session_responder);

while let Some(received) = self.rx.recv().await {
Expand Down Expand Up @@ -481,9 +490,10 @@ impl<'a> Session<ChaCha20Poly1305EnDeCrypt, PGPEnDeCrypt<'a>> {
let tx_clone = self.tx.clone();
self.serve_topics(topics_to_subscribe, &tx_clone, false)
.await;

let zc = self.middleware_config.clone();
let zenoh_config = Config::from_file(zc).unwrap();
let zenoh_session_responder =
Arc::new(Mutex::new(zenoh::open(config::peer()).res().await.unwrap()));
Arc::new(Mutex::new(zenoh::open(zenoh_config).res().await.unwrap()));
let responder = ZenohHandler::new(zenoh_session_responder);

let mut msg_count = 0;
Expand Down

0 comments on commit d2e7988

Please sign in to comment.