Skip to content

Commit

Permalink
feat: support listen reuse port
Browse files Browse the repository at this point in the history
  • Loading branch information
TheWaWaR committed Dec 2, 2023
1 parent 7883ac1 commit 22dd82c
Show file tree
Hide file tree
Showing 7 changed files with 103 additions and 37 deletions.
1 change: 1 addition & 0 deletions akasa-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ log = "0.4.17"
mqtt-proto = { git = "https://github.com/akasamq/mqtt-proto.git", branch = "master" }
num_cpus = "1.14.0"
parking_lot = "0.12.1"
cfg-if = "1.0.0"
# rhai = { version = "1.11.0", features = ["decimal"] }
# jsonwebtoken = "8.2.0"
# jwt-simple = "0.11.2"
Expand Down
3 changes: 3 additions & 0 deletions akasa-core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,15 @@ pub struct Listeners {
#[derive(Serialize, Deserialize, Clone, Debug, Eq, PartialEq)]
pub struct Listener {
pub addr: SocketAddr,
pub reuse_port: bool,
/// The proxy protocol v2 mode
pub proxy_mode: Option<ProxyMode>,
}

#[derive(Serialize, Deserialize, Clone, Debug, Eq, PartialEq)]
pub struct TlsListener {
pub addr: SocketAddr,
pub reuse_port: bool,
/// Enable proxy protocol v2 or not
pub proxy: bool,
/// This CA file is for verify client certificate, if `verify_peer` is true
Expand All @@ -99,6 +101,7 @@ impl Default for Listeners {
mqtt: Some(Listener {
addr: (Ipv4Addr::LOCALHOST, 1883).into(),
proxy_mode: None,
reuse_port: true,
}),
mqtts: None,
ws: None,
Expand Down
1 change: 1 addition & 0 deletions akasa-core/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ fn build_tls_context(listener: &TlsListener) -> io::Result<SslAcceptor> {
#[derive(Clone)]
pub struct ConnectionArgs {
pub(crate) addr: SocketAddr,
pub(crate) reuse_port: bool,
pub(crate) proxy: bool,
pub(crate) proxy_tls_termination: bool,
pub(crate) websocket: bool,
Expand Down
126 changes: 89 additions & 37 deletions akasa-core/src/server/rt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::io;
use std::sync::Arc;
use std::time::Duration;

use tokio::{net::TcpListener, runtime::Runtime};
use tokio::{net::TcpSocket, runtime::Runtime};

use super::{build_tls_context, handle_accept, ConnectionArgs};
use crate::config::{Listener, ProxyMode, TlsListener};
Expand Down Expand Up @@ -36,64 +36,100 @@ where
})
.transpose()?;

cfg_if::cfg_if! {
if #[cfg(all(unix, not(target_os = "solaris"), not(target_os = "illumos")))] {
let reuse_port_available = true;
} else {
let reuse_port_available = false;
}
}

rt.block_on(async move {
let listeners = &global.config.listeners;
let tasks: Vec<_> = [
listeners
.mqtt
.as_ref()
.map(|Listener { addr, proxy_mode }| ConnectionArgs {
listeners.mqtt.as_ref().map(
|Listener {
addr,
reuse_port,
proxy_mode,
}| ConnectionArgs {
addr: *addr,
reuse_port: *reuse_port,
proxy: proxy_mode.is_some(),
proxy_tls_termination: *proxy_mode == Some(ProxyMode::TlsTermination),
websocket: false,
tls_acceptor: None,
}),
listeners
.mqtts
.as_ref()
.map(|TlsListener { addr, proxy, .. }| ConnectionArgs {
},
),
listeners.mqtts.as_ref().map(
|TlsListener {
addr,
reuse_port,
proxy,
..
}| ConnectionArgs {
addr: *addr,
reuse_port: *reuse_port,
proxy: *proxy,
proxy_tls_termination: false,
websocket: false,
tls_acceptor: mqtts_tls_acceptor.map(Into::into),
}),
listeners
.ws
.as_ref()
.map(|Listener { addr, proxy_mode }| ConnectionArgs {
},
),
listeners.ws.as_ref().map(
|Listener {
addr,
reuse_port,
proxy_mode,
}| ConnectionArgs {
addr: *addr,
reuse_port: *reuse_port,
proxy: proxy_mode.is_some(),
proxy_tls_termination: *proxy_mode == Some(ProxyMode::TlsTermination),
websocket: true,
tls_acceptor: None,
}),
listeners
.wss
.as_ref()
.map(|TlsListener { addr, proxy, .. }| ConnectionArgs {
},
),
listeners.wss.as_ref().map(
|TlsListener {
addr,
reuse_port,
proxy,
..
}| ConnectionArgs {
addr: *addr,
reuse_port: *reuse_port,
proxy: *proxy,
proxy_tls_termination: false,
websocket: true,
tls_acceptor: wss_tls_acceptor.map(Into::into),
}),
},
),
]
.into_iter()
.flatten()
.map(|conn_args| {
.flat_map(|conn_args| {
let reuse_port = reuse_port_available && conn_args.reuse_port;
let global = Arc::clone(&global);
let hook_handler = hook_handler.clone();
tokio::spawn(async move {
loop {
let global = Arc::clone(&global);
let hook_handler = hook_handler.clone();
if let Err(err) = listen(conn_args.clone(), hook_handler, global).await {
log::error!("Listen error: {:?}", err);
tokio::time::sleep(Duration::from_secs(1)).await;
let conn_args = conn_args.clone();
let n = if reuse_port { 4 } else { 1 };
(0..n).map(move |_| {
let global = Arc::clone(&global);
let hook_handler = hook_handler.clone();
let conn_args = conn_args.clone();
tokio::spawn(async move {
loop {
let global = Arc::clone(&global);
let hook_handler = hook_handler.clone();
if let Err(err) =
listen(conn_args.clone(), reuse_port, hook_handler, global).await
{
log::error!("Listen error: {:?}", err);
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
}
})
})
})
.collect();
Expand All @@ -110,23 +146,39 @@ where

async fn listen<H: Hook + Clone + Send + Sync + 'static>(
conn_args: ConnectionArgs,
reuse_port: bool,
hook_handler: H,
global: Arc<GlobalState>,
) -> io::Result<()> {
let addr = conn_args.addr;
let listener = TcpListener::bind(addr).await?;
let socket = if addr.is_ipv4() {
TcpSocket::new_v4()?
} else {
TcpSocket::new_v6()?
};
socket.set_reuseaddr(true)?;
if reuse_port {
socket.set_reuseport(true)?;
}
socket.bind(addr)?;
let listener = socket.listen(1024)?;

let listen_type = match (conn_args.websocket, conn_args.tls_acceptor.is_some()) {
(false, false) => "mqtt",
(false, true) => "mqtts",
(true, false) => "ws",
(true, true) => "wss",
};
let listen_type = if conn_args.proxy {
format!("{listen_type}(proxy)")
} else {
listen_type.to_owned()
};
log::info!("Listen {listen_type}@{addr} success!");
let labels = [(conn_args.proxy, "proxy"), (reuse_port, "reuseport")]
.into_iter()
.filter(|(flag, _)| *flag)
.map(|(_, text)| text)
.collect::<Vec<_>>();
log::info!(
"Listen {listen_type}@{addr} ({}) success!",
labels.join(",")
);

loop {
let (conn, peer) = listener.accept().await?;
log::debug!("{} connected", peer,);
Expand Down
1 change: 1 addition & 0 deletions akasa-core/src/tests/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ impl MockConnControl {
let hook_handler = TestHook;
let conn_args = ConnectionArgs {
addr: conn.bind,
reuse_port: false,
proxy: false,
proxy_tls_termination: false,
websocket: false,
Expand Down
4 changes: 4 additions & 0 deletions docs/chinese/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ listeners:
mqtt:
# 绑定的 Socket 地址
addr: 127.0.0.1:1883
# 允许多个 listener 绑定在同一个端口上,可以增加建立链接的速度
reuse_port: false
# (可选) proxy protocol 模式, 可能的选项:
# null : 不启用 proxy protocol
# Normal : 客户端非 TLS, 服务端非 TLS
Expand All @@ -17,6 +19,8 @@ listeners:
mqtts:
# 绑定的 Socket 地址
addr: 127.0.0.1:8883
# 允许多个 listener 绑定在同一个端口上,可以增加建立链接的速度
reuse_port: false
# 是否开启 proxy protocol v2
proxy: false
# 用来认证客户端的 CA 文件, 如果 `verify_peer` 是 true 这个字段必须填上
Expand Down
4 changes: 4 additions & 0 deletions docs/english/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ listeners:
mqtt:
# The socket address to bind
addr: 127.0.0.1:1883
# Allows the socket to bind to an in-use port to increase connection accept speed
reuse_port: false
# (optional) proxy protocol mode, can be:
# null : Disable proxy protocol
# Normal : Client side non-TLS, server side non-TLS
Expand All @@ -17,6 +19,8 @@ listeners:
mqtts:
# The socket address to bind
addr: 127.0.0.1:8883
# Allows the socket to bind to an in-use port to increase connection accept speed
reuse_port: false
# Enable proxy protocol v2 or not
proxy: false
# This CA file is for verify client certificate, if `verify_peer` is true this field MUST be presented.
Expand Down

0 comments on commit 22dd82c

Please sign in to comment.