Skip to content

Commit

Permalink
clean up proof backpressure
Browse files Browse the repository at this point in the history
  • Loading branch information
bchocho committed Jan 9, 2025
1 parent 160771d commit 6c50309
Showing 1 changed file with 31 additions and 9 deletions.
40 changes: 31 additions & 9 deletions consensus/src/quorum_store/batch_proof_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ use std::{
time::{Duration, Instant},
};

enum BatchStatus {
Unordered,
Ordered,
Committed,
}

/// QueueItem represents an item in the ProofBatchQueue.
/// It stores the transaction summaries and proof associated with the
/// batch.
Expand All @@ -41,17 +47,19 @@ struct QueueItem {
proof: Option<ProofOfStore>,
/// The time when the proof is inserted into this item.
proof_insertion_time: Option<Instant>,
status: BatchStatus,
}

impl QueueItem {
fn is_committed(&self) -> bool {
self.proof.is_none() && self.proof_insertion_time.is_none() && self.txn_summaries.is_none()
matches!(self.status, BatchStatus::Committed)
}

fn mark_committed(&mut self) {
self.proof = None;
self.proof_insertion_time = None;
self.txn_summaries = None;
self.status = BatchStatus::Committed;
}
}

Expand Down Expand Up @@ -122,7 +130,7 @@ impl BatchProofQueue {
}

#[inline]
fn inc_remaining_proofs(&mut self, author: &PeerId, num_txns: u64) {
fn inc_unordered_proofs(&mut self, author: &PeerId, num_txns: u64) {
self.remaining_txns_with_duplicates += num_txns;
self.remaining_proofs += 1;
if *author == self.my_peer_id {
Expand All @@ -132,7 +140,7 @@ impl BatchProofQueue {
}

#[inline]
fn dec_remaining_proofs(&mut self, author: &PeerId, num_txns: u64) {
fn dec_unordered_proofs(&mut self, author: &PeerId, num_txns: u64) {
self.remaining_txns_with_duplicates -= num_txns;
self.remaining_proofs -= 1;
if *author == self.my_peer_id {
Expand Down Expand Up @@ -262,6 +270,7 @@ impl BatchProofQueue {
proof: Some(proof),
proof_insertion_time: Some(Instant::now()),
txn_summaries: None,
status: BatchStatus::Unordered,
});
},
}
Expand All @@ -271,7 +280,7 @@ impl BatchProofQueue {
} else {
counters::inc_remote_pos_count(bucket);
}
self.inc_remaining_proofs(&author, num_txns);
self.inc_unordered_proofs(&author, num_txns);

sample!(
SampleRate::Duration(Duration::from_millis(500)),
Expand Down Expand Up @@ -331,6 +340,7 @@ impl BatchProofQueue {
proof: None,
proof_insertion_time: None,
txn_summaries: Some(txn_summaries),
status: BatchStatus::Unordered,
});
},
}
Expand Down Expand Up @@ -775,7 +785,7 @@ impl BatchProofQueue {
if let Some(batch) = queue.remove(key) {
let item = self
.items
.get(&key.batch_key)
.get_mut(&key.batch_key)
.expect("Entry for unexpired batch must exist");
if item.proof.is_some() {
// not committed proof that is expired
Expand All @@ -794,7 +804,9 @@ impl BatchProofQueue {
};
}
}
self.dec_remaining_proofs(&batch.author(), batch.num_txns());
if matches!(item.status, BatchStatus::Unordered) {
self.dec_unordered_proofs(&batch.author(), batch.num_txns());
}
counters::GARBAGE_COLLECTED_IN_PROOF_QUEUE_COUNTER
.with_label_values(&["expired_proof"])
.inc();
Expand Down Expand Up @@ -914,7 +926,13 @@ impl BatchProofQueue {
}

for batch in Self::get_batches_from_block(&block) {
if let Some(item) = self.items.get(&BatchKey::from_info(&batch)) {
let mut to_decrement = false;
if let Some(item) = self.items.get_mut(&BatchKey::from_info(&batch)) {
if matches!(item.status, BatchStatus::Unordered) {
item.status = BatchStatus::Ordered;
to_decrement = true;
}

if let Some(txn_summaries) = item.txn_summaries.as_ref() {
for txn_summary in txn_summaries {
// Insert into ordered_txns_cache
Expand All @@ -941,6 +959,9 @@ impl BatchProofQueue {
}
}
}
if to_decrement {
self.dec_unordered_proofs(&batch.author(), batch.num_txns());
}
}

self.ordered_txns_cache.latest_block_round = block.round();
Expand All @@ -956,7 +977,7 @@ impl BatchProofQueue {

for batch in batches.into_iter() {
let batch_key = BatchKey::from_info(&batch);
if let Some(item) = self.items.get(&batch_key) {
if let Some(item) = self.items.get_mut(&batch_key) {
if let Some(ref proof) = item.proof {
let insertion_time = item
.proof_insertion_time
Expand All @@ -965,7 +986,7 @@ impl BatchProofQueue {
proof.gas_bucket_start(),
insertion_time.elapsed().as_secs_f64(),
);
self.dec_remaining_proofs(&batch.author(), batch.num_txns());
item.status = BatchStatus::Committed;
counters::GARBAGE_COLLECTED_IN_PROOF_QUEUE_COUNTER
.with_label_values(&["committed_proof"])
.inc();
Expand Down Expand Up @@ -996,6 +1017,7 @@ impl BatchProofQueue {
txn_summaries: None,
proof: None,
proof_insertion_time: None,
status: BatchStatus::Committed,
});
}
}
Expand Down

0 comments on commit 6c50309

Please sign in to comment.