Skip to content

Commit

Permalink
Make everything more unsafe
Browse files Browse the repository at this point in the history
  • Loading branch information
EvilCodeZ committed Dec 22, 2024
1 parent 950dd41 commit 8c8e08e
Show file tree
Hide file tree
Showing 8 changed files with 166 additions and 129 deletions.
2 changes: 2 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#![feature(arbitrary_self_types)]

use rustyline::{DefaultEditor, ExternalPrinter};
use std::io;
use std::io::Write;
Expand Down
31 changes: 11 additions & 20 deletions src/server/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use std::{
net::{IpAddr, SocketAddr},
ops::DerefMut,
pin::Pin,
sync::Arc,
};

use rand::RngCore;
Expand All @@ -19,22 +18,18 @@ use crate::{
encryption::{PacketDecryption, PacketEncryption},
packets::{EncryptionResponse, Kick, Packet},
},
util::{IOError, IOErrorKind, VarInt},
util::{IOError, IOErrorKind, VarInt, WeakHandle},
version::R1_20_2,
};

use self::packets::{LoginAcknowledged, LoginDisconnect, SetCompression};

use super::{
packet_handler::ServerPacketHandler,
packet_ids::{PacketRegistry, ServerPacketType},
packets::{
packet_handler::ServerPacketHandler, packet_ids::{PacketRegistry, ServerPacketType}, packets::{
self, encode_and_send_packet, read_and_decode_packet, CookieRequest, CookieResponse,
EncryptionRequest, Handshake, LoginPluginRequest, LoginPluginResponse, LoginRequest,
LoginSuccess, PlayerPublicKey, ProtocolState, PROTOCOL_STATE_LOGIN,
},
proxy_handler::{ClientHandle, ConnectionHandle, PacketSending},
PlayerSyncData, ProxyServer, SlotId,
}, proxy_handler::{ClientHandle, ConnectionHandle, PacketSending}, ProxiedPlayer, ProxyServer, SlotId
};

