diff --git a/src/session/mod.rs b/src/session/mod.rs index 6541a5f..3dc390d 100644 --- a/src/session/mod.rs +++ b/src/session/mod.rs @@ -1,9 +1,12 @@ use std::collections::HashMap; +use std::fs::OpenOptions; use std::pin::Pin; use std::sync::Arc; use std::thread; use std::time::SystemTime; use tokio::sync::{mpsc, Mutex}; + +use std::io::Write; use tokio::time::{timeout, Duration}; pub mod crypto; @@ -824,6 +827,7 @@ impl Session { topics_to_subscribe.push(init_topic); topics_to_subscribe.push(discover_topic); topics_to_subscribe.push(close_topic); + topics_to_subscribe.push(heartbeat_topic); let tx_clone = self.tx.clone(); self.serve_topics(topics_to_subscribe, &tx_clone, false) @@ -839,6 +843,8 @@ impl Session { let responder = ZenohHandler::new(zenoh_session_responder); // Send discover message each minut self.launch_discovery().await; + // Launch session housekeeping + // self.launch_session_housekeeping().await; let keep_running = self.running.clone(); while *keep_running.lock().await { let timeout_duration = Duration::from_secs(5); @@ -905,68 +911,6 @@ impl Session { } } - pub async fn serve_testing(&mut self) { - let pub_key = self.host_encro.lock().await.get_public_key_fingerprint(); - let mut topics_to_subscribe = Vec::new(); - - // the initialization topic - let mut init_topic = Topic::Initialize.as_str().to_owned(); - init_topic.push_str("/"); - init_topic.push_str(&pub_key); - - topics_to_subscribe.push(init_topic); - topics_to_subscribe.push(Topic::Discover.as_str().to_owned()); - - let tx_clone = self.tx.clone(); - self.serve_topics(topics_to_subscribe, &tx_clone, false) - .await; - let zc = self.middleware_config.clone(); - let zenoh_config = Config::from_file(zc).unwrap(); - let zenoh_session_responder = - Arc::new(Mutex::new(zenoh::open(zenoh_config).res().await.unwrap())); - let responder = ZenohHandler::new(zenoh_session_responder); - - while let Some(received) = self.rx.recv().await { - let topic = received.0; - let mut topic_response = topic.clone(); - - let msg = match Message::deserialize(&received.1) { - Ok(msg) => { - let session_id = msg.session_id.clone(); - match self.handle_message(msg, &topic).await { - Ok(Some(res)) => { - // Do something - let response = res.0; - let topic_response = res.1; - let _ = responder.send_message(&topic_response, response).await; - } - Ok(None) => {} - Err(errormessage) => { - let response = Message { - message: MessageData::SessionError(errormessage), - session_id: session_id, - }; - let rs = response.to_string(); - let _ = responder.send_message(&topic_response, response).await; - } - } - } - Err(_) => {} - }; - { - let hm = self.sessions.lock().await; - if hm.len() > 0 { - thread::spawn(|| { - // Sleep for 3 seconds - thread::sleep(Duration::from_secs(3)); - // Exit the program with code 0 - }); - } else { - } - } - } - } - pub async fn stop_session(&mut self) { *self.running.lock().await = false; }