From 8d2b69d55fa47a9bb94c275921d26bd59a350b05 Mon Sep 17 00:00:00 2001 From: Pavel Borzenkov Date: Tue, 11 Jun 2024 14:24:25 +0200 Subject: [PATCH 1/3] corro-client: switch from system's to trust-dns resolver System's resolver returns at most 3 addresses, but we really need all of them to make sure all clients use the same Corrosion servers. --- Cargo.lock | 273 +++++++++++++++++++++++++++++++-- crates/corro-client/Cargo.toml | 1 + crates/corro-client/src/lib.rs | 105 ++++++++----- 3 files changed, 327 insertions(+), 52 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4f6efa4d..ad3df51f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -161,7 +161,7 @@ dependencies = [ "proc-macro2", "quote", "syn 1.0.109", - "synstructure", + "synstructure 0.12.6", ] [[package]] @@ -858,6 +858,7 @@ dependencies = [ "tokio", "tokio-util", "tracing", + "trust-dns-resolver", "uuid", ] @@ -1553,9 +1554,9 @@ dependencies = [ [[package]] name = "form_urlencoded" -version = "1.1.0" +version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a9c384f161156f5260c24a097c56119f9be8c798586aecc13afbcbe7b7e26bf8" +checksum = "e13624c2627564efccf4934284bdd98cbaa14e79b0b5a141218e507b3a823456" dependencies = [ "percent-encoding", ] @@ -1961,6 +1962,124 @@ dependencies = [ "cxx-build", ] +[[package]] +name = "icu_collections" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "db2fa452206ebee18c4b5c2274dbf1de17008e874b4dc4f0aea9d01ca79e4526" +dependencies = [ + "displaydoc", + "yoke", + "zerofrom", + "zerovec", +] + +[[package]] +name = "icu_locid" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "13acbb8371917fc971be86fc8057c41a64b521c184808a698c02acc242dbf637" +dependencies = [ + "displaydoc", + "litemap", + "tinystr", + "writeable", + "zerovec", +] + +[[package]] +name = "icu_locid_transform" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "01d11ac35de8e40fdeda00d9e1e9d92525f3f9d887cdd7aa81d727596788b54e" +dependencies = [ + "displaydoc", + "icu_locid", + "icu_locid_transform_data", + "icu_provider", + "tinystr", + "zerovec", +] + +[[package]] +name = "icu_locid_transform_data" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fdc8ff3388f852bede6b579ad4e978ab004f139284d7b28715f773507b946f6e" + +[[package]] +name = "icu_normalizer" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19ce3e0da2ec68599d193c93d088142efd7f9c5d6fc9b803774855747dc6a84f" +dependencies = [ + "displaydoc", + "icu_collections", + "icu_normalizer_data", + "icu_properties", + "icu_provider", + "smallvec", + "utf16_iter", + "utf8_iter", + "write16", + "zerovec", +] + +[[package]] +name = "icu_normalizer_data" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8cafbf7aa791e9b22bec55a167906f9e1215fd475cd22adfcf660e03e989516" + +[[package]] +name = "icu_properties" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f8ac670d7422d7f76b32e17a5db556510825b29ec9154f235977c9caba61036" +dependencies = [ + "displaydoc", + "icu_collections", + "icu_locid_transform", + "icu_properties_data", + "icu_provider", + "tinystr", + "zerovec", +] + +[[package]] +name = "icu_properties_data" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "67a8effbc3dd3e4ba1afa8ad918d5684b8868b3b26500753effea8d2eed19569" + +[[package]] +name = "icu_provider" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ed421c8a8ef78d3e2dbc98a973be2f3770cb42b606e3ab18d6237c4dfde68d9" +dependencies = [ + "displaydoc", + "icu_locid", + "icu_provider_macros", + "stable_deref_trait", + "tinystr", + "writeable", + "yoke", + "zerofrom", + "zerovec", +] + +[[package]] +name = "icu_provider_macros" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ec89e9337638ecdc08744df490b221a7399bf8d164eb52a665454e60e075ad6" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.32", +] + [[package]] name = "ident_case" version = "1.0.1" @@ -1980,12 +2099,14 @@ dependencies = [ [[package]] name = "idna" -version = "0.3.0" +version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e14ddfc70884202db2244c223200c204c2bda1bc6e0998d11b5e024d657209e6" +checksum = "4716a3a0933a1d01c2f72450e89596eb51dd34ef3c211ccd875acdf1f8fe47ed" dependencies = [ - "unicode-bidi", - "unicode-normalization", + "icu_normalizer", + "icu_properties", + "smallvec", + "utf8_iter", ] [[package]] @@ -2227,6 +2348,12 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d59d8c75012853d2e872fb56bc8a2e53718e2cafe1a4c823143141c6d90c322f" +[[package]] +name = "litemap" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "643cb0b8d4fcc284004d5fd0d67ccf61dfffadb7f75e1e71bc420f4688a3a704" + [[package]] name = "lock_api" version = "0.4.11" @@ -2764,9 +2891,9 @@ dependencies = [ [[package]] name = "percent-encoding" -version = "2.2.0" +version = "2.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "478c572c3d73181ff3c2539045f6eb99e5491218eae919370993b890cdbdd98e" +checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" [[package]] name = "pgwire" @@ -3666,9 +3793,9 @@ dependencies = [ [[package]] name = "smallvec" -version = "1.11.0" +version = "1.13.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "62bb4feee49fdd9f707ef802e22365a35de4b7b299de4763d44bfea899442ff9" +checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67" dependencies = [ "serde", ] @@ -3809,6 +3936,12 @@ dependencies = [ "log", ] +[[package]] +name = "stable_deref_trait" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3" + [[package]] name = "static_assertions" version = "1.1.0" @@ -3928,6 +4061,17 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "synstructure" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c8af7666ab7b6390ab78131fb5b0fce11d6b7a6951602017c35fa82800708971" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.32", +] + [[package]] name = "tempfile" version = "3.5.0" @@ -4063,6 +4207,16 @@ dependencies = [ "crunchy", ] +[[package]] +name = "tinystr" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9117f5d4db391c1cf6927e7bea3db74b9a1c1add8f7eda9ffd5364f40f57b82f" +dependencies = [ + "displaydoc", + "zerovec", +] + [[package]] name = "tinyvec" version = "1.6.0" @@ -4582,12 +4736,12 @@ checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a" [[package]] name = "url" -version = "2.3.1" +version = "2.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0d68c799ae75762b8c3fe375feb6600ef5602c883c5d21eb51c09f22b83c4643" +checksum = "f7c25da092f0a868cdf09e8674cd3b7ef3a7d92a24253e663a2fb85e2496de56" dependencies = [ "form_urlencoded", - "idna 0.3.0", + "idna 1.0.0", "percent-encoding", ] @@ -4603,6 +4757,18 @@ version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" +[[package]] +name = "utf16_iter" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c8232dd3cdaed5356e0f716d285e4b40b932ac434100fe9b7e0e8e935b9e6246" + +[[package]] +name = "utf8_iter" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be" + [[package]] name = "utf8parse" version = "0.2.1" @@ -4958,6 +5124,18 @@ dependencies = [ "winapi", ] +[[package]] +name = "write16" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d1890f4022759daae28ed4fe62859b1236caebfc61ede2f63ed4e695f3f6d936" + +[[package]] +name = "writeable" +version = "0.5.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e9df38ee2d2c3c5948ea468a8406ff0db0b29ae1ffde1bcf20ef305bcc95c51" + [[package]] name = "x509-certificate" version = "0.21.0" @@ -5009,6 +5187,30 @@ dependencies = [ "time", ] +[[package]] +name = "yoke" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c5b1314b079b0930c31e3af543d8ee1757b1951ae1e1565ec704403a7240ca5" +dependencies = [ + "serde", + "stable_deref_trait", + "yoke-derive", + "zerofrom", +] + +[[package]] +name = "yoke-derive" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "28cc31741b18cb6f1d5ff12f5b7523e3d6eb0852bbbad19d73905511d9849b95" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.32", + "synstructure 0.13.1", +] + [[package]] name = "zerocopy" version = "0.7.26" @@ -5029,12 +5231,55 @@ dependencies = [ "syn 2.0.32", ] +[[package]] +name = "zerofrom" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91ec111ce797d0e0784a1116d0ddcdbea84322cd79e5d5ad173daeba4f93ab55" +dependencies = [ + "zerofrom-derive", +] + +[[package]] +name = "zerofrom-derive" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ea7b4a3637ea8669cedf0f1fd5c286a17f3de97b8dd5a70a6c167a1730e63a5" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.32", + "synstructure 0.13.1", +] + [[package]] name = "zeroize" version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2a0956f1ba7c7909bfb66c2e9e4124ab6f6482560f6628b5aaeba39207c9aad9" +[[package]] +name = "zerovec" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb2cc8827d6c0994478a15c53f374f46fbd41bea663d809b14744bc42e6b109c" +dependencies = [ + "yoke", + "zerofrom", + "zerovec-derive", +] + +[[package]] +name = "zerovec-derive" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97cf56601ee5052b4417d90c8755c6683473c926039908196cf35d99f893ebe7" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.32", +] + [[package]] name = "zstd" version = "0.13.0" diff --git a/crates/corro-client/Cargo.toml b/crates/corro-client/Cargo.toml index 6731c024..9e8a328c 100644 --- a/crates/corro-client/Cargo.toml +++ b/crates/corro-client/Cargo.toml @@ -22,5 +22,6 @@ tracing = { workspace = true } uuid = { workspace = true } sqlite-pool = { path = "../sqlite-pool" } serde = { workspace = true } +trust-dns-resolver = { workspace = true } [features] diff --git a/crates/corro-client/src/lib.rs b/crates/corro-client/src/lib.rs index 6380ae2e..b3719b69 100644 --- a/crates/corro-client/src/lib.rs +++ b/crates/corro-client/src/lib.rs @@ -5,7 +5,6 @@ use http::uri::PathAndQuery; use hyper::{client::HttpConnector, http::HeaderName, Body, StatusCode}; use serde::de::DeserializeOwned; use std::{ - io, net::SocketAddr, ops::Deref, path::Path, @@ -13,11 +12,13 @@ use std::{ time::{self, Instant}, }; use sub::{QueryStream, SubscriptionStream}; -use tokio::{ - net::{lookup_host, ToSocketAddrs}, - sync::{RwLock, RwLockReadGuard}, -}; +use tokio::sync::{RwLock, RwLockReadGuard}; use tracing::{debug, info, warn}; +use trust_dns_resolver::{ + error::ResolveError, + name_server::{GenericConnection, GenericConnectionProvider, TokioRuntime}, + AsyncResolver, +}; use uuid::Uuid; #[derive(Clone)] @@ -347,12 +348,12 @@ impl Deref for CorrosionClient { } #[derive(Clone)] -pub struct CorrosionPooledClient { - inner: Arc>>, +pub struct CorrosionPooledClient { + inner: Arc>, } -struct PooledClientInner { - picker: AddrPicker, +struct PooledClientInner { + picker: AddrPicker, // For how long to stick with a chosen server stickiness_timeout: time::Duration, @@ -366,11 +367,15 @@ struct PooledClientInner { generation: u64, } -impl CorrosionPooledClient { - pub fn new(addrs: Vec, stickiness_timeout: time::Duration) -> Self { +impl CorrosionPooledClient { + pub fn new( + addrs: Vec, + stickiness_timeout: time::Duration, + resolver: AsyncResolver>, + ) -> Self { Self { inner: Arc::new(RwLock::new(PooledClientInner { - picker: AddrPicker::new(addrs), + picker: AddrPicker::new(addrs, resolver), stickiness_timeout, client: None, @@ -520,9 +525,11 @@ impl CorrosionPooledClient { } } -struct AddrPicker { +struct AddrPicker { + // Resolver used to resolve the addresses + resolver: AsyncResolver>, // List of addresses/hostname to try in order - addrs: Vec, + addrs: Vec, // Next address/hostname to try next_addr: usize, @@ -532,9 +539,13 @@ struct AddrPicker { next_resolved_addr: usize, } -impl AddrPicker { - fn new(addrs: Vec) -> AddrPicker { +impl AddrPicker { + fn new( + addrs: Vec, + resolver: AsyncResolver>, + ) -> AddrPicker { Self { + resolver, addrs, next_addr: 0, @@ -552,13 +563,31 @@ impl AddrPicker { .map(|v| v.len()) .unwrap_or_default() { - let addr_or_hostname = self.addrs.get(self.next_addr).ok_or(io::Error::new( - io::ErrorKind::Other, - "No addresses available", - ))?; + let host_port = self + .addrs + .get(self.next_addr) + .ok_or(ResolveError::from("No addresses available"))?; self.next_addr = (self.next_addr + 1) % self.addrs.len(); - let mut addrs = lookup_host(addr_or_hostname).await?.collect::>(); + // split host port + let mut host_port = host_port.splitn(2, ':'); + let host = host_port.next().ok_or(ResolveError::from( + "Invalid Corrosion server address (missing hostname)", + ))?; + let port = host_port + .next() + .and_then(|p| p.parse().ok()) + .ok_or(ResolveError::from( + "Invalid Corrosion server address (missing port)", + ))?; + + let mut addrs = self + .resolver + .lookup_ip(host) + .await? + .iter() + .map(|addr| (addr, port).into()) + .collect::>(); // Sort so all the nodes try the addresses in the same order addrs.sort(); @@ -577,7 +606,7 @@ impl AddrPicker { Ok(addr) } else { - Err(io::Error::new(io::ErrorKind::Other, "DNS didn't return any addresses").into()) + Err(ResolveError::from("DNS didn't return any addresses").into()) } } @@ -591,7 +620,7 @@ impl AddrPicker { #[derive(Debug, thiserror::Error)] pub enum Error { #[error(transparent)] - Dns(#[from] io::Error), + Dns(#[from] ResolveError), #[error(transparent)] Hyper(#[from] hyper::Error), #[error(transparent)] @@ -629,6 +658,7 @@ mod tests { time::Duration, }; use tokio::{net::TcpListener, pin, sync::broadcast}; + use trust_dns_resolver::AsyncResolver; use uuid::Uuid; struct Server { @@ -698,7 +728,7 @@ mod tests { } } - async fn gen_servers(num: usize) -> (Vec, Vec) { + async fn gen_servers(num: usize) -> (Vec, Vec) { let mut servers = Vec::new(); for _ in 0..num { @@ -707,7 +737,7 @@ mod tests { // sort the way the client is supposed to try them servers.sort_by(|a, b| a.addr.partial_cmp(&b.addr).unwrap()); - let addrs = servers.iter().map(|s| s.addr).collect(); + let addrs = servers.iter().map(|s| s.addr.to_string()).collect(); (servers, addrs) } @@ -717,8 +747,8 @@ mod tests { let statement = "".into(); let (servers, addresses) = gen_servers(1).await; - let client = - CorrosionPooledClient::new(vec![addresses.as_slice()], Duration::from_nanos(1)); + let resolver = AsyncResolver::tokio_from_system_conf().unwrap(); + let client = CorrosionPooledClient::new(addresses, Duration::from_nanos(1), resolver); let sub = client .subscribe_typed::(&statement, false, None) .await @@ -734,8 +764,6 @@ mod tests { assert!(matches!(res, Result::Err(Error::Hyper(_)))); // But the new one should succeed - let client = - CorrosionPooledClient::new(vec![addresses.as_slice()], Duration::from_nanos(1)); let sub = client .subscribe_typed::(&statement, false, None) .await @@ -748,8 +776,8 @@ mod tests { let statement = "".into(); let (servers, addresses) = gen_servers(3).await; - let client = - CorrosionPooledClient::new(vec![addresses.as_slice()], Duration::from_nanos(1)); + let resolver = AsyncResolver::tokio_from_system_conf().unwrap(); + let client = CorrosionPooledClient::new(addresses, Duration::from_nanos(1), resolver); // Refuse connections on the first server servers[0].refuse_new_conns(true); @@ -793,8 +821,8 @@ mod tests { let statement = "".into(); let (servers, addresses) = gen_servers(3).await; - let client = - CorrosionPooledClient::new(vec![addresses.as_slice()], Duration::from_millis(50)); + let resolver = AsyncResolver::tokio_from_system_conf().unwrap(); + let client = CorrosionPooledClient::new(addresses, Duration::from_millis(50), resolver); // Refuse connections on the first server servers[0].refuse_new_conns(true); @@ -835,15 +863,16 @@ mod tests { } #[tokio::test] - async fn test_fallback_pool() { + async fn test_more_servers() { let statement = "".into(); let (pool1_servers, pool1_addresses) = gen_servers(2).await; let (pool2_servers, pool2_addresses) = gen_servers(2).await; - let client = CorrosionPooledClient::new( - vec![pool1_addresses.as_slice(), pool2_addresses.as_slice()], - Duration::from_nanos(1), - ); + let mut addresses = pool1_addresses; + addresses.extend_from_slice(&pool2_addresses); + + let resolver = AsyncResolver::tokio_from_system_conf().unwrap(); + let client = CorrosionPooledClient::new(addresses, Duration::from_nanos(1), resolver); // Refuse connections on all servers for i in 0..2 { From bcca417f90ad51dcf9e3a7c3e3ea96f77ba4416e Mon Sep 17 00:00:00 2001 From: Pavel Borzenkov Date: Wed, 12 Jun 2024 10:40:47 +0200 Subject: [PATCH 2/3] corro-client: fix ipv6 address:port parsing --- crates/corro-client/src/lib.rs | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/crates/corro-client/src/lib.rs b/crates/corro-client/src/lib.rs index b3719b69..fbeb7a75 100644 --- a/crates/corro-client/src/lib.rs +++ b/crates/corro-client/src/lib.rs @@ -570,16 +570,10 @@ impl AddrPicker { self.next_addr = (self.next_addr + 1) % self.addrs.len(); // split host port - let mut host_port = host_port.splitn(2, ':'); - let host = host_port.next().ok_or(ResolveError::from( - "Invalid Corrosion server address (missing hostname)", - ))?; - let port = host_port - .next() - .and_then(|p| p.parse().ok()) - .ok_or(ResolveError::from( - "Invalid Corrosion server address (missing port)", - ))?; + let (host, port) = host_port + .rsplit_once(':') + .and_then(|(host, port)| Some((host, port.parse().ok()?))) + .ok_or(ResolveError::from("Invalid Corrosion server address"))?; let mut addrs = self .resolver From 5d6257b5ac541b518128d33e2e0c1e9c4fe598f3 Mon Sep 17 00:00:00 2001 From: Pavel Borzenkov Date: Wed, 12 Jun 2024 12:13:10 +0200 Subject: [PATCH 3/3] corro-client: don't pass IP addresses to AsyncResolver Looks like it doesn't understand IPv6 addresses written as '[a:b:c:etc]' --- crates/corro-client/src/lib.rs | 29 ++++++++++++++++------------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/crates/corro-client/src/lib.rs b/crates/corro-client/src/lib.rs index fbeb7a75..b67e64a7 100644 --- a/crates/corro-client/src/lib.rs +++ b/crates/corro-client/src/lib.rs @@ -569,19 +569,22 @@ impl AddrPicker { .ok_or(ResolveError::from("No addresses available"))?; self.next_addr = (self.next_addr + 1) % self.addrs.len(); - // split host port - let (host, port) = host_port - .rsplit_once(':') - .and_then(|(host, port)| Some((host, port.parse().ok()?))) - .ok_or(ResolveError::from("Invalid Corrosion server address"))?; - - let mut addrs = self - .resolver - .lookup_ip(host) - .await? - .iter() - .map(|addr| (addr, port).into()) - .collect::>(); + let mut addrs = if let Ok(addr) = host_port.parse() { + vec![addr] + } else { + // split host port + let (host, port) = host_port + .rsplit_once(':') + .and_then(|(host, port)| Some((host, port.parse().ok()?))) + .ok_or(ResolveError::from("Invalid Corrosion server address"))?; + + self.resolver + .lookup_ip(host) + .await? + .iter() + .map(|addr| (addr, port).into()) + .collect::>() + }; // Sort so all the nodes try the addresses in the same order addrs.sort();