Skip to content

Commit

Permalink
feat: implement bootstrap from upstream configs
Browse files Browse the repository at this point in the history
  • Loading branch information
jjeffcaii committed Jul 29, 2024
1 parent e1f0d43 commit 78c2a14
Show file tree
Hide file tree
Showing 24 changed files with 290 additions and 64 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,6 @@ resolver = "2"
members = [
"capybara",
"capybara-core",
"capybara-etc",
"capybara-util",
]
1 change: 1 addition & 0 deletions capybara-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ hex = "0.4.3"

[dependencies]
capybara-util = { path = "../capybara-util" }
capybara-etc = { path = "../capybara-etc" }
log = "0.4"
slog = "2.7.0"
slog-async = "2.7.0"
Expand Down
3 changes: 2 additions & 1 deletion capybara-core/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#![allow(dead_code)]
// #![allow(unused_imports)]
#![allow(unused_imports)]
#![allow(unused_variables)]
#![allow(unused_assignments)]
#![allow(clippy::type_complexity)]
Expand All @@ -21,6 +21,7 @@ extern crate string_cache;

pub use builtin::setup;
pub use error::CapybaraError;
pub use upstream::{Pool, Pools, RoundRobinPools, WeightedPools};

pub type Result<T> = std::result::Result<T, CapybaraError>;

Expand Down
11 changes: 11 additions & 0 deletions capybara-core/src/proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ pub enum UpstreamKey {
Tls(SocketAddr, ServerName),
TcpHP(Cachestr, u16),
TlsHP(Cachestr, u16, ServerName),
Tag(Cachestr),
}

