From 78c2a14a1f51763483b24734a28fa5381f1f1e35 Mon Sep 17 00:00:00 2001 From: Jeffsky Date: Mon, 29 Jul 2024 23:39:21 +0800 Subject: [PATCH] feat: implement bootstrap from upstream configs --- Cargo.toml | 1 + capybara-core/Cargo.toml | 1 + capybara-core/src/lib.rs | 3 +- capybara-core/src/proto.rs | 11 ++ .../src/protocol/http/listener/listener.rs | 22 +++- capybara-core/src/transport/tcp/mod.rs | 2 +- capybara-core/src/transport/tcp/pool.rs | 22 ++-- capybara-core/src/transport/tls/pool.rs | 4 +- capybara-core/src/upstream/misc.rs | 3 + capybara-core/src/upstream/mod.rs | 5 +- capybara-core/src/upstream/pools.rs | 8 +- capybara-core/src/upstream/round_robin.rs | 2 +- capybara-core/src/upstream/upstreams.rs | 67 +++++++--- capybara-core/src/upstream/weighted.rs | 5 +- capybara-etc/Cargo.toml | 12 ++ {capybara => capybara-etc}/src/config.rs | 8 +- capybara-etc/src/lib.rs | 12 ++ capybara-util/src/lib.rs | 9 ++ capybara/Cargo.toml | 6 +- capybara/src/bootstrap/runtime.rs | 123 +++++++++++++++++- capybara/src/main.rs | 7 +- capybara/src/provider/static_file.rs | 2 +- justfile | 5 + testdata/config.yaml | 14 +- 24 files changed, 290 insertions(+), 64 deletions(-) create mode 100644 capybara-etc/Cargo.toml rename {capybara => capybara-etc}/src/config.rs (94%) create mode 100644 capybara-etc/src/lib.rs diff --git a/Cargo.toml b/Cargo.toml index 06eaa26..dbf6c19 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,5 +7,6 @@ resolver = "2" members = [ "capybara", "capybara-core", + "capybara-etc", "capybara-util", ] diff --git a/capybara-core/Cargo.toml b/capybara-core/Cargo.toml index 45b4d64..30e5df9 100644 --- a/capybara-core/Cargo.toml +++ b/capybara-core/Cargo.toml @@ -13,6 +13,7 @@ hex = "0.4.3" [dependencies] capybara-util = { path = "../capybara-util" } +capybara-etc = { path = "../capybara-etc" } log = "0.4" slog = "2.7.0" slog-async = "2.7.0" diff --git a/capybara-core/src/lib.rs b/capybara-core/src/lib.rs index 9140f83..108381b 100644 --- a/capybara-core/src/lib.rs +++ b/capybara-core/src/lib.rs @@ -1,5 +1,5 @@ #![allow(dead_code)] -// #![allow(unused_imports)] +#![allow(unused_imports)] #![allow(unused_variables)] #![allow(unused_assignments)] #![allow(clippy::type_complexity)] @@ -21,6 +21,7 @@ extern crate string_cache; pub use builtin::setup; pub use error::CapybaraError; +pub use upstream::{Pool, Pools, RoundRobinPools, WeightedPools}; pub type Result = std::result::Result; diff --git a/capybara-core/src/proto.rs b/capybara-core/src/proto.rs index 49e8622..bae5ed9 100644 --- a/capybara-core/src/proto.rs +++ b/capybara-core/src/proto.rs @@ -14,6 +14,7 @@ pub enum UpstreamKey { Tls(SocketAddr, ServerName), TcpHP(Cachestr, u16), TlsHP(Cachestr, u16, ServerName), + Tag(Cachestr), } impl FromStr for UpstreamKey { @@ -51,6 +52,14 @@ impl FromStr for UpstreamKey { // FIXME: too many duplicated codes + if let Some(suffix) = s.strip_prefix("upstream://") { + return if suffix.is_empty() { + Err(CapybaraError::InvalidUpstream(s.to_string().into())) + } else { + Ok(UpstreamKey::Tag(Cachestr::from(suffix))) + }; + } + if let Some(suffix) = s.strip_prefix("tcp://") { let (host, port) = host_and_port(suffix)?; let port = port.ok_or_else(|| CapybaraError::InvalidUpstream(s.to_string().into()))?; @@ -113,6 +122,7 @@ impl Display for UpstreamKey { } write!(f, "tls://{}:{}", addr, port) } + UpstreamKey::Tag(tag) => write!(f, "upstream://{}", tag.as_ref()), } } } @@ -149,6 +159,7 @@ mod tests { init(); for (s, expect) in [ + ("upstream://some-upstream", "upstream://some-upstream"), // ip+port ("127.0.0.1:8080", "tcp://127.0.0.1:8080"), // host+port diff --git a/capybara-core/src/protocol/http/listener/listener.rs b/capybara-core/src/protocol/http/listener/listener.rs index 7c7dd23..0e051de 100644 --- a/capybara-core/src/protocol/http/listener/listener.rs +++ b/capybara-core/src/protocol/http/listener/listener.rs @@ -25,7 +25,7 @@ use crate::protocol::http::{ StatusLine, }; use crate::transport::tcp; -use crate::upstream::{Pool, Upstreams}; +use crate::upstream::{Pool, Pools, Upstreams}; use crate::Result; pub struct HttpListenerBuilder { @@ -33,6 +33,7 @@ pub struct HttpListenerBuilder { tls: Option, id: Option, pipelines: Vec<(Cachestr, PipelineConf)>, + upstreams: Vec<(Cachestr, Arc)>, } impl HttpListenerBuilder { @@ -58,24 +59,36 @@ impl HttpListenerBuilder { self } + pub fn upstream(mut self, key: K, pools: Arc) -> Self + where + K: AsRef, + { + self.upstreams.push((Cachestr::from(key.as_ref()), pools)); + self + } + pub fn build(self) -> Result { let Self { addr, id, pipelines, tls, + upstreams, } = self; let closer = Arc::new(Notify::new()); - let upstreams = Upstreams::builder(Clone::clone(&closer)).build(); + let mut ub = Upstreams::builder(Clone::clone(&closer)); + for (k, v) in upstreams { + ub = ub.keyed(k, v); + } Ok(HttpListener { id: id.unwrap_or_else(|| Cachestr::from(uuid::Uuid::new_v4().to_string())), tls, addr, pipelines: ArcSwap::from_pointee(pipelines), - upstreams, + upstreams: ub.build(), closer, }) } @@ -97,6 +110,7 @@ impl HttpListener { id: None, tls: None, pipelines: Default::default(), + upstreams: Default::default(), } } @@ -602,7 +616,7 @@ where } None => match self.ctx.upstream() { Some(uk) => { - let pool = self.upstreams.get(uk).await?; + let pool = self.upstreams.get(uk, 0).await?; match &*pool { Pool::Tcp(pool) => { let mut upstream = pool.get().await?; diff --git a/capybara-core/src/transport/tcp/mod.rs b/capybara-core/src/transport/tcp/mod.rs index 2ab8d5f..5afbc06 100644 --- a/capybara-core/src/transport/tcp/mod.rs +++ b/capybara-core/src/transport/tcp/mod.rs @@ -1,5 +1,5 @@ pub use misc::*; -pub(crate) use pool::{Pool, TcpStreamPoolBuilder}; +pub use pool::{Pool, TcpStreamPoolBuilder}; mod misc; mod pool; diff --git a/capybara-core/src/transport/tcp/pool.rs b/capybara-core/src/transport/tcp/pool.rs index 7b45e0a..fe91cbb 100644 --- a/capybara-core/src/transport/tcp/pool.rs +++ b/capybara-core/src/transport/tcp/pool.rs @@ -15,9 +15,9 @@ use crate::transport::Address; use super::{misc, TcpStreamBuilder}; -pub(crate) type Pool = managed::Pool; +pub type Pool = managed::Pool; -pub(crate) struct TcpStreamPoolBuilder { +pub struct TcpStreamPoolBuilder { addr: Address, max_size: usize, timeout: Option, @@ -30,12 +30,12 @@ impl TcpStreamPoolBuilder { pub(crate) const BUFF_SIZE: usize = 8192; pub(crate) const MAX_SIZE: usize = 128; - pub(crate) fn with_addr(addr: SocketAddr) -> Self { + pub fn with_addr(addr: SocketAddr) -> Self { let addr = Address::Direct(addr); Self::new(addr) } - pub(crate) fn with_domain(domain: D, port: u16) -> Self + pub fn with_domain(domain: D, port: u16) -> Self where D: AsRef, { @@ -56,32 +56,32 @@ impl TcpStreamPoolBuilder { } } - pub(crate) fn max_size(mut self, size: usize) -> Self { + pub fn max_size(mut self, size: usize) -> Self { self.max_size = size; self } - pub(crate) fn timeout(mut self, timeout: Duration) -> Self { + pub fn timeout(mut self, timeout: Duration) -> Self { self.timeout.replace(timeout); self } - pub(crate) fn buff_size(mut self, buff_size: usize) -> Self { + pub fn buff_size(mut self, buff_size: usize) -> Self { self.buff_size = buff_size; self } - pub(crate) fn idle_time(mut self, lifetime: Duration) -> Self { + pub fn idle_time(mut self, lifetime: Duration) -> Self { self.idle_time.replace(lifetime); self } - pub(crate) fn resolver(mut self, resolver: Arc) -> Self { + pub fn resolver(mut self, resolver: Arc) -> Self { self.resolver.replace(resolver); self } - pub(crate) async fn build(self, closer: Arc) -> Result { + pub async fn build(self, closer: Arc) -> Result { let Self { addr, max_size, @@ -171,7 +171,7 @@ impl TcpStreamPoolBuilder { } } -pub(crate) struct Manager { +pub struct Manager { addr: Address, resolver: Arc, buff_size: usize, diff --git a/capybara-core/src/transport/tls/pool.rs b/capybara-core/src/transport/tls/pool.rs index 5452a90..8ebcdc1 100644 --- a/capybara-core/src/transport/tls/pool.rs +++ b/capybara-core/src/transport/tls/pool.rs @@ -19,7 +19,7 @@ use crate::{resolver, CapybaraError}; pub type TlsStream = tokio_rustls::client::TlsStream; -pub(crate) type Pool = managed::Pool; +pub type Pool = managed::Pool; pub(crate) struct TlsStreamPoolBuilder { addr: Address, @@ -203,7 +203,7 @@ fn is_health(stream: &TlsStream) -> crate::Result<()> { tcp::is_health(c) } -pub(crate) struct Manager { +pub struct Manager { addr: Address, resolver: Arc, buff_size: usize, diff --git a/capybara-core/src/upstream/misc.rs b/capybara-core/src/upstream/misc.rs index 5f9f775..a79a3aa 100644 --- a/capybara-core/src/upstream/misc.rs +++ b/capybara-core/src/upstream/misc.rs @@ -58,6 +58,9 @@ pub(crate) async fn establish(upstream: &UpstreamKey, buff_size: usize) -> Resul let stream = c.connect(Clone::clone(sni), stream).await?; ClientStream::Tls(stream) } + UpstreamKey::Tag(tag) => { + todo!("establish with tag is not supported yet") + } }; debug!("establish {} ok: {}", upstream, &stream); diff --git a/capybara-core/src/upstream/mod.rs b/capybara-core/src/upstream/mod.rs index e674fc5..e2e6edf 100644 --- a/capybara-core/src/upstream/mod.rs +++ b/capybara-core/src/upstream/mod.rs @@ -1,5 +1,8 @@ pub(crate) use misc::{establish, ClientStream}; -pub(crate) use upstreams::{Pool, Upstreams}; +pub use pools::{Pool, Pools}; +pub use round_robin::RoundRobinPools; +pub(crate) use upstreams::Upstreams; +pub use weighted::WeightedPools; mod misc; mod pools; diff --git a/capybara-core/src/upstream/pools.rs b/capybara-core/src/upstream/pools.rs index c97f7fd..69a6bc9 100644 --- a/capybara-core/src/upstream/pools.rs +++ b/capybara-core/src/upstream/pools.rs @@ -2,11 +2,15 @@ use std::sync::Arc; use async_trait::async_trait; +use crate::transport::{tcp, tls}; use crate::Result; -use super::upstreams::Pool; +pub enum Pool { + Tcp(tcp::Pool), + Tls(tls::Pool), +} #[async_trait] -pub(crate) trait Pools: Send + Sync + 'static { +pub trait Pools: Send + Sync + 'static { async fn next(&self, seed: u64) -> Result>; } diff --git a/capybara-core/src/upstream/round_robin.rs b/capybara-core/src/upstream/round_robin.rs index 846be2d..651057f 100644 --- a/capybara-core/src/upstream/round_robin.rs +++ b/capybara-core/src/upstream/round_robin.rs @@ -8,7 +8,7 @@ use crate::{CapybaraError, Result}; use super::pools::Pools; -struct RoundRobinPools { +pub struct RoundRobinPools { pools: Vec>, seq: AtomicU32, } diff --git a/capybara-core/src/upstream/upstreams.rs b/capybara-core/src/upstream/upstreams.rs index 29cbcad..e041b3e 100644 --- a/capybara-core/src/upstream/upstreams.rs +++ b/capybara-core/src/upstream/upstreams.rs @@ -6,63 +6,93 @@ use hashbrown::HashMap; use tokio::sync::Notify; use tokio::sync::RwLock; +use crate::cachestr::Cachestr; use crate::proto::UpstreamKey; use crate::resolver::{Resolver, DEFAULT_RESOLVER}; use crate::transport::{tcp, tls}; +use crate::upstream::Pools; -pub(crate) enum Pool { - Tcp(tcp::Pool), - Tls(tls::Pool), -} +use super::pools::Pool; pub(crate) struct UpstreamsBuilder { - inner: Upstreams, + closer: Arc, + resolver: Option>, + pools: HashMap>, } impl UpstreamsBuilder { pub(crate) fn resolver(mut self, resolver: Arc) -> Self { - self.inner.resolver.replace(resolver); + self.resolver.replace(resolver); + self + } + + pub(crate) fn keyed(mut self, key: Cachestr, value: Arc) -> Self { + self.pools.insert(key, value); self } pub(crate) fn build(self) -> Upstreams { - self.inner + let Self { + closer, + resolver, + pools, + } = self; + let mut kv: HashMap, UpstreamValue> = Default::default(); + + for (k, v) in pools { + kv.insert(UpstreamKey::Tag(k).into(), UpstreamValue::Pools(v)); + } + + Upstreams { + resolver, + closer, + inner: Arc::new(RwLock::new(kv)), + } } } +enum UpstreamValue { + Direct(Arc), + Pools(Arc), +} + #[derive(Clone)] pub(crate) struct Upstreams { closer: Arc, resolver: Option>, - inner: Arc, Arc>>>, + inner: Arc, UpstreamValue>>>, } impl Upstreams { pub(crate) fn builder(closer: Arc) -> UpstreamsBuilder { UpstreamsBuilder { - inner: Self { - inner: Default::default(), - resolver: None, - closer, - }, + pools: Default::default(), + closer, + resolver: None, } } - pub(crate) async fn get(&self, k: Arc) -> Result> { + pub(crate) async fn get(&self, k: Arc, seed: u64) -> Result> { { let r = self.inner.read().await; if let Some(exist) = r.get(&k) { - return Ok(Clone::clone(exist)); + return match exist { + UpstreamValue::Direct(it) => Ok(Clone::clone(it)), + UpstreamValue::Pools(pools) => pools.next(seed).await.map_err(|e| e.into()), + }; } } let mut w = self.inner.write().await; match w.entry(k) { - Entry::Occupied(ent) => Ok(Clone::clone(ent.get())), + Entry::Occupied(ent) => match ent.get() { + UpstreamValue::Direct(it) => Ok(Clone::clone(it)), + UpstreamValue::Pools(pools) => pools.next(seed).await.map_err(|e| e.into()), + }, Entry::Vacant(ent) => { let pool = self.build_pool(ent.key()).await?; - ent.insert(Clone::clone(&pool)); + ent.insert(UpstreamValue::Direct(Clone::clone(&pool))); Ok(pool) } } @@ -110,6 +140,9 @@ impl Upstreams { .await?; Pool::Tls(p) } + UpstreamKey::Tag(tag) => { + bail!("no upstream tag '{}' found", tag.as_ref()); + } }; Ok(Arc::new(pool)) diff --git a/capybara-core/src/upstream/weighted.rs b/capybara-core/src/upstream/weighted.rs index 552d3b3..a141552 100644 --- a/capybara-core/src/upstream/weighted.rs +++ b/capybara-core/src/upstream/weighted.rs @@ -6,10 +6,9 @@ use capybara_util::WeightedResource; use crate::{CapybaraError, Result}; -use super::pools::Pools; -use super::upstreams::Pool; +use super::pools::{Pool, Pools}; -struct WeightedPools(WeightedResource>); +pub struct WeightedPools(WeightedResource>); #[async_trait] impl Pools for WeightedPools { diff --git a/capybara-etc/Cargo.toml b/capybara-etc/Cargo.toml new file mode 100644 index 0000000..cd5d9f8 --- /dev/null +++ b/capybara-etc/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "capybara-etc" +version = "0.0.0" +edition = "2021" + +[dev-dependencies] +pretty_env_logger = "0.5.0" + +[dependencies] +log = "0.4" +serde = { version = "1", features = ["derive"] } +serde_yaml = "0.9" diff --git a/capybara/src/config.rs b/capybara-etc/src/config.rs similarity index 94% rename from capybara/src/config.rs rename to capybara-etc/src/config.rs index f1dab52..2b164cc 100644 --- a/capybara/src/config.rs +++ b/capybara-etc/src/config.rs @@ -2,6 +2,8 @@ use std::collections::HashMap; use serde::{Deserialize, Serialize}; +pub type Properties = HashMap; + #[derive(Default, Debug, Clone, Serialize, Deserialize, Eq, PartialEq)] pub struct Config { pub listeners: HashMap, @@ -20,14 +22,14 @@ pub struct ListenerConfig { pub struct ProtocolConfig { pub name: String, #[serde(default)] - pub props: capybara_core::pipeline::PipelineConf, + pub props: Properties, } #[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)] pub struct PipelineConfig { pub name: String, #[serde(default)] - pub props: capybara_core::pipeline::PipelineConf, + pub props: Properties, } #[derive(Debug, Copy, Clone, Serialize, Deserialize, Eq, PartialEq, Hash)] @@ -86,6 +88,8 @@ pub struct PoolConfig { #[cfg(test)] mod tests { + use log::info; + use super::*; fn init() { diff --git a/capybara-etc/src/lib.rs b/capybara-etc/src/lib.rs new file mode 100644 index 0000000..f6c297a --- /dev/null +++ b/capybara-etc/src/lib.rs @@ -0,0 +1,12 @@ +#![allow(clippy::type_complexity)] +#![allow(clippy::from_over_into)] +#![allow(clippy::module_inception)] +#![allow(clippy::upper_case_acronyms)] +#![doc(test( + no_crate_inject, + attr(deny(warnings, rust_2018_idioms), allow(dead_code, unused_variables)) +))] + +pub use config::*; + +mod config; diff --git a/capybara-util/src/lib.rs b/capybara-util/src/lib.rs index 47128f6..ce90299 100644 --- a/capybara-util/src/lib.rs +++ b/capybara-util/src/lib.rs @@ -1,3 +1,12 @@ +#![allow(clippy::type_complexity)] +#![allow(clippy::from_over_into)] +#![allow(clippy::module_inception)] +#![allow(clippy::upper_case_acronyms)] +#![doc(test( + no_crate_inject, + attr(deny(warnings, rust_2018_idioms), allow(dead_code, unused_variables)) +))] + use std::net::IpAddr; use once_cell::sync::Lazy; diff --git a/capybara/Cargo.toml b/capybara/Cargo.toml index 7885ef8..0251c09 100644 --- a/capybara/Cargo.toml +++ b/capybara/Cargo.toml @@ -6,10 +6,10 @@ edition = "2021" [dev-dependencies] pretty_env_logger = "0.5.0" -[dependencies.capybara-core] -path = "../capybara-core" - [dependencies] +capybara-core = { path = "../capybara-core" } +capybara-etc = { path = "../capybara-etc" } +capybara-util = { path = "../capybara-util" } mimalloc = { version = "0.1.42", default-features = false } log = "0.4.21" anyhow = "1.0.86" diff --git a/capybara/src/bootstrap/runtime.rs b/capybara/src/bootstrap/runtime.rs index 5a4f242..618b468 100644 --- a/capybara/src/bootstrap/runtime.rs +++ b/capybara/src/bootstrap/runtime.rs @@ -1,14 +1,17 @@ -use std::net::SocketAddr; +use std::net::{IpAddr, SocketAddr}; use std::path::PathBuf; use std::sync::Arc; use hashbrown::HashMap; -use tokio::sync::{mpsc, RwLock}; +use tokio::sync::{mpsc, Notify, RwLock}; use capybara_core::proto::{Listener, Signal}; use capybara_core::protocol::http::HttpListener; +use capybara_core::transport::tcp::TcpStreamPoolBuilder; +use capybara_core::{CapybaraError, Pool, Pools, RoundRobinPools, WeightedPools}; +use capybara_etc::{BalanceStrategy, Config, ListenerConfig, UpstreamConfig}; +use capybara_util::WeightedResource; -use crate::config::{Config, ListenerConfig}; use crate::provider::{ConfigProvider, StaticFileWatcher}; use super::config::BootstrapConf; @@ -21,7 +24,9 @@ enum ConfigOperation { #[derive(Default)] struct Dispatcher { + closer: Arc, listeners: HashMap, Arc)>, + upstreams: HashMap>, } impl Dispatcher { @@ -31,6 +36,59 @@ impl Dispatcher { } } + async fn dispatch_upstream(&mut self, op: ConfigOperation) { + match op { + ConfigOperation::Remove(k) => { + self.remove_listener(&k).await.ok(); + } + ConfigOperation::Create(k, v) => { + self.create_upstream(k, v).await.ok(); + } + ConfigOperation::Update(_k, _v) => { + todo!("update listener") + } + } + } + + async fn create_upstream(&mut self, k: String, v: UpstreamConfig) -> anyhow::Result<()> { + let p: Arc = match v.balancer { + BalanceStrategy::Weighted => { + let mut b = WeightedResource::builder(); + + for endpoint in &v.endpoints { + let weight = endpoint.weight.unwrap_or(10); + let p = { + let b = to_tcp_stream_pool_builder(&endpoint.addr)?; + let p = b.build(Clone::clone(&self.closer)).await?; + Pool::Tcp(p) + }; + b = b.push(weight, p.into()); + } + Arc::new(WeightedPools::from(b.build())) + } + BalanceStrategy::IpHash => { + todo!() + } + BalanceStrategy::RoundRobin => { + let mut pools: Vec> = vec![]; + + for endpoint in &v.endpoints { + let next = { + let b = to_tcp_stream_pool_builder(&endpoint.addr)?; + let p = b.build(Clone::clone(&self.closer)).await?; + Pool::Tcp(p) + }; + pools.push(next.into()); + } + Arc::new(RoundRobinPools::from(pools)) + } + }; + + self.upstreams.insert(k, p); + + Ok(()) + } + async fn dispatch_listener(&mut self, op: ConfigOperation) { match op { ConfigOperation::Remove(k) => { @@ -62,6 +120,11 @@ impl Dispatcher { for p in &c.pipelines { b = b.pipeline(&p.name, &p.props); } + + for (k, v) in &self.upstreams { + b = b.upstream(k, Clone::clone(v)); + } + Arc::new(b.build()?) } other => { @@ -86,6 +149,25 @@ impl Dispatcher { } } +#[inline] +fn to_tcp_stream_pool_builder(addr: &str) -> anyhow::Result { + let mut sp = addr.split(':'); + if let Some(left) = sp.next() { + if let Some(right) = sp.next() { + if let Ok(port) = right.parse::() { + if sp.next().is_none() { + return Ok(match left.parse::() { + Ok(ip) => TcpStreamPoolBuilder::with_addr(SocketAddr::new(ip, port)), + Err(_) => TcpStreamPoolBuilder::with_domain(left, port), + }); + } + } + } + } + + bail!(CapybaraError::InvalidUpstream(addr.to_string().into())); +} + pub(crate) struct Bootstrap { bc: BootstrapConf, c: Arc>, @@ -108,6 +190,41 @@ impl Bootstrap { while let Some(next) = c_rx.recv().await { let mut prev = c.write().await; + for k in prev.upstreams.keys() { + if !next.upstreams.contains_key(k) { + let mut d = dispatcher.write().await; + d.dispatch_upstream(ConfigOperation::Remove(Clone::clone(k))) + .await; + } + } + + // CREATE: $not_exist(prev) && $exit(next) + // UPDATE: $exist(prev) && $exist(next) + for (k, v) in &next.upstreams { + match prev.upstreams.get(k) { + None => { + let op = ConfigOperation::Create(Clone::clone(k), Clone::clone(v)); + { + let mut d = dispatcher.write().await; + d.dispatch_upstream(op).await; + } + prev.upstreams.insert(Clone::clone(k), Clone::clone(v)); + } + Some(exist) if exist != v => { + { + let mut d = dispatcher.write().await; + d.dispatch_upstream(ConfigOperation::Update( + Clone::clone(k), + Clone::clone(v), + )) + .await; + } + prev.upstreams.insert(Clone::clone(k), Clone::clone(v)); + } + _ => (), + } + } + // REMOVE: $exist(prev) && $not_exist(next) for k in prev.listeners.keys() { if !next.listeners.contains_key(k) { diff --git a/capybara/src/main.rs b/capybara/src/main.rs index 037ab5c..39acebe 100644 --- a/capybara/src/main.rs +++ b/capybara/src/main.rs @@ -1,7 +1,7 @@ -#![allow(dead_code)] +// #![allow(dead_code)] // #![allow(unused_imports)] -#![allow(unused_variables)] -#![allow(unused_assignments)] +// #![allow(unused_variables)] +// #![allow(unused_assignments)] #![allow(clippy::type_complexity)] #![allow(clippy::from_over_into)] #![allow(clippy::module_inception)] @@ -29,7 +29,6 @@ static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc; mod bootstrap; mod cmd; -mod config; mod provider; #[derive(Parser)] diff --git a/capybara/src/provider/static_file.rs b/capybara/src/provider/static_file.rs index c6c21a3..f1ac93c 100644 --- a/capybara/src/provider/static_file.rs +++ b/capybara/src/provider/static_file.rs @@ -3,7 +3,7 @@ use std::path::PathBuf; use async_trait::async_trait; use tokio::sync::mpsc::UnboundedSender; -use crate::config::Config; +use capybara_etc::Config; use super::ConfigProvider; diff --git a/justfile b/justfile index 8ae61fc..3696cd1 100644 --- a/justfile +++ b/justfile @@ -1,6 +1,11 @@ version := "0.0.1" revision := `git rev-parse --short HEAD` +alias r := run + +run: + cargo run --package capybara --bin capybara -- run -c testdata/capybara.yaml + run-example EXAMPLE: cargo run --example {{EXAMPLE}} diff --git a/testdata/config.yaml b/testdata/config.yaml index a64bffe..696e89b 100644 --- a/testdata/config.yaml +++ b/testdata/config.yaml @@ -8,23 +8,21 @@ listeners: - name: capybara.pipelines.http.lua props: content: | + cnts = 0 + function handle_request_headers(ctx,headers) local u = headers:get('x-upstream-key') if u ~= nil and u ~= '' then ctx:set_upstream(u) else - ctx:respond({ - status=502, - headers={ - ['Content-Type']='application/json; charset=UTF-8' - }, - body=json:encode({err='NO_ROUTE_FOUND',msg='Please set x-upstream-key!'}) - }) + ctx:set_upstream('upstream://httpbin') end end function handle_status_line(ctx,status_line) + cnts = cnts + 1 ctx:replace_header('X-Powered-By','capybara') + ctx:replace_header('X-Capybara-Requests', tostring(cnts)) end upstreams: @@ -35,5 +33,5 @@ upstreams: endpoints: - addr: httpbin.org:80 weight: 50 - - addr: httpbingo.org:80 + - addr: postman-echo.com:80 weight: 50