From 60d0576b2677a92c227ebb90e0f6a28cfb951fe2 Mon Sep 17 00:00:00 2001 From: zonyitoo Date: Fri, 20 Sep 2024 00:56:13 +0800 Subject: [PATCH] feat(shadowsocks): DatagramTransport & DatagramTransportExt ProxySocket and MonProxySocket accepts socket as a generic type that implements DatagramTransport. --- .../src/local/dns/upstream.rs | 2 +- .../src/local/net/udp/association.rs | 4 +- .../shadowsocks-service/src/net/mon_socket.rs | 40 ++-- .../src/server/udprelay.rs | 19 +- .../shadowsocks/src/relay/udprelay/compat.rs | 200 ++++++++++++++---- crates/shadowsocks/src/relay/udprelay/mod.rs | 2 +- .../src/relay/udprelay/proxy_socket.rs | 121 +++++------ crates/shadowsocks/tests/udp.rs | 3 +- 8 files changed, 246 insertions(+), 145 deletions(-) diff --git a/crates/shadowsocks-service/src/local/dns/upstream.rs b/crates/shadowsocks-service/src/local/dns/upstream.rs index 161f5d62cbe3..981f22e94404 100644 --- a/crates/shadowsocks-service/src/local/dns/upstream.rs +++ b/crates/shadowsocks-service/src/local/dns/upstream.rs @@ -61,7 +61,7 @@ pub enum DnsClient { stream: ProxyClientStream>, }, UdpRemote { - socket: MonProxySocket, + socket: MonProxySocket, ns: Address, control: UdpSocketControlData, server_windows: LruCache, diff --git a/crates/shadowsocks-service/src/local/net/udp/association.rs b/crates/shadowsocks-service/src/local/net/udp/association.rs index 210d23213126..5ca4da94c6cb 100644 --- a/crates/shadowsocks-service/src/local/net/udp/association.rs +++ b/crates/shadowsocks-service/src/local/net/udp/association.rs @@ -217,7 +217,7 @@ where peer_addr: SocketAddr, bypassed_ipv4_socket: Option, bypassed_ipv6_socket: Option, - proxied_socket: Option, + proxied_socket: Option>, keepalive_tx: mpsc::Sender, keepalive_flag: bool, balancer: PingBalancer, @@ -409,7 +409,7 @@ where #[inline] async fn receive_from_proxied_opt( - socket: &Option, + socket: &Option>, buf: &mut Vec, ) -> io::Result<(usize, Address, Option)> { match *socket { diff --git a/crates/shadowsocks-service/src/net/mon_socket.rs b/crates/shadowsocks-service/src/net/mon_socket.rs index 8b2bebdfca2f..00da5c7f81c7 100644 --- a/crates/shadowsocks-service/src/net/mon_socket.rs +++ b/crates/shadowsocks-service/src/net/mon_socket.rs @@ -3,24 +3,44 @@ use std::{io, net::SocketAddr, sync::Arc}; use shadowsocks::{ - relay::{socks5::Address, udprelay::options::UdpSocketControlData}, + relay::{ + socks5::Address, + udprelay::{options::UdpSocketControlData, DatagramTransport}, + }, ProxySocket, }; use super::flow::FlowStat; /// Monitored `ProxySocket` -pub struct MonProxySocket { - socket: ProxySocket, +pub struct MonProxySocket { + socket: ProxySocket, flow_stat: Arc, } -impl MonProxySocket { +impl MonProxySocket { /// Create a new socket with flow monitor - pub fn from_socket(socket: ProxySocket, flow_stat: Arc) -> MonProxySocket { + pub fn from_socket(socket: ProxySocket, flow_stat: Arc) -> MonProxySocket { MonProxySocket { socket, flow_stat } } + /// Get the underlying `ProxySocket` immutable reference + #[inline] + pub fn get_ref(&self) -> &ProxySocket { + &self.socket + } + + /// Get the flow statistic data + #[inline] + pub fn flow_stat(&self) -> &FlowStat { + &self.flow_stat + } +} + +impl MonProxySocket +where + S: DatagramTransport, +{ /// Send a UDP packet to addr through proxy #[inline] pub async fn send(&self, addr: &Address, payload: &[u8]) -> io::Result<()> { @@ -125,14 +145,4 @@ impl MonProxySocket { Ok((n, peer_addr, addr, control)) } - - #[inline] - pub fn get_ref(&self) -> &ProxySocket { - &self.socket - } - - #[inline] - pub fn flow_stat(&self) -> &FlowStat { - &self.flow_stat - } } diff --git a/crates/shadowsocks-service/src/server/udprelay.rs b/crates/shadowsocks-service/src/server/udprelay.rs index 32ec296468a4..5d64343b3f70 100644 --- a/crates/shadowsocks-service/src/server/udprelay.rs +++ b/crates/shadowsocks-service/src/server/udprelay.rs @@ -17,7 +17,10 @@ use shadowsocks::{ config::ServerUser, crypto::CipherCategory, lookup_then, - net::{get_ip_stack_capabilities, AcceptOpts, AddrFamily, UdpSocket as OutboundUdpSocket}, + net::{ + get_ip_stack_capabilities, AcceptOpts, AddrFamily, UdpSocket as OutboundUdpSocket, + UdpSocket as InboundUdpSocket, + }, relay::{ socks5::Address, udprelay::{options::UdpSocketControlData, ProxySocket, MAXIMUM_UDP_PAYLOAD_SIZE}, @@ -87,7 +90,7 @@ pub struct UdpServer { keepalive_tx: mpsc::Sender, keepalive_rx: mpsc::Receiver, time_to_live: Duration, - listener: Arc, + listener: Arc>, svr_cfg: ServerConfig, } @@ -276,7 +279,7 @@ impl UdpServer { async fn recv_one_packet( context: &ServiceContext, - l: &MonProxySocket, + l: &MonProxySocket, buffer: &mut [u8], ) -> Option<(usize, SocketAddr, Address, Option)> { let (n, peer_addr, target_addr, control) = match l.recv_from_with_ctrl(buffer).await { @@ -316,7 +319,7 @@ impl UdpServer { async fn send_packet( &mut self, - listener: &Arc, + listener: &Arc>, peer_addr: SocketAddr, target_addr: Address, control: Option, @@ -394,7 +397,7 @@ impl Drop for UdpAssociation { impl UdpAssociation { fn new_association( context: Arc, - inbound: Arc, + inbound: Arc>, peer_addr: SocketAddr, keepalive_tx: mpsc::Sender, ) -> UdpAssociation { @@ -405,7 +408,7 @@ impl UdpAssociation { #[cfg(feature = "aead-cipher-2022")] fn new_session( context: Arc, - inbound: Arc, + inbound: Arc>, peer_addr: SocketAddr, keepalive_tx: mpsc::Sender, client_session_id: u64, @@ -447,7 +450,7 @@ struct UdpAssociationContext { outbound_ipv6_socket: Option, keepalive_tx: mpsc::Sender, keepalive_flag: bool, - inbound: Arc, + inbound: Arc>, // AEAD 2022 client_session: Option, server_session_id: u64, @@ -472,7 +475,7 @@ fn generate_server_session_id() -> u64 { impl UdpAssociationContext { fn create( context: Arc, - inbound: Arc, + inbound: Arc>, peer_addr: SocketAddr, keepalive_tx: mpsc::Sender, client_session_id: Option, diff --git a/crates/shadowsocks/src/relay/udprelay/compat.rs b/crates/shadowsocks/src/relay/udprelay/compat.rs index 7245ae0e31ba..4e5a2736c3bb 100644 --- a/crates/shadowsocks/src/relay/udprelay/compat.rs +++ b/crates/shadowsocks/src/relay/udprelay/compat.rs @@ -1,88 +1,194 @@ -use async_trait::async_trait; use std::{ - io::Result, + future::Future, + io, net::SocketAddr, ops::Deref, + pin::Pin, task::{Context, Poll}, }; + +use futures::ready; +use pin_project::pin_project; use tokio::io::ReadBuf; use crate::net::UdpSocket; /// a trait for datagram transport that wraps around a tokio `UdpSocket` -#[async_trait] -pub trait DatagramTransport: Send + Sync + std::fmt::Debug { - async fn recv(&self, buf: &mut [u8]) -> Result; - async fn recv_from(&self, buf: &mut [u8]) -> Result<(usize, SocketAddr)>; +pub trait DatagramTransport { + /// Local binded address + fn local_addr(&self) -> io::Result; + + /// `recv` data into `buf` + fn poll_recv(&self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll>; + /// `recv` data into `buf` with source address + fn poll_recv_from(&self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll>; + /// Check if the underlying I/O object is ready for `recv` + fn poll_recv_ready(&self, cx: &mut Context<'_>) -> Poll>; + + /// `send` data with `buf`, returning the sent bytes + fn poll_send(&self, cx: &mut Context<'_>, buf: &[u8]) -> Poll>; + /// `send` data with `buf` to `target`, returning the sent bytes + fn poll_send_to(&self, cx: &mut Context<'_>, buf: &[u8], target: SocketAddr) -> Poll>; + /// Check if the underlying I/O object is ready for `send` + fn poll_send_ready(&self, cx: &mut Context<'_>) -> Poll>; +} - async fn send(&self, buf: &[u8]) -> Result; - async fn send_to(&self, buf: &[u8], target: SocketAddr) -> Result; +impl DatagramTransport for UdpSocket { + fn poll_recv(&self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll> { + UdpSocket::poll_recv(self, cx, buf) + } - fn poll_recv(&self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll>; - fn poll_recv_from(&self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll>; - fn poll_recv_ready(&self, cx: &mut Context<'_>) -> Poll>; + fn poll_recv_from(&self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll> { + UdpSocket::poll_recv_from(self, cx, buf) + } - fn poll_send(&self, cx: &mut Context<'_>, buf: &[u8]) -> Poll>; - fn poll_send_to(&self, cx: &mut Context<'_>, buf: &[u8], target: SocketAddr) -> Poll>; - fn poll_send_ready(&self, cx: &mut Context<'_>) -> Poll>; + fn poll_recv_ready(&self, cx: &mut Context<'_>) -> Poll> { + self.deref().poll_recv_ready(cx) + } - fn local_addr(&self) -> Result; + fn poll_send(&self, cx: &mut Context<'_>, buf: &[u8]) -> Poll> { + UdpSocket::poll_send(self, cx, buf) + } - #[cfg(unix)] - fn as_raw_fd(&self) -> std::os::fd::RawFd; -} + fn poll_send_to(&self, cx: &mut Context<'_>, buf: &[u8], target: SocketAddr) -> Poll> { + UdpSocket::poll_send_to(self, cx, buf, target) + } -#[async_trait] -impl DatagramTransport for UdpSocket { - async fn recv(&self, buf: &mut [u8]) -> Result { - UdpSocket::recv(self, buf).await + fn poll_send_ready(&self, cx: &mut Context<'_>) -> Poll> { + self.deref().poll_send_ready(cx) } - async fn recv_from(&self, buf: &mut [u8]) -> Result<(usize, SocketAddr)> { - UdpSocket::recv_from(self, buf).await + fn local_addr(&self) -> io::Result { + self.deref().local_addr() } +} + +/// Future for `recv` +#[pin_project] +pub struct RecvFut<'a, S: DatagramTransport + ?Sized> { + #[pin] + io: &'a S, + buf: &'a mut [u8], +} + +impl<'a, S: DatagramTransport + ?Sized> Future for RecvFut<'a, S> { + type Output = io::Result; - async fn send(&self, buf: &[u8]) -> Result { - UdpSocket::send(self, buf).await + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + + let mut read_buf = ReadBuf::new(this.buf); + ready!(this.io.poll_recv(cx, &mut read_buf))?; + Ok(read_buf.filled().len()).into() } +} + +/// Future for `recv_from` +#[pin_project] +pub struct RecvFromFut<'a, S: DatagramTransport + ?Sized> { + #[pin] + io: &'a S, + buf: &'a mut [u8], +} - async fn send_to(&self, buf: &[u8], target: SocketAddr) -> Result { - UdpSocket::send_to(self, buf, target).await +impl<'a, S: DatagramTransport + ?Sized> Future for RecvFromFut<'a, S> { + type Output = io::Result<(usize, SocketAddr)>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + + let mut read_buf = ReadBuf::new(this.buf); + let src_addr = ready!(this.io.poll_recv_from(cx, &mut read_buf))?; + Ok((read_buf.filled().len(), src_addr)).into() } +} - fn poll_recv(&self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll> { - UdpSocket::poll_recv(self, cx, buf) +/// Future for `recv_ready` +pub struct RecvReadyFut<'a, S: DatagramTransport + ?Sized> { + io: &'a S, +} + +impl<'a, S: DatagramTransport + ?Sized> Future for RecvReadyFut<'a, S> { + type Output = io::Result<()>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.io.poll_recv_ready(cx) } +} - fn poll_recv_from(&self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll> { - UdpSocket::poll_recv_from(self, cx, buf) +/// Future for `send` +pub struct SendFut<'a, S: DatagramTransport + ?Sized> { + io: &'a S, + buf: &'a [u8], +} + +impl<'a, S: DatagramTransport + ?Sized> Future for SendFut<'a, S> { + type Output = io::Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.io.poll_send(cx, self.buf) } +} - fn poll_recv_ready(&self, cx: &mut Context<'_>) -> Poll> { - self.deref().poll_recv_ready(cx) +/// Future for `send_to` +pub struct SendToFut<'a, S: DatagramTransport + ?Sized> { + io: &'a S, + target: SocketAddr, + buf: &'a [u8], +} + +impl<'a, S: DatagramTransport + ?Sized> Future for SendToFut<'a, S> { + type Output = io::Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.io.poll_send_to(cx, self.buf, self.target) } +} - fn poll_send(&self, cx: &mut Context<'_>, buf: &[u8]) -> Poll> { - UdpSocket::poll_send(self, cx, buf) +/// Future for `recv_ready` +pub struct SendReadyFut<'a, S: DatagramTransport + ?Sized> { + io: &'a S, +} + +impl<'a, S: DatagramTransport + ?Sized> Future for SendReadyFut<'a, S> { + type Output = io::Result<()>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.io.poll_recv_ready(cx) } +} - fn poll_send_to(&self, cx: &mut Context<'_>, buf: &[u8], target: SocketAddr) -> Poll> { - UdpSocket::poll_send_to(self, cx, buf, target) +/// Extension methods for `DatagramTransport` +pub trait DatagramTransportExt: DatagramTransport { + /// Async method for `poll_recv` + fn recv<'a, 'b>(&'a self, buf: &'a mut [u8]) -> RecvFut<'a, Self> { + RecvFut { io: self, buf } } - fn poll_send_ready(&self, cx: &mut Context<'_>) -> Poll> { - self.deref().poll_send_ready(cx) + /// Async method for `poll_recv_from` + fn recv_from<'a, 'b>(&'a self, buf: &'a mut [u8]) -> RecvFromFut<'a, Self> { + RecvFromFut { io: self, buf } } - fn local_addr(&self) -> Result { - self.deref().local_addr() + /// Async method for `poll_recv_ready` + fn recv_ready<'a>(&'a self) -> RecvReadyFut<'a, Self> { + RecvReadyFut { io: self } + } + + /// Async method for `poll_send` + fn send<'a>(&'a self, buf: &'a [u8]) -> SendFut<'a, Self> { + SendFut { io: self, buf } } - #[cfg(unix)] - fn as_raw_fd(&self) -> std::os::fd::RawFd { - use std::ops::Deref; - use std::os::fd::AsRawFd; + /// Async method for `poll_send_to` + fn send_to<'a>(&'a self, buf: &'a [u8], target: SocketAddr) -> SendToFut<'a, Self> { + SendToFut { io: self, target, buf } + } - self.deref().as_raw_fd() + /// Async method for `poll_send_ready` + fn send_ready<'a>(&'a self) -> SendReadyFut<'a, Self> { + SendReadyFut { io: self } } } + +impl DatagramTransportExt for S {} diff --git a/crates/shadowsocks/src/relay/udprelay/mod.rs b/crates/shadowsocks/src/relay/udprelay/mod.rs index c445d77f29e8..85d6eeafe365 100644 --- a/crates/shadowsocks/src/relay/udprelay/mod.rs +++ b/crates/shadowsocks/src/relay/udprelay/mod.rs @@ -50,7 +50,7 @@ use std::time::Duration; pub use self::proxy_socket::ProxySocket; -pub use compat::DatagramTransport; +pub use compat::{DatagramTransport, DatagramTransportExt}; mod aead; #[cfg(feature = "aead-cipher-2022")] diff --git a/crates/shadowsocks/src/relay/udprelay/proxy_socket.rs b/crates/shadowsocks/src/relay/udprelay/proxy_socket.rs index 431719713a0d..55b31fbd3351 100644 --- a/crates/shadowsocks/src/relay/udprelay/proxy_socket.rs +++ b/crates/shadowsocks/src/relay/udprelay/proxy_socket.rs @@ -23,7 +23,7 @@ use crate::{ }; use super::{ - compat::DatagramTransport, + compat::{DatagramTransport, DatagramTransportExt}, crypto_io::{ decrypt_client_payload, decrypt_server_payload, encrypt_client_payload, encrypt_server_payload, ProtocolError, ProtocolResult, @@ -73,9 +73,9 @@ pub type ProxySocketResult = Result; /// UDP client for communicating with ShadowSocks' server #[derive(Debug)] -pub struct ProxySocket { +pub struct ProxySocket { socket_type: UdpSocketType, - io: Box, + io: S, method: CipherKind, key: Box<[u8]>, send_timeout: Option, @@ -85,9 +85,12 @@ pub struct ProxySocket { user_manager: Option>, } -impl ProxySocket { +impl ProxySocket { /// Create a client to communicate with Shadowsocks' UDP server (outbound) - pub async fn connect(context: SharedContext, svr_cfg: &ServerConfig) -> ProxySocketResult { + pub async fn connect( + context: SharedContext, + svr_cfg: &ServerConfig, + ) -> ProxySocketResult> { ProxySocket::connect_with_opts(context, svr_cfg, &DEFAULT_CONNECT_OPTS) .await .map_err(Into::into) @@ -98,7 +101,7 @@ impl ProxySocket { context: SharedContext, svr_cfg: &ServerConfig, opts: &ConnectOpts, - ) -> ProxySocketResult { + ) -> ProxySocketResult> { // Note: Plugins doesn't support UDP relay let socket = ShadowUdpSocket::connect_server_with_opts(&context, svr_cfg.udp_external_addr(), opts).await?; @@ -118,53 +121,59 @@ impl ProxySocket { )) } - /// Create a `ProxySocket` from a `UdpSocket` - pub fn from_socket( - socket_type: UdpSocketType, + /// Create a `ProxySocket` binding to a specific address (inbound) + pub async fn bind( context: SharedContext, svr_cfg: &ServerConfig, - socket: S, - ) -> ProxySocket - where - S: Into, - { - let key = svr_cfg.key().to_vec().into_boxed_slice(); - let method = svr_cfg.method(); + ) -> ProxySocketResult> { + ProxySocket::bind_with_opts(context, svr_cfg, AcceptOpts::default()) + .await + .map_err(Into::into) + } - // NOTE: svr_cfg.timeout() is not for this socket, but for associations. - ProxySocket { - socket_type, - io: Box::new(socket.into()), - method, - key, - send_timeout: None, - recv_timeout: None, + /// Create a `ProxySocket` binding to a specific address (inbound) + pub async fn bind_with_opts( + context: SharedContext, + svr_cfg: &ServerConfig, + opts: AcceptOpts, + ) -> ProxySocketResult> { + // Plugins doesn't support UDP + let socket = match svr_cfg.udp_external_addr() { + ServerAddr::SocketAddr(sa) => ShadowUdpSocket::listen_with_opts(sa, opts).await?, + ServerAddr::DomainName(domain, port) => { + lookup_then!(&context, domain, *port, |addr| { + ShadowUdpSocket::listen_with_opts(&addr, opts.clone()).await + })? + .1 + } + }; + Ok(ProxySocket::from_socket( + UdpSocketType::Server, context, - identity_keys: match socket_type { - UdpSocketType::Client => svr_cfg.clone_identity_keys(), - UdpSocketType::Server => Arc::new(Vec::new()), - }, - user_manager: match socket_type { - UdpSocketType::Client => None, - UdpSocketType::Server => svr_cfg.clone_user_manager(), - }, - } + svr_cfg, + socket, + )) } +} - pub fn from_io( +impl ProxySocket +where + S: DatagramTransport, +{ + /// Create a `ProxySocket` from a I/O object that impls `DatagramTransport` + pub fn from_socket( socket_type: UdpSocketType, context: SharedContext, svr_cfg: &ServerConfig, - io: Box, - ) -> ProxySocket { + socket: S, + ) -> ProxySocket { let key = svr_cfg.key().to_vec().into_boxed_slice(); let method = svr_cfg.method(); // NOTE: svr_cfg.timeout() is not for this socket, but for associations. - ProxySocket { socket_type, - io, + io: socket, method, key, send_timeout: None, @@ -181,37 +190,6 @@ impl ProxySocket { } } - /// Create a `ProxySocket` binding to a specific address (inbound) - pub async fn bind(context: SharedContext, svr_cfg: &ServerConfig) -> ProxySocketResult { - ProxySocket::bind_with_opts(context, svr_cfg, AcceptOpts::default()) - .await - .map_err(Into::into) - } - - /// Create a `ProxySocket` binding to a specific address (inbound) - pub async fn bind_with_opts( - context: SharedContext, - svr_cfg: &ServerConfig, - opts: AcceptOpts, - ) -> ProxySocketResult { - // Plugins doesn't support UDP - let socket = match svr_cfg.udp_external_addr() { - ServerAddr::SocketAddr(sa) => ShadowUdpSocket::listen_with_opts(sa, opts).await?, - ServerAddr::DomainName(domain, port) => { - lookup_then!(&context, domain, *port, |addr| { - ShadowUdpSocket::listen_with_opts(&addr, opts.clone()).await - })? - .1 - } - }; - Ok(ProxySocket::from_socket( - UdpSocketType::Server, - context, - svr_cfg, - socket, - )) - } - fn encrypt_send_buffer( &self, addr: &Address, @@ -627,7 +605,10 @@ impl ProxySocket { } #[cfg(unix)] -impl AsRawFd for ProxySocket { +impl AsRawFd for ProxySocket +where + S: AsRawFd, +{ /// Retrieve raw fd of the outbound socket fn as_raw_fd(&self) -> RawFd { self.io.as_raw_fd() diff --git a/crates/shadowsocks/tests/udp.rs b/crates/shadowsocks/tests/udp.rs index 1895215d5f39..732af6bc44d2 100644 --- a/crates/shadowsocks/tests/udp.rs +++ b/crates/shadowsocks/tests/udp.rs @@ -8,6 +8,7 @@ use shadowsocks::{ config::{ServerConfig, ServerType}, context::{Context, SharedContext}, crypto::CipherKind, + net::UdpSocket as ShadowUdpSocket, relay::{socks5::Address, udprelay::ProxySocket}, }; @@ -15,7 +16,7 @@ async fn handle_udp_server_client( peer_addr: SocketAddr, remote_addr: Address, payload: &[u8], - socket: &ProxySocket, + socket: &ProxySocket, ) -> io::Result<()> { let remote_socket = UdpSocket::bind("0.0.0.0:0").await?;