Skip to content

Commit

Permalink
feat: implement more upstreams module
Browse files Browse the repository at this point in the history
  • Loading branch information
jjeffcaii committed Aug 3, 2024
1 parent 16adb92 commit 92958c6
Show file tree
Hide file tree
Showing 20 changed files with 172 additions and 86 deletions.
4 changes: 0 additions & 4 deletions capybara-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,7 @@
name = "capybara-core"
version = "0.0.0"
edition = "2021"
build = "build.rs"

[build-dependencies]
string_cache_codegen = "0.5"

[dev-dependencies]
pretty_env_logger = "0.5.0"
Expand Down Expand Up @@ -44,7 +41,6 @@ urlencoding = "2.1"
md5 = "0.7"
ahash = "0.8"
parking_lot = "0.12"
string_cache = "0.8"
strum = { version = "0.26", default-features = false, features = ["strum_macros", "derive"] }
strum_macros = "0.26"
tokio-rustls = "0.24"
Expand Down
6 changes: 0 additions & 6 deletions capybara-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,13 @@ extern crate anyhow;
extern crate cfg_if;
#[macro_use]
extern crate log;
extern crate string_cache;

pub use builtin::setup;
pub use error::CapybaraError;
pub use upstream::{Pool, Pools, RoundRobinPools, WeightedPools};

pub type Result<T> = std::result::Result<T, CapybaraError>;

/// cached string
pub mod cachestr {
include!(concat!(env!("OUT_DIR"), "/cachestr.rs"));
}

mod builtin;
mod error;
mod logger;
Expand Down
12 changes: 11 additions & 1 deletion capybara-core/src/pipeline/http/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ use hashbrown::hash_map::Entry;
use hashbrown::HashMap;
use smallvec::{smallvec, SmallVec};

use crate::cachestr::Cachestr;
use capybara_util::cachestr::Cachestr;

use crate::pipeline::misc;
use crate::proto::UpstreamKey;
use crate::protocol::http::{Headers, Method, RequestLine, Response, StatusLine};
Expand Down Expand Up @@ -122,6 +123,15 @@ impl HeadersContext {
self.inner.is_empty()
}

pub fn exist(&self, key: &str) -> bool {
let key = Cachestr::from(key);
self.inner.contains_key(&key)
}

pub fn _exist(&self, key: Cachestr) -> bool {
self.inner.contains_key(&key)
}

#[inline]
pub(crate) fn _remove(&mut self, header: Cachestr) {
let v = smallvec![HeaderOperator::Drop];
Expand Down
2 changes: 1 addition & 1 deletion capybara-core/src/pipeline/http/pipeline_router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ use std::sync::Arc;
use serde::{Deserialize, Serialize};
use tokio::sync::RwLock;

use crate::cachestr::Cachestr;
use crate::error::CapybaraError;
use crate::pipeline::{HttpContext, HttpPipeline, HttpPipelineFactory, PipelineConf};
use crate::proto::UpstreamKey;
use crate::protocol::http::{Headers, HttpField, Queries, RequestLine};
use capybara_util::cachestr::Cachestr;

struct Route {
must: Vec<MatchRule>,
Expand Down
3 changes: 2 additions & 1 deletion capybara-core/src/proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ use std::str::FromStr;
use async_trait::async_trait;
use rustls::ServerName;

use crate::cachestr::Cachestr;
use capybara_util::cachestr::Cachestr;

use crate::{CapybaraError, Result};

#[derive(Clone, Hash, Eq, PartialEq)]
Expand Down
2 changes: 1 addition & 1 deletion capybara-core/src/protocol/http/httpfield.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::str::FromStr;
use once_cell::sync::Lazy;
use strum_macros::EnumIter;

use crate::cachestr::Cachestr;
use capybara_util::cachestr::Cachestr;

use super::misc::hash16;

Expand Down
103 changes: 56 additions & 47 deletions capybara-core/src/protocol/http/listener/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,17 @@ use std::sync::Arc;
use arc_swap::ArcSwap;
use async_trait::async_trait;
use bytes::Bytes;
use deadpool::managed::Manager;
use futures::{Stream, StreamExt};
use rustls::ServerName;
use smallvec::SmallVec;
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, BufWriter, ReadHalf, WriteHalf};
use tokio::sync::Notify;
use tokio_rustls::TlsAcceptor;
use tokio_util::codec::FramedRead;

use crate::cachestr::Cachestr;
use capybara_util::cachestr::Cachestr;

use crate::pipeline::http::{
load, AnyString, HeaderOperator, HeadersContext, HttpContextFlags, HttpPipelineFactoryExt,
};
Expand All @@ -24,7 +27,7 @@ use crate::protocol::http::{
misc, Headers, HttpCodec, HttpField, HttpFrame, RequestLine, Response, ResponseFlags,
StatusLine,
};
use crate::transport::tcp;
use crate::transport::{tcp, Address, Addressable};
use crate::upstream::{Pool, Pools, Upstreams};
use crate::Result;

Expand Down Expand Up @@ -418,29 +421,6 @@ where
.await?;
}

