From 190f4a024b6dca783d0b224deb4c7d93641b0c03 Mon Sep 17 00:00:00 2001 From: DevOrbitlabs Date: Wed, 15 Jan 2025 15:36:21 +0700 Subject: [PATCH] feat(code/consensus): Add support for full nodes (#750) * feat(code/consensus): Add support for full nodes * fix lint * Update code/crates/core-consensus/src/state.rs Co-authored-by: Romain Ruetschi Signed-off-by: DevOrbitlabs * update * update --------- Signed-off-by: DevOrbitlabs Co-authored-by: Romain Ruetschi --- .../core-consensus/src/handle/driver.rs | 76 ++++---- code/crates/core-consensus/src/handle/vote.rs | 25 +-- code/crates/core-consensus/src/state.rs | 7 + code/crates/starknet/test/src/lib.rs | 20 ++- code/crates/starknet/test/src/tests.rs | 1 + .../starknet/test/src/tests/full_nodes.rs | 165 ++++++++++++++++++ 6 files changed, 247 insertions(+), 47 deletions(-) create mode 100644 code/crates/starknet/test/src/tests/full_nodes.rs diff --git a/code/crates/core-consensus/src/handle/driver.rs b/code/crates/core-consensus/src/handle/driver.rs index 94e5ba7f4..a5085b71d 100644 --- a/code/crates/core-consensus/src/handle/driver.rs +++ b/code/crates/core-consensus/src/handle/driver.rs @@ -194,36 +194,39 @@ where "Proposing value" ); - let signed_proposal = sign_proposal(co, proposal).await?; - - if signed_proposal.pol_round().is_defined() { - perform!( - co, - Effect::RestreamValue( - signed_proposal.height(), - signed_proposal.round(), - signed_proposal.pol_round(), - signed_proposal.validator_address().clone(), - signed_proposal.value().id(), - Default::default() - ) - ); + // Only sign and publish if we're in the validator set + if state.is_validator() { + let signed_proposal = sign_proposal(co, proposal).await?; + + if signed_proposal.pol_round().is_defined() { + perform!( + co, + Effect::RestreamValue( + signed_proposal.height(), + signed_proposal.round(), + signed_proposal.pol_round(), + signed_proposal.validator_address().clone(), + signed_proposal.value().id(), + Default::default() + ) + ); + } + + on_proposal(co, state, metrics, signed_proposal.clone()).await?; + + // Proposal messages should not be broadcasted if they are implicit, + // instead they should be inferred from the block parts. + if state.params.value_payload.include_proposal() { + perform!( + co, + Effect::Publish( + SignedConsensusMsg::Proposal(signed_proposal), + Default::default() + ) + ); + }; } - on_proposal(co, state, metrics, signed_proposal.clone()).await?; - - // Proposal messages should not be broadcasted if they are implicit, - // instead they should be inferred from the block parts. - if state.params.value_payload.include_proposal() { - perform!( - co, - Effect::Publish( - SignedConsensusMsg::Proposal(signed_proposal), - Default::default() - ) - ); - }; - Ok(()) } @@ -235,15 +238,18 @@ where "Voting", ); - let extended_vote = extend_vote(vote, state); - let signed_vote = sign_vote(co, extended_vote).await?; + // Only sign and publish if we're in the validator set + if state.is_validator() { + let extended_vote = extend_vote(vote, state); + let signed_vote = sign_vote(co, extended_vote).await?; - on_vote(co, state, metrics, signed_vote.clone()).await?; + on_vote(co, state, metrics, signed_vote.clone()).await?; - perform!( - co, - Effect::Publish(SignedConsensusMsg::Vote(signed_vote), Default::default()) - ); + perform!( + co, + Effect::Publish(SignedConsensusMsg::Vote(signed_vote), Default::default()) + ); + } Ok(()) } diff --git a/code/crates/core-consensus/src/handle/vote.rs b/code/crates/core-consensus/src/handle/vote.rs index 0a48cfe8a..34ab6f97c 100644 --- a/code/crates/core-consensus/src/handle/vote.rs +++ b/code/crates/core-consensus/src/handle/vote.rs @@ -75,18 +75,21 @@ where debug_assert_eq!(consensus_height, vote_height); - // Append the vote to the Write-ahead Log - perform!( - co, - Effect::WalAppendMessage( - SignedConsensusMsg::Vote(signed_vote.clone()), - Default::default() - ) - ); + // Only append to WAL and store precommits if we're in the validator set + if state.is_validator() { + // Append the vote to the Write-ahead Log + perform!( + co, + Effect::WalAppendMessage( + SignedConsensusMsg::Vote(signed_vote.clone()), + Default::default() + ) + ); - // Store the non-nil Precommits. - if signed_vote.vote_type() == VoteType::Precommit && signed_vote.value().is_val() { - state.store_signed_precommit(signed_vote.clone()); + // Store the non-nil Precommits. + if signed_vote.vote_type() == VoteType::Precommit && signed_vote.value().is_val() { + state.store_signed_precommit(signed_vote.clone()); + } } apply_driver_input(co, state, metrics, DriverInput::Vote(signed_vote)).await?; diff --git a/code/crates/core-consensus/src/state.rs b/code/crates/core-consensus/src/state.rs index 5875d1c00..1814f5dac 100644 --- a/code/crates/core-consensus/src/state.rs +++ b/code/crates/core-consensus/src/state.rs @@ -220,4 +220,11 @@ where ); } } + + /// Check if we are a validator node, i.e. we are present in the current validator set. + pub fn is_validator(&self) -> bool { + self.validator_set() + .get_by_address(self.address()) + .is_some() + } } diff --git a/code/crates/starknet/test/src/lib.rs b/code/crates/starknet/test/src/lib.rs index c855eced8..856352eda 100644 --- a/code/crates/starknet/test/src/lib.rs +++ b/code/crates/starknet/test/src/lib.rs @@ -286,6 +286,19 @@ impl TestNode { self.steps.push(Step::Success); self } + + pub fn full_node(&mut self) -> &mut Self { + self.voting_power = 0; + // Ensure full nodes never participate in consensus + self.on_vote(|_vote, _state| { + panic!("Full nodes should never vote"); + }); + + self.on_proposed_value(|_proposal, _state| { + panic!("Full nodes should never propose values"); + }); + self + } } fn unique_id() -> usize { @@ -341,6 +354,7 @@ where S: Send + Sync + 'static, { pub fn new(nodes: Vec>) -> Self { + // Only include nodes with non-zero voting power in the validator set let vals_and_keys = make_validators(voting_powers(&nodes)); let (validators, private_keys): (Vec<_>, Vec<_>) = vals_and_keys.into_iter().unzip(); let validator_set = ValidatorSet::new(validators); @@ -740,7 +754,11 @@ pub fn make_node_config(test: &Test, i: usize) -> NodeConfig { } fn voting_powers(nodes: &[TestNode]) -> Vec { - nodes.iter().map(|node| node.voting_power).collect() + nodes + .iter() + .filter(|node| node.voting_power > 0) + .map(|node| node.voting_power) + .collect() } pub fn make_validators(voting_powers: Vec) -> Vec<(Validator, PrivateKey)> { diff --git a/code/crates/starknet/test/src/tests.rs b/code/crates/starknet/test/src/tests.rs index d586ce044..eafcd18f5 100644 --- a/code/crates/starknet/test/src/tests.rs +++ b/code/crates/starknet/test/src/tests.rs @@ -1,3 +1,4 @@ +pub mod full_nodes; pub mod n3f0; pub mod n3f0_consensus_mode; pub mod n3f0_pubsub_protocol; diff --git a/code/crates/starknet/test/src/tests/full_nodes.rs b/code/crates/starknet/test/src/tests/full_nodes.rs new file mode 100644 index 000000000..b2c9408f3 --- /dev/null +++ b/code/crates/starknet/test/src/tests/full_nodes.rs @@ -0,0 +1,165 @@ +use std::time::Duration; + +use crate::{init_logging, TestBuilder}; + +#[tokio::test] +pub async fn basic_full_node() { + init_logging(module_path!()); + + const HEIGHT: u64 = 5; + + let mut test = TestBuilder::<()>::new(); + + // Add 3 validators with different voting powers + test.add_node() + .with_voting_power(10) + .start() + .wait_until(HEIGHT) + .success(); + test.add_node() + .with_voting_power(20) + .start() + .wait_until(HEIGHT) + .success(); + test.add_node() + .with_voting_power(30) + .start() + .wait_until(HEIGHT) + .success(); + + // Add 2 full nodes that should follow consensus but not participate + test.add_node() + .full_node() + .start() + .wait_until(HEIGHT) + .success(); + test.add_node() + .full_node() + .start() + .wait_until(HEIGHT) + .success(); + + test.build().run(Duration::from_secs(30)).await +} + +#[tokio::test] +pub async fn full_node_crash_and_sync() { + init_logging(module_path!()); + + const HEIGHT: u64 = 10; + + let mut test = TestBuilder::<()>::new(); + + // Add validators + test.add_node() + .with_voting_power(20) + .start() + .wait_until(HEIGHT) + .success(); + test.add_node() + .with_voting_power(20) + .start() + .wait_until(HEIGHT) + .success(); + test.add_node() + .with_voting_power(20) + .start() + .wait_until(HEIGHT) + .success(); + + // Add a full node that crashes and needs to sync + test.add_node() + .full_node() + .start() + .wait_until(3) + .crash() + .reset_db() + .restart_after(Duration::from_secs(5)) + .wait_until(HEIGHT) + .success(); + + test.build().run(Duration::from_secs(60)).await +} + +#[tokio::test] +pub async fn late_starting_full_node() { + init_logging(module_path!()); + + const HEIGHT: u64 = 10; + + let mut test = TestBuilder::<()>::new(); + + // Add validators that start immediately + test.add_node() + .with_voting_power(20) + .start() + .wait_until(HEIGHT) + .success(); + test.add_node() + .with_voting_power(20) + .start() + .wait_until(HEIGHT) + .success(); + test.add_node() + .with_voting_power(20) + .start() + .wait_until(HEIGHT) + .success(); + + // Add a full node that starts late + test.add_node() + .full_node() + .start_after(1, Duration::from_secs(10)) + .wait_until(HEIGHT) + .success(); + + test.build().run(Duration::from_secs(60)).await +} + +#[tokio::test] +pub async fn mixed_validator_and_full_node_failures() { + init_logging(module_path!()); + + const HEIGHT: u64 = 10; + + let mut test = TestBuilder::<()>::new(); + + // Add stable validators + test.add_node() + .with_voting_power(30) + .start() + .wait_until(HEIGHT) + .success(); + test.add_node() + .with_voting_power(30) + .start() + .wait_until(HEIGHT) + .success(); + + // Add a validator that crashes + test.add_node() + .with_voting_power(20) + .start() + .wait_until(5) + .crash() + .restart_after(Duration::from_secs(10)) + .wait_until(HEIGHT) + .success(); + + // Add full nodes - one stable, one that crashes + test.add_node() + .full_node() + .start() + .wait_until(HEIGHT) + .success(); + test.add_node() + .full_node() + .start() + .wait_until(6) + .crash() + .restart_after(Duration::from_secs(15)) + .wait_until(HEIGHT) + .success(); + + test.build().run(Duration::from_secs(60)).await +}