From 8465291330e9d2401e8d1c8c794ef49101033d7b Mon Sep 17 00:00:00 2001 From: Katharina Fey Date: Mon, 22 Apr 2024 19:42:46 +0200 Subject: [PATCH 1/2] Allow API listener to bind to multiple addresses --- crates/corro-agent/src/agent/run_root.rs | 3 +- crates/corro-agent/src/agent/setup.rs | 7 ++++ crates/corro-agent/src/agent/util.rs | 44 ++++++++++++++++-------- crates/corro-types/src/config.rs | 9 +++++ 4 files changed, 47 insertions(+), 16 deletions(-) diff --git a/crates/corro-agent/src/agent/run_root.rs b/crates/corro-agent/src/agent/run_root.rs index 1f73bf66..8dc12e45 100644 --- a/crates/corro-agent/src/agent/run_root.rs +++ b/crates/corro-agent/src/agent/run_root.rs @@ -39,6 +39,7 @@ async fn run(agent: Agent, opts: AgentOptions, pconf: PerfConfig) -> eyre::Resul gossip_server_endpoint, transport, api_listener, + extra_api_listeners, tripwire, lock_registry, rx_bcast, @@ -96,7 +97,7 @@ async fn run(agent: Agent, opts: AgentOptions, pconf: PerfConfig) -> eyre::Resul &tripwire, subs_bcast_cache, &subs_manager, - api_listener, + (api_listener, extra_api_listeners), ) .await?; diff --git a/crates/corro-agent/src/agent/setup.rs b/crates/corro-agent/src/agent/setup.rs index a41adbff..b0ff5a43 100644 --- a/crates/corro-agent/src/agent/setup.rs +++ b/crates/corro-agent/src/agent/setup.rs @@ -48,6 +48,7 @@ pub struct AgentOptions { pub gossip_server_endpoint: quinn::Endpoint, pub transport: Transport, pub api_listener: TcpListener, + pub extra_api_listeners: Vec, pub rx_bcast: CorroReceiver, pub rx_apply: CorroReceiver<(ActorId, Version)>, pub rx_clear_buf: CorroReceiver<(ActorId, RangeInclusive)>, @@ -135,6 +136,11 @@ pub async fn setup(conf: Config, tripwire: Tripwire) -> eyre::Result<(Agent, Age let api_listener = TcpListener::bind(conf.api.bind_addr).await?; let api_addr = api_listener.local_addr()?; + let mut extra_api_listeners = vec![]; + for addr in &conf.api.extra_bind_addrs { + extra_api_listeners.push(TcpListener::bind(addr).await?); + } + let clock = Arc::new( uhlc::HLCBuilder::default() .with_id(actor_id.try_into().unwrap()) @@ -167,6 +173,7 @@ pub async fn setup(conf: Config, tripwire: Tripwire) -> eyre::Result<(Agent, Age gossip_server_endpoint, transport, api_listener, + extra_api_listeners, lock_registry, rx_bcast, rx_apply, diff --git a/crates/corro-agent/src/agent/util.rs b/crates/corro-agent/src/agent/util.rs index 66f3c482..d8329478 100644 --- a/crates/corro-agent/src/agent/util.rs +++ b/crates/corro-agent/src/agent/util.rs @@ -5,16 +5,6 @@ //! a similar API facade, handle the same kinds of data, etc) should //! be pulled out of this file in future. -use std::{ - cmp, - collections::{BTreeMap, BTreeSet, HashSet}, - convert::Infallible, - net::SocketAddr, - ops::RangeInclusive, - sync::{atomic::AtomicI64, Arc}, - time::{Duration, Instant}, -}; - use crate::{ agent::{handlers, CountedExecutor, MAX_SYNC_BACKOFF, TO_CLEAR_COUNT}, api::public::{ @@ -34,6 +24,15 @@ use corro_types::{ config::AuthzConfig, pubsub::SubsManager, }; +use std::{ + cmp, + collections::{BTreeMap, BTreeSet, HashSet}, + convert::Infallible, + net::SocketAddr, + ops::RangeInclusive, + sync::{atomic::AtomicI64, Arc}, + time::{Duration, Instant}, +}; use axum::{ error_handling::HandleErrorLayer, @@ -384,7 +383,7 @@ pub async fn setup_http_api_handler( tripwire: &Tripwire, subs_bcast_cache: BcastCache, subs_manager: &SubsManager, - api_listener: TcpListener, + (api_listener, extra_listeners): (TcpListener, Vec), ) -> eyre::Result<()> { let api = Router::new() // transactions @@ -485,9 +484,24 @@ pub async fn setup_http_api_handler( .layer(DefaultBodyLimit::disable()) .layer(TraceLayer::new_for_http()); + spawn_server_on_bind(api_listener, api.clone(), &tripwire)?; + + for extra_addr in extra_listeners { + spawn_server_on_bind(extra_addr, api.clone(), &tripwire)?; + } + + Ok(()) +} + +fn spawn_server_on_bind( + api_listener: TcpListener, + api: Router, + tripwire: &Tripwire, +) -> eyre::Result<()> { let api_addr = api_listener.local_addr()?; - info!("Starting public API server on tcp/{api_addr}"); + info!("Starting API listener on tcp/{api_addr}"); let mut incoming = AddrIncoming::from_listener(api_listener)?; + incoming.set_nodelay(true); spawn_counted( axum::Server::builder(incoming) @@ -784,7 +798,7 @@ pub fn store_empty_changeset( let deleted: Vec> = conn .prepare_cached( " - DELETE FROM __corro_bookkeeping + DELETE FROM __corro_bookkeeping WHERE actor_id = :actor_id AND start_version >= COALESCE(( @@ -800,7 +814,7 @@ pub fn store_empty_changeset( ( -- start_version is between start and end of range AND no end_version ( start_version BETWEEN :start AND :end AND end_version IS NULL ) OR - + -- start_version and end_version are within the range ( start_version >= :start AND end_version <= :end ) OR @@ -1389,7 +1403,7 @@ pub fn process_incomplete_version( " DELETE FROM __corro_seq_bookkeeping WHERE site_id = :actor_id AND version = :version AND - ( + ( -- start_seq and end_seq are within the range ( start_seq >= :start AND end_seq <= :end ) OR diff --git a/crates/corro-types/src/config.rs b/crates/corro-types/src/config.rs index 89158b3d..5fe78f54 100644 --- a/crates/corro-types/src/config.rs +++ b/crates/corro-types/src/config.rs @@ -116,6 +116,8 @@ impl DbConfig { pub struct ApiConfig { #[serde(alias = "addr")] pub bind_addr: SocketAddr, + #[serde(alias = "extra_addrs")] + pub extra_bind_addrs: Vec, #[serde(alias = "authz", default)] pub authorization: Option, #[serde(default)] @@ -281,6 +283,7 @@ pub struct ConfigBuilder { pub db_path: Option, gossip_addr: Option, api_addr: Option, + extra_api_addrs: Vec, external_addr: Option, admin_path: Option, prometheus_addr: Option, @@ -309,6 +312,11 @@ impl ConfigBuilder { self } + pub fn extra_api_addr(mut self, addr: SocketAddr) -> Self { + self.extra_api_addrs.push(addr); + self + } + pub fn external_addr(mut self, addr: SocketAddr) -> Self { self.external_addr = Some(addr); self @@ -373,6 +381,7 @@ impl ConfigBuilder { }, api: ApiConfig { bind_addr: self.api_addr.ok_or(ConfigBuilderError::ApiAddrRequired)?, + extra_bind_addrs: self.extra_api_addrs, authorization: None, pg: None, }, From d272a6e9e91358f3b8bc71be881672e2e304ef6b Mon Sep 17 00:00:00 2001 From: Katharina Fey Date: Tue, 23 Apr 2024 15:36:36 +0200 Subject: [PATCH 2/2] Combine API binds into a single configuration value --- crates/corro-agent/src/agent/run_root.rs | 3 +-- crates/corro-agent/src/agent/setup.rs | 9 +------- crates/corro-agent/src/agent/util.rs | 16 +------------ crates/corro-types/src/config.rs | 29 +++++++++--------------- crates/corrosion/src/command/agent.rs | 3 ++- crates/corrosion/src/main.rs | 2 +- 6 files changed, 17 insertions(+), 45 deletions(-) diff --git a/crates/corro-agent/src/agent/run_root.rs b/crates/corro-agent/src/agent/run_root.rs index 8dc12e45..1f73bf66 100644 --- a/crates/corro-agent/src/agent/run_root.rs +++ b/crates/corro-agent/src/agent/run_root.rs @@ -39,7 +39,6 @@ async fn run(agent: Agent, opts: AgentOptions, pconf: PerfConfig) -> eyre::Resul gossip_server_endpoint, transport, api_listener, - extra_api_listeners, tripwire, lock_registry, rx_bcast, @@ -97,7 +96,7 @@ async fn run(agent: Agent, opts: AgentOptions, pconf: PerfConfig) -> eyre::Resul &tripwire, subs_bcast_cache, &subs_manager, - (api_listener, extra_api_listeners), + api_listener, ) .await?; diff --git a/crates/corro-agent/src/agent/setup.rs b/crates/corro-agent/src/agent/setup.rs index b0ff5a43..331c081f 100644 --- a/crates/corro-agent/src/agent/setup.rs +++ b/crates/corro-agent/src/agent/setup.rs @@ -48,7 +48,6 @@ pub struct AgentOptions { pub gossip_server_endpoint: quinn::Endpoint, pub transport: Transport, pub api_listener: TcpListener, - pub extra_api_listeners: Vec, pub rx_bcast: CorroReceiver, pub rx_apply: CorroReceiver<(ActorId, Version)>, pub rx_clear_buf: CorroReceiver<(ActorId, RangeInclusive)>, @@ -133,14 +132,9 @@ pub async fn setup(conf: Config, tripwire: Tripwire) -> eyre::Result<(Agent, Age let transport = Transport::new(&conf.gossip, rtt_tx).await?; - let api_listener = TcpListener::bind(conf.api.bind_addr).await?; + let api_listener = TcpListener::bind(conf.api.bind_addr.as_slice()).await?; let api_addr = api_listener.local_addr()?; - let mut extra_api_listeners = vec![]; - for addr in &conf.api.extra_bind_addrs { - extra_api_listeners.push(TcpListener::bind(addr).await?); - } - let clock = Arc::new( uhlc::HLCBuilder::default() .with_id(actor_id.try_into().unwrap()) @@ -173,7 +167,6 @@ pub async fn setup(conf: Config, tripwire: Tripwire) -> eyre::Result<(Agent, Age gossip_server_endpoint, transport, api_listener, - extra_api_listeners, lock_registry, rx_bcast, rx_apply, diff --git a/crates/corro-agent/src/agent/util.rs b/crates/corro-agent/src/agent/util.rs index d8329478..cd0bd7dd 100644 --- a/crates/corro-agent/src/agent/util.rs +++ b/crates/corro-agent/src/agent/util.rs @@ -383,7 +383,7 @@ pub async fn setup_http_api_handler( tripwire: &Tripwire, subs_bcast_cache: BcastCache, subs_manager: &SubsManager, - (api_listener, extra_listeners): (TcpListener, Vec), + api_listener: TcpListener, ) -> eyre::Result<()> { let api = Router::new() // transactions @@ -484,20 +484,6 @@ pub async fn setup_http_api_handler( .layer(DefaultBodyLimit::disable()) .layer(TraceLayer::new_for_http()); - spawn_server_on_bind(api_listener, api.clone(), &tripwire)?; - - for extra_addr in extra_listeners { - spawn_server_on_bind(extra_addr, api.clone(), &tripwire)?; - } - - Ok(()) -} - -fn spawn_server_on_bind( - api_listener: TcpListener, - api: Router, - tripwire: &Tripwire, -) -> eyre::Result<()> { let api_addr = api_listener.local_addr()?; info!("Starting API listener on tcp/{api_addr}"); let mut incoming = AddrIncoming::from_listener(api_listener)?; diff --git a/crates/corro-types/src/config.rs b/crates/corro-types/src/config.rs index 5fe78f54..f88d6e87 100644 --- a/crates/corro-types/src/config.rs +++ b/crates/corro-types/src/config.rs @@ -2,6 +2,7 @@ use std::net::{Ipv6Addr, SocketAddr, SocketAddrV6}; use camino::Utf8PathBuf; use serde::{Deserialize, Serialize}; +use serde_with::{formats::PreferOne, serde_as, OneOrMany}; pub const DEFAULT_GOSSIP_PORT: u16 = 4001; const DEFAULT_GOSSIP_IDLE_TIMEOUT: u32 = 30; @@ -112,12 +113,12 @@ impl DbConfig { } } +#[serde_as] #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ApiConfig { #[serde(alias = "addr")] - pub bind_addr: SocketAddr, - #[serde(alias = "extra_addrs")] - pub extra_bind_addrs: Vec, + #[serde_as(deserialize_as = "OneOrMany<_, PreferOne>")] + pub bind_addr: Vec, #[serde(alias = "authz", default)] pub authorization: Option, #[serde(default)] @@ -282,8 +283,7 @@ impl Config { pub struct ConfigBuilder { pub db_path: Option, gossip_addr: Option, - api_addr: Option, - extra_api_addrs: Vec, + api_addr: Vec, external_addr: Option, admin_path: Option, prometheus_addr: Option, @@ -308,17 +308,7 @@ impl ConfigBuilder { } pub fn api_addr(mut self, addr: SocketAddr) -> Self { - self.api_addr = Some(addr); - self - } - - pub fn extra_api_addr(mut self, addr: SocketAddr) -> Self { - self.extra_api_addrs.push(addr); - self - } - - pub fn external_addr(mut self, addr: SocketAddr) -> Self { - self.external_addr = Some(addr); + self.api_addr.push(addr); self } @@ -372,6 +362,10 @@ impl ConfigBuilder { open_telemetry: None, }; + if self.api_addr.is_empty() { + return Err(ConfigBuilderError::ApiAddrRequired); + } + Ok(Config { db: DbConfig { path: db_path, @@ -380,8 +374,7 @@ impl ConfigBuilder { clear_overwritten_secs: None, }, api: ApiConfig { - bind_addr: self.api_addr.ok_or(ConfigBuilderError::ApiAddrRequired)?, - extra_bind_addrs: self.extra_api_addrs, + bind_addr: self.api_addr, authorization: None, pg: None, }, diff --git a/crates/corrosion/src/command/agent.rs b/crates/corrosion/src/command/agent.rs index 436373f1..78878283 100644 --- a/crates/corrosion/src/command/agent.rs +++ b/crates/corrosion/src/command/agent.rs @@ -59,7 +59,8 @@ pub async fn run(config: Config, config_path: &Utf8PathBuf) -> eyre::Result<()> )?; if !config.db.schema_paths.is_empty() { - let client = corro_client::CorrosionApiClient::new(config.api.bind_addr); + let client = + corro_client::CorrosionApiClient::new(config.api.bind_addr.first().unwrap().clone()); match client .schema_from_paths(config.db.schema_paths.as_slice()) .await diff --git a/crates/corrosion/src/main.rs b/crates/corrosion/src/main.rs index b69b98e6..5d84e1b1 100644 --- a/crates/corrosion/src/main.rs +++ b/crates/corrosion/src/main.rs @@ -577,7 +577,7 @@ impl Cli { Ok(if let Some(api_addr) = self.api_addr { api_addr } else { - self.config()?.api.bind_addr + self.config()?.api.bind_addr.first().unwrap().clone() }) }