Skip to content

Commit

Permalink
various cleanups
Browse files Browse the repository at this point in the history
  • Loading branch information
Ricardicus committed Sep 5, 2024
1 parent 0d88441 commit a2edc00
Show file tree
Hide file tree
Showing 7 changed files with 27 additions and 70 deletions.
8 changes: 7 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ path="./src/client.rs"

[dependencies]
anyhow = "1.0.79"
base64 = "0.21.7"
base64 = "0.20.0"
chrono = "0.4.33"
clap = { version = "4.4.18", features = ["derive"] }
ncurses = "5.101.0"
Expand Down
50 changes: 6 additions & 44 deletions src/client.rs
Original file line number Diff line number Diff line change
@@ -1,37 +1,20 @@
#![allow(dead_code)]
mod session;

use once_cell::sync::Lazy;
use serde::{Deserialize, Serialize};
use session::crypto::{
ChaCha20Poly1305EnDeCrypt, Cryptical, CrypticalID, PGPEnCryptOwned, PGPEnDeCrypt,
};
use session::messages::MessageData::Discovery;
use session::messages::{
ChatMsg, DiscoveryMsg, EncryptedMsg, InitMsg, MessageData, MessageListener, Messageble,
MessagebleTopicAsync, SessionMessage,
};
use session::messages::SessionMessage;
use session::protocol::*;
use session::Session;
use std::future::Future;
use std::pin::Pin;
use std::process::exit;
use std::thread;
use zenoh::prelude::r#async::*;
use zenoh::prelude::*;
use zenoh::Config;

use std::sync::Arc;
use std::time::Duration;
use tokio::signal;
use tokio::sync::{mpsc, Mutex};
use tokio::time::timeout;

use serde::{Deserialize, Serialize};

use std::env;
use std::io::{self, Write};
use std::pin::Pin;

use std::future::Future;

use once_cell::sync::Lazy;
use tokio::sync::mpsc;

mod util;

