Skip to content

Commit

Permalink
remove dependency on bytes crate for azalea-protocol and fix memory leak
Browse files Browse the repository at this point in the history
  • Loading branch information
mat-1 committed Dec 25, 2024
1 parent 0ee9ed5 commit 04eaa5c
Show file tree
Hide file tree
Showing 8 changed files with 74 additions and 59 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ bevy_log = "0.15.0"
bevy_tasks = "0.15.0"
bevy_time = "0.15.0"
byteorder = "1.5.0"
bytes = "1.9.0"
cfb8 = "0.8.1"
chrono = { version = "0.4.39", default-features = false }
criterion = "0.5.1"
Expand Down
12 changes: 6 additions & 6 deletions azalea-client/src/raw_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,12 @@ pub struct RawConnection {

#[derive(Clone)]
struct RawConnectionReader {
pub incoming_packet_queue: Arc<Mutex<Vec<Vec<u8>>>>,
pub incoming_packet_queue: Arc<Mutex<Vec<Box<[u8]>>>>,
pub run_schedule_sender: mpsc::UnboundedSender<()>,
}
#[derive(Clone)]
struct RawConnectionWriter {
pub outgoing_packets_sender: mpsc::UnboundedSender<Vec<u8>>,
pub outgoing_packets_sender: mpsc::UnboundedSender<Box<[u8]>>,
}

#[derive(Error, Debug)]
Expand All @@ -54,7 +54,7 @@ pub enum WritePacketError {
SendError {
#[from]
#[backtrace]
source: SendError<Vec<u8>>,
source: SendError<Box<[u8]>>,
},
}

Expand Down Expand Up @@ -93,7 +93,7 @@ impl RawConnection {
}
}

