diff --git a/Cargo.lock b/Cargo.lock index d18801fe..afd5bf59 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -323,6 +323,28 @@ version = "1.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457" +[[package]] +name = "async-stream" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd56dd203fef61ac097dd65721a419ddccb106b2d2b70ba60a6b529f03961a51" +dependencies = [ + "async-stream-impl", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.76", +] + [[package]] name = "async-trait" version = "0.1.82" @@ -868,7 +890,7 @@ dependencies = [ [[package]] name = "example-harness" -version = "0.1.78" +version = "0.1.80" dependencies = [ "raftify", ] @@ -1074,7 +1096,7 @@ dependencies = [ [[package]] name = "harness" -version = "0.1.78" +version = "0.1.80" dependencies = [ "actix-rt", "actix-web", @@ -1527,7 +1549,7 @@ checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3" [[package]] name = "memstore-dynamic-members" -version = "0.1.78" +version = "0.1.80" dependencies = [ "actix-rt", "actix-web", @@ -1551,7 +1573,7 @@ dependencies = [ [[package]] name = "memstore-example-harness" -version = "0.1.78" +version = "0.1.80" dependencies = [ "actix-rt", "actix-web", @@ -1568,7 +1590,7 @@ dependencies = [ [[package]] name = "memstore-static-members" -version = "0.1.78" +version = "0.1.80" dependencies = [ "actix-rt", "actix-web", @@ -1986,7 +2008,7 @@ dependencies = [ [[package]] name = "raftify" -version = "0.1.78" +version = "0.1.80" dependencies = [ "async-trait", "bincode", @@ -2013,14 +2035,15 @@ dependencies = [ [[package]] name = "raftify_cli" -version = "0.1.1" +version = "0.1.80" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "89fca418beb04ff1d175598c386db1d8bc40b20b0f2a1e645fe9d5bb0789a7d6" +checksum = "d90e7b2a6ada58910273956addaad5100d76a3e888f55e3831d3d20bd8f3cb40" dependencies = [ "built", "clap 4.5.18", "log", "raftify", + "rocksdb", "serde", "serde_json", "slog", @@ -2112,6 +2135,21 @@ version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7a66a03ae7c801facd77a29370b4faec201768915ac14a721ba36f20bc9c209b" +[[package]] +name = "ring" +version = "0.17.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c17fa4cb658e3583423e915b9f3acc01cceaee1860e33d59ebae66adc3a2dc0d" +dependencies = [ + "cc", + "cfg-if", + "getrandom", + "libc", + "spin", + "untrusted", + "windows-sys 0.52.0", +] + [[package]] name = "rocksdb" version = "0.19.0" @@ -2156,6 +2194,37 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "rustls" +version = "0.21.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f56a14d1f48b391359b22f731fd4bd7e43c97f3c50eee276f3aa09c94784d3e" +dependencies = [ + "log", + "ring", + "rustls-webpki", + "sct", +] + +[[package]] +name = "rustls-pemfile" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1c74cae0a4cf6ccbbf5f359f08efdf8ee7e1dc532573bf0db71968cb56b1448c" +dependencies = [ + "base64 0.21.7", +] + +[[package]] +name = "rustls-webpki" +version = "0.101.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b6275d1ee7a1cd780b64aca7726599a1dbc893b1e64144529e55c3c2f745765" +dependencies = [ + "ring", + "untrusted", +] + [[package]] name = "rustversion" version = "1.0.17" @@ -2174,6 +2243,16 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" +[[package]] +name = "sct" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da046153aa2352493d6cb7da4b6e5c0c057d8a1d0a9aa8560baffdd945acd414" +dependencies = [ + "ring", + "untrusted", +] + [[package]] name = "semver" version = "1.0.23" @@ -2361,6 +2440,12 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "spin" +version = "0.9.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" + [[package]] name = "strsim" version = "0.8.0" @@ -2597,6 +2682,16 @@ dependencies = [ "syn 2.0.76", ] +[[package]] +name = "tokio-rustls" +version = "0.24.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081" +dependencies = [ + "rustls", + "tokio", +] + [[package]] name = "tokio-stream" version = "0.1.16" @@ -2670,6 +2765,7 @@ version = "0.9.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3082666a3a6433f7f511c7192923fa1fe07c69332d3c6a2e6bb040b569199d5a" dependencies = [ + "async-stream", "async-trait", "axum", "base64 0.21.7", @@ -2684,7 +2780,9 @@ dependencies = [ "percent-encoding", "pin-project", "prost", + "rustls-pemfile", "tokio", + "tokio-rustls", "tokio-stream", "tower", "tower-layer", @@ -2820,6 +2918,12 @@ version = "0.2.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ebc1c04c71510c7f702b52b7c350734c9ff1295c464a03335b00bb84fc54f853" +[[package]] +name = "untrusted" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" + [[package]] name = "url" version = "2.5.2" diff --git a/Cargo.toml b/Cargo.toml index d099eaed..32152885 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,7 +19,7 @@ default-members = [ exclude = ["raft-rs", "raftify-cli"] [workspace.package] -version = "0.1.78" +version = "0.1.80" authors = ["Lablup Inc."] edition = "2021" description = "Experimental High level Raft framework" diff --git a/examples/Cargo.toml b/examples/Cargo.toml index a9536bd0..ea24eee5 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -10,3 +10,7 @@ keywords.workspace = true [dependencies] raftify.workspace = true + +[features] +default = [] +tls = [] diff --git a/examples/memstore/Cargo.toml b/examples/memstore/Cargo.toml index bf25a234..a30840db 100644 --- a/examples/memstore/Cargo.toml +++ b/examples/memstore/Cargo.toml @@ -26,11 +26,12 @@ serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" tokio = { version = "1.40", features = ["full"] } color-backtrace = "0.6.1" -raftify = { version = "0.1.78" } -raftify_cli = { version = "0.1.1" } +raftify = { version = "0.1.80" } +raftify_cli = { version = "0.1.80" } [features] default = ["heed_storage"] inmemory_storage = [] heed_storage = [] rocksdb_storage = [] +tls = [] diff --git a/examples/memstore/dynamic-members/src/main.rs b/examples/memstore/dynamic-members/src/main.rs index 6c60c9de..67295ae4 100644 --- a/examples/memstore/dynamic-members/src/main.rs +++ b/examples/memstore/dynamic-members/src/main.rs @@ -70,19 +70,17 @@ async fn main() -> std::result::Result<(), Box> { Some(peer_addr) => { log::info!("Running in Follower mode"); - let ticket = Raft::request_id(options.raft_addr.clone(), peer_addr.clone()) + let ticket = Raft::request_id(options.raft_addr.clone(), peer_addr.clone(), None) .await .unwrap(); let node_id = ticket.reserved_id; - let mut cfg = build_config(node_id); - - cfg.initial_peers = Some(ticket.peers.clone().into()); + let cfg = build_config(node_id, Some(ticket.peers.clone().into())); #[cfg(feature = "inmemory_storage")] let log_storage = MemStorage::create(); #[cfg(feature = "heed_storage")] - let log_storage = HeedStorage::create(&cfg.log_dir, &cfg.clone(), logger.clone()) + let log_storage = HeedStorage::create(cfg.get_log_dir(), &cfg.clone(), logger.clone()) .expect("Failed to create storage"); #[cfg(feature = "rocksdb_storage")] @@ -121,13 +119,13 @@ async fn main() -> std::result::Result<(), Box> { } let leader_node_id = 1; - let cfg = build_config(leader_node_id); + let cfg = build_config(leader_node_id, None); #[cfg(feature = "inmemory_storage")] let log_storage = MemStorage::create(); #[cfg(feature = "heed_storage")] - let log_storage = HeedStorage::create(&cfg.log_dir, &cfg.clone(), logger.clone()) + let log_storage = HeedStorage::create(cfg.get_log_dir(), &cfg.clone(), logger.clone()) .expect("Failed to create storage"); #[cfg(feature = "rocksdb_storage")] diff --git a/examples/memstore/src/client/main.rs b/examples/memstore/src/client/main.rs index 39e8e6c2..66ab35b6 100644 --- a/examples/memstore/src/client/main.rs +++ b/examples/memstore/src/client/main.rs @@ -7,7 +7,7 @@ use memstore_example_harness::state_machine::LogEntry; #[actix_rt::main] async fn main() { println!("---Message propose---"); - let mut leader_client = create_client(&"127.0.0.1:60061").await.unwrap(); + let mut leader_client = create_client(&"127.0.0.1:60061", None).await.unwrap(); leader_client .propose(raft_service::ProposeArgs { diff --git a/examples/memstore/static-members/Cargo.toml b/examples/memstore/static-members/Cargo.toml index b0d0d847..98282929 100644 --- a/examples/memstore/static-members/Cargo.toml +++ b/examples/memstore/static-members/Cargo.toml @@ -33,4 +33,3 @@ default = ["heed_storage"] inmemory_storage = [] heed_storage = [] rocksdb_storage = [] - diff --git a/examples/memstore/static-members/src/main.rs b/examples/memstore/static-members/src/main.rs index a6337d4c..2413f111 100644 --- a/examples/memstore/static-members/src/main.rs +++ b/examples/memstore/static-members/src/main.rs @@ -71,14 +71,13 @@ async fn main() -> std::result::Result<(), Box> { .get_node_id_by_addr(options.raft_addr.clone()) .unwrap(); - let mut cfg = build_config(node_id); - cfg.initial_peers = Some(initial_peers.clone()); + let cfg = build_config(node_id, Some(initial_peers.clone())); #[cfg(feature = "inmemory_storage")] let log_storage = MemStorage::create(); #[cfg(feature = "heed_storage")] - let log_storage = HeedStorage::create(&cfg.log_dir, &cfg.clone(), logger.clone()) + let log_storage = HeedStorage::create(cfg.get_log_dir(), &cfg.clone(), logger.clone()) .expect("Failed to create storage"); #[cfg(feature = "rocksdb_storage")] diff --git a/examples/src/config.rs b/examples/src/config.rs index 128b7b8b..4f2732d1 100644 --- a/examples/src/config.rs +++ b/examples/src/config.rs @@ -1,8 +1,11 @@ -use raftify::{Config, RaftConfig}; +use raftify::{Config, ConfigBuilder, Peers, RaftConfig}; + +#[cfg(feature = "tls")] +use raftify::TlsConfig; use crate::utils::{ensure_directory_exist, get_storage_path}; -pub fn build_config(node_id: u64) -> Config { +pub fn build_config(node_id: u64, initial_peers: Option) -> Config { let raft_config = RaftConfig { id: node_id, election_tick: 10, @@ -14,14 +17,32 @@ pub fn build_config(node_id: u64) -> Config { let storage_pth = get_storage_path("./logs", node_id); ensure_directory_exist(&storage_pth).expect("Failed to create storage directory"); - Config { - bootstrap_from_snapshot: false, - tick_interval: 0.2, - log_dir: storage_pth.clone(), - save_compacted_logs: true, - compacted_log_dir: storage_pth, - compacted_log_size_threshold: 1024 * 1024 * 1024, - raft_config, - ..Default::default() + #[allow(unused_mut)] + let mut config_builder = ConfigBuilder::new() + .log_dir("./logs".to_owned()) + .save_compacted_logs(true) + .compacted_log_dir("./logs".to_owned()) + .compacted_log_size_threshold(1024 * 1024 * 1024) + .raft_config(raft_config) + .tick_interval(0.2); + + #[cfg(feature = "tls")] + { + let client_tls_config = TlsConfig { + cert_path: None, + key_path: None, + ca_cert_path: Some("./ca_cert.pem".to_string()), + domain_name: Some("localhost".to_string()), + }; + + config_builder = config_builder.global_client_tls_config(client_tls_config); } + + let config_builder = if let Some(peers) = initial_peers { + config_builder.initial_peers(peers) + } else { + config_builder + }; + + config_builder.build() } diff --git a/harness/src/config.rs b/harness/src/config.rs index af042e26..52d11f38 100644 --- a/harness/src/config.rs +++ b/harness/src/config.rs @@ -1,8 +1,8 @@ -use raftify::{Config, RaftConfig}; +use raftify::{Config, ConfigBuilder, Peers, RaftConfig}; use crate::utils::{ensure_directory_exist, get_storage_path}; -pub fn build_config(node_id: u64) -> Config { +pub fn build_config(node_id: u64, initial_peers: Option) -> Config { let raft_config = RaftConfig { id: node_id, election_tick: 10, @@ -14,12 +14,18 @@ pub fn build_config(node_id: u64) -> Config { let storage_path = get_storage_path("./logs", node_id); ensure_directory_exist(&storage_path).expect("Failed to create storage directory"); - Config { - log_dir: storage_path.clone(), - save_compacted_logs: true, - compacted_log_dir: storage_path, - compacted_log_size_threshold: 1024 * 1024 * 1024, - raft_config, - ..Default::default() - } + let config_builder = ConfigBuilder::new() + .log_dir(storage_path.clone()) + .save_compacted_logs(true) + .compacted_log_dir(storage_path) + .compacted_log_size_threshold(1024 * 1024 * 1024) + .raft_config(raft_config); + + let config_builder = if let Some(peers) = initial_peers { + config_builder.initial_peers(peers) + } else { + config_builder + }; + + config_builder.build() } diff --git a/harness/src/raft.rs b/harness/src/raft.rs index 5873854d..31ef0280 100644 --- a/harness/src/raft.rs +++ b/harness/src/raft.rs @@ -51,16 +51,18 @@ fn run_raft( should_be_leader: bool, ) -> Result>> { let peer = peers.get(node_id).unwrap(); - let mut cfg = build_config(*node_id); - cfg.initial_peers = if should_be_leader { - None - } else { - Some(peers.clone()) - }; + let cfg = build_config( + *node_id, + if should_be_leader { + None + } else { + Some(peers.clone()) + }, + ); let store = HashStore::new(); let logger = build_logger(); - let storage_pth = get_storage_path(cfg.log_dir.as_str(), *node_id); + let storage_pth = get_storage_path(cfg.get_log_dir(), *node_id); ensure_directory_exist(storage_pth.as_str())?; let storage = HeedStorage::create( @@ -141,9 +143,9 @@ pub async fn spawn_extra_node( slog: build_logger(), }); - let cfg = build_config(node_id); + let cfg = build_config(node_id, None); let store = HashStore::new(); - let storage_pth = get_storage_path(cfg.log_dir.as_str(), node_id); + let storage_pth = get_storage_path(cfg.get_log_dir(), node_id); ensure_directory_exist(storage_pth.as_str())?; let storage = HeedStorage::create(&storage_pth, &cfg, logger.clone())?; @@ -167,16 +169,16 @@ pub async fn spawn_and_join_extra_node( let logger = Arc::new(Slogger { slog: build_logger(), }); - let join_ticket = Raft::request_id(raft_addr.to_owned(), peer_addr.to_owned()) + let join_ticket = Raft::request_id(raft_addr.to_owned(), peer_addr.to_owned(), None) .await .unwrap(); let node_id = join_ticket.reserved_id; - let mut cfg = build_config(node_id); - cfg.initial_peers = Some(join_ticket.peers.clone().into()); + let cfg = build_config(node_id, Some(join_ticket.peers.clone().into())); + let store = HashStore::new(); - let storage_pth = get_storage_path(cfg.log_dir.as_str(), node_id); + let storage_pth = get_storage_path(cfg.get_log_dir(), node_id); ensure_directory_exist(storage_pth.as_str())?; let storage = HeedStorage::create(&storage_pth, &cfg, logger.clone())?; @@ -202,7 +204,7 @@ pub async fn spawn_and_join_extra_node( pub async fn join_nodes(rafts: Vec<&Raft>, raft_addrs: Vec<&str>, peer_addr: &str) { let mut tickets = vec![]; for (raft, raft_addr) in rafts.iter().zip(raft_addrs.into_iter()) { - let join_ticket = Raft::request_id(raft_addr.to_owned(), peer_addr.to_owned()) + let join_ticket = Raft::request_id(raft_addr.to_owned(), peer_addr.to_owned(), None) .await .unwrap(); diff --git a/raftify-cli/Cargo.lock b/raftify-cli/Cargo.lock index c682b77f..8b782020 100644 --- a/raftify-cli/Cargo.lock +++ b/raftify-cli/Cargo.lock @@ -102,6 +102,28 @@ version = "1.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457" +[[package]] +name = "async-stream" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b5a71a6f37880a80d1d7f19efd781e4b5de42c88f0722cc13bcb6cc2cfe8476" +dependencies = [ + "async-stream-impl", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.77", +] + [[package]] name = "async-trait" version = "0.1.83" @@ -194,6 +216,26 @@ dependencies = [ "serde", ] +[[package]] +name = "bindgen" +version = "0.64.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4243e6031260db77ede97ad86c27e501d646a27ab57b59a574f725d98ab1fb4" +dependencies = [ + "bitflags 1.3.2", + "cexpr", + "clang-sys", + "lazy_static", + "lazycell", + "peeking_take_while", + "proc-macro2", + "quote", + "regex", + "rustc-hash", + "shlex", + "syn 1.0.109", +] + [[package]] name = "bitflags" version = "1.3.2" @@ -236,6 +278,17 @@ version = "1.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "428d9aa8fbc0670b7b8d6030a7fadd0f86151cae55e4dbbece15f3780a3dfaf3" +[[package]] +name = "bzip2-sys" +version = "0.1.11+1.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "736a955f3fa7875102d57c82b8cac37ec45224a07fd32d58f9f7a186b6cd4cdc" +dependencies = [ + "cc", + "libc", + "pkg-config", +] + [[package]] name = "cargo-lock" version = "8.0.3" @@ -254,9 +307,20 @@ version = "1.1.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "07b1695e2c7e8fc85310cde85aeaab7e3097f593c91d209d3f9df76c928100f0" dependencies = [ + "jobserver", + "libc", "shlex", ] +[[package]] +name = "cexpr" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6fac387a98bb7c37292057cffc56d62ecb629900026402633ae9160df93a8766" +dependencies = [ + "nom", +] + [[package]] name = "cfg-if" version = "1.0.0" @@ -277,6 +341,17 @@ dependencies = [ "windows-targets", ] +[[package]] +name = "clang-sys" +version = "1.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b023947811758c97c59bf9d1c188fd619ad4718dcaa767947df1cadb14f39f4" +dependencies = [ + "glob", + "libc", + "libloading", +] + [[package]] name = "clap" version = "4.5.18" @@ -518,6 +593,12 @@ version = "0.31.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32085ea23f3234fc7846555e85283ba4de91e21016dc0455a16286d87a292d64" +[[package]] +name = "glob" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" + [[package]] name = "h2" version = "0.3.26" @@ -775,6 +856,15 @@ version = "1.0.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49f1f14873335454500d59611f1cf4a4b0f786f9ac11f4312a78e4cf2566695b" +[[package]] +name = "jobserver" +version = "0.1.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "48d1dbcbbeb6a7fec7e059840aa538bd62aaccf972c7346c4d9d2059312853d0" +dependencies = [ + "libc", +] + [[package]] name = "jopemachine-raft" version = "0.7.10" @@ -810,12 +900,28 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" +[[package]] +name = "lazycell" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" + [[package]] name = "libc" version = "0.2.159" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "561d97a539a36e26a9a5fad1ea11a3039a67714694aaa379433e580854bc3dc5" +[[package]] +name = "libloading" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4979f22fdb869068da03c9f7528f8297c6fd2606bc3a4affe42e6a823fdb8da4" +dependencies = [ + "cfg-if", + "windows-targets", +] + [[package]] name = "libredox" version = "0.1.3" @@ -826,6 +932,32 @@ dependencies = [ "libc", ] +[[package]] +name = "librocksdb-sys" +version = "0.8.3+7.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "557b255ff04123fcc176162f56ed0c9cd42d8f357cf55b3fabeb60f7413741b3" +dependencies = [ + "bindgen", + "bzip2-sys", + "cc", + "glob", + "libc", + "libz-sys", + "zstd-sys", +] + +[[package]] +name = "libz-sys" +version = "1.1.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2d16453e800a8cf6dd2fc3eb4bc99b786a9b90c663b8559a5b1a041bf89e472" +dependencies = [ + "cc", + "pkg-config", + "vcpkg", +] + [[package]] name = "linux-raw-sys" version = "0.4.14" @@ -877,6 +1009,12 @@ version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" +[[package]] +name = "minimal-lexical" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" + [[package]] name = "miniz_oxide" version = "0.8.0" @@ -904,6 +1042,16 @@ version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a" +[[package]] +name = "nom" +version = "7.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d273983c5a657a70a3e8f2a01329822f3b8c8172b73826411a55751e404a0a4a" +dependencies = [ + "memchr", + "minimal-lexical", +] + [[package]] name = "num-conv" version = "0.1.0" @@ -967,6 +1115,12 @@ dependencies = [ "windows-targets", ] +[[package]] +name = "peeking_take_while" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19b17cddbe7ec3f8bc800887bab5e717348c95ea2ca0b1bf0837fb964dc67099" + [[package]] name = "percent-encoding" version = "2.3.1" @@ -1057,6 +1211,12 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "pkg-config" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "953ec861398dccce10c670dfeaf3ec4911ca479e9c02154b3a215178c5f566f2" + [[package]] name = "powerfmt" version = "0.2.0" @@ -1209,22 +1369,22 @@ dependencies = [ [[package]] name = "raftify" -version = "0.1.78" +version = "0.1.80" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f42c008e7e6a5b6431062fac27c4b61e7a78a7e87feeffba5d44d80cfdd657d8" +checksum = "623a229ebdc881f3f6d3f6c091db8271319f0405a4ce312ca9b1e38ace2000b3" dependencies = [ "async-trait", "bincode", "built", "bytes", "chrono", - "clap", "heed", "heed-traits", "jopemachine-raft", "log", "parking_lot", "prost", + "rocksdb", "serde", "serde_json", "slog", @@ -1237,12 +1397,13 @@ dependencies = [ [[package]] name = "raftify_cli" -version = "0.1.1" +version = "0.1.80" dependencies = [ "built", "clap", "log", "raftify", + "rocksdb", "serde", "serde_json", "slog", @@ -1328,12 +1489,43 @@ version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7a66a03ae7c801facd77a29370b4faec201768915ac14a721ba36f20bc9c209b" +[[package]] +name = "ring" +version = "0.17.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c17fa4cb658e3583423e915b9f3acc01cceaee1860e33d59ebae66adc3a2dc0d" +dependencies = [ + "cc", + "cfg-if", + "getrandom", + "libc", + "spin", + "untrusted", + "windows-sys 0.52.0", +] + +[[package]] +name = "rocksdb" +version = "0.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e9562ea1d70c0cc63a34a22d977753b50cca91cc6b6527750463bd5dd8697bc" +dependencies = [ + "libc", + "librocksdb-sys", +] + [[package]] name = "rustc-demangle" version = "0.1.24" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f" +[[package]] +name = "rustc-hash" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" + [[package]] name = "rustix" version = "0.38.37" @@ -1347,6 +1539,37 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "rustls" +version = "0.21.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f56a14d1f48b391359b22f731fd4bd7e43c97f3c50eee276f3aa09c94784d3e" +dependencies = [ + "log", + "ring", + "rustls-webpki", + "sct", +] + +[[package]] +name = "rustls-pemfile" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1c74cae0a4cf6ccbbf5f359f08efdf8ee7e1dc532573bf0db71968cb56b1448c" +dependencies = [ + "base64", +] + +[[package]] +name = "rustls-webpki" +version = "0.101.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b6275d1ee7a1cd780b64aca7726599a1dbc893b1e64144529e55c3c2f745765" +dependencies = [ + "ring", + "untrusted", +] + [[package]] name = "rustversion" version = "1.0.17" @@ -1365,6 +1588,16 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" +[[package]] +name = "sct" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da046153aa2352493d6cb7da4b6e5c0c057d8a1d0a9aa8560baffdd945acd414" +dependencies = [ + "ring", + "untrusted", +] + [[package]] name = "semver" version = "1.0.23" @@ -1520,6 +1753,12 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "spin" +version = "0.9.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" + [[package]] name = "strsim" version = "0.11.1" @@ -1708,6 +1947,16 @@ dependencies = [ "syn 2.0.77", ] +[[package]] +name = "tokio-rustls" +version = "0.24.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081" +dependencies = [ + "rustls", + "tokio", +] + [[package]] name = "tokio-stream" version = "0.1.16" @@ -1747,6 +1996,7 @@ version = "0.9.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3082666a3a6433f7f511c7192923fa1fe07c69332d3c6a2e6bb040b569199d5a" dependencies = [ + "async-stream", "async-trait", "axum", "base64", @@ -1761,7 +2011,9 @@ dependencies = [ "percent-encoding", "pin-project", "prost", + "rustls-pemfile", "tokio", + "tokio-rustls", "tokio-stream", "tower", "tower-layer", @@ -1872,6 +2124,12 @@ dependencies = [ "tinyvec", ] +[[package]] +name = "untrusted" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" + [[package]] name = "url" version = "2.5.2" @@ -1889,6 +2147,12 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" +[[package]] +name = "vcpkg" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" + [[package]] name = "want" version = "0.3.1" @@ -2104,3 +2368,13 @@ dependencies = [ "quote", "syn 2.0.77", ] + +[[package]] +name = "zstd-sys" +version = "2.0.13+zstd.1.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38ff0f21cfee8f97d94cef41359e0c89aa6113028ab0291aa8ca0038995a95aa" +dependencies = [ + "cc", + "pkg-config", +] diff --git a/raftify-cli/Cargo.toml b/raftify-cli/Cargo.toml index a6aeab69..3d26e83d 100644 --- a/raftify-cli/Cargo.toml +++ b/raftify-cli/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "raftify_cli" -version = "0.1.1" +version = "0.1.80" edition = "2021" description = "Raftify CLI tool" license = "MIT/Apache-2.0" @@ -12,7 +12,7 @@ serde_json = "1.0" slog = "2" built = "0.5" clap = { version = "4.5.18", features = ["derive"] } -raftify = { version = "0.1.78", features = ["heed_storage", "inmemory_storage", "rocksdb_storage"] } +raftify = { version = "=0.1.80", features = ["heed_storage", "inmemory_storage", "rocksdb_storage"] } rocksdb = "0.19.0" [lib] diff --git a/raftify-cli/src/commands/debug.rs b/raftify-cli/src/commands/debug.rs index 47a316bb..0c04bc61 100644 --- a/raftify-cli/src/commands/debug.rs +++ b/raftify-cli/src/commands/debug.rs @@ -3,26 +3,21 @@ use serde_json::Value; use std::{collections::HashMap, fs, path::Path, sync::Arc}; use raftify::{ - create_client, - raft::{ + create_client, raft::{ formatter::{format_entry, format_snapshot}, logger::Slogger, Storage, - }, - raft_node::utils::format_debugging_info, - raft_service, Config, Result, StableStorage, - HeedStorage, StorageType, + }, raft_node::utils::format_debugging_info, raft_service, Config, ConfigBuilder, HeedStorage, Result, StableStorage, StorageType }; pub fn debug_persisted(path: &str, logger: slog::Logger) -> Result<()> { - let config = Config { - log_dir: path.to_string(), - ..Default::default() - }; + let config = ConfigBuilder::new() + .log_dir(path.to_string()) + .build(); let storage = match LogStorage::STORAGE_TYPE { StorageType::Heed => HeedStorage::create( - config.log_dir.as_str(), + config.get_log_dir(), &config, Arc::new(Slogger { slog: logger.clone(), @@ -60,7 +55,7 @@ pub fn debug_persisted(path: &str, logger: slog::Logg Ok(()) } -pub fn debug_persitsted_all(path_str: &str, logger: slog::Logger) -> Result<()> { +pub fn debug_persisted_all(path_str: &str, logger: slog::Logger) -> Result<()> { let path = match fs::canonicalize(Path::new(&path_str)) { Ok(absolute_path) => absolute_path, Err(e) => { @@ -102,7 +97,8 @@ pub fn debug_persitsted_all(path_str: &str, logger: s } pub async fn debug_node(addr: &str) -> Result<()> { - let mut client = create_client(&addr).await?; + // TODO: Support TLS configuration + let mut client = create_client(&addr, None).await?; let response = client.debug_node(raft_service::Empty {}).await?; let json = response.into_inner().result_json; let parsed: HashMap = serde_json::from_str(&json).unwrap(); diff --git a/raftify/Cargo.toml b/raftify/Cargo.toml index 47299abd..43dc25b5 100644 --- a/raftify/Cargo.toml +++ b/raftify/Cargo.toml @@ -23,7 +23,7 @@ slog = "2" slog-stdlog = "4" thiserror = "1.0" tokio = { version = "1.40", features = ["full"] } -tonic = "0.9.2" +tonic = { version = "0.9.2" } built = "0.5" chrono = "0.4.38" heed = { version = "0.20.5", optional = true } @@ -31,10 +31,11 @@ heed-traits = { version = "0.20", optional = true } rocksdb = { version = "0.19.0", optional = true } [features] -default = ["heed_storage"] +default = ["heed_storage", "tls"] inmemory_storage = [] heed_storage =["heed", "heed-traits"] rocksdb_storage = ["rocksdb"] +tls = ["tonic/tls"] [dev-dependencies] tempfile = "3" diff --git a/raftify/src/config.rs b/raftify/src/config.rs index e5606f71..3e5142e6 100644 --- a/raftify/src/config.rs +++ b/raftify/src/config.rs @@ -1,57 +1,135 @@ use std::fmt; +use serde::{Deserialize, Serialize}; + use crate::{error::Error, raft::Config as RaftConfig, InitialRole, Peers, Result}; +// NOTE: This TLS Config is a type commonly used in both RaftServer and RaftClient. +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct TlsConfig { + pub cert_path: Option, + pub key_path: Option, + pub ca_cert_path: Option, + pub domain_name: Option, +} + #[derive(Clone)] pub struct Config { - pub raft_config: RaftConfig, - pub log_dir: String, - - pub bootstrap_from_snapshot: bool, - pub save_compacted_logs: bool, - pub compacted_log_dir: String, - pub compacted_log_size_threshold: u64, - - pub tick_interval: f32, - pub lmdb_map_size: u64, - pub cluster_id: String, - pub conf_change_request_timeout: f32, + pub(crate) raft_config: RaftConfig, + pub(crate) log_dir: String, + pub(crate) save_compacted_logs: bool, + pub(crate) compacted_log_dir: String, + pub(crate) compacted_log_size_threshold: u64, + pub(crate) tick_interval: f32, + pub(crate) lmdb_map_size: u64, + pub(crate) bootstrap_from_snapshot: bool, + pub(crate) cluster_id: String, + pub(crate) conf_change_request_timeout: f32, + pub(crate) initial_peers: Option, + pub(crate) snapshot_interval: Option, + pub(crate) client_tls_config: Option, + pub(crate) server_tls_config: Option, +} - pub initial_peers: Option, - pub snapshot_interval: Option, +pub struct ConfigBuilder { + config: Config, + global_client_tls_config: Option, } -impl Config { - #[allow(clippy::too_many_arguments)] - pub fn new( - log_dir: String, - save_compacted_logs: bool, - compacted_log_dir: String, - compacted_log_size_threshold: u64, - raft_config: RaftConfig, - tick_interval: f32, - lmdb_map_size: u64, - cluster_id: String, - bootstrap_from_snapshot: bool, - conf_change_request_timeout: f32, - initial_peers: Option, - snapshot_interval: Option, - ) -> Self { +impl ConfigBuilder { + pub fn new() -> Self { Self { - raft_config, - log_dir, - save_compacted_logs, - compacted_log_dir, - compacted_log_size_threshold, - snapshot_interval, - tick_interval, - lmdb_map_size, - initial_peers, - cluster_id, - conf_change_request_timeout, - bootstrap_from_snapshot, + config: Config::default(), + global_client_tls_config: None, } } + + pub fn log_dir(mut self, log_dir: String) -> Self { + self.config.log_dir = log_dir; + self + } + + pub fn save_compacted_logs(mut self, save: bool) -> Self { + self.config.save_compacted_logs = save; + self + } + + pub fn compacted_log_dir(mut self, dir: String) -> Self { + self.config.compacted_log_dir = dir; + self + } + + pub fn compacted_log_size_threshold(mut self, threshold: u64) -> Self { + self.config.compacted_log_size_threshold = threshold; + self + } + + pub fn raft_config(mut self, raft_config: RaftConfig) -> Self { + self.config.raft_config = raft_config; + self + } + + pub fn tick_interval(mut self, interval: f32) -> Self { + self.config.tick_interval = interval; + self + } + + pub fn lmdb_map_size(mut self, size: u64) -> Self { + self.config.lmdb_map_size = size; + self + } + + pub fn cluster_id(mut self, cluster_id: String) -> Self { + self.config.cluster_id = cluster_id; + self + } + + pub fn conf_change_request_timeout(mut self, timeout: f32) -> Self { + self.config.conf_change_request_timeout = timeout; + self + } + + pub fn initial_peers(mut self, peers: Peers) -> Self { + self.config.initial_peers = Some(peers); + self + } + + pub fn snapshot_interval(mut self, interval: f32) -> Self { + self.config.snapshot_interval = Some(interval); + self + } + + pub fn server_tls_config(mut self, config: TlsConfig) -> Self { + self.config.server_tls_config = Some(config); + self + } + + pub fn bootstrap_from_snapshot(mut self, v: bool) -> Self { + self.config.bootstrap_from_snapshot = v; + self + } + + pub fn global_client_tls_config(mut self, config: TlsConfig) -> Self { + self.global_client_tls_config = Some(config); + self + } + + pub fn build(mut self) -> Config { + self.config.client_tls_config = self.global_client_tls_config.clone(); + + if let Some(mut initial_peers) = self.config.initial_peers.clone() { + if let Some(self_peer) = initial_peers.get_mut(&self.config.raft_config.id) { + if self_peer.client_tls_config.is_none() { + self_peer.client_tls_config = self.global_client_tls_config.clone(); + } else { + self.config.client_tls_config = self_peer.client_tls_config.clone(); + } + } + self.config.initial_peers = Some(initial_peers); + } + + self.config + } } impl Config { @@ -63,7 +141,7 @@ impl Config { .unwrap() .inner .into_iter() - .filter(|(_, peer)| peer.role == InitialRole::Leader) + .filter(|(_, peer)| peer.initial_role == InitialRole::Leader) .map(|(key, _)| key) .collect::>(); @@ -77,6 +155,10 @@ impl Config { self.raft_config.validate()?; Ok(()) } + + pub fn get_log_dir(&self) -> &str { + &self.log_dir + } } impl Default for Config { @@ -94,6 +176,8 @@ impl Default for Config { initial_peers: None, snapshot_interval: None, bootstrap_from_snapshot: false, + client_tls_config: None, + server_tls_config: None, } } } @@ -132,6 +216,8 @@ impl fmt::Debug for Config { lmdb_map_size: {lmdb_map_size}, \ cluster_id: {cluster_id}, \ conf_change_request_timeout: {conf_change_request_timeout}, \ + client_tls_config: {client_tls_config:?}, \ + server_tls_config: {server_tls_config:?}, \ }}", id = self.raft_config.id, election_tick = self.raft_config.election_tick, @@ -160,6 +246,8 @@ impl fmt::Debug for Config { cluster_id = self.cluster_id, conf_change_request_timeout = self.conf_change_request_timeout, bootstrap_from_snapshot = self.bootstrap_from_snapshot, + client_tls_config = self.client_tls_config, + server_tls_config = self.server_tls_config, ) } } diff --git a/raftify/src/error.rs b/raftify/src/error.rs index 5d36a5d8..ef535393 100644 --- a/raftify/src/error.rs +++ b/raftify/src/error.rs @@ -45,11 +45,11 @@ pub enum Error { #[derive(Debug, ThisError)] pub enum SendMessageError { - #[error("Failed to connect to node {0}")] + #[error("Failed to connect to node. {0}")] ConnectionError(String), #[error("Node {0} not found from the peers")] PeerNotFound(String), - #[error("Failed to send message to node {0}")] + #[error("Failed to send message. {0}")] TransmissionError(String), } diff --git a/raftify/src/lib.rs b/raftify/src/lib.rs index ba7bdca4..e64edf80 100644 --- a/raftify/src/lib.rs +++ b/raftify/src/lib.rs @@ -25,7 +25,7 @@ pub use { pub use crate::{ cluster_join_ticket::ClusterJoinTicket, - config::Config, + config::{Config, ConfigBuilder, TlsConfig}, error::{Error, Result}, log_entry::AbstractLogEntry, peer::Peer, diff --git a/raftify/src/peer.rs b/raftify/src/peer.rs index b4e02f46..4049c4a1 100644 --- a/raftify/src/peer.rs +++ b/raftify/src/peer.rs @@ -2,14 +2,15 @@ use serde::{Deserialize, Serialize}; use std::net::{SocketAddr, ToSocketAddrs}; use tonic::transport::Channel; -use crate::InitialRole; +use crate::{config::TlsConfig, InitialRole}; use super::{create_client, error::Result, raft_service::raft_service_client::RaftServiceClient}; #[derive(Debug, Clone, Deserialize, Serialize)] pub struct Peer { pub addr: SocketAddr, - pub role: InitialRole, + pub initial_role: InitialRole, + pub client_tls_config: Option, #[serde(skip_serializing, skip_deserializing)] pub client: Option>, } @@ -25,17 +26,22 @@ pub struct Peer { // } impl Peer { - pub fn new(addr: A, initial_role: InitialRole) -> Self { + pub fn new( + addr: A, + initial_role: InitialRole, + client_tls_config: Option, + ) -> Self { let addr = addr.to_socket_addrs().unwrap().next().unwrap(); Peer { addr, - role: initial_role, + client_tls_config, + initial_role, client: None, } } pub async fn connect(&mut self) -> Result<()> { - let client = create_client(&self.addr).await?; + let client = create_client(&self.addr, self.client_tls_config.clone()).await?; self.client = Some(client); Ok(()) } diff --git a/raftify/src/peers.rs b/raftify/src/peers.rs index f9e905e6..21c11d2e 100644 --- a/raftify/src/peers.rs +++ b/raftify/src/peers.rs @@ -5,11 +5,12 @@ use std::{ }; use super::Peer; -use crate::{error::Result, raft_service, InitialRole}; +use crate::{config::TlsConfig, error::Result, raft_service, InitialRole}; #[derive(Debug, Clone, Deserialize, Serialize)] pub struct Peers { pub inner: HashMap, + pub client_tls_config: Option, } impl Default for Peers { @@ -53,22 +54,37 @@ impl From> for Peers { fn from(map: HashMap) -> Self { let inner = map .into_iter() - .map(|(k, addr)| (k, Peer::new(addr, InitialRole::Voter))) + .map(|(k, addr)| (k, Peer::new(addr, InitialRole::Voter, None))) .collect(); - Peers { inner } + // TODO: Support TLS config here + Peers { + inner, + client_tls_config: None, + } } } impl Peers { - pub fn new(self_id: u64, self_addr: A) -> Self { + pub fn new( + self_id: u64, + self_addr: A, + client_tls_config: Option, + ) -> Self { let mut inner = HashMap::new(); - inner.insert(self_id, Peer::new(self_addr, InitialRole::Voter)); - Self { inner } + inner.insert( + self_id, + Peer::new(self_addr, InitialRole::Voter, client_tls_config.clone()), + ); + Self { + inner, + client_tls_config, + } } pub fn with_empty() -> Self { Self { inner: HashMap::new(), + client_tls_config: None, } } @@ -119,7 +135,7 @@ impl Peers { ) { let addr = addr.to_socket_addrs().unwrap().next().unwrap(); let initial_role = initial_role.unwrap_or(InitialRole::Voter); - let peer = Peer::new(addr, initial_role); + let peer = Peer::new(addr, initial_role, self.client_tls_config.clone()); self.inner.insert(id, peer); } @@ -177,7 +193,7 @@ mod tests { #[test] fn test_peers_serial_reserve_peer() { - let mut peers = Peers::new(1, "127.0.0.1:8081"); + let mut peers = Peers::new(1, "127.0.0.1:8081", None); let next_id = peers.reserve_id(); peers.add_peer(next_id, "127.0.0.1:8082", None); assert_eq!(next_id, 2); diff --git a/raftify/src/raft_bootstrapper.rs b/raftify/src/raft_bootstrapper.rs index 8815b435..2a6f758a 100644 --- a/raftify/src/raft_bootstrapper.rs +++ b/raftify/src/raft_bootstrapper.rs @@ -1,6 +1,6 @@ use crate::{ - raft::logger::Logger, request::server_request_message::ServerRequestMsg, ClusterJoinTicket, - InitialRole, Peers, StableStorage, + config::TlsConfig, raft::logger::Logger, request::server_request_message::ServerRequestMsg, + ClusterJoinTicket, InitialRole, Peers, StableStorage, }; use bincode::deserialize; use std::{net::ToSocketAddrs, ops::Deref, sync::Arc}; @@ -75,7 +75,7 @@ impl< .unwrap() .inner .into_iter() - .filter(|(_, peer)| peer.role == InitialRole::Leader) + .filter(|(_, peer)| peer.initial_role == InitialRole::Leader) .map(|(key, _)| key) .collect::>(); @@ -171,6 +171,7 @@ impl< pub async fn request_id( raft_addr: A, peer_addr: String, + tls_config: Option, ) -> Result { let raft_addr = raft_addr .to_socket_addrs() @@ -179,7 +180,7 @@ impl< .unwrap() .to_string(); - let mut client = create_client(&peer_addr).await?; + let mut client = create_client(&peer_addr, tls_config).await?; let response = client .request_id(raft_service::RequestIdArgs { raft_addr: raft_addr.to_string(), diff --git a/raftify/src/raft_client.rs b/raftify/src/raft_client.rs index d0697a39..0df55345 100644 --- a/raftify/src/raft_client.rs +++ b/raftify/src/raft_client.rs @@ -1,23 +1,63 @@ -use bytes::Bytes; use std::net::ToSocketAddrs; use tonic::transport::{Channel, Error as TonicError}; +#[cfg(feature = "tls")] +use tonic::transport::{Certificate, ClientTlsConfig, Identity}; + +use crate::config::TlsConfig; + use super::RaftServiceClient; -// TODO: Support https schema pub async fn create_client( addr: A, + client_tls_config: Option, ) -> Result, TonicError> { - let addr = addr - .to_socket_addrs() - .expect("Invalid socket address format") - .next() - .unwrap(); - let addr = format!("http://{}", addr); - let addr = Bytes::copy_from_slice(addr.as_bytes()); - - let channel = Channel::from_shared(addr).unwrap().connect().await?; + let addr = addr.to_socket_addrs().unwrap().next().unwrap(); + + let scheme = if client_tls_config.is_some() { + "https" + } else { + "http" + }; + + let addr = format!("{}://{}", scheme, addr); + + #[allow(unused_mut)] + let mut endpoint = Channel::from_shared(addr).unwrap(); + + #[cfg(feature = "tls")] + if let Some(tls_cfg) = client_tls_config { + let ca_cert_path = tls_cfg.ca_cert_path.as_ref().unwrap(); + let ca_cert = tokio::fs::read(ca_cert_path).await.unwrap(); + let ca_cert = Certificate::from_pem(ca_cert); + + let domain_name = tls_cfg + .domain_name + .expect("Domain name is required for TLS"); + + let mut client_tls_config = ClientTlsConfig::new() + .ca_certificate(ca_cert) + .domain_name(domain_name); + + // mTLS + if let (Some(cert_path), Some(key_path)) = + (tls_cfg.cert_path.clone(), tls_cfg.key_path.clone()) + { + let cert = tokio::fs::read(cert_path).await.unwrap(); + let key = tokio::fs::read(key_path).await.unwrap(); + let identity = Identity::from_pem(cert, key); + client_tls_config = client_tls_config.identity(identity); + } + + endpoint = endpoint.tls_config(client_tls_config)?; + } + + let channel = endpoint.connect().await?; let client = RaftServiceClient::new(channel); Ok(client) } + +// TODO: Implement test_create_client +#[cfg(test)] +mod test {} diff --git a/raftify/src/raft_node/mod.rs b/raftify/src/raft_node/mod.rs index b91cb1ce..8c12f794 100644 --- a/raftify/src/raft_node/mod.rs +++ b/raftify/src/raft_node/mod.rs @@ -51,7 +51,7 @@ use crate::{ }, utils::{membership::to_confchange_v2, oneshot_mutex::OneShotMutex}, AbstractLogEntry, AbstractStateMachine, ClusterJoinTicket, Config, Error, InitialRole, Peers, - RaftServiceClient, StableStorage, + StableStorage, }; #[derive(Clone)] @@ -65,6 +65,7 @@ pub struct RaftNode< inner: Arc>>, // RaftNode.(method_call) >>> RaftNodeCore.run tx_local: mpsc::Sender>, + config: Config, } impl< @@ -92,7 +93,7 @@ impl< should_be_leader, log_storage, fsm, - config, + config.clone(), raft_addr, logger, tx_server, @@ -103,6 +104,7 @@ impl< .map(|core| Self { inner: Arc::new(OneShotMutex::new(core)), tx_local: tx_local.clone(), + config, }) } @@ -259,7 +261,8 @@ impl< ResponseResult::Success => (), ResponseResult::Error(e) => return Err(e), ResponseResult::WrongLeader { leader_addr, .. } => { - let mut client = create_client(leader_addr).await?; + let mut client = + create_client(leader_addr, self.config.client_tls_config.clone()).await?; client .propose(Request::new(ProposeArgs { msg: proposal })) .await?; @@ -288,7 +291,10 @@ impl< match resp { LocalResponseMsg::ConfigChange { result } => match result { ConfChangeResponseResult::WrongLeader { leader_addr, .. } => { - let mut client = create_client(leader_addr).await.unwrap(); + let mut client = + create_client(leader_addr, self.config.client_tls_config.clone()) + .await + .unwrap(); let conf_change: ConfChangeRequest = conf_change; let conf_change: raft_service::ChangeConfigArgs = conf_change.into(); @@ -541,16 +547,19 @@ impl< let conf_state = snapshot.mut_metadata().mut_conf_state(); - let peers = config - .initial_peers - .clone() - .unwrap_or(Peers::new(node_id, raft_addr)); + let peers = config.initial_peers.clone().unwrap_or(Peers::new( + node_id, + raft_addr, + config.client_tls_config.clone(), + )); let voters = peers .clone() .inner .into_iter() - .filter(|(_, peer)| peer.role == InitialRole::Voter || peer.role == InitialRole::Leader) + .filter(|(_, peer)| { + peer.initial_role == InitialRole::Voter || peer.initial_role == InitialRole::Leader + }) .map(|(key, _)| key) .collect::>(); @@ -558,7 +567,7 @@ impl< .clone() .inner .into_iter() - .filter(|(_, peer)| peer.role == InitialRole::Learner) + .filter(|(_, peer)| peer.initial_role == InitialRole::Learner) .map(|(key, _)| key) .collect::>(); @@ -665,15 +674,17 @@ impl< logger: Arc, ) { let node_id = message.get_to(); - let mut ok = std::result::Result::<(), SendMessageError>::Ok(()); let client = match peers.lock().await.get_mut(&node_id) { Some(peer) => { if peer.client.is_none() { if let Err(e) = peer.connect().await { - logger.debug(format!("Connection error: {:?}", e).as_str()); - ok = Err(SendMessageError::ConnectionError(node_id.to_string())); + ok = Err(SendMessageError::ConnectionError(format!( + "(to node {}). Error: {:?}", + node_id, + e.to_string() + ))); } } peer.client.clone() @@ -687,8 +698,11 @@ impl< if let Some(mut client) = client { let message = Request::new(message.clone()); if let Err(e) = client.send_message(message).await { - logger.trace(&format!("Message transmission error: {:?}", e)); - ok = Err(SendMessageError::TransmissionError(node_id.to_string())); + ok = Err(SendMessageError::TransmissionError(format!( + "(to node {}). Error: {}", + node_id, + e.to_string() + ))); } } @@ -756,7 +770,9 @@ impl< let cc_v2: ConfChangeRequest = cc_v2.clone().into(); let cc_v2: raft_service::ChangeConfigArgs = cc_v2.into(); - let mut leader_client = RaftServiceClient::connect(format!("http://{}", peer_addr)).await?; + let mut leader_client = + create_client(peer_addr, self.config.client_tls_config.clone()).await?; + let response = leader_client .change_config(cc_v2.clone()) .await? diff --git a/raftify/src/raft_server.rs b/raftify/src/raft_server.rs index f3718b2c..fe9901f0 100644 --- a/raftify/src/raft_server.rs +++ b/raftify/src/raft_server.rs @@ -13,6 +13,9 @@ use tokio::{ }; use tonic::{transport::Server, Request, Response, Status}; +#[cfg(feature = "tls")] +use tonic::transport::{Certificate, Identity, ServerTlsConfig}; + use super::{ macro_utils::function_name, raft_service::{ @@ -79,7 +82,33 @@ impl< rx_quit_signal.await.ok(); }; - Server::builder() + let mut server_builder = Server::builder(); + + #[cfg(feature = "tls")] + if let Some(tls_cfg) = &self.config.server_tls_config { + logger.debug("TLS enabled."); + let cert_path = tls_cfg + .cert_path + .as_ref() + .expect("Server requires cert_path"); + let cert = tokio::fs::read(cert_path).await?; + let key_path = tls_cfg.key_path.as_ref().expect("Server requires key_path"); + let key = tokio::fs::read(key_path).await?; + let identity = Identity::from_pem(cert, key); + + let mut tls_config = ServerTlsConfig::new().identity(identity); + + // mTLS + if let Some(ca_cert_path) = &tls_cfg.ca_cert_path { + let ca_cert = tokio::fs::read(ca_cert_path).await?; + let ca_cert = Certificate::from_pem(ca_cert); + tls_config = tls_config.client_ca_root(ca_cert); + } + + server_builder = server_builder.tls_config(tls_config)?; + } + + server_builder .add_service(RaftServiceServer::new(self)) .serve_with_shutdown(raft_addr, quit_signal) .await?; @@ -147,7 +176,10 @@ impl< })) } RequestIdResponseResult::WrongLeader { leader_addr, .. } => { - let mut client = create_client(leader_addr).await.unwrap(); + let mut client = + create_client(leader_addr, self.config.client_tls_config.clone()) + .await + .unwrap(); let reply = client.request_id(request_args).await?.into_inner(); Ok(Response::new(reply)) @@ -215,7 +247,10 @@ impl< raft_service::ChangeConfigResultType::ChangeConfigWrongLeader as i32; - let mut client = create_client(leader_addr).await.unwrap(); + let mut client = + create_client(leader_addr, self.config.client_tls_config.clone()) + .await + .unwrap(); reply = client.change_config(request_args).await?.into_inner(); } }, @@ -295,7 +330,10 @@ impl< } ResponseResult::WrongLeader { leader_addr, .. } => { // TODO: Handle this kind of errors - let mut client = create_client(leader_addr).await.unwrap(); + let mut client = + create_client(leader_addr, self.config.client_tls_config.clone()) + .await + .unwrap(); let _ = client .propose(ProposeArgs { msg: request_args.msg, diff --git a/raftify/src/storage/heed_storage/mod.rs b/raftify/src/storage/heed_storage/mod.rs index c8e876df..3b66dbbb 100644 --- a/raftify/src/storage/heed_storage/mod.rs +++ b/raftify/src/storage/heed_storage/mod.rs @@ -553,7 +553,7 @@ mod test { logger::Slogger, Config as RaftConfig, Error as RaftError, GetEntriesContext, Storage, StorageError, }; - use crate::{Config, HeedStorage, StableStorage}; + use crate::{Config, ConfigBuilder, HeedStorage, StableStorage}; use prost::Message; fn new_entry(index: u64, term: u64) -> Entry { @@ -582,14 +582,13 @@ mod test { ..Default::default() }; - Config { - log_dir: test_dir_pth.to_owned(), - save_compacted_logs: false, - compacted_log_dir: test_dir_pth.to_owned(), - compacted_log_size_threshold: 1024 * 1024 * 1024, - raft_config, - ..Default::default() - } + ConfigBuilder::new() + .log_dir(test_dir_pth.to_owned()) + .save_compacted_logs(false) + .compacted_log_dir(test_dir_pth.to_owned()) + .compacted_log_size_threshold(1024 * 1024 * 1024) + .raft_config(raft_config) + .build() } pub fn build_logger() -> slog::Logger {