Expand Down Expand Up @@ -67,10 +50,6 @@ struct Cli {
#[arg(default_value = "false")]
test_receiver: bool,

#[clap(long)]
#[arg(default_value = "false")]
no_discovery: bool,

#[clap(long)]
#[arg(default_value = "false")]
test_sender: bool,
Expand Down Expand Up @@ -626,23 +605,13 @@ async fn launch_terminal_program(
tokio::spawn(async move {
let mut window_manager = WindowManager::new();

let (tx, rx) = mpsc::channel::<String>(50);

let pipe_clone = pipe0.clone();
let tx_clone = tx.clone();

let tx_clone = tx_clone.clone();
window_manager.serve(pipe_clone).await;
});
tokio::spawn(async move {
let mut window_manager = WindowManager::new();

let (tx, rx) = mpsc::channel::<String>(50);

let pipe_clone = pipe1.clone();
let tx_clone = tx.clone();

let tx_clone = tx_clone.clone();
window_manager.serve(pipe_clone).await;
});
let pipe0;
Expand Down Expand Up @@ -714,7 +683,6 @@ async fn main() {
let test_sender = cli.test_sender;
let test_receiver = cli.test_receiver;
let zenoh_config = cli.zenoh_config;
let no_discovery = cli.no_discovery;

let mut cert = None;

Expand Down Expand Up @@ -744,14 +712,8 @@ async fn main() {
let cert = Arc::new(cert.unwrap());

let pgp_handler = PGPEnDeCrypt::new(cert.clone(), &passphrase);
let pub_key_fingerprint = pgp_handler.get_public_key_fingerprint();
let pub_key_userid = pgp_handler.get_userid();
let pub_key_full = pgp_handler.get_public_key_as_base64();

let mut session = Session::new(pgp_handler, zenoh_config.clone());

let zenoh_config = Config::from_file(zenoh_config).unwrap();

if test_receiver {
println!("-- Testing initiailize session [receiver]");
session.serve_testing().await;
Expand Down
1 change: 0 additions & 1 deletion src/session/crypto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use chacha20poly1305::{
aead::{Aead, AeadCore, KeyInit, OsRng},
ChaCha20Poly1305,
};
use openpgp::cert::prelude::*;
use openpgp::policy::StandardPolicy as P;
use openpgp::Cert;

Expand Down
5 changes: 0 additions & 5 deletions src/session/middleware/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,11 @@ use crate::session::messages::{
};
use crate::session::Session;

use async_std::task;
use std::collections::HashMap;
use std::env;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::{mpsc, Mutex};
use zenoh::prelude::r#async::*;
use zenoh::prelude::*;

use futures::prelude::*;
use tokio::time::timeout;

pub struct ZMQHandler {
Expand Down
29 changes: 12 additions & 17 deletions src/session/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ where

impl Session<ChaCha20Poly1305EnDeCrypt, PGPEnDeCrypt> {
pub fn new(host_encro: PGPEnDeCrypt, middleware_config: String) -> Self {
let (tx, mut rx) = mpsc::channel(100);
let (tx, rx) = mpsc::channel(100);
Session {
sessions: Arc::new(Mutex::new(HashMap::new())),
discovered: Arc::new(Mutex::new(HashMap::new())),
Expand All @@ -160,7 +160,7 @@ impl Session<ChaCha20Poly1305EnDeCrypt, PGPEnDeCrypt> {
}

pub fn clone(&self) -> Self {
let (tx, mut rx) = mpsc::channel(100);
let (tx, rx) = mpsc::channel(100);
Self {
sessions: Arc::clone(&self.sessions),
discovered: Arc::clone(&self.discovered),
Expand Down Expand Up @@ -613,7 +613,7 @@ impl Session<ChaCha20Poly1305EnDeCrypt, PGPEnDeCrypt> {
let responder = ZenohHandler::new(zenoh_session_responder);
// Send discover message each minut
let mut session_discover = self.clone();
let mut keep_running_discover = self.running.clone();
let keep_running_discover = self.running.clone();
tokio::spawn(async move {
let mut seconds = 0;
let mut keep_running;
Expand All @@ -631,18 +631,16 @@ impl Session<ChaCha20Poly1305EnDeCrypt, PGPEnDeCrypt> {
}
}
});
let mut keep_running = self.running.clone();
let keep_running = self.running.clone();
while *keep_running.lock().await {
let mut received = self.rx.recv().await.expect("Error in session");
let received = self.rx.recv().await.expect("Error in session");
let topic = received.0;
let mut topic_response = topic.clone();
let mut topic_error = Topic::Errors.as_str().to_string();
topic_error.push_str("/");
topic_error.push_str(&pub_key);
let msg = match Message::deserialize(&received.1) {
let _msg = match Message::deserialize(&received.1) {
Ok(msg) => {
let session_id = msg.session_id.clone();
let msg_clone = msg.clone();
match self.handle_message(msg, &topic).await {
Ok(Some(res)) => {
// Do something
Expand All @@ -652,7 +650,7 @@ impl Session<ChaCha20Poly1305EnDeCrypt, PGPEnDeCrypt> {
*keep_running.lock().await = false;
continue;
}
let m = response.clone();
let _ = response.clone();
let _ = responder
.send_message(&topic_response, response.clone())
.await;
Expand All @@ -662,9 +660,9 @@ impl Session<ChaCha20Poly1305EnDeCrypt, PGPEnDeCrypt> {
//println!("errormessage {:?}", errormessage);
let response = Message {
message: MessageData::SessionError(errormessage),
session_id: session_id,
session_id,
};
let rs = response.to_string();
let _ = response.to_string();
let _ = responder.send_message(&topic_error, response).await;
}
}
Expand Down Expand Up @@ -695,10 +693,7 @@ impl Session<ChaCha20Poly1305EnDeCrypt, PGPEnDeCrypt> {
Arc::new(Mutex::new(zenoh::open(zenoh_config).res().await.unwrap()));
let responder = ZenohHandler::new(zenoh_session_responder);

let mut msg_count = 0;

while let Some(received) = self.rx.recv().await {
msg_count += 1;
let topic = received.0;
let mut topic_response = topic.clone();

Expand Down Expand Up @@ -727,7 +722,7 @@ impl Session<ChaCha20Poly1305EnDeCrypt, PGPEnDeCrypt> {
Err(_) => {}
};
{
let mut hm = self.sessions.lock().await;
let hm = self.sessions.lock().await;
if hm.len() > 0 {
thread::spawn(|| {
// Sleep for 3 seconds
Expand Down Expand Up @@ -855,7 +850,7 @@ impl Session<ChaCha20Poly1305EnDeCrypt, PGPEnDeCrypt> {
let mut response = Message::new_chat("Hello World".to_string());
response.session_id = message.session_id.clone();
let session_id = message.session_id.clone();
let msg_raw = message.to_string();
let _msg_raw = message.to_string();

match message.message {
Internal(msg) => {
Expand Down Expand Up @@ -1129,7 +1124,7 @@ impl Session<ChaCha20Poly1305EnDeCrypt, PGPEnDeCrypt> {
let other_key_fingerprint = cert.fingerprint().to_string();

for pending in pendings.iter() {
let mut pending_pub_key_fingerprint = pending.clone();
let pending_pub_key_fingerprint = pending.clone();
if other_key_fingerprint == pending_pub_key_fingerprint {
// Add this to the sessions to add
add_session = Some(msg.pub_key.clone())
Expand Down
2 changes: 1 addition & 1 deletion src/terminal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ impl WindowManager {
upper_prompt: &str,
wait_time_seconds: i32,
) -> Option<String> {
if let Some((win, subwin)) = self.windows.get(&window_number) {
if let Some((_win, subwin)) = self.windows.get(&window_number) {
wtimeout(*subwin, wait_time_seconds * 1000);
wrefresh(*subwin);

Expand Down

0 comments on commit a2edc00

Please sign in to comment.