From 6e35676a262dc77fad87782458da5a89774560dd Mon Sep 17 00:00:00 2001 From: tW4r Date: Thu, 9 Feb 2023 16:56:56 +0200 Subject: [PATCH 1/8] Add a very basic support for multi-server handling. * Added a new cli param `bind` ("127.0.0.1:25565" by default) for lazymc global binding * Routing now directs all traffic to status service rather than proxying it if server has stared as it is server dependent * Removed all ban checking from routing as it is server dependent * Status server now chooses config/server based on the handshake * Status server is now responsible for strating to proxy if server is started * BUG: After server has started the server status is not shown somewhy --- src/action/start.rs | 22 +++-- src/cli.rs | 8 ++ src/config.rs | 42 +++++++-- src/proxy.rs | 9 -- src/service/server.rs | 122 ++++++++++---------------- src/status.rs | 199 +++++++++++++++++++++++------------------- 6 files changed, 211 insertions(+), 191 deletions(-) diff --git a/src/action/start.rs b/src/action/start.rs index aa9326a..d3ad2b0 100644 --- a/src/action/start.rs +++ b/src/action/start.rs @@ -1,4 +1,5 @@ use std::collections::HashMap; +use std::net::SocketAddr; use std::sync::Arc; use clap::ArgMatches; @@ -14,20 +15,25 @@ const RCON_PASSWORD_LENGTH: usize = 32; /// Start lazymc. pub fn invoke(matches: &ArgMatches) -> Result<(), ()> { + // Parse bind address + let bind_addr: SocketAddr = matches.get_one::("bind").unwrap().parse().unwrap(); + // Load config #[allow(unused_mut)] - let mut config = config::load(matches); + let mut configs = config::load(matches); - // Prepare RCON if enabled - #[cfg(feature = "rcon")] - prepare_rcon(&mut config); + for config in configs.iter_mut() { + // Prepare RCON if enabled + #[cfg(feature = "rcon")] + prepare_rcon(config); - // Rewrite server server.properties file - rewrite_server_properties(&config); + // Rewrite server server.properties file + rewrite_server_properties(&config); + } // Start server service - let config = Arc::new(config); - service::server::service(config) + let configs_arc = configs.into_iter().map(|config| Arc::new(config)).collect(); + service::server::service(bind_addr, configs_arc) } /// Prepare RCON. diff --git a/src/cli.rs b/src/cli.rs index 8dcc0c5..4b7696c 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -24,6 +24,14 @@ pub fn app() -> Command { ) .subcommand(Command::new("test").about("Test config")), ) + .arg( + Arg::new("bind") + .short('b') + .value_name("ADDRESS") + .default_value("0.0.0.0:25565") + .help("Address to bind to") + .num_args(1), + ) .arg( Arg::new("config") .short('c') diff --git a/src/config.rs b/src/config.rs index 150a932..0b9793d 100644 --- a/src/config.rs +++ b/src/config.rs @@ -17,21 +17,38 @@ pub const CONFIG_FILE: &str = "lazymc.toml"; /// Configuration version user should be using, or warning will be shown. const CONFIG_VERSION: &str = "0.2.8"; -/// Load config from file, based on CLI arguments. -/// -/// Quits with an error message on failure. -pub fn load(matches: &ArgMatches) -> Config { +pub fn load(matches: &ArgMatches) -> Vec { // Get config path, attempt to canonicalize let mut path = PathBuf::from(matches.get_one::("config").unwrap()); if let Ok(p) = path.canonicalize() { path = p; } - // Ensure configuration file exists - if !path.is_file() { + let paths: Vec = if path.is_dir() { + path.read_dir() + .unwrap() + .filter_map(|entry| { + entry.ok().and_then(|entry| { + let path = entry.path(); + if path.is_file() { + Some(path) + } else { + None + } + }) + }) + .collect() + } else if path.is_file() { + vec![path.clone()] + } else { + vec![] + }; + + // Ensure configuration file/directory exists + if paths.len() == 0 { quit_error_msg( format!( - "Config file does not exist: {}", + "Config file/directory does not exist: {}", path.to_str().unwrap_or("?") ), ErrorHintsBuilder::default() @@ -42,6 +59,13 @@ pub fn load(matches: &ArgMatches) -> Config { ); } + paths.into_iter().map(|path| load_file(path)).collect() +} + +/// Load config from file, based on CLI arguments. +/// +/// Quits with an error message on failure. +pub fn load_file(path: PathBuf) -> Config { // Load config let config = match Config::load(path) { Ok(config) => config, @@ -135,8 +159,8 @@ impl Config { #[serde(default)] pub struct Public { /// Public address. - #[serde(deserialize_with = "to_socket_addrs")] - pub address: SocketAddr, + // #[serde(deserialize_with = "to_socket_addrs")] + pub address: String, // SocketAddr, /// Minecraft protocol version name hint. pub version: String, diff --git a/src/proxy.rs b/src/proxy.rs index af6bbb8..d68aa67 100644 --- a/src/proxy.rs +++ b/src/proxy.rs @@ -10,15 +10,6 @@ use tokio::net::TcpStream; use crate::net; -/// Proxy the inbound stream to a target address. -pub async fn proxy( - inbound: TcpStream, - proxy_header: ProxyHeader, - addr_target: SocketAddr, -) -> Result<(), Box> { - proxy_with_queue(inbound, proxy_header, addr_target, &[]).await -} - /// Proxy the inbound stream to a target address. /// /// Send the queue to the target server before proxying. diff --git a/src/service/server.rs b/src/service/server.rs index 62c0cd7..ba8163b 100644 --- a/src/service/server.rs +++ b/src/service/server.rs @@ -1,3 +1,4 @@ +use std::collections::HashMap; use std::net::SocketAddr; use std::sync::Arc; @@ -8,7 +9,7 @@ use tokio::net::{TcpListener, TcpStream}; use crate::config::Config; use crate::proto::client::Client; use crate::proxy::{self, ProxyHeader}; -use crate::server::{self, Server}; +use crate::server::Server; use crate::service; use crate::status; use crate::util::error::{quit_error, ErrorHints}; @@ -19,52 +20,60 @@ use crate::util::error::{quit_error, ErrorHints}; /// /// Spawns a tokio runtime to complete all work on. #[tokio::main(flavor = "multi_thread")] -pub async fn service(config: Arc) -> Result<(), ()> { - // Load server state - let server = Arc::new(Server::default()); - +pub async fn service(bind_addr: SocketAddr, configs: Vec>) -> Result<(), ()> { // Listen for new connections - let listener = TcpListener::bind(config.public.address) - .await - .map_err(|err| { - quit_error( - anyhow!(err).context("Failed to start proxy server"), - ErrorHints::default(), - ); - })?; + let listener = TcpListener::bind(bind_addr).await.map_err(|err| { + quit_error( + anyhow!(err).context("Failed to start proxy server"), + ErrorHints::default(), + ); + })?; - info!( - target: "lazymc", - "Proxying public {} to server {}", - config.public.address, config.server.address, - ); + let mut servers = HashMap::new(); - if config.lockout.enabled { - warn!( + for config in configs.iter() { + // Load server state + let server = Arc::new(Server::default()); + servers.insert( + config.public.address.clone(), + (config.clone(), server.clone()), + ); + + info!( target: "lazymc", - "Lockout mode is enabled, nobody will be able to connect through the proxy", + "Proxying public {} to server {}", + config.public.address, config.server.address, ); - } - // Spawn services: monitor, signal handler - tokio::spawn(service::monitor::service(config.clone(), server.clone())); - tokio::spawn(service::signal::service(config.clone(), server.clone())); + if config.lockout.enabled { + warn!( + target: "lazymc", + "Lockout mode is enabled, nobody will be able to connect through the proxy", + ); + } + + // Spawn services: monitor, signal handler + tokio::spawn(service::monitor::service(config.clone(), server.clone())); + tokio::spawn(service::signal::service(config.clone(), server.clone())); + + // Initiate server start + if config.server.wake_on_start { + Server::start(config.clone(), server.clone(), None).await; + } - // Initiate server start - if config.server.wake_on_start { - Server::start(config.clone(), server.clone(), None).await; + // Spawn additional services: probe and ban manager + tokio::spawn(service::probe::service(config.clone(), server.clone())); + tokio::task::spawn_blocking({ + let (config, server) = (config.clone(), server.clone()); + || service::file_watcher::service(config, server) + }); } - // Spawn additional services: probe and ban manager - tokio::spawn(service::probe::service(config.clone(), server.clone())); - tokio::task::spawn_blocking({ - let (config, server) = (config.clone(), server.clone()); - || service::file_watcher::service(config, server) - }); + let servers_arc = Arc::new(servers); // Route all incomming connections while let Ok((inbound, _)) = listener.accept().await { - route(inbound, config.clone(), server.clone()); + route(inbound, servers_arc.clone()); } Ok(()) @@ -72,7 +81,7 @@ pub async fn service(config: Arc) -> Result<(), ()> { /// Route inbound TCP stream to correct service, spawning a new task. #[inline] -fn route(inbound: TcpStream, config: Arc, server: Arc) { +fn route(inbound: TcpStream, servers: Arc, Arc)>>) { // Get user peer address let peer = match inbound.peer_addr() { Ok(peer) => peer, @@ -82,29 +91,8 @@ fn route(inbound: TcpStream, config: Arc, server: Arc) { } }; - // Check ban state, just drop connection if enabled - let banned = server.is_banned_ip_blocking(&peer.ip()); - if config.server.drop_banned_ips { - info!(target: "lazymc", "Connection from banned IP {}, dropping", peer.ip()); - return; - } - - // Route connection through proper channel - let should_proxy = - !banned && server.state() == server::State::Started && !config.lockout.enabled; - if should_proxy { - route_proxy(inbound, config) - } else { - route_status(inbound, config, server, peer) - } -} - -/// Route inbound TCP stream to status server, spawning a new task. -#[inline] -fn route_status(inbound: TcpStream, config: Arc, server: Arc, peer: SocketAddr) { - // When server is not online, spawn a status server let client = Client::new(peer); - let service = status::serve(client, inbound, config, server).map(|r| { + let service = status::serve(client, inbound, servers).map(|r| { if let Err(err) = r { warn!(target: "lazymc", "Failed to serve status: {:?}", err); } @@ -113,24 +101,6 @@ fn route_status(inbound: TcpStream, config: Arc, server: Arc, pe tokio::spawn(service); } -/// Route inbound TCP stream to proxy, spawning a new task. -#[inline] -fn route_proxy(inbound: TcpStream, config: Arc) { - // When server is online, proxy all - let service = proxy::proxy( - inbound, - ProxyHeader::Proxy.not_none(config.server.send_proxy_v2), - config.server.address, - ) - .map(|r| { - if let Err(err) = r { - warn!(target: "lazymc", "Failed to proxy: {}", err); - } - }); - - tokio::spawn(service); -} - /// Route inbound TCP stream to proxy with queued data, spawning a new task. #[inline] pub fn route_proxy_queue(inbound: TcpStream, config: Arc, queue: BytesMut) { diff --git a/src/status.rs b/src/status.rs index 6058642..dfc4ce0 100644 --- a/src/status.rs +++ b/src/status.rs @@ -1,3 +1,4 @@ +use std::collections::HashMap; use std::sync::Arc; use bytes::BytesMut; @@ -38,8 +39,7 @@ const SERVER_ICON_FILE: &str = "server-icon.png"; pub async fn serve( client: Client, mut inbound: TcpStream, - config: Arc, - server: Arc, + servers: Arc, Arc)>>, ) -> Result<(), ()> { let (mut reader, mut writer) = inbound.split(); @@ -101,103 +101,124 @@ pub async fn serve( continue; } - // Hijack server status packet - if client_state == ClientState::Status && packet.id == packets::status::SERVER_STATUS { - let server_status = server_status(&client_info, &config, &server).await; - let packet = StatusResponse { server_status }; - - let mut data = Vec::new(); - packet.encode(&mut data).map_err(|_| ())?; - - let response = RawPacket::new(0, data).encode_with_len(&client)?; - writer.write_all(&response).await.map_err(|_| ())?; - - continue; - } - // Hijack ping packet - if client_state == ClientState::Status && packet.id == packets::status::SERVER_PING { + if client_state == ClientState::Status && packet.id == packets::status::SERVER_PING + { writer.write_all(&raw).await.map_err(|_| ())?; continue; } - // Hijack login start - if client_state == ClientState::Login && packet.id == packets::login::SERVER_LOGIN_START { - // Try to get login username, update client info - // TODO: we should always parse this packet successfully - let username = LoginStart::decode(&mut packet.data.as_slice()) - .ok() - .map(|p| p.name); - client_info.username = username.clone(); - - // Kick if lockout is enabled - if config.lockout.enabled { - match username { - Some(username) => { - info!(target: "lazymc", "Kicked '{}' because lockout is enabled", username) - } - None => info!(target: "lazymc", "Kicked player because lockout is enabled"), - } - action::kick(&client, &config.lockout.message, &mut writer).await?; - break; - } + // Grab the server and config + if let Some(handshake) = client_info.handshake.as_ref() { + if let Some((config, server)) = servers.get(&handshake.server_addr) { + // Hijack server status packet + if client_state == ClientState::Status + && packet.id == packets::status::SERVER_STATUS + { + let server_status = server_status(&client_info, &config, &server).await; + let packet = StatusResponse { server_status }; - // Kick if client is banned - if let Some(ban) = server.ban_entry(&client.peer.ip()).await { - if ban.is_banned() { - let msg = if let Some(reason) = ban.reason { - info!(target: "lazymc", "Login from banned IP {} ({}), disconnecting", client.peer.ip(), &reason); - reason.to_string() - } else { - info!(target: "lazymc", "Login from banned IP {}, disconnecting", client.peer.ip()); - DEFAULT_BAN_REASON.to_string() - }; - action::kick( - &client, - &format!("{}{}", BAN_MESSAGE_PREFIX, msg), - &mut writer, - ) - .await?; - break; + let mut data = Vec::new(); + packet.encode(&mut data).map_err(|_| ())?; + + let response = RawPacket::new(0, data).encode_with_len(&client)?; + writer.write_all(&response).await.map_err(|_| ())?; + + continue; } - } - // Kick if client is not whitelisted to wake server - if let Some(ref username) = username { - if !server.is_whitelisted(username).await { - info!(target: "lazymc", "User '{}' tried to wake server but is not whitelisted, disconnecting", username); - action::kick(&client, WHITELIST_MESSAGE, &mut writer).await?; - break; + // Hijack login start + if client_state == ClientState::Login + && packet.id == packets::login::SERVER_LOGIN_START + { + // Try to get login username, update client info + // TODO: we should always parse this packet successfully + let username = LoginStart::decode(&mut packet.data.as_slice()) + .ok() + .map(|p| p.name); + client_info.username = username.clone(); + + // Kick if lockout is enabled + if config.lockout.enabled { + match username { + Some(username) => { + info!(target: "lazymc", "Kicked '{}' because lockout is enabled", username) + } + None => { + info!(target: "lazymc", "Kicked player because lockout is enabled") + } + } + action::kick(&client, &config.lockout.message, &mut writer).await?; + break; + } + + // Kick if client is banned + if let Some(ban) = server.ban_entry(&client.peer.ip()).await { + if ban.is_banned() { + let msg = if let Some(reason) = ban.reason { + info!(target: "lazymc", "Login from banned IP {} ({}), disconnecting", client.peer.ip(), &reason); + reason.to_string() + } else { + info!(target: "lazymc", "Login from banned IP {}, disconnecting", client.peer.ip()); + DEFAULT_BAN_REASON.to_string() + }; + action::kick( + &client, + &format!("{}{}", BAN_MESSAGE_PREFIX, msg), + &mut writer, + ) + .await?; + break; + } + } + + // Kick if client is not whitelisted to wake server + if let Some(ref username) = username { + if !server.is_whitelisted(username).await { + info!(target: "lazymc", "User '{}' tried to wake server but is not whitelisted, disconnecting", username); + action::kick(&client, WHITELIST_MESSAGE, &mut writer).await?; + break; + } + } + + // Start server if not starting yet + Server::start(config.clone(), server.clone(), username).await; + + // Remember inbound packets + inbound_history.extend(&raw); + inbound_history.extend(&buf); + + // Build inbound packet queue with everything from login start (including this) + let mut login_queue = BytesMut::with_capacity(raw.len() + buf.len()); + login_queue.extend(&raw); + login_queue.extend(&buf); + + // Buf is fully consumed here + buf.clear(); + + // Route connection through proper channel + if server.state() == server::State::Started { + crate::service::server::route_proxy_queue( + inbound, + config.clone(), + inbound_history.clone(), + ); + } else { + // Start occupying client + join::occupy( + client, + client_info, + config.clone(), + server.clone(), + inbound, + inbound_history, + login_queue, + ) + .await?; + } + return Ok(()); } } - - // Start server if not starting yet - Server::start(config.clone(), server.clone(), username).await; - - // Remember inbound packets - inbound_history.extend(&raw); - inbound_history.extend(&buf); - - // Build inbound packet queue with everything from login start (including this) - let mut login_queue = BytesMut::with_capacity(raw.len() + buf.len()); - login_queue.extend(&raw); - login_queue.extend(&buf); - - // Buf is fully consumed here - buf.clear(); - - // Start occupying client - join::occupy( - client, - client_info, - config, - server, - inbound, - inbound_history, - login_queue, - ) - .await?; - return Ok(()); } // Show unhandled packet warning From 4d90011963d552230016cefddc78685bcb0f0ee6 Mon Sep 17 00:00:00 2001 From: timvisee Date: Fri, 10 Feb 2023 11:21:53 +0100 Subject: [PATCH 2/8] Resolve clippy warnings --- src/action/start.rs | 4 ++-- src/config.rs | 4 ++-- src/status.rs | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/action/start.rs b/src/action/start.rs index d3ad2b0..d8dcfba 100644 --- a/src/action/start.rs +++ b/src/action/start.rs @@ -28,11 +28,11 @@ pub fn invoke(matches: &ArgMatches) -> Result<(), ()> { prepare_rcon(config); // Rewrite server server.properties file - rewrite_server_properties(&config); + rewrite_server_properties(config); } // Start server service - let configs_arc = configs.into_iter().map(|config| Arc::new(config)).collect(); + let configs_arc = configs.into_iter().map(Arc::new).collect(); service::server::service(bind_addr, configs_arc) } diff --git a/src/config.rs b/src/config.rs index 0b9793d..8ca1598 100644 --- a/src/config.rs +++ b/src/config.rs @@ -45,7 +45,7 @@ pub fn load(matches: &ArgMatches) -> Vec { }; // Ensure configuration file/directory exists - if paths.len() == 0 { + if paths.is_empty() { quit_error_msg( format!( "Config file/directory does not exist: {}", @@ -59,7 +59,7 @@ pub fn load(matches: &ArgMatches) -> Vec { ); } - paths.into_iter().map(|path| load_file(path)).collect() + paths.into_iter().map(load_file).collect() } /// Load config from file, based on CLI arguments. diff --git a/src/status.rs b/src/status.rs index aea3a0f..4394b7a 100644 --- a/src/status.rs +++ b/src/status.rs @@ -114,7 +114,7 @@ pub async fn serve( if client_state == ClientState::Status && packet.id == packets::status::SERVER_STATUS { - let server_status = server_status(&client_info, &config, &server).await; + let server_status = server_status(&client_info, config, server).await; let packet = StatusResponse { server_status }; let mut data = Vec::new(); From 2a55c92ef8c0465602a26f5cdb78c6ae66268041 Mon Sep 17 00:00:00 2001 From: tW4r Date: Fri, 10 Feb 2023 15:01:47 +0200 Subject: [PATCH 3/8] Implement suggested syntax changes --- src/config.rs | 23 ++---- src/status.rs | 193 ++++++++++++++++++++++++-------------------------- 2 files changed, 101 insertions(+), 115 deletions(-) diff --git a/src/config.rs b/src/config.rs index 8ca1598..c7a9caa 100644 --- a/src/config.rs +++ b/src/config.rs @@ -27,25 +27,16 @@ pub fn load(matches: &ArgMatches) -> Vec { let paths: Vec = if path.is_dir() { path.read_dir() .unwrap() - .filter_map(|entry| { - entry.ok().and_then(|entry| { - let path = entry.path(); - if path.is_file() { - Some(path) - } else { - None - } - }) - }) - .collect() - } else if path.is_file() { - vec![path.clone()] + .filter_map(|entry| entry.ok()) + .map(|entry| entry.path()).collect() } else { - vec![] + vec![path.clone()] }; + let configs: Vec = paths.into_iter().filter(|path| path.is_file()).map(load_file).collect(); + // Ensure configuration file/directory exists - if paths.is_empty() { + if configs.is_empty() { quit_error_msg( format!( "Config file/directory does not exist: {}", @@ -59,7 +50,7 @@ pub fn load(matches: &ArgMatches) -> Vec { ); } - paths.into_iter().map(load_file).collect() + configs } /// Load config from file, based on CLI arguments. diff --git a/src/status.rs b/src/status.rs index 4394b7a..53713c0 100644 --- a/src/status.rs +++ b/src/status.rs @@ -108,115 +108,110 @@ pub async fn serve( } // Grab the server and config - if let Some(handshake) = client_info.handshake.as_ref() { - if let Some((config, server)) = servers.get(&handshake.server_addr) { - // Hijack server status packet - if client_state == ClientState::Status - && packet.id == packets::status::SERVER_STATUS - { - let server_status = server_status(&client_info, config, server).await; - let packet = StatusResponse { server_status }; - - let mut data = Vec::new(); - packet.encode(&mut data).map_err(|_| ())?; - - let response = RawPacket::new(0, data).encode_with_len(&client)?; - writer.write_all(&response).await.map_err(|_| ())?; - - continue; - } + if let Some((config, server)) = client_info + .handshake + .as_ref() + .and_then(|handshake| servers.get(&handshake.server_addr)) + { + // Hijack server status packet + if client_state == ClientState::Status && packet.id == packets::status::SERVER_STATUS { + let server_status = server_status(&client_info, config, server).await; + let packet = StatusResponse { server_status }; + + let mut data = Vec::new(); + packet.encode(&mut data).map_err(|_| ())?; - // Hijack login start - if client_state == ClientState::Login - && packet.id == packets::login::SERVER_LOGIN_START - { - // Try to get login username, update client info - // TODO: we should always parse this packet successfully - let username = LoginStart::decode(&mut packet.data.as_slice()) - .ok() - .map(|p| p.name); - client_info.username = username.clone(); - - // Kick if lockout is enabled - if config.lockout.enabled { - match username { - Some(username) => { - info!(target: "lazymc", "Kicked '{}' because lockout is enabled", username) - } - None => { - info!(target: "lazymc", "Kicked player because lockout is enabled") - } + let response = RawPacket::new(0, data).encode_with_len(&client)?; + writer.write_all(&response).await.map_err(|_| ())?; + + continue; + } + + // Hijack login start + if client_state == ClientState::Login && packet.id == packets::login::SERVER_LOGIN_START + { + // Try to get login username, update client info + // TODO: we should always parse this packet successfully + let username = LoginStart::decode(&mut packet.data.as_slice()) + .ok() + .map(|p| p.name); + client_info.username = username.clone(); + + // Kick if lockout is enabled + if config.lockout.enabled { + match username { + Some(username) => { + info!(target: "lazymc", "Kicked '{}' because lockout is enabled", username) + } + None => { + info!(target: "lazymc", "Kicked player because lockout is enabled") } - action::kick(&client, &config.lockout.message, &mut writer).await?; - break; } + action::kick(&client, &config.lockout.message, &mut writer).await?; + break; + } - // Kick if client is banned - if let Some(ban) = server.ban_entry(&client.peer.ip()).await { - if ban.is_banned() { - let msg = if let Some(reason) = ban.reason { - info!(target: "lazymc", "Login from banned IP {} ({}), disconnecting", client.peer.ip(), &reason); - reason.to_string() - } else { - info!(target: "lazymc", "Login from banned IP {}, disconnecting", client.peer.ip()); - DEFAULT_BAN_REASON.to_string() - }; - action::kick( - &client, - &format!("{BAN_MESSAGE_PREFIX}{msg}"), - &mut writer, - ) + // Kick if client is banned + if let Some(ban) = server.ban_entry(&client.peer.ip()).await { + if ban.is_banned() { + let msg = if let Some(reason) = ban.reason { + info!(target: "lazymc", "Login from banned IP {} ({}), disconnecting", client.peer.ip(), &reason); + reason.to_string() + } else { + info!(target: "lazymc", "Login from banned IP {}, disconnecting", client.peer.ip()); + DEFAULT_BAN_REASON.to_string() + }; + action::kick(&client, &format!("{BAN_MESSAGE_PREFIX}{msg}"), &mut writer) .await?; - break; - } + break; } + } - // Kick if client is not whitelisted to wake server - if let Some(ref username) = username { - if !server.is_whitelisted(username).await { - info!(target: "lazymc", "User '{}' tried to wake server but is not whitelisted, disconnecting", username); - action::kick(&client, WHITELIST_MESSAGE, &mut writer).await?; - break; - } + // Kick if client is not whitelisted to wake server + if let Some(ref username) = username { + if !server.is_whitelisted(username).await { + info!(target: "lazymc", "User '{}' tried to wake server but is not whitelisted, disconnecting", username); + action::kick(&client, WHITELIST_MESSAGE, &mut writer).await?; + break; } + } - // Start server if not starting yet - Server::start(config.clone(), server.clone(), username).await; - - // Remember inbound packets - inbound_history.extend(&raw); - inbound_history.extend(&buf); - - // Build inbound packet queue with everything from login start (including this) - let mut login_queue = BytesMut::with_capacity(raw.len() + buf.len()); - login_queue.extend(&raw); - login_queue.extend(&buf); - - // Buf is fully consumed here - buf.clear(); - - // Route connection through proper channel - if server.state() == server::State::Started { - crate::service::server::route_proxy_queue( - inbound, - config.clone(), - inbound_history.clone(), - ); - } else { - // Start occupying client - join::occupy( - client, - client_info, - config.clone(), - server.clone(), - inbound, - inbound_history, - login_queue, - ) - .await?; - } - return Ok(()); + // Start server if not starting yet + Server::start(config.clone(), server.clone(), username).await; + + // Remember inbound packets + inbound_history.extend(&raw); + inbound_history.extend(&buf); + + // Build inbound packet queue with everything from login start (including this) + let mut login_queue = BytesMut::with_capacity(raw.len() + buf.len()); + login_queue.extend(&raw); + login_queue.extend(&buf); + + // Buf is fully consumed here + buf.clear(); + + // Route connection through proper channel + if server.state() == server::State::Started { + crate::service::server::route_proxy_queue( + inbound, + config.clone(), + inbound_history.clone(), + ); + } else { + // Start occupying client + join::occupy( + client, + client_info, + config.clone(), + server.clone(), + inbound, + inbound_history, + login_queue, + ) + .await?; } + return Ok(()); } } From 487c190e84335c281a20d68429287ed4da79a606 Mon Sep 17 00:00:00 2001 From: tW4r Date: Fri, 10 Feb 2023 15:04:05 +0200 Subject: [PATCH 4/8] Formatting --- src/config.rs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/config.rs b/src/config.rs index c7a9caa..020e7f8 100644 --- a/src/config.rs +++ b/src/config.rs @@ -28,12 +28,17 @@ pub fn load(matches: &ArgMatches) -> Vec { path.read_dir() .unwrap() .filter_map(|entry| entry.ok()) - .map(|entry| entry.path()).collect() + .map(|entry| entry.path()) + .collect() } else { vec![path.clone()] }; - let configs: Vec = paths.into_iter().filter(|path| path.is_file()).map(load_file).collect(); + let configs: Vec = paths + .into_iter() + .filter(|path| path.is_file()) + .map(load_file) + .collect(); // Ensure configuration file/directory exists if configs.is_empty() { From 269db957c05e9b6663b761ae6c4714c477a0fce0 Mon Sep 17 00:00:00 2001 From: tW4r Date: Sat, 11 Feb 2023 14:50:51 +0200 Subject: [PATCH 5/8] Made lazymc listen on all `[public] address` * Moved Config under Server * Reverted `[public] address` removal * Removed cli `bind` argument --- src/action/start.rs | 8 +-- src/cli.rs | 8 --- src/config.rs | 12 +++- src/join/forward.rs | 4 +- src/join/hold.rs | 12 ++-- src/join/kick.rs | 6 +- src/join/lobby.rs | 5 +- src/join/mod.rs | 18 ++--- src/lobby.rs | 18 +++-- src/main.rs | 1 + src/monitor.rs | 19 +++-- src/probe.rs | 38 +++++----- src/router.rs | 17 +++++ src/server.rs | 135 ++++++++++++++++++------------------ src/service/file_watcher.rs | 10 +-- src/service/monitor.rs | 5 +- src/service/probe.rs | 6 +- src/service/server.rs | 81 +++++++++++----------- src/service/signal.rs | 5 +- src/status.rs | 87 +++++++++++------------ 20 files changed, 242 insertions(+), 253 deletions(-) create mode 100644 src/router.rs diff --git a/src/action/start.rs b/src/action/start.rs index d8dcfba..29f7ab9 100644 --- a/src/action/start.rs +++ b/src/action/start.rs @@ -1,6 +1,4 @@ use std::collections::HashMap; -use std::net::SocketAddr; -use std::sync::Arc; use clap::ArgMatches; @@ -15,9 +13,6 @@ const RCON_PASSWORD_LENGTH: usize = 32; /// Start lazymc. pub fn invoke(matches: &ArgMatches) -> Result<(), ()> { - // Parse bind address - let bind_addr: SocketAddr = matches.get_one::("bind").unwrap().parse().unwrap(); - // Load config #[allow(unused_mut)] let mut configs = config::load(matches); @@ -32,8 +27,7 @@ pub fn invoke(matches: &ArgMatches) -> Result<(), ()> { } // Start server service - let configs_arc = configs.into_iter().map(Arc::new).collect(); - service::server::service(bind_addr, configs_arc) + service::server::service(configs) } /// Prepare RCON. diff --git a/src/cli.rs b/src/cli.rs index 4b7696c..8dcc0c5 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -24,14 +24,6 @@ pub fn app() -> Command { ) .subcommand(Command::new("test").about("Test config")), ) - .arg( - Arg::new("bind") - .short('b') - .value_name("ADDRESS") - .default_value("0.0.0.0:25565") - .help("Address to bind to") - .num_args(1), - ) .arg( Arg::new("config") .short('c') diff --git a/src/config.rs b/src/config.rs index 020e7f8..ddaa81e 100644 --- a/src/config.rs +++ b/src/config.rs @@ -155,8 +155,10 @@ impl Config { #[serde(default)] pub struct Public { /// Public address. - // #[serde(deserialize_with = "to_socket_addrs")] - pub address: String, // SocketAddr, + /// + /// The address lazymc will bind to and listen for incoming connections. + #[serde(deserialize_with = "to_socket_addrs")] + pub address: SocketAddr, /// Minecraft protocol version name hint. pub version: String, @@ -187,6 +189,12 @@ pub struct Server { /// Start command. pub command: String, + /// Server Name. + /// + /// Incoming connections will be routed to this server according to the name in the handshake packet. + /// If no name is provided this server will act as a "catch-all" and be routed all connections where no other names match. + pub name: Option, + /// Server address. #[serde( deserialize_with = "to_socket_addrs", diff --git a/src/join/forward.rs b/src/join/forward.rs index 7ba3a4c..460e860 100644 --- a/src/join/forward.rs +++ b/src/join/forward.rs @@ -1,5 +1,3 @@ -use std::sync::Arc; - use bytes::BytesMut; use tokio::net::TcpStream; @@ -11,7 +9,7 @@ use super::MethodResult; /// Forward the client. pub async fn occupy( - config: Arc, + config: &Config, inbound: TcpStream, inbound_history: &mut BytesMut, ) -> Result { diff --git a/src/join/hold.rs b/src/join/hold.rs index 3a39270..8ea9030 100644 --- a/src/join/hold.rs +++ b/src/join/hold.rs @@ -6,7 +6,6 @@ use bytes::BytesMut; use tokio::net::TcpStream; use tokio::time; -use crate::config::*; use crate::server::{Server, State}; use crate::service; @@ -14,7 +13,6 @@ use super::MethodResult; /// Hold the client. pub async fn occupy( - config: Arc, server: Arc, inbound: TcpStream, inbound_history: &mut BytesMut, @@ -27,8 +25,8 @@ pub async fn occupy( } // Start holding, consume client - if hold(&config, &server).await? { - service::server::route_proxy_queue(inbound, config, inbound_history.clone()); + if hold(&server).await? { + service::server::route_proxy_queue(inbound, &server.config, inbound_history.clone()); return Ok(MethodResult::Consumed); } @@ -39,7 +37,7 @@ pub async fn occupy( /// /// Returns holding status. `true` if client is held and it should be proxied, `false` it was held /// but it timed out. -async fn hold<'a>(config: &Config, server: &Server) -> Result { +async fn hold<'a>(server: &Server) -> Result { trace!(target: "lazymc", "Started holding client"); // A task to wait for suitable server state @@ -78,7 +76,7 @@ async fn hold<'a>(config: &Config, server: &Server) -> Result { }; // Wait for server state with timeout - let timeout = Duration::from_secs(config.join.hold.timeout as u64); + let timeout = Duration::from_secs(server.config.join.hold.timeout as u64); match time::timeout(timeout, task_wait).await { // Relay client to proxy Ok(true) => { @@ -94,7 +92,7 @@ async fn hold<'a>(config: &Config, server: &Server) -> Result { // Timeout reached, kick with starting message Err(_) => { - warn!(target: "lazymc", "Held client reached timeout of {}s", config.join.hold.timeout); + warn!(target: "lazymc", "Held client reached timeout of {}s", server.config.join.hold.timeout); Ok(false) } } diff --git a/src/join/kick.rs b/src/join/kick.rs index 42fe20f..579659d 100644 --- a/src/join/kick.rs +++ b/src/join/kick.rs @@ -1,6 +1,5 @@ use tokio::net::TcpStream; -use crate::config::*; use crate::net; use crate::proto::action; use crate::proto::client::Client; @@ -11,7 +10,6 @@ use super::MethodResult; /// Kick the client. pub async fn occupy( client: &Client, - config: &Config, server: &Server, mut inbound: TcpStream, ) -> Result { @@ -20,9 +18,9 @@ pub async fn occupy( // Select message and kick let msg = match server.state() { server::State::Starting | server::State::Stopped | server::State::Started => { - &config.join.kick.starting + &server.config.join.kick.starting } - server::State::Stopping => &config.join.kick.stopping, + server::State::Stopping => &server.config.join.kick.stopping, }; action::kick(client, msg, &mut inbound.split().1).await?; diff --git a/src/join/lobby.rs b/src/join/lobby.rs index aaf18e3..3d1245b 100644 --- a/src/join/lobby.rs +++ b/src/join/lobby.rs @@ -14,7 +14,6 @@ use super::MethodResult; pub async fn occupy( client: &Client, client_info: ClientInfo, - config: Arc, server: Arc, inbound: TcpStream, inbound_queue: BytesMut, @@ -22,13 +21,13 @@ pub async fn occupy( trace!(target: "lazymc", "Using lobby method to occupy joining client"); // Must be ready to lobby - if must_still_probe(&config, &server).await { + if must_still_probe(&server.config, &server).await { warn!(target: "lazymc", "Client connected but lobby is not ready, using next join method, probing not completed"); return Ok(MethodResult::Continue(inbound)); } // Start lobby - lobby::serve(client, client_info, inbound, config, server, inbound_queue).await?; + lobby::serve(client, client_info, inbound, server, inbound_queue).await?; // TODO: do not consume client here, allow other join method on fail diff --git a/src/join/mod.rs b/src/join/mod.rs index 959b183..92acb49 100644 --- a/src/join/mod.rs +++ b/src/join/mod.rs @@ -29,7 +29,6 @@ pub enum MethodResult { pub async fn occupy( client: Client, #[allow(unused_variables)] client_info: ClientInfo, - config: Arc, server: Arc, mut inbound: TcpStream, mut inbound_history: BytesMut, @@ -43,26 +42,18 @@ pub async fn occupy( ); // Go through all configured join methods - for method in &config.join.methods { + for method in &server.config.join.methods { // Invoke method, take result let result = match method { // Kick method, immediately kick client - Method::Kick => kick::occupy(&client, &config, &server, inbound).await?, + Method::Kick => kick::occupy(&client, &server, inbound).await?, // Hold method, hold client connection while server starts - Method::Hold => { - hold::occupy( - config.clone(), - server.clone(), - inbound, - &mut inbound_history, - ) - .await? - } + Method::Hold => hold::occupy(server.clone(), inbound, &mut inbound_history).await?, // Forward method, forward client connection while server starts Method::Forward => { - forward::occupy(config.clone(), inbound, &mut inbound_history).await? + forward::occupy(&server.config, inbound, &mut inbound_history).await? } // Lobby method, keep client in lobby while server starts @@ -71,7 +62,6 @@ pub async fn occupy( lobby::occupy( &client, client_info.clone(), - config.clone(), server.clone(), inbound, login_queue.clone(), diff --git a/src/lobby.rs b/src/lobby.rs index fc8fab4..14561e4 100644 --- a/src/lobby.rs +++ b/src/lobby.rs @@ -54,7 +54,6 @@ pub async fn serve( client: &Client, client_info: ClientInfo, mut inbound: TcpStream, - config: Arc, server: Arc, queue: BytesMut, ) -> Result<(), ()> { @@ -98,7 +97,7 @@ pub async fn serve( debug!(target: "lazymc::lobby", "Login on lobby server (user: {})", login_start.name); // Replay Forge payload - if config.server.forge { + if server.config.server.forge { forge::replay_login_payload(client, &mut inbound, server.clone(), &mut inbound_buf) .await?; let (_returned_reader, returned_writer) = inbound.split(); @@ -122,12 +121,12 @@ pub async fn serve( send_lobby_play_packets(client, &client_info, &mut writer, &server).await?; // Wait for server to come online - stage_wait(client, &client_info, &server, &config, &mut writer).await?; + stage_wait(client, &client_info, &server, &mut writer).await?; // Start new connection to server let server_client_info = client_info.clone(); let (server_client, mut outbound, mut server_buf) = - connect_to_server(&server_client_info, &inbound, &config).await?; + connect_to_server(&server_client_info, &inbound, &server.config).await?; let (returned_reader, returned_writer) = inbound.split(); reader = returned_reader; writer = returned_writer; @@ -145,7 +144,7 @@ pub async fn serve( packets::play::title::send(client, &client_info, &mut writer, "").await?; // Play ready sound if configured - play_lobby_ready_sound(client, &client_info, &mut writer, &config).await?; + play_lobby_ready_sound(client, &client_info, &mut writer, &server.config).await?; // Wait a second because Notchian servers are slow // See: https://wiki.vg/Protocol#Login_Success @@ -287,19 +286,18 @@ async fn stage_wait( client: &Client, client_info: &ClientInfo, server: &Server, - config: &Config, writer: &mut WriteHalf<'_>, ) -> Result<(), ()> { select! { - a = keep_alive_loop(client, client_info, writer, config) => a, - b = wait_for_server(server, config) => b, + a = keep_alive_loop(client, client_info, writer, &server.config) => a, + b = wait_for_server(server) => b, } } /// Wait for the server to come online. /// /// Returns `Ok(())` once the server is online, returns `Err(())` if waiting failed. -async fn wait_for_server(server: &Server, config: &Config) -> Result<(), ()> { +async fn wait_for_server(server: &Server) -> Result<(), ()> { debug!(target: "lazymc::lobby", "Waiting on server to come online..."); // A task to wait for suitable server state @@ -331,7 +329,7 @@ async fn wait_for_server(server: &Server, config: &Config) -> Result<(), ()> { }; // Wait for server state with timeout - let timeout = Duration::from_secs(config.join.lobby.timeout as u64); + let timeout = Duration::from_secs(server.config.join.lobby.timeout as u64); match time::timeout(timeout, task_wait).await { // Relay client to proxy Ok(true) => { diff --git a/src/main.rs b/src/main.rs index d7bde87..fdee277 100644 --- a/src/main.rs +++ b/src/main.rs @@ -21,6 +21,7 @@ pub(crate) mod os; pub(crate) mod probe; pub(crate) mod proto; pub(crate) mod proxy; +pub(crate) mod router; pub(crate) mod server; pub(crate) mod service; pub(crate) mod status; diff --git a/src/monitor.rs b/src/monitor.rs index ebc58d6..d978ad9 100644 --- a/src/monitor.rs +++ b/src/monitor.rs @@ -30,9 +30,9 @@ const STATUS_TIMEOUT: u64 = 20; const PING_TIMEOUT: u64 = 10; /// Monitor server. -pub async fn monitor_server(config: Arc, server: Arc) { +pub async fn monitor_server(server: Arc) { // Server address - let addr = config.server.address; + let addr = server.config.server.address; let mut poll_interval = time::interval(MONITOR_POLL_INTERVAL); @@ -41,13 +41,13 @@ pub async fn monitor_server(config: Arc, server: Arc) { // Poll server state and update internal status trace!(target: "lazymc::monitor", "Fetching status for {} ... ", addr); - let status = poll_server(&config, &server, addr).await; + let status = poll_server(&server, addr).await; match status { // Got status, update - Ok(Some(status)) => server.update_status(&config, Some(status)).await, + Ok(Some(status)) => server.update_status(Some(status)).await, // Error, reset status - Err(_) => server.update_status(&config, None).await, + Err(_) => server.update_status(None).await, // Didn't get status, but ping fallback worked, leave as-is, show warning Ok(None) => { @@ -56,9 +56,9 @@ pub async fn monitor_server(config: Arc, server: Arc) { } // Sleep server when it's bedtime - if server.should_sleep(&config).await { + if server.should_sleep(&server.config).await { info!(target: "lazymc::montior", "Server has been idle, sleeping..."); - server.stop(&config).await; + server.stop().await; } // Check whether we should force kill server @@ -76,19 +76,18 @@ pub async fn monitor_server(config: Arc, server: Arc) { /// Returns `Ok` if status/ping succeeded, includes server status most of the time. /// Returns `Err` if no connection could be established or if an error occurred. pub async fn poll_server( - config: &Config, server: &Server, addr: SocketAddr, ) -> Result, ()> { // Fetch status - if let Ok(status) = fetch_status(config, addr).await { + if let Ok(status) = fetch_status(&server.config, addr).await { return Ok(Some(status)); } // Try ping fallback if server is currently started if server.state() == State::Started { debug!(target: "lazymc::monitor", "Failed to get status from started server, trying ping..."); - do_ping(config, addr).await?; + do_ping(&server.config, addr).await?; } Err(()) diff --git a/src/probe.rs b/src/probe.rs index 8ba6462..4012729 100644 --- a/src/probe.rs +++ b/src/probe.rs @@ -11,7 +11,6 @@ use minecraft_protocol::version::v1_14_4::login::{ use tokio::net::TcpStream; use tokio::time; -use crate::config::Config; use crate::forge; use crate::net; use crate::proto::client::{Client, ClientInfo, ClientState}; @@ -35,11 +34,11 @@ const PROBE_ONLINE_TIMEOUT: Duration = Duration::from_secs(10 * 60); const PROBE_JOIN_GAME_TIMEOUT: Duration = Duration::from_secs(20); /// Connect to the Minecraft server and probe useful details from it. -pub async fn probe(config: Arc, server: Arc) -> Result<(), ()> { +pub async fn probe(server: Arc) -> Result<(), ()> { debug!(target: "lazymc::probe", "Starting server probe..."); // Start server if not starting already - if Server::start(config.clone(), server.clone(), None).await { + if Server::start(server.clone(), None).await { info!(target: "lazymc::probe", "Starting server to probe..."); } @@ -52,7 +51,7 @@ pub async fn probe(config: Arc, server: Arc) -> Result<(), ()> { debug!(target: "lazymc::probe", "Connecting to server to probe details..."); // Connect to server, record Forge payload - let forge_payload = connect_to_server(&config, &server).await?; + let forge_payload = connect_to_server(&server).await?; *server.forge_payload.write().await = forge_payload; Ok(()) @@ -115,10 +114,10 @@ async fn wait_until_online<'a>(server: &Server) -> Result { /// This will initialize the connection to the play state. Client details are used. /// /// Returns recorded Forge login payload if any. -async fn connect_to_server(config: &Config, server: &Server) -> Result>, ()> { +async fn connect_to_server(server: &Server) -> Result>, ()> { time::timeout( PROBE_CONNECT_TIMEOUT, - connect_to_server_no_timeout(config, server), + connect_to_server_no_timeout(server), ) .await .map_err(|_| { @@ -132,13 +131,10 @@ async fn connect_to_server(config: &Config, server: &Server) -> Result Result>, ()> { +async fn connect_to_server_no_timeout(server: &Server) -> Result>, ()> { // Open connection // TODO: on connect fail, ping server and redirect to serve_status if offline - let mut outbound = TcpStream::connect(config.server.address) + let mut outbound = TcpStream::connect(server.config.server.address) .await .map_err(|_| ())?; @@ -151,23 +147,29 @@ async fn connect_to_server_no_timeout( // Construct client info let mut tmp_client_info = ClientInfo::empty(); - tmp_client_info.protocol.replace(config.public.protocol); + tmp_client_info + .protocol + .replace(server.config.public.protocol); let (mut reader, mut writer) = outbound.split(); // Select server address to use, add magic if Forge - let server_addr = if config.server.forge { - format!("{}{}", config.server.address.ip(), forge::STATUS_MAGIC) + let server_addr = if server.config.server.forge { + format!( + "{}{}", + server.config.server.address.ip(), + forge::STATUS_MAGIC + ) } else { - config.server.address.ip().to_string() + server.config.server.address.ip().to_string() }; // Send handshake packet packet::write_packet( Handshake { - protocol_version: config.public.protocol as i32, + protocol_version: server.config.public.protocol as i32, server_addr, - server_port: config.server.address.port(), + server_port: server.config.server.address.port(), next_state: ClientState::Login.to_id(), }, &tmp_client, @@ -235,7 +237,7 @@ async fn connect_to_server_no_timeout( })?; // Handle plugin requests for Forge - if config.server.forge { + if server.config.server.forge { // Record Forge login payload forge_payload.push(raw); diff --git a/src/router.rs b/src/router.rs new file mode 100644 index 0000000..18bbbbc --- /dev/null +++ b/src/router.rs @@ -0,0 +1,17 @@ +use std::{collections::HashMap, sync::Arc}; + +use crate::server::Server; + +#[derive(Default)] +pub struct Router { + pub data: HashMap, Arc>, +} + +impl Router { + pub fn get(&self, server_name: String) -> Option> { + self.data + .get(&Some(server_name)) + .or_else(|| self.data.get(&None)) + .cloned() + } +} diff --git a/src/server.rs b/src/server.rs index 8bc259d..80d1be8 100644 --- a/src/server.rs +++ b/src/server.rs @@ -36,6 +36,9 @@ const UNIX_EXIT_SIGTERM: i32 = 130; /// Shared server state. #[derive(Debug)] pub struct Server { + /// Server config. + pub config: Config, + /// Server state. /// /// Matches `State`, utilzes AtomicU8 for better performance. @@ -95,6 +98,30 @@ pub struct Server { } impl Server { + pub fn new(config: Config) -> Self { + let (state_watch_sender, state_watch_receiver) = watch::channel(State::Stopped); + + Self { + config, + state: AtomicU8::new(State::Stopped.to_u8()), + state_watch_sender, + state_watch_receiver, + pid: Default::default(), + status: Default::default(), + last_active: Default::default(), + keep_online_until: Default::default(), + kill_at: Default::default(), + banned_ips: Default::default(), + whitelist: Default::default(), + #[cfg(feature = "rcon")] + rcon_lock: Semaphore::new(1), + #[cfg(feature = "rcon")] + rcon_last_stop: Default::default(), + probed_join_game: Default::default(), + forge_payload: Default::default(), + } + } + /// Get current state. pub fn state(&self) -> State { State::from_u8(self.state.load(Ordering::Relaxed)) @@ -110,8 +137,8 @@ impl Server { /// This updates various other internal things depending on how the state changes. /// /// Returns false if the state didn't change, in which case nothing happens. - async fn update_state(&self, state: State, config: &Config) -> bool { - self.update_state_from(None, state, config).await + async fn update_state(&self, state: State) -> bool { + self.update_state_from(None, state).await } /// Set new state, from a current state. @@ -119,7 +146,7 @@ impl Server { /// This updates various other internal things depending on how the state changes. /// /// Returns false if current state didn't match `from` or if nothing changed. - async fn update_state_from(&self, from: Option, new: State, config: &Config) -> bool { + async fn update_state_from(&self, from: Option, new: State) -> bool { // Atomically swap state to new, return if from doesn't match let old = State::from_u8(match from { Some(from) => match self.state.compare_exchange( @@ -146,11 +173,11 @@ impl Server { // Update kill at time for starting/stopping state *self.kill_at.write().await = match new { - State::Starting if config.server.start_timeout > 0 => { - Some(Instant::now() + Duration::from_secs(config.server.start_timeout as u64)) + State::Starting if self.config.server.start_timeout > 0 => { + Some(Instant::now() + Duration::from_secs(self.config.server.start_timeout as u64)) } - State::Stopping if config.server.stop_timeout > 0 => { - Some(Instant::now() + Duration::from_secs(config.server.stop_timeout as u64)) + State::Stopping if self.config.server.stop_timeout > 0 => { + Some(Instant::now() + Duration::from_secs(self.config.server.stop_timeout as u64)) } _ => None, }; @@ -165,7 +192,7 @@ impl Server { // If Starting -> Started, update active time and keep it online for configured time if old == State::Starting && new == State::Started { self.update_last_active().await; - self.keep_online_for(Some(config.time.min_online_time)) + self.keep_online_for(Some(self.config.time.min_online_time)) .await; } @@ -176,14 +203,14 @@ impl Server { /// /// This updates various other internal things depending on the current state and the given /// status. - pub async fn update_status(&self, config: &Config, status: Option) { + pub async fn update_status(&self, status: Option) { // Update state based on curren match (self.state(), &status) { (State::Stopped | State::Starting, Some(_)) => { - self.update_state(State::Started, config).await; + self.update_state(State::Started).await; } (State::Started, None) => { - self.update_state(State::Stopped, config).await; + self.update_state(State::Stopped).await; } _ => {} } @@ -202,10 +229,10 @@ impl Server { /// Try to start the server. /// /// Does nothing if currently not in stopped state. - pub async fn start(config: Arc, server: Arc, username: Option) -> bool { + pub async fn start(server: Arc, username: Option) -> bool { // Must set state from stopped to starting if !server - .update_state_from(Some(State::Stopped), State::Starting, &config) + .update_state_from(Some(State::Stopped), State::Starting) .await { return false; @@ -219,42 +246,42 @@ impl Server { // Unfreeze server if it is frozen #[cfg(unix)] - if config.server.freeze_process && unfreeze_server_signal(&config, &server).await { + if server.config.server.freeze_process && unfreeze_server_signal(&server).await { return true; } // Spawn server in new task - Self::spawn_server_task(config, server); + Self::spawn_server_task(server); true } /// Spawn the server task. /// /// This should not be called directly. - fn spawn_server_task(config: Arc, server: Arc) { - tokio::spawn(invoke_server_cmd(config, server).map(|_| ())); + fn spawn_server_task(server: Arc) { + tokio::spawn(invoke_server_cmd(server).map(|_| ())); } /// Stop running server. /// /// This will attempt to stop the server with all available methods. #[allow(unused_variables)] - pub async fn stop(&self, config: &Config) -> bool { + pub async fn stop(&self) -> bool { // Try to freeze through signal #[cfg(unix)] - if config.server.freeze_process && freeze_server_signal(config, self).await { + if self.config.server.freeze_process && freeze_server_signal(self).await { return true; } // Try to stop through RCON if started #[cfg(feature = "rcon")] - if self.state() == State::Started && stop_server_rcon(config, self).await { + if self.state() == State::Started && stop_server_rcon(&self.config, self).await { return true; } // Try to stop through signal #[cfg(unix)] - if stop_server_signal(config, self).await { + if stop_server_signal(self).await { return true; } @@ -394,31 +421,6 @@ impl Server { } } -impl Default for Server { - fn default() -> Self { - let (state_watch_sender, state_watch_receiver) = watch::channel(State::Stopped); - - Self { - state: AtomicU8::new(State::Stopped.to_u8()), - state_watch_sender, - state_watch_receiver, - pid: Default::default(), - status: Default::default(), - last_active: Default::default(), - keep_online_until: Default::default(), - kill_at: Default::default(), - banned_ips: Default::default(), - whitelist: Default::default(), - #[cfg(feature = "rcon")] - rcon_lock: Semaphore::new(1), - #[cfg(feature = "rcon")] - rcon_last_stop: Default::default(), - probed_join_game: Default::default(), - forge_payload: Default::default(), - } - } -} - /// Server state. #[derive(Debug, Copy, Clone, Eq, PartialEq)] pub enum State { @@ -459,18 +461,15 @@ impl State { } /// Invoke server command, store PID and wait for it to quit. -pub async fn invoke_server_cmd( - config: Arc, - state: Arc, -) -> Result<(), Box> { +pub async fn invoke_server_cmd(server: Arc) -> Result<(), Box> { // Configure command - let args = shlex::split(&config.server.command).expect("invalid server command"); + let args = shlex::split(&server.config.server.command).expect("invalid server command"); let mut cmd = Command::new(&args[0]); cmd.args(args.iter().skip(1)); cmd.kill_on_drop(true); // Set working directory - if let Some(ref dir) = ConfigServer::server_directory(&config) { + if let Some(ref dir) = ConfigServer::server_directory(&server.config) { cmd.current_dir(dir); } @@ -484,7 +483,7 @@ pub async fn invoke_server_cmd( }; // Remember PID - state + server .pid .lock() .await @@ -503,7 +502,7 @@ pub async fn invoke_server_cmd( } Ok(status) => { warn!(target: "lazymc", "Server process stopped with error code ({})", status); - state.state() == State::Started + server.state() == State::Started } Err(err) => { error!(target: "lazymc", "Failed to wait for server process to quit: {}", err); @@ -513,18 +512,18 @@ pub async fn invoke_server_cmd( }; // Forget server PID - state.pid.lock().await.take(); + server.pid.lock().await.take(); // Give server a little more time to quit forgotten threads time::sleep(SERVER_QUIT_COOLDOWN).await; // Set server state to stopped - state.update_state(State::Stopped, &config).await; + server.update_state(State::Stopped).await; // Restart on crash - if crashed && config.server.wake_on_crash { + if crashed && server.config.server.wake_on_crash { warn!(target: "lazymc", "Server crashed, restarting..."); - Server::start(config, state, None).await; + Server::start(server, None).await; } Ok(()) @@ -573,7 +572,7 @@ async fn stop_server_rcon(config: &Config, server: &Server) -> bool { // Set server to stopping state, update last RCON time server.rcon_last_stop.lock().await.replace(Instant::now()); - server.update_state(State::Stopping, config).await; + server.update_state(State::Stopping).await; // Gracefully close connection rcon.close().await; @@ -587,7 +586,7 @@ async fn stop_server_rcon(config: &Config, server: &Server) -> bool { /// /// Only available on Unix. #[cfg(unix)] -async fn stop_server_signal(config: &Config, server: &Server) -> bool { +async fn stop_server_signal(server: &Server) -> bool { // Grab PID let pid = match *server.pid.lock().await { Some(pid) => pid, @@ -603,10 +602,10 @@ async fn stop_server_signal(config: &Config, server: &Server) -> bool { } server - .update_state_from(Some(State::Starting), State::Stopping, config) + .update_state_from(Some(State::Starting), State::Stopping) .await; server - .update_state_from(Some(State::Started), State::Stopping, config) + .update_state_from(Some(State::Started), State::Stopping) .await; true @@ -616,7 +615,7 @@ async fn stop_server_signal(config: &Config, server: &Server) -> bool { /// /// Only available on Unix. #[cfg(unix)] -async fn freeze_server_signal(config: &Config, server: &Server) -> bool { +async fn freeze_server_signal(server: &Server) -> bool { // Grab PID let pid = match *server.pid.lock().await { Some(pid) => pid, @@ -631,10 +630,10 @@ async fn freeze_server_signal(config: &Config, server: &Server) -> bool { } server - .update_state_from(Some(State::Starting), State::Stopped, config) + .update_state_from(Some(State::Starting), State::Stopped) .await; server - .update_state_from(Some(State::Started), State::Stopped, config) + .update_state_from(Some(State::Started), State::Stopped) .await; true @@ -644,7 +643,7 @@ async fn freeze_server_signal(config: &Config, server: &Server) -> bool { /// /// Only available on Unix. #[cfg(unix)] -async fn unfreeze_server_signal(config: &Config, server: &Server) -> bool { +async fn unfreeze_server_signal(server: &Server) -> bool { // Grab PID let pid = match *server.pid.lock().await { Some(pid) => pid, @@ -659,10 +658,10 @@ async fn unfreeze_server_signal(config: &Config, server: &Server) -> bool { } server - .update_state_from(Some(State::Stopping), State::Starting, config) + .update_state_from(Some(State::Stopping), State::Starting) .await; server - .update_state_from(Some(State::Stopped), State::Starting, config) + .update_state_from(Some(State::Stopped), State::Starting) .await; true diff --git a/src/service/file_watcher.rs b/src/service/file_watcher.rs index fa0ecb9..2e2a850 100644 --- a/src/service/file_watcher.rs +++ b/src/service/file_watcher.rs @@ -14,9 +14,9 @@ use crate::server::Server; const WATCH_DEBOUNCE: Duration = Duration::from_secs(2); /// Service to watch server file changes. -pub fn service(config: Arc, server: Arc) { +pub fn service(server: Arc) { // Ensure server directory is set, it must exist - let dir = match ConfigServer::server_directory(&config) { + let dir = match ConfigServer::server_directory(&server.config) { Some(dir) if dir.is_dir() => dir, _ => { warn!(target: "lazymc", "Server directory doesn't exist, can't watch file changes to reload whitelist and banned IPs"); @@ -28,11 +28,11 @@ pub fn service(config: Arc, server: Arc) { #[allow(clippy::blocks_in_if_conditions)] while { // Update all files once - reload_bans(&config, &server, &dir.join(ban::FILE)); - reload_whitelist(&config, &server, &dir); + reload_bans(&server.config, &server, &dir.join(ban::FILE)); + reload_whitelist(&server.config, &server, &dir); // Watch for changes, update accordingly - watch_server(&config, &server, &dir) + watch_server(&server.config, &server, &dir) } {} } diff --git a/src/service/monitor.rs b/src/service/monitor.rs index 6cc8db5..08d63b7 100644 --- a/src/service/monitor.rs +++ b/src/service/monitor.rs @@ -1,10 +1,9 @@ use std::sync::Arc; -use crate::config::Config; use crate::monitor; use crate::server::Server; /// Server monitor task. -pub async fn service(config: Arc, state: Arc) { - monitor::monitor_server(config, state).await +pub async fn service(server: Arc) { + monitor::monitor_server(server).await } diff --git a/src/service/probe.rs b/src/service/probe.rs index dd9f69b..92cd889 100644 --- a/src/service/probe.rs +++ b/src/service/probe.rs @@ -5,14 +5,14 @@ use crate::probe; use crate::server::Server; /// Probe server. -pub async fn service(config: Arc, state: Arc) { +pub async fn service(server: Arc) { // Only probe if enabled or if we must - if !config.server.probe_on_start && !must_probe(&config) { + if !server.config.server.probe_on_start && !must_probe(&server.config) { return; } // Probe - match probe::probe(config, state).await { + match probe::probe(server).await { Ok(_) => info!(target: "lazymc::probe", "Succesfully probed server"), Err(_) => { error!(target: "lazymc::probe", "Failed to probe server, this may limit lazymc features") diff --git a/src/service/server.rs b/src/service/server.rs index ba8163b..b8a332d 100644 --- a/src/service/server.rs +++ b/src/service/server.rs @@ -9,6 +9,7 @@ use tokio::net::{TcpListener, TcpStream}; use crate::config::Config; use crate::proto::client::Client; use crate::proxy::{self, ProxyHeader}; +use crate::router::Router; use crate::server::Server; use crate::service; use crate::status; @@ -20,32 +21,19 @@ use crate::util::error::{quit_error, ErrorHints}; /// /// Spawns a tokio runtime to complete all work on. #[tokio::main(flavor = "multi_thread")] -pub async fn service(bind_addr: SocketAddr, configs: Vec>) -> Result<(), ()> { - // Listen for new connections - let listener = TcpListener::bind(bind_addr).await.map_err(|err| { - quit_error( - anyhow!(err).context("Failed to start proxy server"), - ErrorHints::default(), - ); - })?; - - let mut servers = HashMap::new(); - - for config in configs.iter() { +pub async fn service(configs: Vec) -> Result<(), ()> { + let mut routers: HashMap = HashMap::new(); + + for config in configs.into_iter() { // Load server state - let server = Arc::new(Server::default()); - servers.insert( - config.public.address.clone(), - (config.clone(), server.clone()), - ); - - info!( - target: "lazymc", - "Proxying public {} to server {}", - config.public.address, config.server.address, - ); - - if config.lockout.enabled { + let server = Arc::new(Server::new(config)); + routers + .entry(server.config.public.address) + .or_default() + .data + .insert(server.config.server.name.clone(), server.clone()); + + if server.config.lockout.enabled { warn!( target: "lazymc", "Lockout mode is enabled, nobody will be able to connect through the proxy", @@ -53,27 +41,42 @@ pub async fn service(bind_addr: SocketAddr, configs: Vec>) -> Result } // Spawn services: monitor, signal handler - tokio::spawn(service::monitor::service(config.clone(), server.clone())); - tokio::spawn(service::signal::service(config.clone(), server.clone())); + tokio::spawn(service::monitor::service(server.clone())); + tokio::spawn(service::signal::service(server.clone())); // Initiate server start - if config.server.wake_on_start { - Server::start(config.clone(), server.clone(), None).await; + if server.config.server.wake_on_start { + Server::start(server.clone(), None).await; } // Spawn additional services: probe and ban manager - tokio::spawn(service::probe::service(config.clone(), server.clone())); + tokio::spawn(service::probe::service(server.clone())); tokio::task::spawn_blocking({ - let (config, server) = (config.clone(), server.clone()); - || service::file_watcher::service(config, server) + let server = server.clone(); + || service::file_watcher::service(server) }); } - let servers_arc = Arc::new(servers); + info!(target: "lazymc", "Routing\n{}", routers.iter().flat_map(|(public_address, router)| { + router.data.iter().map(move |(server_name, server)| { + format!("{} -> {} -> {}", server_name.clone().unwrap_or("*".to_string()), public_address, server.config.server.address.clone()) + }) + }).collect::>().join("\n")); + + for (public_address, router) in routers { + let listener = TcpListener::bind(public_address).await.map_err(|err| { + quit_error( + anyhow!(err).context("Failed to start proxy server"), + ErrorHints::default(), + ); + })?; + + let router = Arc::new(router); - // Route all incomming connections - while let Ok((inbound, _)) = listener.accept().await { - route(inbound, servers_arc.clone()); + // Route all incomming connections + while let Ok((inbound, _)) = listener.accept().await { + route(inbound, router.clone()); + } } Ok(()) @@ -81,7 +84,7 @@ pub async fn service(bind_addr: SocketAddr, configs: Vec>) -> Result /// Route inbound TCP stream to correct service, spawning a new task. #[inline] -fn route(inbound: TcpStream, servers: Arc, Arc)>>) { +fn route(inbound: TcpStream, router: Arc) { // Get user peer address let peer = match inbound.peer_addr() { Ok(peer) => peer, @@ -92,7 +95,7 @@ fn route(inbound: TcpStream, servers: Arc, Arc, Arc, queue: BytesMut) { +pub fn route_proxy_queue(inbound: TcpStream, config: &Config, queue: BytesMut) { route_proxy_address_queue( inbound, ProxyHeader::Proxy.not_none(config.server.send_proxy_v2), diff --git a/src/service/signal.rs b/src/service/signal.rs index 6512da0..932d637 100644 --- a/src/service/signal.rs +++ b/src/service/signal.rs @@ -1,11 +1,10 @@ use std::sync::Arc; -use crate::config::Config; use crate::server::{self, Server}; use crate::util::error; /// Signal handler task. -pub async fn service(config: Arc, server: Arc) { +pub async fn service(server: Arc) { loop { // Wait for SIGTERM/SIGINT signal tokio::signal::ctrl_c().await.unwrap(); @@ -16,7 +15,7 @@ pub async fn service(config: Arc, server: Arc) { } // Try to stop server - let stopping = server.stop(&config).await; + let stopping = server.stop().await; // If not stopping, maybe due to failure, just quit if !stopping { diff --git a/src/status.rs b/src/status.rs index 53713c0..e1e42e9 100644 --- a/src/status.rs +++ b/src/status.rs @@ -1,4 +1,3 @@ -use std::collections::HashMap; use std::sync::Arc; use bytes::BytesMut; @@ -20,7 +19,9 @@ use crate::proto::action; use crate::proto::client::{Client, ClientInfo, ClientState}; use crate::proto::packet::{self, RawPacket}; use crate::proto::packets; -use crate::server::{self, Server}; +use crate::router::Router; +use crate::server::{self, Server, State}; +use crate::service::server::route_proxy_queue; /// The ban message prefix. const BAN_MESSAGE_PREFIX: &str = "Your IP address is banned from this server.\nReason: "; @@ -36,11 +37,7 @@ const SERVER_ICON_FILE: &str = "server-icon.png"; /// Proxy the given inbound stream to a target address. // TODO: do not drop error here, return Box -pub async fn serve( - client: Client, - mut inbound: TcpStream, - servers: Arc, Arc)>>, -) -> Result<(), ()> { +pub async fn serve(client: Client, mut inbound: TcpStream, router: Arc) -> Result<(), ()> { let (mut reader, mut writer) = inbound.split(); // Incoming buffer and packet holding queue @@ -51,6 +48,19 @@ pub async fn serve( let mut client_info = ClientInfo::empty(); loop { + let server = client_info + .handshake + .as_ref() + .and_then(|handshake| router.get(handshake.server_addr.clone())); + + if let Some(server) = server + .clone() + .filter(|server| server.state() == State::Started) + { + route_proxy_queue(inbound, &server.config, inbound_history); + return Ok(()); + } + // Read packet from stream let (packet, raw) = match packet::read_packet(&client, &mut buf, &mut reader).await { Ok(Some(packet)) => packet, @@ -107,15 +117,10 @@ pub async fn serve( continue; } - // Grab the server and config - if let Some((config, server)) = client_info - .handshake - .as_ref() - .and_then(|handshake| servers.get(&handshake.server_addr)) - { + if let Some(server) = server { // Hijack server status packet if client_state == ClientState::Status && packet.id == packets::status::SERVER_STATUS { - let server_status = server_status(&client_info, config, server).await; + let server_status = server_status(&client_info, &server).await; let packet = StatusResponse { server_status }; let mut data = Vec::new(); @@ -138,7 +143,7 @@ pub async fn serve( client_info.username = username.clone(); // Kick if lockout is enabled - if config.lockout.enabled { + if server.config.lockout.enabled { match username { Some(username) => { info!(target: "lazymc", "Kicked '{}' because lockout is enabled", username) @@ -147,7 +152,7 @@ pub async fn serve( info!(target: "lazymc", "Kicked player because lockout is enabled") } } - action::kick(&client, &config.lockout.message, &mut writer).await?; + action::kick(&client, &server.config.lockout.message, &mut writer).await?; break; } @@ -177,7 +182,7 @@ pub async fn serve( } // Start server if not starting yet - Server::start(config.clone(), server.clone(), username).await; + Server::start(server.clone(), username).await; // Remember inbound packets inbound_history.extend(&raw); @@ -191,26 +196,16 @@ pub async fn serve( // Buf is fully consumed here buf.clear(); - // Route connection through proper channel - if server.state() == server::State::Started { - crate::service::server::route_proxy_queue( - inbound, - config.clone(), - inbound_history.clone(), - ); - } else { - // Start occupying client - join::occupy( - client, - client_info, - config.clone(), - server.clone(), - inbound, - inbound_history, - login_queue, - ) - .await?; - } + // Start occupying client + join::occupy( + client, + client_info, + server.clone(), + inbound, + inbound_history, + login_queue, + ) + .await?; return Ok(()); } } @@ -225,7 +220,7 @@ pub async fn serve( } /// Build server status object to respond to client with. -async fn server_status(client_info: &ClientInfo, config: &Config, server: &Server) -> ServerStatus { +async fn server_status(client_info: &ClientInfo, server: &Server) -> ServerStatus { let status = server.status().await; let server_state = server.state(); @@ -239,8 +234,8 @@ async fn server_status(client_info: &ClientInfo, config: &Config, server: &Serve Some(status) => (status.version.clone(), status.players.max), None => ( ServerVersion { - name: config.public.version.clone(), - protocol: config.public.protocol, + name: server.config.public.version.clone(), + protocol: server.config.public.protocol, }, 0, ), @@ -248,13 +243,13 @@ async fn server_status(client_info: &ClientInfo, config: &Config, server: &Serve // Select description, use server MOTD if enabled, or use configured let description = { - if config.motd.from_server && status.is_some() { + if server.config.motd.from_server && status.is_some() { status.as_ref().unwrap().description.clone() } else { Message::new(Payload::text(match server_state { - server::State::Stopped | server::State::Started => &config.motd.sleeping, - server::State::Starting => &config.motd.starting, - server::State::Stopping => &config.motd.stopping, + server::State::Stopped | server::State::Started => &server.config.motd.sleeping, + server::State::Starting => &server.config.motd.starting, + server::State::Stopping => &server.config.motd.stopping, })) } }; @@ -262,11 +257,11 @@ async fn server_status(client_info: &ClientInfo, config: &Config, server: &Serve // Extract favicon from real server status, load from disk, or use default let mut favicon = None; if favicon::supports_favicon(client_info) { - if config.motd.from_server && status.is_some() { + if server.config.motd.from_server && status.is_some() { favicon = status.as_ref().unwrap().favicon.clone() } if favicon.is_none() { - favicon = Some(server_favicon(config).await); + favicon = Some(server_favicon(&server.config).await); } } From ce21d5605891a4192b1fcaf8a34bdd0cd0d705bd Mon Sep 17 00:00:00 2001 From: tW4r Date: Tue, 14 Feb 2023 09:13:56 +0200 Subject: [PATCH 6/8] Make it closer to original implementation --- src/monitor.rs | 5 +- src/service/server.rs | 98 +++++++++++++++++- src/status.rs | 224 ++++++++++++++++-------------------------- 3 files changed, 179 insertions(+), 148 deletions(-) diff --git a/src/monitor.rs b/src/monitor.rs index d978ad9..a103699 100644 --- a/src/monitor.rs +++ b/src/monitor.rs @@ -75,10 +75,7 @@ pub async fn monitor_server(server: Arc) { /// /// Returns `Ok` if status/ping succeeded, includes server status most of the time. /// Returns `Err` if no connection could be established or if an error occurred. -pub async fn poll_server( - server: &Server, - addr: SocketAddr, -) -> Result, ()> { +pub async fn poll_server(server: &Server, addr: SocketAddr) -> Result, ()> { // Fetch status if let Ok(status) = fetch_status(&server.config, addr).await { return Ok(Some(status)); diff --git a/src/service/server.rs b/src/service/server.rs index b8a332d..b8f5d8c 100644 --- a/src/service/server.rs +++ b/src/service/server.rs @@ -4,13 +4,16 @@ use std::sync::Arc; use bytes::BytesMut; use futures::FutureExt; +use minecraft_protocol::decoder::Decoder; +use minecraft_protocol::version::v1_14_4::handshake::Handshake; use tokio::net::{TcpListener, TcpStream}; use crate::config::Config; -use crate::proto::client::Client; +use crate::proto::client::{Client, ClientInfo, ClientState}; +use crate::proto::{packet, packets}; use crate::proxy::{self, ProxyHeader}; use crate::router::Router; -use crate::server::Server; +use crate::server::{self, Server}; use crate::service; use crate::status; use crate::util::error::{quit_error, ErrorHints}; @@ -75,7 +78,7 @@ pub async fn service(configs: Vec) -> Result<(), ()> { // Route all incomming connections while let Ok((inbound, _)) = listener.accept().await { - route(inbound, router.clone()); + route(inbound, router.clone()).await; } } @@ -84,7 +87,7 @@ pub async fn service(configs: Vec) -> Result<(), ()> { /// Route inbound TCP stream to correct service, spawning a new task. #[inline] -fn route(inbound: TcpStream, router: Arc) { +async fn route(mut inbound: TcpStream, router: Arc) { // Get user peer address let peer = match inbound.peer_addr() { Ok(peer) => peer, @@ -95,7 +98,92 @@ fn route(inbound: TcpStream, router: Arc) { }; let client = Client::new(peer); - let service = status::serve(client, inbound, router).map(|r| { + + let (mut reader, _) = inbound.split(); + + // Incoming buffer and packet holding queue + let mut buf = BytesMut::new(); + + // Remember inbound packets, track client info + let mut inbound_history = BytesMut::new(); + let mut client_info = ClientInfo::empty(); + + // Read packet from stream + let (packet, raw) = match packet::read_packet(&client, &mut buf, &mut reader).await { + Ok(Some(packet)) => packet, + Ok(None) => return, + Err(_) => { + error!(target: "lazymc", "Closing connection, error occurred"); + return; + } + }; + + // Hijack handshake + if client.state() == ClientState::Handshake && packet.id == packets::handshake::SERVER_HANDSHAKE + { + // Parse handshake + let handshake = match Handshake::decode(&mut packet.data.as_slice()) { + Ok(handshake) => handshake, + Err(_) => { + debug!(target: "lazymc", "Got malformed handshake from client, disconnecting"); + return; + } + }; + + let server = match router.get(handshake.server_addr.clone()) { + Some(server) => server, + None => { + error!(target: "lazymc", "Client tried to join a non existing server ({})", handshake.server_addr); + return; + } + }; + + // Check ban state, just drop connection if enabled + let banned = server.is_banned_ip_blocking(&peer.ip()); + if server.config.server.drop_banned_ips { + info!(target: "lazymc", "Connection from banned IP {}, dropping", peer.ip()); + return; + } + + // Parse new state + let new_state = match ClientState::from_id(handshake.next_state) { + Some(state) => state, + None => { + error!(target: "lazymc", "Client tried to switch into unknown protcol state ({}), disconnecting", handshake.next_state); + return; + } + }; + + // Update client info and client state + client_info + .protocol + .replace(handshake.protocol_version as u32); + client_info.handshake.replace(handshake); + client.set_state(new_state); + inbound_history.extend(raw); + + // Route connection through proper channel + let should_proxy = + !banned && server.state() == server::State::Started && !server.config.lockout.enabled; + if should_proxy { + route_proxy_queue(inbound, &server.config, inbound_history) + } else { + route_status(client, inbound, server, inbound_history, client_info) + } + } +} + +/// Route inbound TCP stream to status server, spawning a new task. +#[inline] +fn route_status( + client: Client, + inbound: TcpStream, + server: Arc, + inbound_history: BytesMut, + client_info: ClientInfo, +) { + // When server is not online, spawn a status server + let service = status::serve(client, inbound, server, inbound_history, client_info).map(|r| { if let Err(err) = r { warn!(target: "lazymc", "Failed to serve status: {:?}", err); } diff --git a/src/status.rs b/src/status.rs index e1e42e9..952b1c3 100644 --- a/src/status.rs +++ b/src/status.rs @@ -5,7 +5,6 @@ use minecraft_protocol::data::chat::{Message, Payload}; use minecraft_protocol::data::server_status::*; use minecraft_protocol::decoder::Decoder; use minecraft_protocol::encoder::Encoder; -use minecraft_protocol::version::v1_14_4::handshake::Handshake; use minecraft_protocol::version::v1_14_4::login::LoginStart; use minecraft_protocol::version::v1_14_4::status::StatusResponse; use tokio::fs; @@ -19,9 +18,7 @@ use crate::proto::action; use crate::proto::client::{Client, ClientInfo, ClientState}; use crate::proto::packet::{self, RawPacket}; use crate::proto::packets; -use crate::router::Router; -use crate::server::{self, Server, State}; -use crate::service::server::route_proxy_queue; +use crate::server::{self, Server}; /// The ban message prefix. const BAN_MESSAGE_PREFIX: &str = "Your IP address is banned from this server.\nReason: "; @@ -37,30 +34,19 @@ const SERVER_ICON_FILE: &str = "server-icon.png"; /// Proxy the given inbound stream to a target address. // TODO: do not drop error here, return Box -pub async fn serve(client: Client, mut inbound: TcpStream, router: Arc) -> Result<(), ()> { +pub async fn serve( + client: Client, + mut inbound: TcpStream, + server: Arc, + mut inbound_history: BytesMut, + mut client_info: ClientInfo, +) -> Result<(), ()> { let (mut reader, mut writer) = inbound.split(); // Incoming buffer and packet holding queue let mut buf = BytesMut::new(); - // Remember inbound packets, track client info - let mut inbound_history = BytesMut::new(); - let mut client_info = ClientInfo::empty(); - loop { - let server = client_info - .handshake - .as_ref() - .and_then(|handshake| router.get(handshake.server_addr.clone())); - - if let Some(server) = server - .clone() - .filter(|server| server.state() == State::Started) - { - route_proxy_queue(inbound, &server.config, inbound_history); - return Ok(()); - } - // Read packet from stream let (packet, raw) = match packet::read_packet(&client, &mut buf, &mut reader).await { Ok(Some(packet)) => packet, @@ -74,140 +60,100 @@ pub async fn serve(client: Client, mut inbound: TcpStream, router: Arc) // Grab client state let client_state = client.state(); - // Hijack handshake - if client_state == ClientState::Handshake - && packet.id == packets::handshake::SERVER_HANDSHAKE - { - // Parse handshake - let handshake = match Handshake::decode(&mut packet.data.as_slice()) { - Ok(handshake) => handshake, - Err(_) => { - debug!(target: "lazymc", "Got malformed handshake from client, disconnecting"); - break; - } - }; - - // Parse new state - let new_state = match ClientState::from_id(handshake.next_state) { - Some(state) => state, - None => { - error!(target: "lazymc", "Client tried to switch into unknown protcol state ({}), disconnecting", handshake.next_state); - break; - } - }; - - // Update client info and client state - client_info - .protocol - .replace(handshake.protocol_version as u32); - client_info.handshake.replace(handshake); - client.set_state(new_state); - - // If loggin in with handshake, remember inbound - if new_state == ClientState::Login { - inbound_history.extend(raw); - } - - continue; - } - // Hijack ping packet if client_state == ClientState::Status && packet.id == packets::status::SERVER_PING { writer.write_all(&raw).await.map_err(|_| ())?; continue; } - if let Some(server) = server { - // Hijack server status packet - if client_state == ClientState::Status && packet.id == packets::status::SERVER_STATUS { - let server_status = server_status(&client_info, &server).await; - let packet = StatusResponse { server_status }; + // Hijack server status packet + if client_state == ClientState::Status && packet.id == packets::status::SERVER_STATUS { + let server_status = server_status(&client_info, &server).await; + let packet = StatusResponse { server_status }; - let mut data = Vec::new(); - packet.encode(&mut data).map_err(|_| ())?; + let mut data = Vec::new(); + packet.encode(&mut data).map_err(|_| ())?; - let response = RawPacket::new(0, data).encode_with_len(&client)?; - writer.write_all(&response).await.map_err(|_| ())?; + let response = RawPacket::new(0, data).encode_with_len(&client)?; + writer.write_all(&response).await.map_err(|_| ())?; - continue; - } + continue; + } - // Hijack login start - if client_state == ClientState::Login && packet.id == packets::login::SERVER_LOGIN_START - { - // Try to get login username, update client info - // TODO: we should always parse this packet successfully - let username = LoginStart::decode(&mut packet.data.as_slice()) - .ok() - .map(|p| p.name); - client_info.username = username.clone(); - - // Kick if lockout is enabled - if server.config.lockout.enabled { - match username { - Some(username) => { - info!(target: "lazymc", "Kicked '{}' because lockout is enabled", username) - } - None => { - info!(target: "lazymc", "Kicked player because lockout is enabled") - } + // Hijack login start + if client_state == ClientState::Login && packet.id == packets::login::SERVER_LOGIN_START { + // Try to get login username, update client info + // TODO: we should always parse this packet successfully + let username = LoginStart::decode(&mut packet.data.as_slice()) + .ok() + .map(|p| p.name); + client_info.username = username.clone(); + + // Kick if lockout is enabled + if server.config.lockout.enabled { + match username { + Some(username) => { + info!(target: "lazymc", "Kicked '{}' because lockout is enabled", username) } - action::kick(&client, &server.config.lockout.message, &mut writer).await?; - break; - } - - // Kick if client is banned - if let Some(ban) = server.ban_entry(&client.peer.ip()).await { - if ban.is_banned() { - let msg = if let Some(reason) = ban.reason { - info!(target: "lazymc", "Login from banned IP {} ({}), disconnecting", client.peer.ip(), &reason); - reason.to_string() - } else { - info!(target: "lazymc", "Login from banned IP {}, disconnecting", client.peer.ip()); - DEFAULT_BAN_REASON.to_string() - }; - action::kick(&client, &format!("{BAN_MESSAGE_PREFIX}{msg}"), &mut writer) - .await?; - break; + None => { + info!(target: "lazymc", "Kicked player because lockout is enabled") } } + action::kick(&client, &server.config.lockout.message, &mut writer).await?; + break; + } - // Kick if client is not whitelisted to wake server - if let Some(ref username) = username { - if !server.is_whitelisted(username).await { - info!(target: "lazymc", "User '{}' tried to wake server but is not whitelisted, disconnecting", username); - action::kick(&client, WHITELIST_MESSAGE, &mut writer).await?; - break; - } + // Kick if client is banned + if let Some(ban) = server.ban_entry(&client.peer.ip()).await { + if ban.is_banned() { + let msg = if let Some(reason) = ban.reason { + info!(target: "lazymc", "Login from banned IP {} ({}), disconnecting", client.peer.ip(), &reason); + reason.to_string() + } else { + info!(target: "lazymc", "Login from banned IP {}, disconnecting", client.peer.ip()); + DEFAULT_BAN_REASON.to_string() + }; + action::kick(&client, &format!("{BAN_MESSAGE_PREFIX}{msg}"), &mut writer) + .await?; + break; } + } - // Start server if not starting yet - Server::start(server.clone(), username).await; - - // Remember inbound packets - inbound_history.extend(&raw); - inbound_history.extend(&buf); - - // Build inbound packet queue with everything from login start (including this) - let mut login_queue = BytesMut::with_capacity(raw.len() + buf.len()); - login_queue.extend(&raw); - login_queue.extend(&buf); - - // Buf is fully consumed here - buf.clear(); - - // Start occupying client - join::occupy( - client, - client_info, - server.clone(), - inbound, - inbound_history, - login_queue, - ) - .await?; - return Ok(()); + // Kick if client is not whitelisted to wake server + if let Some(ref username) = username { + if !server.is_whitelisted(username).await { + info!(target: "lazymc", "User '{}' tried to wake server but is not whitelisted, disconnecting", username); + action::kick(&client, WHITELIST_MESSAGE, &mut writer).await?; + break; + } } + + // Start server if not starting yet + Server::start(server.clone(), username).await; + + // Remember inbound packets + inbound_history.extend(&raw); + inbound_history.extend(&buf); + + // Build inbound packet queue with everything from login start (including this) + let mut login_queue = BytesMut::with_capacity(raw.len() + buf.len()); + login_queue.extend(&raw); + login_queue.extend(&buf); + + // Buf is fully consumed here + buf.clear(); + + // Start occupying client + join::occupy( + client, + client_info, + server.clone(), + inbound, + inbound_history, + login_queue, + ) + .await?; + return Ok(()); } // Show unhandled packet warning From 023e46fe6406221d5360c75c296deb2fe24ae198 Mon Sep 17 00:00:00 2001 From: timvisee Date: Mon, 20 Feb 2023 10:31:04 +0100 Subject: [PATCH 7/8] Allow exit code 143 and 130 Fixes https://github.com/timvisee/lazymc/issues/26 --- src/server.rs | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/src/server.rs b/src/server.rs index 8bc259d..b626eae 100644 --- a/src/server.rs +++ b/src/server.rs @@ -29,9 +29,11 @@ const SERVER_QUIT_COOLDOWN: Duration = Duration::from_millis(2500); #[cfg(feature = "rcon")] const RCON_COOLDOWN: Duration = Duration::from_secs(15); -/// Exit code when SIGTERM is received on Unix. -#[cfg(unix)] -const UNIX_EXIT_SIGTERM: i32 = 130; +/// Exit codes that are allowed. +/// +/// - 143: https://github.com/timvisee/lazymc/issues/26#issuecomment-1435670029 +/// - 130: https://unix.stackexchange.com/q/386836/61092 +const ALLOWED_EXIT_CODES: [i32; 2] = [130, 143]; /// Shared server state. #[derive(Debug)] @@ -496,8 +498,12 @@ pub async fn invoke_server_cmd( debug!(target: "lazymc", "Server process stopped successfully ({})", status); false } - #[cfg(unix)] - Ok(status) if status.code() == Some(UNIX_EXIT_SIGTERM) => { + Ok(status) + if status + .code() + .map(|ref code| ALLOWED_EXIT_CODES.contains(code)) + .unwrap_or(false) => + { debug!(target: "lazymc", "Server process stopped successfully by SIGTERM ({})", status); false } From e54025f02f105930defe9035244d4bac6f05d072 Mon Sep 17 00:00:00 2001 From: timvisee Date: Mon, 20 Feb 2023 10:34:06 +0100 Subject: [PATCH 8/8] Bump version to 0.2.10 --- CHANGELOG.md | 4 ++++ Cargo.lock | 2 +- Cargo.toml | 2 +- res/lazymc.toml | 2 +- 4 files changed, 7 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d93a161..50ce00c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # Changelog +## 0.2.10 (2023-02-20) + +- Do not report an error when server exits with status code 143 + ## 0.2.9 (2023-02-14) - Fix dropping all connections when `server.drop_banned_ips` was enabled diff --git a/Cargo.lock b/Cargo.lock index ddefcaf..b604ac7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -926,7 +926,7 @@ checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" [[package]] name = "lazymc" -version = "0.2.9" +version = "0.2.10" dependencies = [ "anyhow", "async-std", diff --git a/Cargo.toml b/Cargo.toml index 0b66269..20dc5c0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "lazymc" -version = "0.2.9" +version = "0.2.10" authors = ["Tim Visee <3a4fb3964f@sinenomine.email>"] license = "GPL-3.0" readme = "README.md" diff --git a/res/lazymc.toml b/res/lazymc.toml index 0c4b9a6..1706d28 100644 --- a/res/lazymc.toml +++ b/res/lazymc.toml @@ -187,4 +187,4 @@ command = "java -Xmx1G -Xms1G -jar server.jar --nogui" [config] # lazymc version this configuration is for. # Don't change unless you know what you're doing. -version = "0.2.9" +version = "0.2.10"