From 22dd82c04859a3b9510885607eee4fa06a1b443c Mon Sep 17 00:00:00 2001 From: Linfeng Qian Date: Sat, 2 Dec 2023 16:48:30 +0800 Subject: [PATCH] feat: support listen reuse port --- akasa-core/Cargo.toml | 1 + akasa-core/src/config.rs | 3 + akasa-core/src/server/mod.rs | 1 + akasa-core/src/server/rt.rs | 126 ++++++++++++++++++++++++---------- akasa-core/src/tests/utils.rs | 1 + docs/chinese/config.md | 4 ++ docs/english/config.md | 4 ++ 7 files changed, 103 insertions(+), 37 deletions(-) diff --git a/akasa-core/Cargo.toml b/akasa-core/Cargo.toml index dee872e..4208e7a 100644 --- a/akasa-core/Cargo.toml +++ b/akasa-core/Cargo.toml @@ -17,6 +17,7 @@ log = "0.4.17" mqtt-proto = { git = "https://github.com/akasamq/mqtt-proto.git", branch = "master" } num_cpus = "1.14.0" parking_lot = "0.12.1" +cfg-if = "1.0.0" # rhai = { version = "1.11.0", features = ["decimal"] } # jsonwebtoken = "8.2.0" # jwt-simple = "0.11.2" diff --git a/akasa-core/src/config.rs b/akasa-core/src/config.rs index 9620fcf..45e0fab 100644 --- a/akasa-core/src/config.rs +++ b/akasa-core/src/config.rs @@ -67,6 +67,7 @@ pub struct Listeners { #[derive(Serialize, Deserialize, Clone, Debug, Eq, PartialEq)] pub struct Listener { pub addr: SocketAddr, + pub reuse_port: bool, /// The proxy protocol v2 mode pub proxy_mode: Option, } @@ -74,6 +75,7 @@ pub struct Listener { #[derive(Serialize, Deserialize, Clone, Debug, Eq, PartialEq)] pub struct TlsListener { pub addr: SocketAddr, + pub reuse_port: bool, /// Enable proxy protocol v2 or not pub proxy: bool, /// This CA file is for verify client certificate, if `verify_peer` is true @@ -99,6 +101,7 @@ impl Default for Listeners { mqtt: Some(Listener { addr: (Ipv4Addr::LOCALHOST, 1883).into(), proxy_mode: None, + reuse_port: true, }), mqtts: None, ws: None, diff --git a/akasa-core/src/server/mod.rs b/akasa-core/src/server/mod.rs index fb4f8c1..8c35c42 100644 --- a/akasa-core/src/server/mod.rs +++ b/akasa-core/src/server/mod.rs @@ -250,6 +250,7 @@ fn build_tls_context(listener: &TlsListener) -> io::Result { #[derive(Clone)] pub struct ConnectionArgs { pub(crate) addr: SocketAddr, + pub(crate) reuse_port: bool, pub(crate) proxy: bool, pub(crate) proxy_tls_termination: bool, pub(crate) websocket: bool, diff --git a/akasa-core/src/server/rt.rs b/akasa-core/src/server/rt.rs index bdb378f..c1ab897 100644 --- a/akasa-core/src/server/rt.rs +++ b/akasa-core/src/server/rt.rs @@ -2,7 +2,7 @@ use std::io; use std::sync::Arc; use std::time::Duration; -use tokio::{net::TcpListener, runtime::Runtime}; +use tokio::{net::TcpSocket, runtime::Runtime}; use super::{build_tls_context, handle_accept, ConnectionArgs}; use crate::config::{Listener, ProxyMode, TlsListener}; @@ -36,64 +36,100 @@ where }) .transpose()?; + cfg_if::cfg_if! { + if #[cfg(all(unix, not(target_os = "solaris"), not(target_os = "illumos")))] { + let reuse_port_available = true; + } else { + let reuse_port_available = false; + } + } + rt.block_on(async move { let listeners = &global.config.listeners; let tasks: Vec<_> = [ - listeners - .mqtt - .as_ref() - .map(|Listener { addr, proxy_mode }| ConnectionArgs { + listeners.mqtt.as_ref().map( + |Listener { + addr, + reuse_port, + proxy_mode, + }| ConnectionArgs { addr: *addr, + reuse_port: *reuse_port, proxy: proxy_mode.is_some(), proxy_tls_termination: *proxy_mode == Some(ProxyMode::TlsTermination), websocket: false, tls_acceptor: None, - }), - listeners - .mqtts - .as_ref() - .map(|TlsListener { addr, proxy, .. }| ConnectionArgs { + }, + ), + listeners.mqtts.as_ref().map( + |TlsListener { + addr, + reuse_port, + proxy, + .. + }| ConnectionArgs { addr: *addr, + reuse_port: *reuse_port, proxy: *proxy, proxy_tls_termination: false, websocket: false, tls_acceptor: mqtts_tls_acceptor.map(Into::into), - }), - listeners - .ws - .as_ref() - .map(|Listener { addr, proxy_mode }| ConnectionArgs { + }, + ), + listeners.ws.as_ref().map( + |Listener { + addr, + reuse_port, + proxy_mode, + }| ConnectionArgs { addr: *addr, + reuse_port: *reuse_port, proxy: proxy_mode.is_some(), proxy_tls_termination: *proxy_mode == Some(ProxyMode::TlsTermination), websocket: true, tls_acceptor: None, - }), - listeners - .wss - .as_ref() - .map(|TlsListener { addr, proxy, .. }| ConnectionArgs { + }, + ), + listeners.wss.as_ref().map( + |TlsListener { + addr, + reuse_port, + proxy, + .. + }| ConnectionArgs { addr: *addr, + reuse_port: *reuse_port, proxy: *proxy, proxy_tls_termination: false, websocket: true, tls_acceptor: wss_tls_acceptor.map(Into::into), - }), + }, + ), ] .into_iter() .flatten() - .map(|conn_args| { + .flat_map(|conn_args| { + let reuse_port = reuse_port_available && conn_args.reuse_port; let global = Arc::clone(&global); let hook_handler = hook_handler.clone(); - tokio::spawn(async move { - loop { - let global = Arc::clone(&global); - let hook_handler = hook_handler.clone(); - if let Err(err) = listen(conn_args.clone(), hook_handler, global).await { - log::error!("Listen error: {:?}", err); - tokio::time::sleep(Duration::from_secs(1)).await; + let conn_args = conn_args.clone(); + let n = if reuse_port { 4 } else { 1 }; + (0..n).map(move |_| { + let global = Arc::clone(&global); + let hook_handler = hook_handler.clone(); + let conn_args = conn_args.clone(); + tokio::spawn(async move { + loop { + let global = Arc::clone(&global); + let hook_handler = hook_handler.clone(); + if let Err(err) = + listen(conn_args.clone(), reuse_port, hook_handler, global).await + { + log::error!("Listen error: {:?}", err); + tokio::time::sleep(Duration::from_secs(1)).await; + } } - } + }) }) }) .collect(); @@ -110,23 +146,39 @@ where async fn listen( conn_args: ConnectionArgs, + reuse_port: bool, hook_handler: H, global: Arc, ) -> io::Result<()> { let addr = conn_args.addr; - let listener = TcpListener::bind(addr).await?; + let socket = if addr.is_ipv4() { + TcpSocket::new_v4()? + } else { + TcpSocket::new_v6()? + }; + socket.set_reuseaddr(true)?; + if reuse_port { + socket.set_reuseport(true)?; + } + socket.bind(addr)?; + let listener = socket.listen(1024)?; + let listen_type = match (conn_args.websocket, conn_args.tls_acceptor.is_some()) { (false, false) => "mqtt", (false, true) => "mqtts", (true, false) => "ws", (true, true) => "wss", }; - let listen_type = if conn_args.proxy { - format!("{listen_type}(proxy)") - } else { - listen_type.to_owned() - }; - log::info!("Listen {listen_type}@{addr} success!"); + let labels = [(conn_args.proxy, "proxy"), (reuse_port, "reuseport")] + .into_iter() + .filter(|(flag, _)| *flag) + .map(|(_, text)| text) + .collect::>(); + log::info!( + "Listen {listen_type}@{addr} ({}) success!", + labels.join(",") + ); + loop { let (conn, peer) = listener.accept().await?; log::debug!("{} connected", peer,); diff --git a/akasa-core/src/tests/utils.rs b/akasa-core/src/tests/utils.rs index 5fca219..3ce5114 100644 --- a/akasa-core/src/tests/utils.rs +++ b/akasa-core/src/tests/utils.rs @@ -106,6 +106,7 @@ impl MockConnControl { let hook_handler = TestHook; let conn_args = ConnectionArgs { addr: conn.bind, + reuse_port: false, proxy: false, proxy_tls_termination: false, websocket: false, diff --git a/docs/chinese/config.md b/docs/chinese/config.md index 6a5d8da..c45cee0 100644 --- a/docs/chinese/config.md +++ b/docs/chinese/config.md @@ -8,6 +8,8 @@ listeners: mqtt: # 绑定的 Socket 地址 addr: 127.0.0.1:1883 + # 允许多个 listener 绑定在同一个端口上,可以增加建立链接的速度 + reuse_port: false # (可选) proxy protocol 模式, 可能的选项: # null : 不启用 proxy protocol # Normal : 客户端非 TLS, 服务端非 TLS @@ -17,6 +19,8 @@ listeners: mqtts: # 绑定的 Socket 地址 addr: 127.0.0.1:8883 + # 允许多个 listener 绑定在同一个端口上,可以增加建立链接的速度 + reuse_port: false # 是否开启 proxy protocol v2 proxy: false # 用来认证客户端的 CA 文件, 如果 `verify_peer` 是 true 这个字段必须填上 diff --git a/docs/english/config.md b/docs/english/config.md index 2724dd7..b20e2ab 100644 --- a/docs/english/config.md +++ b/docs/english/config.md @@ -8,6 +8,8 @@ listeners: mqtt: # The socket address to bind addr: 127.0.0.1:1883 + # Allows the socket to bind to an in-use port to increase connection accept speed + reuse_port: false # (optional) proxy protocol mode, can be: # null : Disable proxy protocol # Normal : Client side non-TLS, server side non-TLS @@ -17,6 +19,8 @@ listeners: mqtts: # The socket address to bind addr: 127.0.0.1:8883 + # Allows the socket to bind to an in-use port to increase connection accept speed + reuse_port: false # Enable proxy protocol v2 or not proxy: false # This CA file is for verify client certificate, if `verify_peer` is true this field MUST be presented.