#[derive(Debug)]
Expand Down Expand Up @@ -74,7 +69,6 @@ impl EstablishedBackend {
self,
server_name: &str,
partner: ClientHandle,
sync_data: Arc<PlayerSyncData>,
) -> (GameProfile, ConnectionHandle) {
let player_name = self.profile.name.clone();
let Self {
Expand All @@ -87,7 +81,8 @@ impl EstablishedBackend {
} = self;
let synced_protocol_state = partner.connection.protocol_state.clone();
let (read, mut write) = stream.into_split();
let player_id = partner.player_id;
let player = partner.player.clone();
let version = partner.version;

let partner_handle = partner.connection.clone();
let (mut encryption, decryption) = match encryption {
Expand All @@ -96,6 +91,7 @@ impl EstablishedBackend {
};

let (handle_sender, handle_receiver) = tokio::sync::oneshot::channel::<ConnectionHandle>();
let player_ = player.clone();
let read_task = tokio::spawn(async move {
let self_handle = handle_receiver.await.unwrap();
let mut protocol_buf = Vec::new();
Expand Down Expand Up @@ -126,10 +122,9 @@ impl EstablishedBackend {
let res = ServerPacketHandler::handle_packet(
packet_id,
&read_buf[VarInt::get_size(packet_id)..],
sync_data.version,
player_id,
version,
&player_,
&self_handle,
&sync_data,
&partner.connection,
)
.await;
Expand Down Expand Up @@ -184,7 +179,6 @@ impl EstablishedBackend {
write_task.abort_handle(),
compression_threshold,
decryption,
None,
address,
);

Expand Down Expand Up @@ -212,7 +206,7 @@ impl EstablishedBackend {
for server_name in servers.get_priorities() {
let server = servers.get_server_id_by_name(&server_name);
if let Some(server) = server {
if switch_server_helper(player_id, server).await {
if switch_server_helper(player.clone(), server).await {
return;
}
}
Expand All @@ -237,15 +231,12 @@ impl EstablishedBackend {
}

fn switch_server_helper(
player_id: SlotId,
player: WeakHandle<ProxiedPlayer>,
server: SlotId,
) -> Pin<Box<dyn Future<Output=bool> + Send>> {
let block = async move {
let players = ProxyServer::instance().players().read().await;
let player = players.get(player_id);
if let Some(player) = player {
if let Some(player) = player.upgrade() {
let switched = player.switch_server(server).await;
drop(players);
if let Some(success) = switched {
let success = success.await;
if let Ok(success) = success {
Expand Down
4 changes: 2 additions & 2 deletions src/server/command/core_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ fn server_command(sender: &CommandSender, _name: &str, args: Vec<&str>) {
sender.send_message(TextBuilder::new("This command can only be executed by a player").style(Style::empty().with_color(TextColor::Red)));
return;
}
let player_id = sender.as_player().unwrap();
let player = sender.as_player().unwrap();
if args.is_empty() {
let servers = ProxyServer::instance().servers.blocking_read();
let mut first = true;
Expand Down Expand Up @@ -43,7 +43,7 @@ fn server_command(sender: &CommandSender, _name: &str, args: Vec<&str>) {
let server = servers.get_server_id_by_name(&server_name);
if let Some(server_id) = server {
drop(servers);
ProxyServer::instance().block_on(crate::server::packet_handler::switch_server_helper(player_id, server_id));
ProxyServer::instance().block_on(crate::server::packet_handler::switch_server_helper(player, server_id));
} else {
drop(servers);
sender.send_message(TextBuilder::new(format!("The server {} does not exist", server_name))
Expand Down
15 changes: 7 additions & 8 deletions src/server/command/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use std::collections::HashMap;

use crate::chat::Text;
use crate::{chat::Text, util::WeakHandle};

use super::{brigadier::Suggestions, ProxyServer, SlotId};
use super::{brigadier::Suggestions, ProxiedPlayer, ProxyServer};

pub(crate) mod core_impl;

Expand Down Expand Up @@ -183,7 +183,7 @@ impl CommandRegistry {

pub enum CommandSender {
Console,
Player(SlotId),
Player(WeakHandle<ProxiedPlayer>),
}

impl CommandSender {
Expand All @@ -202,9 +202,9 @@ impl CommandSender {
}
}

pub fn as_player(&self) -> Option<SlotId> {
pub fn as_player(&self) -> Option<WeakHandle<ProxiedPlayer>> {
match self {
CommandSender::Player(id) => Some(*id),
CommandSender::Player(p) => Some(p.clone()),
_ => None,
}
}
Expand All @@ -216,9 +216,8 @@ impl CommandSender {
pub async fn send_message_async(&self, message: Text) {
match self {
CommandSender::Console => log::info!("{}", message),
CommandSender::Player(id) => {
let players = ProxyServer::instance().players().read().await;
if let Some(player) = players.get(*id) {
CommandSender::Player(player) => {
if let Some(player) = player.upgrade() {
let _ = player.send_message(message).await;
}
},
Expand Down
89 changes: 36 additions & 53 deletions src/server/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{collections::HashMap, future::Future, io::Cursor, path::{Path, PathBuf}, sync::{atomic::Ordering, Arc}, time::{Duration, Instant}};
use std::{collections::HashMap, future::Future, io::Cursor, path::{Path, PathBuf}, sync::atomic::Ordering, time::{Duration, Instant}};

use base64::Engine;
use command::{CommandRegistry, CommandRegistryBuilder};
Expand All @@ -10,7 +10,7 @@ use serde::{Deserialize, Serialize};
use slotmap::{DefaultKey, SlotMap};
use tokio::{net::TcpListener, runtime::Runtime, sync::RwLock, task::JoinHandle};

use crate::{auth::GameProfile, chat::Text, util::IOResult};
use crate::{auth::GameProfile, chat::Text, util::{Handle, IOResult}};

pub(crate) mod backend;
pub(crate) mod brigadier;
Expand Down Expand Up @@ -155,7 +155,7 @@ pub struct ProxyServer {
servers: RwLock<ServerList>,
rsa_priv_key: RsaPrivateKey,
rsa_pub_key: RsaPublicKey,
players: RwLock<SlotMap<SlotId, ProxiedPlayer>>,
players: RwLock<SlotMap<SlotId, Handle<ProxiedPlayer>>>,
pub player_count: usize,
favicon: Option<String>,
}
Expand All @@ -176,7 +176,7 @@ impl ProxyServer {
&self.servers
}

pub fn players(&self) -> &RwLock<SlotMap<SlotId, ProxiedPlayer>> {
pub fn players(&self) -> &RwLock<SlotMap<SlotId, Handle<ProxiedPlayer>>> {
&self.players
}

Expand Down Expand Up @@ -367,7 +367,7 @@ pub struct ProxiedPlayer {
pub client_handle: ConnectionHandle,
pub server_handle: Option<ConnectionHandle>,
pub protocol_version: i32,
pub(crate) sync_data: Arc<PlayerSyncData>,
pub(crate) sync_data: PlayerSyncData,
}

impl ProxiedPlayer {
Expand All @@ -385,88 +385,78 @@ impl ProxiedPlayer {
Ok(())
}

pub async fn switch_server(&self, server_id: SlotId) -> Option<JoinHandle<bool>> {

pub async fn switch_server(mut self: Handle<Self>, server_id: SlotId) -> Option<JoinHandle<bool>> {
if self.client_handle.closed.load(Ordering::Relaxed) {
return None;
}

let sync_data = self.sync_data.clone();

if let Err(true) = sync_data.is_switching_server.compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed) {
if let Err(true) = self.sync_data.is_switching_server.compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed) {
return None;
}
let player_id = self.player_id;
let profile = self.profile.clone_without_properties();
let public_key = self.player_public_key.clone();
let version = self.protocol_version;
let handle = self.client_handle.clone();
let server_handle = self.server_handle.clone();
let join_handle = tokio::spawn(async move {

if handle.closed.load(Ordering::Relaxed) {
if self.client_handle.closed.load(Ordering::Relaxed) {
return false;
}

let (addr, server_name) = {
let server_list = ProxyServer::instance().servers().read().await;
let server = server_list.get_server(server_id);
if server.is_none() {
sync_data.is_switching_server.store(false, Ordering::Relaxed);
self.sync_data.is_switching_server.store(false, Ordering::Relaxed);
return false;
}
let server = server.unwrap();
(server.address.clone(), server.label.clone())
};

let username = profile.name.clone();
let backend = backend::connect(handle.address, addr, "127.0.0.1".to_string(), 25565, profile, public_key, version).await;
let username = self.profile.name.clone();
let backend = backend::connect(
self.client_handle.address, addr, "127.0.0.1".to_string(), 25565,
self.profile.clone(), self.player_public_key.clone(), version
).await;
if let Err(e) = backend {
log::error!("[{}] Failed to connect to backend: {}", username, e);
sync_data.is_switching_server.store(false, Ordering::Relaxed);
let players = ProxyServer::instance().players().read().await;
if let Some(player) = players.get(player_id) { // info player
player.send_message(Text::new(format!("§cCould not connect: {}", e))).await.ok();
}
drop(players);
self.sync_data.is_switching_server.store(false, Ordering::Relaxed);
let _ = self.send_message(Text::new(format!("§cCould not connect: {}", e))).await;
return false;
}
let backend = backend.unwrap();

if let ProtocolState::Game = handle.protocol_state() {

let _ = handle.drop_redundant(true).await;
if let Some(server_handle) = server_handle {
if let ProtocolState::Game = self.client_handle.protocol_state() {
let _ = self.client_handle.drop_redundant(true).await;
if let Some(ref server_handle) = self.server_handle {
server_handle.disconnect("client is switching servers").await;
server_handle.wait_for_disconnect().await;
}

let _ = handle.goto_config(version).await;
sync_data.config_ack_notify.notified().await;
let _ = self.client_handle.goto_config(version).await;
self.sync_data.config_ack_notify.notified().await;

let _ = handle.drop_redundant(false).await;
let _ = self.client_handle.drop_redundant(false).await;
} else {
log::warn!("Player {} is not in game state, cancelling server switch.", username);
sync_data.is_switching_server.store(false, Ordering::Relaxed);
self.sync_data.is_switching_server.store(false, Ordering::Relaxed);
return false;
}

if let Some(read_task) = handle.read_task.lock().await.take() {
if let Some(read_task) = self.client_handle.read_task.lock().await.take() {
read_task.abort();
}

let (profile, server_handle) = backend.begin_proxying(&server_name, ClientHandle {
player_id,
connection: handle.clone(),
}, sync_data.clone()).await;
player: self.downgrade(),
version,
connection: self.client_handle.clone(),
}).await;

let settings = sync_data.client_settings.lock().await;
let settings = self.sync_data.client_settings.lock().await;

if let Some(packet) = settings.as_ref() {
if let Some(data) = packets::get_full_client_packet_buf(packet, version, handle.protocol_state()).unwrap() {
if let Some(data) = packets::get_full_client_packet_buf(packet, version, self.client_handle.protocol_state()).unwrap() {
if let Err(_e) = server_handle.queue_packet(data, true).await {
drop(settings);
sync_data.is_switching_server.store(false, Ordering::Relaxed);
self.sync_data.is_switching_server.store(false, Ordering::Relaxed);
return false;
}
}
Expand All @@ -475,19 +465,12 @@ impl ProxiedPlayer {

let display_name = format!("[{} - {}]", username, server_name);

handle.spawn_read_task(false, display_name, server_handle.clone(), player_id, version).await;
self.client_handle.spawn_read_task(false, display_name, server_handle.clone(), self.downgrade(), version).await;

let mut players = ProxyServer::instance().players().write().await;
if let Some(player) = players.get_mut(player_id) {
player.current_server = server_id;
player.server_handle = Some(server_handle);
player.profile = profile;
drop(players);
} else {
drop(players);
server_handle.disconnect("player has disconnected").await;
}
sync_data.is_switching_server.store(false, Ordering::Relaxed);
self.current_server = server_id;
self.server_handle = Some(server_handle);
self.profile = profile;
self.sync_data.is_switching_server.store(false, Ordering::Relaxed);
true
});
Some(join_handle)
Expand Down
Loading

0 comments on commit 8c8e08e

Please sign in to comment.