impl FromStr for UpstreamKey {
Expand Down Expand Up @@ -51,6 +52,14 @@ impl FromStr for UpstreamKey {

// FIXME: too many duplicated codes

if let Some(suffix) = s.strip_prefix("upstream://") {
return if suffix.is_empty() {
Err(CapybaraError::InvalidUpstream(s.to_string().into()))
} else {
Ok(UpstreamKey::Tag(Cachestr::from(suffix)))
};
}

if let Some(suffix) = s.strip_prefix("tcp://") {
let (host, port) = host_and_port(suffix)?;
let port = port.ok_or_else(|| CapybaraError::InvalidUpstream(s.to_string().into()))?;
Expand Down Expand Up @@ -113,6 +122,7 @@ impl Display for UpstreamKey {
}
write!(f, "tls://{}:{}", addr, port)
}
UpstreamKey::Tag(tag) => write!(f, "upstream://{}", tag.as_ref()),
}
}
}
Expand Down Expand Up @@ -149,6 +159,7 @@ mod tests {
init();

for (s, expect) in [
("upstream://some-upstream", "upstream://some-upstream"),
// ip+port
("127.0.0.1:8080", "tcp://127.0.0.1:8080"),
// host+port
Expand Down
22 changes: 18 additions & 4 deletions capybara-core/src/protocol/http/listener/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,15 @@ use crate::protocol::http::{
StatusLine,
};
use crate::transport::tcp;
use crate::upstream::{Pool, Upstreams};
use crate::upstream::{Pool, Pools, Upstreams};
use crate::Result;

pub struct HttpListenerBuilder {
addr: SocketAddr,
tls: Option<TlsAcceptor>,
id: Option<Cachestr>,
pipelines: Vec<(Cachestr, PipelineConf)>,
upstreams: Vec<(Cachestr, Arc<dyn Pools>)>,
}

impl HttpListenerBuilder {
Expand All @@ -58,24 +59,36 @@ impl HttpListenerBuilder {
self
}

pub fn upstream<K>(mut self, key: K, pools: Arc<dyn Pools>) -> Self
where
K: AsRef<str>,
{
self.upstreams.push((Cachestr::from(key.as_ref()), pools));
self
}

pub fn build(self) -> Result<HttpListener> {
let Self {
addr,
id,
pipelines,
tls,
upstreams,
} = self;

let closer = Arc::new(Notify::new());

let upstreams = Upstreams::builder(Clone::clone(&closer)).build();
let mut ub = Upstreams::builder(Clone::clone(&closer));
for (k, v) in upstreams {
ub = ub.keyed(k, v);
}

Ok(HttpListener {
id: id.unwrap_or_else(|| Cachestr::from(uuid::Uuid::new_v4().to_string())),
tls,
addr,
pipelines: ArcSwap::from_pointee(pipelines),
upstreams,
upstreams: ub.build(),
closer,
})
}
Expand All @@ -97,6 +110,7 @@ impl HttpListener {
id: None,
tls: None,
pipelines: Default::default(),
upstreams: Default::default(),
}
}

Expand Down Expand Up @@ -602,7 +616,7 @@ where
}
None => match self.ctx.upstream() {
Some(uk) => {
let pool = self.upstreams.get(uk).await?;
let pool = self.upstreams.get(uk, 0).await?;
match &*pool {
Pool::Tcp(pool) => {
let mut upstream = pool.get().await?;
Expand Down
2 changes: 1 addition & 1 deletion capybara-core/src/transport/tcp/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
pub use misc::*;
pub(crate) use pool::{Pool, TcpStreamPoolBuilder};
pub use pool::{Pool, TcpStreamPoolBuilder};

mod misc;
mod pool;
22 changes: 11 additions & 11 deletions capybara-core/src/transport/tcp/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ use crate::transport::Address;

use super::{misc, TcpStreamBuilder};

pub(crate) type Pool = managed::Pool<Manager>;
pub type Pool = managed::Pool<Manager>;

pub(crate) struct TcpStreamPoolBuilder {
pub struct TcpStreamPoolBuilder {
addr: Address,
max_size: usize,
timeout: Option<Duration>,
Expand All @@ -30,12 +30,12 @@ impl TcpStreamPoolBuilder {
pub(crate) const BUFF_SIZE: usize = 8192;
pub(crate) const MAX_SIZE: usize = 128;

pub(crate) fn with_addr(addr: SocketAddr) -> Self {
pub fn with_addr(addr: SocketAddr) -> Self {
let addr = Address::Direct(addr);
Self::new(addr)
}

pub(crate) fn with_domain<D>(domain: D, port: u16) -> Self
pub fn with_domain<D>(domain: D, port: u16) -> Self
where
D: AsRef<str>,
{
Expand All @@ -56,32 +56,32 @@ impl TcpStreamPoolBuilder {
}
}

pub(crate) fn max_size(mut self, size: usize) -> Self {
pub fn max_size(mut self, size: usize) -> Self {
self.max_size = size;
self
}

pub(crate) fn timeout(mut self, timeout: Duration) -> Self {
pub fn timeout(mut self, timeout: Duration) -> Self {
self.timeout.replace(timeout);
self
}

pub(crate) fn buff_size(mut self, buff_size: usize) -> Self {
pub fn buff_size(mut self, buff_size: usize) -> Self {
self.buff_size = buff_size;
self
}

pub(crate) fn idle_time(mut self, lifetime: Duration) -> Self {
pub fn idle_time(mut self, lifetime: Duration) -> Self {
self.idle_time.replace(lifetime);
self
}

pub(crate) fn resolver(mut self, resolver: Arc<dyn Resolver>) -> Self {
pub fn resolver(mut self, resolver: Arc<dyn Resolver>) -> Self {
self.resolver.replace(resolver);
self
}

pub(crate) async fn build(self, closer: Arc<Notify>) -> Result<Pool> {
pub async fn build(self, closer: Arc<Notify>) -> Result<Pool> {
let Self {
addr,
max_size,
Expand Down Expand Up @@ -171,7 +171,7 @@ impl TcpStreamPoolBuilder {
}
}

pub(crate) struct Manager {
pub struct Manager {
addr: Address,
resolver: Arc<dyn Resolver>,
buff_size: usize,
Expand Down
4 changes: 2 additions & 2 deletions capybara-core/src/transport/tls/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::{resolver, CapybaraError};

pub type TlsStream<T> = tokio_rustls::client::TlsStream<T>;

pub(crate) type Pool = managed::Pool<Manager>;
pub type Pool = managed::Pool<Manager>;

pub(crate) struct TlsStreamPoolBuilder {
addr: Address,
Expand Down Expand Up @@ -203,7 +203,7 @@ fn is_health(stream: &TlsStream<TcpStream>) -> crate::Result<()> {
tcp::is_health(c)
}

pub(crate) struct Manager {
pub struct Manager {
addr: Address,
resolver: Arc<dyn Resolver>,
buff_size: usize,
Expand Down
3 changes: 3 additions & 0 deletions capybara-core/src/upstream/misc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ pub(crate) async fn establish(upstream: &UpstreamKey, buff_size: usize) -> Resul
let stream = c.connect(Clone::clone(sni), stream).await?;
ClientStream::Tls(stream)
}
UpstreamKey::Tag(tag) => {
todo!("establish with tag is not supported yet")
}
};

debug!("establish {} ok: {}", upstream, &stream);
Expand Down
5 changes: 4 additions & 1 deletion capybara-core/src/upstream/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
pub(crate) use misc::{establish, ClientStream};
pub(crate) use upstreams::{Pool, Upstreams};
pub use pools::{Pool, Pools};
pub use round_robin::RoundRobinPools;
pub(crate) use upstreams::Upstreams;
pub use weighted::WeightedPools;

mod misc;
mod pools;
Expand Down
8 changes: 6 additions & 2 deletions capybara-core/src/upstream/pools.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,15 @@ use std::sync::Arc;

use async_trait::async_trait;

use crate::transport::{tcp, tls};
use crate::Result;

use super::upstreams::Pool;
pub enum Pool {
Tcp(tcp::Pool),
Tls(tls::Pool),
}

#[async_trait]
pub(crate) trait Pools: Send + Sync + 'static {
pub trait Pools: Send + Sync + 'static {
async fn next(&self, seed: u64) -> Result<Arc<Pool>>;
}
2 changes: 1 addition & 1 deletion capybara-core/src/upstream/round_robin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::{CapybaraError, Result};

use super::pools::Pools;

struct RoundRobinPools {
pub struct RoundRobinPools {
pools: Vec<Arc<Pool>>,
seq: AtomicU32,
}
Expand Down
Loading

0 comments on commit 78c2a14

Please sign in to comment.