pub fn write_raw_packet(&self, raw_packet: Vec<u8>) -> Result<(), WritePacketError> {
pub fn write_raw_packet(&self, raw_packet: Box<[u8]>) -> Result<(), WritePacketError> {
self.writer.outgoing_packets_sender.send(raw_packet)?;
Ok(())
}
Expand All @@ -120,7 +120,7 @@ impl RawConnection {
!self.read_packets_task.is_finished()
}

pub fn incoming_packet_queue(&self) -> Arc<Mutex<Vec<Vec<u8>>>> {
pub fn incoming_packet_queue(&self) -> Arc<Mutex<Vec<Box<[u8]>>>> {
self.reader.incoming_packet_queue.clone()
}

Expand Down Expand Up @@ -161,7 +161,7 @@ impl RawConnectionWriter {
pub async fn write_task(
self,
mut write_conn: RawWriteConnection,
mut outgoing_packets_receiver: mpsc::UnboundedReceiver<Vec<u8>>,
mut outgoing_packets_receiver: mpsc::UnboundedReceiver<Box<[u8]>>,
) {
while let Some(raw_packet) = outgoing_packets_receiver.recv().await {
if let Err(err) = write_conn.write(&raw_packet).await {
Expand Down
5 changes: 2 additions & 3 deletions azalea-protocol/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,11 @@ azalea-protocol-macros = { path = "./azalea-protocol-macros", version = "0.11.0"
azalea-registry = { path = "../azalea-registry", version = "0.11.0" }
azalea-world = { path = "../azalea-world", version = "0.11.0" }
bevy_ecs = { workspace = true }
#byteorder = { workspace = true }
bytes = { workspace = true }
# byteorder = { workspace = true }
flate2 = { workspace = true }
futures = { workspace = true }
futures-lite = { workspace = true }
#futures-util = { workspace = true }
# futures-util = { workspace = true }
serde = { workspace = true, features = ["serde_derive"] }
serde_json = { workspace = true }
simdnbt = { workspace = true }
Expand Down
17 changes: 7 additions & 10 deletions azalea-protocol/src/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use std::net::SocketAddr;
use azalea_auth::game_profile::GameProfile;
use azalea_auth::sessionserver::{ClientSessionServerError, ServerSessionServerError};
use azalea_crypto::{Aes128CfbDec, Aes128CfbEnc};
use bytes::BytesMut;
use thiserror::Error;
use tokio::io::{AsyncWriteExt, BufStream};
use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf, ReuniteError};
Expand All @@ -28,7 +27,7 @@ use crate::write::{serialize_packet, write_raw_packet};

pub struct RawReadConnection {
pub read_stream: OwnedReadHalf,
pub buffer: BytesMut,
pub buffer: Cursor<Vec<u8>>,
pub compression_threshold: Option<u32>,
pub dec_cipher: Option<Aes128CfbDec>,
}
Expand Down Expand Up @@ -135,7 +134,7 @@ pub struct Connection<R: ProtocolPacket, W: ProtocolPacket> {
}

impl RawReadConnection {
pub async fn read(&mut self) -> Result<Vec<u8>, Box<ReadPacketError>> {
pub async fn read(&mut self) -> Result<Box<[u8]>, Box<ReadPacketError>> {
read_raw_packet::<_>(
&mut self.read_stream,
&mut self.buffer,
Expand All @@ -145,7 +144,7 @@ impl RawReadConnection {
.await
}

pub fn try_read(&mut self) -> Result<Option<Vec<u8>>, Box<ReadPacketError>> {
pub fn try_read(&mut self) -> Result<Option<Box<[u8]>>, Box<ReadPacketError>> {
try_read_raw_packet::<_>(
&mut self.read_stream,
&mut self.buffer,
Expand Down Expand Up @@ -190,7 +189,7 @@ where
/// Read a packet from the stream.
pub async fn read(&mut self) -> Result<R, Box<ReadPacketError>> {
let raw_packet = self.raw.read().await?;
deserialize_packet(&mut Cursor::new(raw_packet.as_slice()))
deserialize_packet(&mut Cursor::new(&raw_packet))
}

/// Try to read a packet from the stream, or return Ok(None) if there's no
Expand All @@ -199,9 +198,7 @@ where
let Some(raw_packet) = self.raw.try_read()? else {
return Ok(None);
};
Ok(Some(deserialize_packet(&mut Cursor::new(
raw_packet.as_slice(),
))?))
Ok(Some(deserialize_packet(&mut Cursor::new(&raw_packet))?))
}
}
impl<W> WriteConnection<W>
Expand Down Expand Up @@ -304,7 +301,7 @@ impl Connection<ClientboundHandshakePacket, ServerboundHandshakePacket> {
reader: ReadConnection {
raw: RawReadConnection {
read_stream,
buffer: BytesMut::new(),
buffer: Cursor::new(Vec::new()),
compression_threshold: None,
dec_cipher: None,
},
Expand Down Expand Up @@ -562,7 +559,7 @@ where
reader: ReadConnection {
raw: RawReadConnection {
read_stream,
buffer: BytesMut::new(),
buffer: Cursor::new(Vec::new()),
compression_threshold: None,
dec_cipher: None,
},
Expand Down
12 changes: 8 additions & 4 deletions azalea-protocol/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
//!
//! See [`crate::connect::Connection`] for an example.
// these two are necessary for thiserror backtraces
// this is necessary for thiserror backtraces
#![feature(error_generic_member_access)]

use std::{fmt::Display, net::SocketAddr, str::FromStr};
Expand Down Expand Up @@ -111,7 +111,6 @@ impl serde::Serialize for ServerAddress {
mod tests {
use std::io::Cursor;

use bytes::BytesMut;
use uuid::Uuid;

use crate::{
Expand All @@ -135,11 +134,16 @@ mod tests {
.await
.unwrap();

assert_eq!(
stream,
[22, 0, 4, 116, 101, 115, 116, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
);

let mut stream = Cursor::new(stream);

let _ = read_packet::<ServerboundLoginPacket, _>(
&mut stream,
&mut BytesMut::new(),
&mut Cursor::new(Vec::new()),
None,
&mut None,
)
Expand All @@ -163,7 +167,7 @@ mod tests {
.unwrap();
let mut stream = Cursor::new(stream);

let mut buffer = BytesMut::new();
let mut buffer = Cursor::new(Vec::new());

let _ = read_packet::<ServerboundLoginPacket, _>(&mut stream, &mut buffer, None, &mut None)
.await
Expand Down
Loading

0 comments on commit 04eaa5c

Please sign in to comment.