let mut has_upstream = false;

if let Some(uk) = self.ctx.upstream() {
has_upstream = true;
match &*uk {
UpstreamKey::Tls(_, sni) => self.set_request_sni(sni),
UpstreamKey::TlsHP(_, _, sni) => self.set_request_sni(sni),
UpstreamKey::TcpHP(host, port) => {
// check if http port
let host = if *port == 80 {
AnyString::Cache(Clone::clone(host))
} else {
AnyString::String(format!("{}:{}", host, port))
};
self.ctx
.request()
.headers()
._replace(HttpField::Host.into(), host);
}
_ => (),
}
}

match self.downstream.0.next().await {
Some(second) => {
let HttpFrame::Headers(mut headers) = second? else {
Expand Down Expand Up @@ -469,28 +449,6 @@ where
}
};

if !has_upstream {
if let Some(kind) = self.ctx.upstream() {
match &*kind {
UpstreamKey::Tls(_, sni) => self.set_request_sni(sni),
UpstreamKey::TlsHP(_, _, sni) => self.set_request_sni(sni),
UpstreamKey::TcpHP(host, port) => {
// check if http port
let host = if *port == 80 {
AnyString::Cache(Clone::clone(host))
} else {
AnyString::String(format!("{}:{}", host, port))
};
self.ctx
.request()
.headers()
._replace(HttpField::Host.into(), host);
}
_ => (),
}
}
}

Ok(Some(Handshake {
request_line,
request_headers: headers,
Expand Down Expand Up @@ -619,11 +577,62 @@ where
let pool = self.upstreams.get(uk, 0).await?;
match &*pool {
Pool::Tcp(pool) => {
if !self.ctx.request().headers()._exist(HttpField::Host.into())
{
if let Address::Domain(dom, port) = pool.manager().address()
{
let host = if *port == 80 {
AnyString::Cache(Clone::clone(dom))
} else {
let host = {
use std::io::Write;
let mut b = SmallVec::<[u8; 128]>::new();
write!(&mut b[..], "{}:{}", dom.as_ref(), port)
.ok();
Cachestr::from(unsafe {
std::str::from_utf8_unchecked(&b[..])
})
};
AnyString::Cache(host)
};

self.ctx
.request()
.headers()
._replace(HttpField::Host.into(), host);
}
}

let mut upstream = pool.get().await?;
self.transfer(upstream.as_mut(), request_line, request_headers)
.await?
}
Pool::Tls(pool) => {
if !self.ctx.request().headers()._exist(HttpField::Host.into())
{
if let Address::Domain(dom, port) = pool.manager().address()
{
let host = if *port == 443 {
AnyString::Cache(Clone::clone(dom))
} else {
let host = {
let mut b = SmallVec::<[u8; 128]>::new();
use std::io::Write;
write!(&mut b[..], "{}:{}", dom.as_ref(), port)
.ok();
Cachestr::from(unsafe {
std::str::from_utf8_unchecked(&b[..])
})
};
AnyString::Cache(host)
};
self.ctx
.request()
.headers()
._replace(HttpField::Host.into(), host);
}
}

let mut upstream = pool.get().await?;
self.transfer(upstream.as_mut(), request_line, request_headers)
.await?
Expand Down
3 changes: 2 additions & 1 deletion capybara-core/src/protocol/http2/hpack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ use once_cell::sync::Lazy;
use strum::FromRepr;
use strum_macros::EnumIter;

use crate::cachestr::Cachestr;
use capybara_util::cachestr::Cachestr;

use crate::CapybaraError;

static STATIC_TABLE_ENTRIES: Lazy<Vec<Arc<HeaderField>>> = Lazy::new(|| {
Expand Down
3 changes: 2 additions & 1 deletion capybara-core/src/protocol/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ use async_trait::async_trait;
use tokio::net::TcpStream;
use tokio::sync::Notify;

use crate::cachestr::Cachestr;
use capybara_util::cachestr::Cachestr;

use crate::error::CapybaraError;
use crate::pipeline::stream::load;
use crate::pipeline::stream::StreamPipelineFactoryExt;
Expand Down
9 changes: 6 additions & 3 deletions capybara-core/src/transport/mod.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
use std::fmt::{Display, Formatter};
use std::net::SocketAddr;

use capybara_util::cachestr::Cachestr;
pub use tcp::TcpListenerBuilder;
pub use tls::{TlsAcceptorBuilder, TlsConnectorBuilder};

use crate::cachestr::Cachestr;

pub mod tcp;
pub mod tls;

pub trait Addressable {
fn address(&self) -> &Address;
}

#[derive(Clone)]
pub(super) enum Address {
pub enum Address {
Direct(SocketAddr),
Domain(Cachestr, u16),
}
Expand Down
11 changes: 9 additions & 2 deletions capybara-core/src/transport/tcp/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ use deadpool::{managed, Runtime};
use tokio::net::TcpStream;
use tokio::sync::Notify;

use crate::cachestr::Cachestr;
use capybara_util::cachestr::Cachestr;

use crate::resolver::{self, Resolver};
use crate::transport::Address;
use crate::transport::{Address, Addressable};

use super::{misc, TcpStreamBuilder};

Expand Down Expand Up @@ -203,6 +204,12 @@ impl Manager {
}
}

impl Addressable for Manager {
fn address(&self) -> &Address {
&self.addr
}
}

impl managed::Manager for Manager {
type Type = TcpStream;
type Error = crate::CapybaraError;
Expand Down
2 changes: 1 addition & 1 deletion capybara-core/src/transport/tls/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
pub(crate) use pool::{Pool, TlsStream, TlsStreamPoolBuilder};
pub use pool::{Pool, TlsStream, TlsStreamPoolBuilder};
pub use tls::{TlsAcceptorBuilder, TlsConnectorBuilder};

mod pool;
Expand Down
14 changes: 10 additions & 4 deletions capybara-core/src/transport/tls/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,17 @@ use rustls::ServerName;
use tokio::net::TcpStream;
use tokio::sync::Notify;

use crate::cachestr::Cachestr;
use capybara_util::cachestr::Cachestr;

use crate::resolver::Resolver;
use crate::transport::Address;
use crate::transport::{tcp, TlsConnectorBuilder};
use crate::transport::{tcp, Address, Addressable, TlsConnectorBuilder};
use crate::{resolver, CapybaraError};

pub type TlsStream<T> = tokio_rustls::client::TlsStream<T>;

pub type Pool = managed::Pool<Manager>;

pub(crate) struct TlsStreamPoolBuilder {
pub struct TlsStreamPoolBuilder {
addr: Address,
max_size: usize,
timeout: Option<Duration>,
Expand Down Expand Up @@ -211,6 +211,12 @@ pub struct Manager {
sni: ServerName,
}

impl Addressable for Manager {
fn address(&self) -> &Address {
&self.addr
}
}

impl managed::Manager for Manager {
type Type = TlsStream<TcpStream>;
type Error = CapybaraError;
Expand Down
3 changes: 2 additions & 1 deletion capybara-core/src/upstream/upstreams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ use hashbrown::HashMap;
use tokio::sync::Notify;
use tokio::sync::RwLock;

use crate::cachestr::Cachestr;
use capybara_util::cachestr::Cachestr;

use crate::proto::UpstreamKey;
use crate::resolver::{Resolver, DEFAULT_RESOLVER};
use crate::transport::{tcp, tls};
Expand Down
2 changes: 2 additions & 0 deletions capybara-etc/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ pub struct UpstreamConfig {

#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
pub struct EndpointConfig {
pub transport: Option<TransportKind>,
pub tls: Option<bool>,
pub resolver: Option<String>,
pub addr: String,
pub weight: Option<u32>,
Expand Down
5 changes: 5 additions & 0 deletions capybara-util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@
name = "capybara-util"
version = "0.0.0"
edition = "2021"
build = "build.rs"

[build-dependencies]
string_cache_codegen = "0.5"

[dev-dependencies]
log = "0.4"
Expand All @@ -13,3 +17,4 @@ foreign-types = "0.5"
libc = "0.2"
anyhow = "1"
rand = "0.8"
string_cache = "0.8"
File renamed without changes.
Loading

0 comments on commit 92958c6

Please sign in to comment.