From 7cc5c4a059cdccb27132c7c3cad147c33b0d58e8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rickard=20Hallerb=C3=A4ck?= Date: Wed, 11 Sep 2024 18:50:30 +0200 Subject: [PATCH] minor modifications here and there --- src/client.rs | 36 +++++----- src/pgp.rs | 4 +- src/session/middleware/mod.rs | 2 +- src/session/mod.rs | 129 +++++++++++++++++++--------------- src/terminal.rs | 4 +- 5 files changed, 94 insertions(+), 81 deletions(-) diff --git a/src/client.rs b/src/client.rs index 862768b..0479b14 100644 --- a/src/client.rs +++ b/src/client.rs @@ -11,7 +11,7 @@ use session::protocol::*; use session::Session; use std::fs; use std::future::Future; -use std::io; + use std::pin::Pin; use std::process::exit; use std::sync::Arc; @@ -22,7 +22,7 @@ mod util; use util::get_current_datetime; mod pgp; -use pgp::pgp::{generate_new_key, get_public_key_as_base64, read_from_gpg, read_from_vec}; +use pgp::pgp::{generate_new_key, read_from_gpg, read_from_vec}; extern crate sequoia_openpgp as openpgp; use openpgp::cert::prelude::*; @@ -32,10 +32,10 @@ use ncurses::*; mod terminal; use terminal::{ - NewWindowCommand, PrintCommand, ReadCommand, WindowCommand, WindowManager, WindowPipe, + NewWindowCommand, PrintCommand, WindowCommand, WindowManager, WindowPipe, }; -use session::middleware::{ZMQHandler, ZenohHandler}; + #[derive(Parser)] #[command(author, version, about, long_about = None)] @@ -199,7 +199,7 @@ impl InputCommand { } async fn print_help() { - let mut help_text = String::new(); + let _help_text = String::new(); println_message_str(1, "Available commands:").await; println_message_str(1, "!list").await; println_message_str(1, "- List and enumerate all discovered peers.").await; @@ -271,8 +271,8 @@ async fn cb_chat_input( session_id: String, topic_out: String, ) -> Option<(String, String)> { - let prompt = ">> ".to_string(); - let mut input = read_chat_message(1).await; + let _prompt = ">> ".to_string(); + let input = read_chat_message(1).await; if input.is_err() { return None; } @@ -301,7 +301,7 @@ async fn cb_chat_input( return Some((topic.to_string(), msg.serialize().unwrap())); } -async fn cb_closed(public_key: String, session_id: String) { +async fn cb_closed(public_key: String, _session_id: String) { let pub_key_decoded = base64::decode(public_key); if pub_key_decoded.is_err() { return; @@ -336,7 +336,7 @@ async fn cb_discovered(public_key: String) -> bool { Ok(pub_key) => pub_key, }; match PGPEnCryptOwned::new_from_vec(&pub_key_decoded) { - Ok(pub_encro) => true, + Ok(_pub_encro) => true, _ => false, } } @@ -345,7 +345,7 @@ async fn cb_terminate() { println_message(1, format!("-- Terminating session ...")).await; } -async fn cb_init_declined(public_key: String, message: String) { +async fn cb_init_declined(public_key: String, _message: String) { let pub_key_decoded = match base64::decode(public_key) { Err(_) => { return; @@ -389,7 +389,7 @@ async fn cb_init_await(public_key: String) { } } -async fn cb_init_accepted(public_key: String) { +async fn cb_init_accepted(_public_key: String) { println_message( 1, format!("-- Peer accepted the connection. You can now chat!"), @@ -405,7 +405,7 @@ async fn cb_init_incoming(public_key: String) -> bool { Ok(pub_key) => pub_key, }; match PGPEnCryptOwned::new_from_vec(&pub_key_decoded) { - Ok(pub_encro) => true, + Ok(_pub_encro) => true, _ => { let _ = &format!("-- Chat was not initiailized"); false @@ -504,7 +504,7 @@ async fn terminal_program( } } } else { - let mut input; + let input; if print_prompt { input = read_message(1, ">> ", &upper_prompt, 1).await; } else { @@ -584,11 +584,11 @@ async fn terminal_program( ), ) .await; - let session_id = match session + let _session_id = match session .initialize_session_zenoh(peer.clone()) .await { - Ok(ok) => {} + Ok(_ok) => {} Err(not_ok) => { terminate(session_tx.clone()).await; println!("{}", not_ok); @@ -671,9 +671,9 @@ async fn launch_terminal_program( tokio::time::sleep(Duration::from_millis(100)).await; let pgp_handler = PGPEnDeCrypt::new_no_certpass(cert.clone()); - let pub_key_fingerprint = pgp_handler.get_public_key_fingerprint(); - let pub_key_userid = pgp_handler.get_userid(); - let pub_key_full = pgp_handler.get_public_key_as_base64(); + let _pub_key_fingerprint = pgp_handler.get_public_key_fingerprint(); + let _pub_key_userid = pgp_handler.get_userid(); + let _pub_key_full = pgp_handler.get_public_key_as_base64(); let (max_y, max_x) = WindowManager::get_max_yx(); let num_windows = 2; diff --git a/src/pgp.rs b/src/pgp.rs index 6c9e24f..829fe3b 100644 --- a/src/pgp.rs +++ b/src/pgp.rs @@ -78,7 +78,7 @@ pub mod pgp { ) -> openpgp::Result<()> { let p = &P::new(); // Get the keypair to do the signing from the Cert. - let key = tsk.primary_key().key().clone().parts_into_secret()?; + let _key = tsk.primary_key().key().clone().parts_into_secret()?; let mut keypair = None; if passphrase.len() == 0 { keypair = Some( @@ -135,7 +135,7 @@ pub mod pgp { let mut message = LiteralWriter::new(message).build()?; // Sign the data. - message.write_all(plaintext.clone().as_bytes())?; + message.write_all(plaintext.as_bytes())?; // Finalize the OpenPGP message to make sure that all data is // written. diff --git a/src/session/middleware/mod.rs b/src/session/middleware/mod.rs index 44b6c08..105056c 100644 --- a/src/session/middleware/mod.rs +++ b/src/session/middleware/mod.rs @@ -1,6 +1,6 @@ use crate::session::messages::{ MessageListener, Messageble, MessagebleTopicAsync, MessagebleTopicAsyncPublishReads, - MessagebleTopicAsyncReadTimeout, MessagingError, SessionErrorCodes, SessionErrorMsg, + MessagebleTopicAsyncReadTimeout, MessagingError, SessionMessage, }; use crate::session::Session; diff --git a/src/session/mod.rs b/src/session/mod.rs index 3dc390d..18cd6e5 100644 --- a/src/session/mod.rs +++ b/src/session/mod.rs @@ -1,12 +1,12 @@ use std::collections::HashMap; -use std::fs::OpenOptions; + use std::pin::Pin; use std::sync::Arc; -use std::thread; + use std::time::SystemTime; use tokio::sync::{mpsc, Mutex}; -use std::io::Write; + use tokio::time::{timeout, Duration}; pub mod crypto; @@ -28,8 +28,7 @@ use messages::MessageData::{ use messages::MessagingError::*; use messages::SessionMessage as Message; use messages::{ - ChatMsg, EncryptedMsg, HeartbeatMsg, InitAwaitMsg, InitDeclineMsg, InitMsg, InitOkMsg, - InternalMsg, MessageData, MessageListener, Messageble, MessagebleTopicAsync, + ChatMsg, EncryptedMsg, InitMsg, MessageData, MessageListener, Messageble, MessagebleTopicAsync, MessagebleTopicAsyncPublishReads, MessagebleTopicAsyncReadTimeout, MessagingError, SessionErrorCodes, SessionErrorMsg, }; @@ -148,6 +147,7 @@ where pub middleware_config: String, discovery_interval_seconds: u64, + heartbeat_interval_seconds: u64, running: Arc>, } @@ -177,6 +177,7 @@ impl Session { middleware_config, discovery_interval_seconds: 60, + heartbeat_interval_seconds: 10, running: Arc::new(Mutex::new(true)), } } @@ -206,6 +207,7 @@ impl Session { middleware_config: self.middleware_config.clone(), discovery_interval_seconds: self.discovery_interval_seconds, + heartbeat_interval_seconds: self.heartbeat_interval_seconds, running: self.running.clone(), } } @@ -540,7 +542,7 @@ impl Session { let zenoh_session = Arc::new(Mutex::new(zenoh::open(zenoh_config).res().await.unwrap())); let handler = ZenohHandler::new(zenoh_session); - let await_response_interval = Duration::from_secs(60); + let _await_response_interval = Duration::from_secs(60); { let mut requests = self.requests_outgoing_initialization.lock().await; @@ -561,11 +563,11 @@ impl Session { for topic in topics { let tx_clone = tx.clone(); - let t = topic.clone(); + let _t = topic.clone(); let zc = self.middleware_config.clone(); let terminate_callbacks = self.callbacks_terminate.clone(); - let mut running = self.running.clone(); + let running = self.running.clone(); let h = tokio::spawn(async move { let zenoh_config = Config::from_file(zc).unwrap(); let zenoh_session = zenoh::open(zenoh_config.clone()).res().await; @@ -581,7 +583,7 @@ impl Session { let handler = ZenohHandler::new(zenoh_session); let mut keep_running = *running.lock().await; while keep_running { - let result = handler.read_messages(&topic, &tx_clone).await; + let _result = handler.read_messages(&topic, &tx_clone).await; { keep_running = *running.lock().await; } @@ -626,7 +628,7 @@ impl Session { ) .await { - if let Err(e) = tx_clone.send((topic, msg)).await {} + if let Err(_e) = tx_clone.send((topic, msg)).await {} } } { @@ -645,23 +647,19 @@ impl Session { hm.clone() } - pub async fn launch_discovery(&mut self) { + pub async fn launch_discovery(&mut self, handler: Arc>) { let mut session_discover = self.clone(); let keep_running_discover = self.running.clone(); let discovery_interval_seconds = self.discovery_interval_seconds; - let zc = self.middleware_config.clone(); + let _zc = self.middleware_config.clone(); + let handler = handler.clone(); tokio::spawn(async move { let mut keep_running; { keep_running = *keep_running_discover.lock().await; } - - let zenoh_config = Config::from_file(zc).unwrap(); - let zenoh_session = - Arc::new(Mutex::new(zenoh::open(zenoh_config).res().await.unwrap())); - let handler = ZenohHandler::new(zenoh_session); while keep_running { - let _ = session_discover.discover(&handler).await; + let _ = session_discover.discover(handler.clone()).await; tokio::time::sleep(Duration::from_secs(discovery_interval_seconds)).await; { keep_running = *keep_running_discover.lock().await; @@ -671,7 +669,7 @@ impl Session { } pub async fn terminate_session_locally(&mut self, session_id: &str) { - let signature = match self.host_encro.lock().await.sign(session_id) { + let _signature = match self.host_encro.lock().await.sign(session_id) { Ok(s) => s, Err(_) => { return; @@ -690,8 +688,8 @@ impl Session { }; match PGPEnCryptOwned::new_from_vec(&pub_key_decoded) { Ok(pub_encro) => { - let pub_key = self.host_encro.lock().await.get_public_key_as_base64(); - let topic = Topic::close_topic(&pub_encro.get_public_key_fingerprint()); + let _pub_key = self.host_encro.lock().await.get_public_key_as_base64(); + let _topic = Topic::close_topic(&pub_encro.get_public_key_fingerprint()); let mut hm = self.sessions.lock().await; hm.remove(session_id); @@ -709,7 +707,7 @@ impl Session { } } - pub async fn terminate_session(&mut self, session_id: &str, sender: &ZenohHandler) { + pub async fn terminate_session(&mut self, session_id: &str, sender: Arc>) { let signature = match self.host_encro.lock().await.sign(session_id) { Ok(s) => s, Err(_) => { @@ -732,7 +730,10 @@ impl Session { let pub_key = self.host_encro.lock().await.get_public_key_as_base64(); let msg = Message::new_close(session_id.to_string(), pub_key, signature); let topic = Topic::close_topic(&pub_encro.get_public_key_fingerprint()); - let _ = sender.send_message(&topic, msg).await; + { + let sender = sender.lock().await; + let _ = sender.send_message(&topic, msg).await; + } let mut hm = self.sessions.lock().await; hm.remove(session_id); @@ -750,21 +751,19 @@ impl Session { } } - pub async fn launch_session_housekeeping(&mut self) { + pub async fn launch_session_housekeeping(&mut self, sender: Arc>) { let mut session_discover = self.clone(); let keep_running_discover = self.running.clone(); - let discovery_interval_seconds = self.discovery_interval_seconds; - let zc = self.middleware_config.clone(); - let wait_factor = 5; // 5 times the discovery interval, hard coded for now? + let heartbeat_interval_seconds = self.heartbeat_interval_seconds; + let _zc = self.middleware_config.clone(); + let wait_factor = 10; // 5 times the discovery interval, hard coded for now? + let handler = sender.clone(); tokio::spawn(async move { let mut keep_running; { keep_running = *keep_running_discover.lock().await; } - let zenoh_config = Config::from_file(zc).unwrap(); - let zenoh_session = - Arc::new(Mutex::new(zenoh::open(zenoh_config).res().await.unwrap())); - let handler = ZenohHandler::new(zenoh_session); + while keep_running { let sessions; { @@ -775,9 +774,9 @@ impl Session { let now = SystemTime::now(); let last_active = session_data.last_active; let duration = now.duration_since(last_active).unwrap(); - if duration.as_secs() > discovery_interval_seconds * wait_factor { + if duration.as_secs() > heartbeat_interval_seconds * wait_factor { let _ = session_discover - .terminate_session(&session_id.clone(), &handler) + .terminate_session(&session_id.clone(), handler.clone()) .await; } session_ids.push((session_id.clone(), session_data.pub_key.clone())); @@ -796,13 +795,15 @@ impl Session { let fingerprint = pub_encro.get_public_key_fingerprint(); let msg = Message::new_heartbeat(session_id); let topic = Topic::heartbeat_topic(&fingerprint); - - let _ = handler.send_message(&topic, msg).await; + { + let handler = handler.lock().await; + let _ = handler.send_message(&topic, msg).await; + } } Err(_) => {} } } - tokio::time::sleep(Duration::from_secs(discovery_interval_seconds)).await; + tokio::time::sleep(Duration::from_secs(heartbeat_interval_seconds)).await; { keep_running = *keep_running_discover.lock().await; } @@ -840,11 +841,11 @@ impl Session { } let zenoh_session = zenoh_session.unwrap(); let zenoh_session_responder = Arc::new(Mutex::new(zenoh_session)); - let responder = ZenohHandler::new(zenoh_session_responder); + let responder = Arc::new(Mutex::new(ZenohHandler::new(zenoh_session_responder))); // Send discover message each minut - self.launch_discovery().await; + self.launch_discovery(responder.clone()).await; // Launch session housekeeping - // self.launch_session_housekeeping().await; + self.launch_session_housekeeping(responder.clone()).await; let keep_running = self.running.clone(); while *keep_running.lock().await { let timeout_duration = Duration::from_secs(5); @@ -877,9 +878,12 @@ impl Session { continue; } let _ = response.clone(); - let _ = responder - .send_message(&topic_response, response.clone()) - .await; + { + let responder = responder.lock().await; + let _ = responder + .send_message(&topic_response, response.clone()) + .await; + } } Ok(None) => {} Err(errormessage) => { @@ -889,7 +893,10 @@ impl Session { session_id, }; let _ = response.to_string(); - let _ = responder.send_message(&topic_error, response).await; + { + let responder = responder.lock().await; + let _ = responder.send_message(&topic_error, response).await; + } } } } @@ -897,17 +904,17 @@ impl Session { }; } - self.close_sessions(&responder).await; + self.close_sessions(responder).await; return Ok(()); } - pub async fn close_sessions(&mut self, sender: &ZenohHandler) { + pub async fn close_sessions(&mut self, sender: Arc>) { let sessions; { sessions = self.get_sessions().await; } - for (session_id, session_data) in sessions.iter() { - let _ = self.terminate_session(session_id, &sender).await; + for (session_id, _session_data) in sessions.iter() { + let _ = self.terminate_session(session_id, sender.clone()).await; } } @@ -916,29 +923,35 @@ impl Session { } pub async fn get_discovered(&self) -> Vec { - let mut discovered; + let discovered; { discovered = self.discovered.lock().await; } let mut discovered_keys = Vec::new(); - for (fingerprint, key) in discovered.iter() { + for (_fingerprint, key) in discovered.iter() { discovered_keys.push(key.clone()); } discovered_keys } - pub async fn discover(&mut self, sender: &ZenohHandler) -> Result<(), MessagingError> { + pub async fn discover( + &mut self, + sender: Arc>, + ) -> Result<(), MessagingError> { let discover_topic = Topic::Discover.as_str(); let mut discover_topic_reply = discover_topic.to_string(); discover_topic_reply.push_str(Topic::reply_suffix()); - let timeout_discovery = Duration::from_secs(5); + let _timeout_discovery = Duration::from_secs(5); let mut this_pub_key = None; { this_pub_key = Some(self.host_encro.lock().await.get_public_key_as_base64()); } let msg = Message::new_discovery(this_pub_key.clone().unwrap()); - self.send(msg, discover_topic, sender).await?; + { + let h = sender.lock().await; + h.send_message(discover_topic, msg).await?; + } Ok(()) } @@ -954,7 +967,7 @@ impl Session { ) -> Result { match gateway.send_message(topic_tx, send_msg).await { Ok(_) => {} - Err(error) => return Err(MessagingError::UnreachableHost), + Err(_error) => return Err(MessagingError::UnreachableHost), }; gateway.read_message_timeout(topic_rx, timeout).await } @@ -971,7 +984,7 @@ impl Session { ) -> Result, MessagingError> { match gateway.send_message(topic_tx, send_msg).await { Ok(_) => {} - Err(error) => return Err(MessagingError::UnreachableHost), + Err(_error) => return Err(MessagingError::UnreachableHost), }; gateway.read_messages_timeout(topic_rx, timeout).await } @@ -984,7 +997,7 @@ impl Session { ) -> Result<(), MessagingError> { match gateway.send_message(topic_tx, send_msg).await { Ok(_) => {} - Err(error) => return Err(MessagingError::UnreachableHost), + Err(_error) => return Err(MessagingError::UnreachableHost), }; Ok(()) } @@ -1119,7 +1132,7 @@ impl Session { Ok(None) } - Heartbeat(msg) => { + Heartbeat(_msg) => { if let Some(session_data) = self.sessions.lock().await.get_mut(&session_id) { session_data.last_active = SystemTime::now(); } @@ -1200,7 +1213,7 @@ impl Session { { let other_key = pub_encro.get_public_key_fingerprint(); - if let Err(s) = pub_encro.verify(&signature, &other_key) { + if let Err(_s) = pub_encro.verify(&signature, &other_key) { let msg = Message::new_init_decline( pub_key.clone(), "Invalid signature".to_owned(), @@ -1306,7 +1319,7 @@ impl Session { } }; let mut add_session = None; - let this_pub_key = self.host_encro.lock().await.get_public_key_as_base64(); + let _this_pub_key = self.host_encro.lock().await.get_public_key_as_base64(); { let pendings = self.requests_outgoing_initialization.lock().await; let pub_key_dec = base64::decode(&msg.pub_key); diff --git a/src/terminal.rs b/src/terminal.rs index 6b8cf42..2f90020 100644 --- a/src/terminal.rs +++ b/src/terminal.rs @@ -1,10 +1,10 @@ use ncurses::*; use std::collections::HashMap; -use std::marker::PhantomData; + use std::sync::Arc; use tokio::sync::{mpsc, Mutex}; -use tokio::time::{timeout, Duration}; + use serde::{Deserialize, Serialize};