From 225a106c7e0959ff47d5cc1bbb5f764acbab390a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rickard=20Hallerb=C3=A4ck?= Date: Tue, 8 Oct 2024 21:10:24 +0200 Subject: [PATCH] more work on the replay front --- src/session/mod.rs | 41 ++++++++++++++++++++++++++++++++--------- 1 file changed, 32 insertions(+), 9 deletions(-) diff --git a/src/session/mod.rs b/src/session/mod.rs index d58822f..77d5784 100644 --- a/src/session/mod.rs +++ b/src/session/mod.rs @@ -20,7 +20,7 @@ use crypto::{ use futures::prelude::*; use messages::MessageData::{ Chat, Close, Discovery, DiscoveryReply, Encrypted, Heartbeat, Init, InitAwait, InitDecline, - InitOk, Internal, Ping, Replay, + InitOk, Internal, Ping, Replay, ReplayResponse, }; use messages::MessagingError::*; use messages::SessionMessage as Message; @@ -867,15 +867,14 @@ impl Session { if self.relay { let messaging_topic = Topic::messaging_topic_in(identifier.as_ref()); + let replay_topic = Topic::replay_topic(identifier.as_ref()); topics_to_subscribe.push(messaging_topic); + topics_to_subscribe.push(replay_topic); } else { topics_to_subscribe.push(discover_topic); topics_to_subscribe.push(heartbeat_topic); } - if self.relay { - println!("{:?}", topics_to_subscribe); - } let tx_clone = self.tx.clone(); self.serve_topics(topics_to_subscribe, &tx_clone, false) .await; @@ -934,6 +933,7 @@ impl Session { Ok(Some(res)) => { // Do something let response = res.0; + let session_id = &response.session_id; let topic_response = res.1; if topic_response == "terminate" { *keep_running.lock().await = false; @@ -1017,6 +1017,20 @@ impl Session { Ok(()) } + pub async fn replay( + &mut self, + sender: Arc>, + session_id: &str, + ) -> Result<(), MessagingError> { + let topic = Topic::replay_topic(session_id); + let msg = Message::new_replay(session_id.to_string()); + { + let h = sender.lock().await; + h.send_message(&topic, msg).await?; + } + Ok(()) + } + pub async fn send_and_receive_topic< T: MessagebleTopicAsync + MessagebleTopicAsyncReadTimeout, >( @@ -1082,6 +1096,7 @@ impl Session { let session = self.sessions.lock().await; session.len() } + pub async fn get_session_ids(&self) -> Vec { let session = self.sessions.lock().await; let mut ids = Vec::new(); @@ -1155,6 +1170,12 @@ impl Session { }); } } + ReplayResponse(msg) => { + for message in msg.messages { + self.handle_message(message, topic, memory).await; + } + return Ok(None); + } Replay(msg) => { if !self.relay { return Ok(None); @@ -1427,8 +1448,8 @@ impl Session { let sym_key_encrypted = msg.sym_key_encrypted.clone(); if self.relay { - let mut pk1 = None; - let mut pk2 = None; + let pk1; + let pk2; let other_pub_key = msg.pub_key.clone(); let pub_key_decoded = match base64::decode(other_pub_key) { @@ -1572,12 +1593,14 @@ impl Session { if let Some(session_data) = self.sessions.lock().await.get(&session_id) { let pub_key = session_data.pub_key.clone(); self.call_callbacks_chat(&pub_key, &msg.message).await; - Ok(None) + let msg = Message::new_replay(session_id.to_string()); + let topic = Topic::replay_topic(&session_id); + return Ok(Some((msg, topic))); } else { - Err(SessionErrorMsg { + return Err(SessionErrorMsg { code: SessionErrorCodes::InvalidPublicKey as u32, message: "Invalid public key".to_owned(), - }) + }); } } Encrypted(msg) => {