Skip to content

Commit

Permalink
[Consensus Observer] Reset pending data on subscription changes.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshLind committed Jul 21, 2024
1 parent 88068af commit 2cb5307
Show file tree
Hide file tree
Showing 5 changed files with 176 additions and 22 deletions.
2 changes: 1 addition & 1 deletion config/src/config/consensus_observer_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub struct ConsensusObserverConfig {

/// Interval (in milliseconds) to garbage collect peer state
pub garbage_collection_interval_ms: u64,
/// Maximum number of pending blocks to keep in memory
/// Maximum number of blocks to keep in memory (e.g., pending blocks, ordered blocks, etc.)
pub max_num_pending_blocks: u64,
/// Maximum timeout (in milliseconds) for active subscriptions
pub max_subscription_timeout_ms: u64,
Expand Down
74 changes: 59 additions & 15 deletions consensus/src/consensus_observer/observer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,17 @@ impl ConsensusObserver {
debug!(LogSchema::new(LogEntry::ConsensusObserver)
.message("Checking consensus observer progress!"));

// If we're in state sync mode, we should wait for state sync to complete
if self.in_state_sync_mode() {
info!(
LogSchema::new(LogEntry::ConsensusObserver).message(&format!(
"Waiting for state sync to reach target: {:?}!",
self.root.lock().commit_info()
))
);
return;
}

// Get the peer ID of the currently active subscription (if any)
let active_subscription_peer = self
.active_observer_subscription
Expand Down Expand Up @@ -192,8 +203,12 @@ impl ConsensusObserver {
self.create_new_observer_subscription(active_subscription_peer)
.await;

// If we successfully created a new subscription, update the subscription creation metrics
// If we successfully created a new subscription, clear the state and update the metrics
if let Some(active_subscription) = &self.active_observer_subscription {
// Clear the block state
self.clear_pending_block_state().await;

// Update the subscription creation metrics
self.update_subscription_creation_metrics(
active_subscription.get_peer_network_id(),
);
Expand Down Expand Up @@ -223,8 +238,7 @@ impl ConsensusObserver {
// Verify the subscription has not timed out
active_subscription.check_subscription_timeout()?;

// Verify that the DB is continuing to sync and commit new data.
// Note: we should only do this if we're not waiting for state sync.
// Verify that the DB is continuing to sync and commit new data
active_subscription.check_syncing_progress()?;

// Verify that the subscription peer is optimal
Expand All @@ -239,6 +253,30 @@ impl ConsensusObserver {
Ok(())
}

/// Clears the pending block state (this is useful for changing
/// subscriptions, where we want to wipe all state and restart).
async fn clear_pending_block_state(&self) {
// Clear the payload store
self.block_payload_store.clear_all_payloads();

// Clear the pending blocks
self.pending_block_store.clear_missing_blocks();

// Clear the ordered blocks
self.pending_ordered_blocks.clear_all_pending_blocks();

// Reset the execution pipeline for the root
let root = self.root.lock().clone();
if let Err(error) = self.execution_client.reset(&root).await {
error!(
LogSchema::new(LogEntry::ConsensusObserver).message(&format!(
"Failed to reset the execution pipeline for the root! Error: {:?}",
error
))
);
}
}

/// Creates and returns a commit callback (to be called after the execution pipeline)
fn create_commit_callback(&self) -> StateComputerCommitCallBackType {
// Clone the root, pending blocks and payload store
Expand Down Expand Up @@ -375,6 +413,14 @@ impl ConsensusObserver {

/// Finalizes the ordered block by sending it to the execution pipeline
async fn finalize_ordered_block(&mut self, ordered_block: OrderedBlock) {
info!(
LogSchema::new(LogEntry::ConsensusObserver).message(&format!(
"Forwarding ordered blocks to the execution pipeline: {}",
ordered_block.proof_block_info()
))
);

// Send the ordered block to the execution pipeline
if let Err(error) = self
.execution_client
.finalize_order(
Expand Down Expand Up @@ -457,6 +503,11 @@ impl ConsensusObserver {
}
}

/// Returns true iff we are waiting for state sync to complete
fn in_state_sync_mode(&self) -> bool {
self.sync_handle.is_some()
}

/// Processes the block payload message
async fn process_block_payload_message(&mut self, block_payload: BlockPayload) {
// Get the block round and epoch
Expand Down Expand Up @@ -580,8 +631,8 @@ impl ConsensusObserver {
.update_commit_decision(commit_decision);

// If we are not in sync mode, forward the commit decision to the execution pipeline
if self.sync_handle.is_none() {
debug!(
if !self.in_state_sync_mode() {
info!(
LogSchema::new(LogEntry::ConsensusObserver).message(&format!(
"Forwarding commit decision to the execution pipeline: {}",
commit_decision.proof_block_info()
Expand Down Expand Up @@ -734,15 +785,7 @@ impl ConsensusObserver {
.insert_ordered_block(ordered_block.clone(), verified_ordered_proof);

// If we verified the proof, and we're not in sync mode, finalize the ordered blocks
if verified_ordered_proof && self.sync_handle.is_none() {
debug!(
LogSchema::new(LogEntry::ConsensusObserver).message(&format!(
"Forwarding blocks to the execution pipeline: {}",
ordered_block.proof_block_info()
))
);

// Finalize the ordered block
if verified_ordered_proof && !self.in_state_sync_mode() {
self.finalize_ordered_block(ordered_block).await;
}
} else {
Expand Down Expand Up @@ -815,8 +858,9 @@ impl ConsensusObserver {
self.wait_for_epoch_start().await;

// Verify the pending blocks for the new epoch
let new_epoch_state = self.get_epoch_state();
self.pending_ordered_blocks
.verify_pending_blocks(&current_epoch_state);
.verify_pending_blocks(&new_epoch_state);
}

// Reset and drop the sync handle
Expand Down
48 changes: 43 additions & 5 deletions consensus/src/consensus_observer/ordered_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ impl PendingOrderedBlocks {
}
}

/// Clears all pending blocks
pub fn clear_all_pending_blocks(&self) {
self.pending_blocks.lock().clear();
}

/// Returns a copy of the verified pending blocks
pub fn get_all_verified_pending_blocks(
&self,
Expand Down Expand Up @@ -256,7 +261,40 @@ mod test {
};

#[test]
pub fn test_get_last_pending_block() {
fn test_clear_all_pending_blocks() {
// Create new pending ordered blocks
let pending_ordered_blocks = PendingOrderedBlocks::new(ConsensusObserverConfig::default());

// Insert several verified blocks for the current epoch
let current_epoch = 0;
let num_verified_blocks = 10;
create_and_add_pending_blocks(
&pending_ordered_blocks,
num_verified_blocks,
current_epoch,
true,
);

// Insert several unverified blocks for the next epoch
let next_epoch = current_epoch + 1;
let num_unverified_blocks = 20;
create_and_add_pending_blocks(
&pending_ordered_blocks,
num_unverified_blocks,
next_epoch,
false,
);

// Clear all pending blocks
pending_ordered_blocks.clear_all_pending_blocks();

// Check all the pending blocks were removed
let num_pending_blocks = pending_ordered_blocks.pending_blocks.lock().len();
assert_eq!(num_pending_blocks, 0);
}

#[test]
fn test_get_last_pending_block() {
// Create new pending ordered blocks
let pending_ordered_blocks = PendingOrderedBlocks::new(ConsensusObserverConfig::default());

Expand Down Expand Up @@ -313,7 +351,7 @@ mod test {
}

#[test]
pub fn test_get_verified_pending_block() {
fn test_get_verified_pending_block() {
// Create new pending ordered blocks
let pending_ordered_blocks = PendingOrderedBlocks::new(ConsensusObserverConfig::default());

Expand Down Expand Up @@ -369,7 +407,7 @@ mod test {
}

#[test]
pub fn test_insert_ordered_block_limit() {
fn test_insert_ordered_block_limit() {
// Create a consensus observer config with a maximum of 10 pending blocks
let max_num_pending_blocks = 10;
let consensus_observer_config = ConsensusObserverConfig {
Expand Down Expand Up @@ -418,7 +456,7 @@ mod test {
}

#[test]
pub fn test_remove_blocks_for_commit() {
fn test_remove_blocks_for_commit() {
// Create new pending ordered blocks
let pending_ordered_blocks = PendingOrderedBlocks::new(ConsensusObserverConfig::default());

Expand Down Expand Up @@ -517,7 +555,7 @@ mod test {
}

#[test]
pub fn test_update_commit_decision() {
fn test_update_commit_decision() {
// Create new pending ordered blocks
let pending_ordered_blocks = PendingOrderedBlocks::new(ConsensusObserverConfig::default());

Expand Down
31 changes: 31 additions & 0 deletions consensus/src/consensus_observer/payload_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ impl BlockPayloadStore {
})
}

/// Clears all the payloads from the block payload store
pub fn clear_all_payloads(&self) {
self.block_transaction_payloads.lock().clear();
}

/// Returns a reference to the block transaction payloads
pub fn get_block_payloads(&self) -> Arc<Mutex<HashMap<HashValue, BlockPayloadStatus>>> {
self.block_transaction_payloads.clone()
Expand Down Expand Up @@ -172,6 +177,32 @@ mod test {
assert!(!block_payload_store.all_payloads_exist(subset_pipelined_blocks));
}

#[test]
fn test_clear_all_payloads() {
// Create a new block payload store
let block_payload_store = BlockPayloadStore::new();

// Add some blocks to the payload store
let num_blocks_in_store = 100;
let pipelined_blocks =
create_and_add_blocks_to_store(block_payload_store.clone(), num_blocks_in_store);

// Check that all the payloads exist in the block payload store
assert!(block_payload_store.all_payloads_exist(&pipelined_blocks));

// Clear all the payloads from the block payload store
block_payload_store.clear_all_payloads();

// Check that all the payloads exist in the block payload store
assert!(!block_payload_store.all_payloads_exist(&pipelined_blocks));

// Check that the block payload store is empty
assert!(block_payload_store
.block_transaction_payloads
.lock()
.is_empty());
}

#[test]
fn test_all_payloads_exist_requested() {
// Create a new block payload store
Expand Down
43 changes: 42 additions & 1 deletion consensus/src/consensus_observer/pending_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ impl PendingBlockStore {
}
}

/// Clears all missing blocks from the store
pub fn clear_missing_blocks(&self) {
self.blocks_without_payloads.lock().clear();
}

/// Inserts a block (without payloads) into the store
pub fn insert_pending_block(&self, ordered_block: OrderedBlock) {
// Get the epoch and round of the first block
Expand Down Expand Up @@ -189,6 +194,42 @@ mod test {
};
use rand::Rng;

#[test]
fn test_clear_missing_blocks() {
// Create a new pending block store
let max_num_pending_blocks = 10;
let consensus_observer_config = ConsensusObserverConfig {
max_num_pending_blocks: max_num_pending_blocks as u64,
..ConsensusObserverConfig::default()
};
let pending_block_store = PendingBlockStore::new(consensus_observer_config);

// Insert the maximum number of blocks into the store
let current_epoch = 0;
let starting_round = 0;
let missing_blocks = create_and_add_pending_blocks(
&pending_block_store,
max_num_pending_blocks,
current_epoch,
starting_round,
5,
);

// Verify that the store is not empty
verify_pending_blocks(
&pending_block_store,
max_num_pending_blocks,
&missing_blocks,
);

// Clear the missing blocks from the store
pending_block_store.clear_missing_blocks();

// Verify that the store is now empty
let blocks_without_payloads = pending_block_store.blocks_without_payloads.lock();
assert!(blocks_without_payloads.is_empty());
}

#[test]
fn test_insert_pending_block() {
// Create a new pending block store
Expand Down Expand Up @@ -400,7 +441,7 @@ mod test {
#[test]
fn test_remove_ready_block_multiple_blocks_missing() {
// Create a new pending block store
let max_num_pending_blocks = 4;
let max_num_pending_blocks = 10;
let consensus_observer_config = ConsensusObserverConfig {
max_num_pending_blocks: max_num_pending_blocks as u64,
..ConsensusObserverConfig::default()
Expand Down

0 comments on commit 2cb5307

Please sign in to comment.