Skip to content

Commit

Permalink
more work on the replay front
Browse files Browse the repository at this point in the history
  • Loading branch information
Ricardicus committed Oct 8, 2024
1 parent cbb5d8d commit 225a106
Showing 1 changed file with 32 additions and 9 deletions.
41 changes: 32 additions & 9 deletions src/session/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use crypto::{
use futures::prelude::*;
use messages::MessageData::{
Chat, Close, Discovery, DiscoveryReply, Encrypted, Heartbeat, Init, InitAwait, InitDecline,
InitOk, Internal, Ping, Replay,
InitOk, Internal, Ping, Replay, ReplayResponse,
};
use messages::MessagingError::*;
use messages::SessionMessage as Message;
Expand Down Expand Up @@ -867,15 +867,14 @@ impl Session<ChaCha20Poly1305EnDeCrypt, PGPEnDeCrypt> {

if self.relay {
let messaging_topic = Topic::messaging_topic_in(identifier.as_ref());
let replay_topic = Topic::replay_topic(identifier.as_ref());
topics_to_subscribe.push(messaging_topic);
topics_to_subscribe.push(replay_topic);
} else {
topics_to_subscribe.push(discover_topic);
topics_to_subscribe.push(heartbeat_topic);
}

if self.relay {
println!("{:?}", topics_to_subscribe);
}
let tx_clone = self.tx.clone();
self.serve_topics(topics_to_subscribe, &tx_clone, false)
.await;
Expand Down Expand Up @@ -934,6 +933,7 @@ impl Session<ChaCha20Poly1305EnDeCrypt, PGPEnDeCrypt> {
Ok(Some(res)) => {
// Do something
let response = res.0;
let session_id = &response.session_id;
let topic_response = res.1;
if topic_response == "terminate" {
*keep_running.lock().await = false;
Expand Down Expand Up @@ -1017,6 +1017,20 @@ impl Session<ChaCha20Poly1305EnDeCrypt, PGPEnDeCrypt> {
Ok(())
}

pub async fn replay(
&mut self,
sender: Arc<Mutex<ZenohHandler>>,
session_id: &str,
) -> Result<(), MessagingError> {
let topic = Topic::replay_topic(session_id);
let msg = Message::new_replay(session_id.to_string());
{
let h = sender.lock().await;
h.send_message(&topic, msg).await?;
}
Ok(())
}

pub async fn send_and_receive_topic<
T: MessagebleTopicAsync + MessagebleTopicAsyncReadTimeout,
>(
Expand Down Expand Up @@ -1082,6 +1096,7 @@ impl Session<ChaCha20Poly1305EnDeCrypt, PGPEnDeCrypt> {
let session = self.sessions.lock().await;
session.len()
}

pub async fn get_session_ids(&self) -> Vec<String> {
let session = self.sessions.lock().await;
let mut ids = Vec::new();
Expand Down Expand Up @@ -1155,6 +1170,12 @@ impl Session<ChaCha20Poly1305EnDeCrypt, PGPEnDeCrypt> {
});
}
}
ReplayResponse(msg) => {
for message in msg.messages {
self.handle_message(message, topic, memory).await;
}
return Ok(None);
}
Replay(msg) => {
if !self.relay {
return Ok(None);
Expand Down Expand Up @@ -1427,8 +1448,8 @@ impl Session<ChaCha20Poly1305EnDeCrypt, PGPEnDeCrypt> {
let sym_key_encrypted = msg.sym_key_encrypted.clone();

if self.relay {
let mut pk1 = None;
let mut pk2 = None;
let pk1;
let pk2;

let other_pub_key = msg.pub_key.clone();
let pub_key_decoded = match base64::decode(other_pub_key) {
Expand Down Expand Up @@ -1572,12 +1593,14 @@ impl Session<ChaCha20Poly1305EnDeCrypt, PGPEnDeCrypt> {
if let Some(session_data) = self.sessions.lock().await.get(&session_id) {
let pub_key = session_data.pub_key.clone();
self.call_callbacks_chat(&pub_key, &msg.message).await;
Ok(None)
let msg = Message::new_replay(session_id.to_string());
let topic = Topic::replay_topic(&session_id);
return Ok(Some((msg, topic)));
} else {
Err(SessionErrorMsg {
return Err(SessionErrorMsg {
code: SessionErrorCodes::InvalidPublicKey as u32,
message: "Invalid public key".to_owned(),
})
});
}
}
Encrypted(msg) => {
Expand Down

0 comments on commit 225a106

Please sign in to comment.