Skip to content

Commit

Permalink
minor
Browse files Browse the repository at this point in the history
  • Loading branch information
Ricardicus committed Sep 10, 2024
1 parent 0fe835d commit be53ef1
Showing 1 changed file with 6 additions and 62 deletions.
68 changes: 6 additions & 62 deletions src/session/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
use std::collections::HashMap;
use std::fs::OpenOptions;
use std::pin::Pin;
use std::sync::Arc;
use std::thread;
use std::time::SystemTime;
use tokio::sync::{mpsc, Mutex};

use std::io::Write;
use tokio::time::{timeout, Duration};

pub mod crypto;
Expand Down Expand Up @@ -824,6 +827,7 @@ impl Session<ChaCha20Poly1305EnDeCrypt, PGPEnDeCrypt> {
topics_to_subscribe.push(init_topic);
topics_to_subscribe.push(discover_topic);
topics_to_subscribe.push(close_topic);
topics_to_subscribe.push(heartbeat_topic);

let tx_clone = self.tx.clone();
self.serve_topics(topics_to_subscribe, &tx_clone, false)
Expand All @@ -839,6 +843,8 @@ impl Session<ChaCha20Poly1305EnDeCrypt, PGPEnDeCrypt> {
let responder = ZenohHandler::new(zenoh_session_responder);
// Send discover message each minut
self.launch_discovery().await;
// Launch session housekeeping
// self.launch_session_housekeeping().await;
let keep_running = self.running.clone();
while *keep_running.lock().await {
let timeout_duration = Duration::from_secs(5);
Expand Down Expand Up @@ -905,68 +911,6 @@ impl Session<ChaCha20Poly1305EnDeCrypt, PGPEnDeCrypt> {
}
}

pub async fn serve_testing(&mut self) {
let pub_key = self.host_encro.lock().await.get_public_key_fingerprint();
let mut topics_to_subscribe = Vec::new();

// the initialization topic
let mut init_topic = Topic::Initialize.as_str().to_owned();
init_topic.push_str("/");
init_topic.push_str(&pub_key);

topics_to_subscribe.push(init_topic);
topics_to_subscribe.push(Topic::Discover.as_str().to_owned());

let tx_clone = self.tx.clone();
self.serve_topics(topics_to_subscribe, &tx_clone, false)
.await;
let zc = self.middleware_config.clone();
let zenoh_config = Config::from_file(zc).unwrap();
let zenoh_session_responder =
Arc::new(Mutex::new(zenoh::open(zenoh_config).res().await.unwrap()));
let responder = ZenohHandler::new(zenoh_session_responder);

while let Some(received) = self.rx.recv().await {
let topic = received.0;
let mut topic_response = topic.clone();

let msg = match Message::deserialize(&received.1) {
Ok(msg) => {
let session_id = msg.session_id.clone();
match self.handle_message(msg, &topic).await {
Ok(Some(res)) => {
// Do something
let response = res.0;
let topic_response = res.1;
let _ = responder.send_message(&topic_response, response).await;
}
Ok(None) => {}
Err(errormessage) => {
let response = Message {
message: MessageData::SessionError(errormessage),
session_id: session_id,
};
let rs = response.to_string();
let _ = responder.send_message(&topic_response, response).await;
}
}
}
Err(_) => {}
};
{
let hm = self.sessions.lock().await;
if hm.len() > 0 {
thread::spawn(|| {
// Sleep for 3 seconds
thread::sleep(Duration::from_secs(3));
// Exit the program with code 0
});
} else {
}
}
}
}

pub async fn stop_session(&mut self) {
*self.running.lock().await = false;
}
Expand Down

0 comments on commit be53ef1

Please sign in to comment.