diff --git a/client/src/console/parser.rs b/client/src/console/parser.rs index e039546..daff67d 100644 --- a/client/src/console/parser.rs +++ b/client/src/console/parser.rs @@ -1,5 +1,6 @@ use serde::Serialize; use std::collections::HashMap; +use tracing::info; use anyhow::Result; @@ -38,7 +39,12 @@ pub struct ConsoleResponse { pub fn parse_http_resonse(id: String, data: Vec) -> Result { let mut headers = [httparse::EMPTY_HEADER; 64]; let mut res = httparse::Response::new(&mut headers); - let offset = res.parse(&data)?.unwrap(); // assuming that the response is complete + let status = res.parse(&data)?; // assuming that the response is complete + // if status.is_partial() { + // info!("is partial: _> {:?}", str_from_u8_nul_utf8(&data)); + // return Err(anyhow::anyhow!("is partial")); + // } + let offset = status.unwrap(); let headers: HashMap> = res .headers .iter() @@ -59,7 +65,13 @@ pub fn parse_http_resonse(id: String, data: Vec) -> Result pub fn parse_http_request(id: String, data: Vec) -> Result { let mut headers = [httparse::EMPTY_HEADER; 64]; let mut req = httparse::Request::new(&mut headers); - let offset = req.parse(&data)?.unwrap(); // assuming that the response is complete + + let status = req.parse(&data)?; // assuming that the response is complete + if status.is_partial() { + info!("is partial: _> {:?}", str_from_u8_nul_utf8(&data)); + return Err(anyhow::anyhow!("is partial")); + } + let offset = status.unwrap(); let headers: HashMap> = req .headers .iter() diff --git a/client/src/uniqx.rs b/client/src/uniqx.rs index c1bd8a3..4647847 100644 --- a/client/src/uniqx.rs +++ b/client/src/uniqx.rs @@ -1,4 +1,4 @@ -use anyhow::{Context, Result}; +use anyhow::Result; use shared::connect_with_timeout; use shared::delimited::delimited_framed; use shared::delimited::DelimitedReadExt; @@ -7,6 +7,7 @@ use shared::delimited::DelimitedWriteExt; use shared::structs::NewClient; use shared::structs::TunnelOpen; use shared::structs::TunnelRequest; +use shared::utils::proxy; use shared::utils::set_tcp_keepalive; use shared::Protocol; use shared::EVENT_SERVER_PORT; @@ -16,12 +17,12 @@ use std::process::exit; use std::sync::Arc; use tokio::io::{self}; use tracing::error; +use tracing::info; use tracing::info_span; use tracing::Instrument; use crate::console; use crate::console::handler::ConsoleHandler; -use crate::util::bind; use crate::util::bind_with_console; pub struct UniqxClient { @@ -73,15 +74,19 @@ impl UniqxClient { delimited_framed(&mut http_event_stream) .send_delimited(data) .await?; - let (s1_read, s1_write) = io::split(localhost_conn); - let (s2_read, s2_write) = io::split(http_event_stream); + if self.protocol == Protocol::HTTP && self.console { + let (s1_read, s1_write) = io::split(localhost_conn); + let (s2_read, s2_write) = io::split(http_event_stream); let (req_tx, res_tx) = self.console_handler.clone().unwrap().init_transmitter(); - tokio::spawn(async { bind_with_console(s1_read, s2_write, res_tx).await }); - return bind_with_console(s2_read, s1_write, req_tx).await; + tokio::select! { + res= bind_with_console(s1_read, s2_write, res_tx).instrument(info_span!("Binder", "localhost reader")) => { info!("local connection discounted");res}, + res= bind_with_console(s2_read, s1_write, req_tx).instrument(info_span!("Binder", "http event reader")) => {info!("event connection discounted"); res} + }? + } else { + proxy(localhost_conn, http_event_stream).await?; } - tokio::spawn(async move { bind(s1_read, s2_write).await.context("cant read from s1") }); - bind(s2_read, s1_write).await.context("cant read from s2")?; + Ok(()) } diff --git a/client/src/util.rs b/client/src/util.rs index 328f6b3..e952dd8 100644 --- a/client/src/util.rs +++ b/client/src/util.rs @@ -33,7 +33,6 @@ pub async fn bind_with_console( let n: usize = src.read(&mut buf).await?; if n == 0 { - info!("connection disconnected"); return Ok(()); } dst.write_all(&buf[..n]).await?; diff --git a/server/src/server/event_server.rs b/server/src/server/event_server.rs index 2fed3f7..c9cdde3 100644 --- a/server/src/server/event_server.rs +++ b/server/src/server/event_server.rs @@ -5,6 +5,7 @@ use anyhow::{Error, Result}; use shared::{ delimited::{delimited_framed, DelimitedReadExt}, structs::NewClient, + utils::proxy, EVENT_SERVER_PORT, }; use tokio::{ @@ -12,7 +13,7 @@ use tokio::{ net::{TcpListener, TcpStream}, }; -use crate::{uniqx::ServerContext, util::proxy}; +use crate::uniqx::ServerContext; use super::tcp_listener::{EventHandler, TCPListener}; diff --git a/server/src/uniqx.rs b/server/src/uniqx.rs index 51b08fa..27b1744 100644 --- a/server/src/uniqx.rs +++ b/server/src/uniqx.rs @@ -1,6 +1,10 @@ use anyhow::{Ok, Result}; use dashmap::DashMap; -use std::sync::{mpsc::channel, Arc}; +use std::{ + sync::{mpsc::channel, Arc}, + time::Duration, +}; +use tokio::time; use tracing::info; use crate::{ @@ -36,6 +40,7 @@ impl UniqxServer { } pub async fn start(self) -> Result<()> { + time::sleep(Duration::from_millis(100000)).await; self.listen(ControlServer::new(self.domain.clone()).await?); self.listen(HttpServer::new(self.http_port).await?); self.listen(EventServer::new().await?); diff --git a/server/src/util.rs b/server/src/util.rs index 2b9a375..4fef5b8 100644 --- a/server/src/util.rs +++ b/server/src/util.rs @@ -1,6 +1,5 @@ use anyhow::Result; -use tokio::io::{self, AsyncRead, AsyncWrite, AsyncWriteExt}; -use tracing::info; +use tokio::io::AsyncWriteExt; pub async fn write_response( mut conn: impl AsyncWriteExt + Unpin, @@ -19,18 +18,3 @@ pub async fn write_response( conn.flush().await?; Ok(()) } - -/// Copy data mutually between two read/write streams. -pub async fn proxy(stream1: S1, stream2: S2) -> io::Result<()> -where - S1: AsyncRead + AsyncWrite + Unpin, - S2: AsyncRead + AsyncWrite + Unpin, -{ - let (mut s1_read, mut s1_write) = io::split(stream1); - let (mut s2_read, mut s2_write) = io::split(stream2); - tokio::select! { - res = io::copy(&mut s1_read, &mut s2_write) => { info!("local connection discounted"); res }, - res = io::copy(&mut s2_read, &mut s1_write) => {info!("event connection discounted"); res } - }?; - Ok(()) -} diff --git a/shared/src/utils.rs b/shared/src/utils.rs index 9dfa93e..169454b 100644 --- a/shared/src/utils.rs +++ b/shared/src/utils.rs @@ -4,6 +4,8 @@ use crate::{TCP_KEEPCNT, TCP_KEEPIDLE, TCP_KEEPINTVL}; use anyhow::Result; use regex::Regex; use socket2::TcpKeepalive; +use tokio::io::{self, AsyncRead, AsyncWrite}; +use tracing::info; // use url::Url; static BLOCK_LIST: &[&str] = &["www", "uniqx"]; @@ -56,3 +58,17 @@ pub fn set_tcp_keepalive() -> TcpKeepalive { .with_interval(Duration::from_secs(TCP_KEEPINTVL)) .with_retries(TCP_KEEPCNT) } +/// Copy data mutually between two read/write streams. +pub async fn proxy(stream1: S1, stream2: S2) -> io::Result<()> +where + S1: AsyncRead + AsyncWrite + Unpin, + S2: AsyncRead + AsyncWrite + Unpin, +{ + let (mut s1_read, mut s1_write) = io::split(stream1); + let (mut s2_read, mut s2_write) = io::split(stream2); + tokio::select! { + res = io::copy(&mut s1_read, &mut s2_write) => { info!("local connection discounted"); res }, + res = io::copy(&mut s2_read, &mut s1_write) => {info!("event connection discounted"); res } + }?; + Ok(()) +}