Skip to content

Commit

Permalink
BUGFIX: close both side of the connection (host connection, server ev…
Browse files Browse the repository at this point in the history
…ent connection) if one is closed
  • Loading branch information
unique1o1 committed Sep 12, 2023
1 parent 496349a commit 94f875e
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 30 deletions.
16 changes: 14 additions & 2 deletions client/src/console/parser.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use serde::Serialize;
use std::collections::HashMap;
use tracing::info;

use anyhow::Result;

Expand Down Expand Up @@ -38,7 +39,12 @@ pub struct ConsoleResponse {
pub fn parse_http_resonse(id: String, data: Vec<u8>) -> Result<ConsoleResponse> {
let mut headers = [httparse::EMPTY_HEADER; 64];
let mut res = httparse::Response::new(&mut headers);
let offset = res.parse(&data)?.unwrap(); // assuming that the response is complete
let status = res.parse(&data)?; // assuming that the response is complete
// if status.is_partial() {
// info!("is partial: _> {:?}", str_from_u8_nul_utf8(&data));
// return Err(anyhow::anyhow!("is partial"));
// }
let offset = status.unwrap();
let headers: HashMap<String, Vec<String>> = res
.headers
.iter()
Expand All @@ -59,7 +65,13 @@ pub fn parse_http_resonse(id: String, data: Vec<u8>) -> Result<ConsoleResponse>
pub fn parse_http_request(id: String, data: Vec<u8>) -> Result<ConsoleRequest> {
let mut headers = [httparse::EMPTY_HEADER; 64];
let mut req = httparse::Request::new(&mut headers);
let offset = req.parse(&data)?.unwrap(); // assuming that the response is complete

let status = req.parse(&data)?; // assuming that the response is complete
if status.is_partial() {
info!("is partial: _> {:?}", str_from_u8_nul_utf8(&data));
return Err(anyhow::anyhow!("is partial"));
}
let offset = status.unwrap();
let headers: HashMap<String, Vec<String>> = req
.headers
.iter()
Expand Down
21 changes: 13 additions & 8 deletions client/src/uniqx.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use anyhow::{Context, Result};
use anyhow::Result;
use shared::connect_with_timeout;
use shared::delimited::delimited_framed;
use shared::delimited::DelimitedReadExt;
Expand All @@ -7,6 +7,7 @@ use shared::delimited::DelimitedWriteExt;
use shared::structs::NewClient;
use shared::structs::TunnelOpen;
use shared::structs::TunnelRequest;
use shared::utils::proxy;
use shared::utils::set_tcp_keepalive;
use shared::Protocol;
use shared::EVENT_SERVER_PORT;
Expand All @@ -16,12 +17,12 @@ use std::process::exit;
use std::sync::Arc;
use tokio::io::{self};
use tracing::error;
use tracing::info;
use tracing::info_span;
use tracing::Instrument;

use crate::console;
use crate::console::handler::ConsoleHandler;
use crate::util::bind;
use crate::util::bind_with_console;

pub struct UniqxClient {
Expand Down Expand Up @@ -73,15 +74,19 @@ impl UniqxClient {
delimited_framed(&mut http_event_stream)
.send_delimited(data)
.await?;
let (s1_read, s1_write) = io::split(localhost_conn);
let (s2_read, s2_write) = io::split(http_event_stream);

if self.protocol == Protocol::HTTP && self.console {
let (s1_read, s1_write) = io::split(localhost_conn);
let (s2_read, s2_write) = io::split(http_event_stream);
let (req_tx, res_tx) = self.console_handler.clone().unwrap().init_transmitter();
tokio::spawn(async { bind_with_console(s1_read, s2_write, res_tx).await });
return bind_with_console(s2_read, s1_write, req_tx).await;
tokio::select! {
res= bind_with_console(s1_read, s2_write, res_tx).instrument(info_span!("Binder", "localhost reader")) => { info!("local connection discounted");res},
res= bind_with_console(s2_read, s1_write, req_tx).instrument(info_span!("Binder", "http event reader")) => {info!("event connection discounted"); res}
}?
} else {
proxy(localhost_conn, http_event_stream).await?;
}
tokio::spawn(async move { bind(s1_read, s2_write).await.context("cant read from s1") });
bind(s2_read, s1_write).await.context("cant read from s2")?;

Ok(())
}

Expand Down
1 change: 0 additions & 1 deletion client/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ pub async fn bind_with_console<T: Transmitter + Debug>(

let n: usize = src.read(&mut buf).await?;
if n == 0 {
info!("connection disconnected");
return Ok(());
}
dst.write_all(&buf[..n]).await?;
Expand Down
3 changes: 2 additions & 1 deletion server/src/server/event_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@ use anyhow::{Error, Result};
use shared::{
delimited::{delimited_framed, DelimitedReadExt},
structs::NewClient,
utils::proxy,
EVENT_SERVER_PORT,
};
use tokio::{
io::AsyncWriteExt,
net::{TcpListener, TcpStream},
};

use crate::{uniqx::ServerContext, util::proxy};
use crate::uniqx::ServerContext;

use super::tcp_listener::{EventHandler, TCPListener};

Expand Down
7 changes: 6 additions & 1 deletion server/src/uniqx.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
use anyhow::{Ok, Result};
use dashmap::DashMap;
use std::sync::{mpsc::channel, Arc};
use std::{
sync::{mpsc::channel, Arc},
time::Duration,
};
use tokio::time;
use tracing::info;

use crate::{
Expand Down Expand Up @@ -36,6 +40,7 @@ impl UniqxServer {
}

pub async fn start(self) -> Result<()> {
time::sleep(Duration::from_millis(100000)).await;
self.listen(ControlServer::new(self.domain.clone()).await?);
self.listen(HttpServer::new(self.http_port).await?);
self.listen(EventServer::new().await?);
Expand Down
18 changes: 1 addition & 17 deletions server/src/util.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use anyhow::Result;
use tokio::io::{self, AsyncRead, AsyncWrite, AsyncWriteExt};
use tracing::info;
use tokio::io::AsyncWriteExt;

pub async fn write_response(
mut conn: impl AsyncWriteExt + Unpin,
Expand All @@ -19,18 +18,3 @@ pub async fn write_response(
conn.flush().await?;
Ok(())
}

/// Copy data mutually between two read/write streams.
pub async fn proxy<S1, S2>(stream1: S1, stream2: S2) -> io::Result<()>
where
S1: AsyncRead + AsyncWrite + Unpin,
S2: AsyncRead + AsyncWrite + Unpin,
{
let (mut s1_read, mut s1_write) = io::split(stream1);
let (mut s2_read, mut s2_write) = io::split(stream2);
tokio::select! {
res = io::copy(&mut s1_read, &mut s2_write) => { info!("local connection discounted"); res },
res = io::copy(&mut s2_read, &mut s1_write) => {info!("event connection discounted"); res }
}?;
Ok(())
}
16 changes: 16 additions & 0 deletions shared/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ use crate::{TCP_KEEPCNT, TCP_KEEPIDLE, TCP_KEEPINTVL};
use anyhow::Result;
use regex::Regex;
use socket2::TcpKeepalive;
use tokio::io::{self, AsyncRead, AsyncWrite};
use tracing::info;
// use url::Url;
static BLOCK_LIST: &[&str] = &["www", "uniqx"];

Expand Down Expand Up @@ -56,3 +58,17 @@ pub fn set_tcp_keepalive() -> TcpKeepalive {
.with_interval(Duration::from_secs(TCP_KEEPINTVL))
.with_retries(TCP_KEEPCNT)
}
/// Copy data mutually between two read/write streams.
pub async fn proxy<S1, S2>(stream1: S1, stream2: S2) -> io::Result<()>
where
S1: AsyncRead + AsyncWrite + Unpin,
S2: AsyncRead + AsyncWrite + Unpin,
{
let (mut s1_read, mut s1_write) = io::split(stream1);
let (mut s2_read, mut s2_write) = io::split(stream2);
tokio::select! {
res = io::copy(&mut s1_read, &mut s2_write) => { info!("local connection discounted"); res },
res = io::copy(&mut s2_read, &mut s1_write) => {info!("event connection discounted"); res }
}?;
Ok(())
}

0 comments on commit 94f875e

Please sign in to comment.