Skip to content

Commit

Permalink
[dag] broadcast nodes within window till all validators ack
Browse files Browse the repository at this point in the history
  • Loading branch information
ibalajiarun committed Jan 24, 2024
1 parent 89ba377 commit a472f03
Showing 1 changed file with 53 additions and 11 deletions.
64 changes: 53 additions & 11 deletions consensus/src/dag/dag_driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,11 @@ use futures::{
future::{AbortHandle, Abortable},
FutureExt,
};
use std::{collections::HashSet, sync::Arc, time::Duration};
use std::{
collections::{vec_deque, HashSet, VecDeque},
sync::Arc,
time::Duration,
};
use tokio_retry::strategy::ExponentialBackoff;

pub(crate) struct DagDriver {
Expand All @@ -49,7 +53,7 @@ pub(crate) struct DagDriver {
payload_client: Arc<dyn PayloadClient>,
reliable_broadcast: Arc<ReliableBroadcast<DAGMessage, ExponentialBackoff, DAGRpcResult>>,
time_service: TimeService,
rb_abort_handle: Mutex<Option<(AbortHandle, u64)>>,
rb_handles: Mutex<BoundedVecDeque<(AbortHandle, u64)>>,
storage: Arc<dyn DAGStorage>,
order_rule: OrderRule,
fetch_requester: Arc<dyn TFetchRequester>,
Expand Down Expand Up @@ -93,7 +97,7 @@ impl DagDriver {
payload_client,
reliable_broadcast,
time_service,
rb_abort_handle: Mutex::new(None),
rb_handles: Mutex::new(BoundedVecDeque::new(window_size_config as usize)),
storage,
order_rule,
fetch_requester,
Expand Down Expand Up @@ -154,11 +158,12 @@ impl DagDriver {
(
highest_strong_links_round,
// unwrap is for round 0
dag_reader.get_strong_links_for_round(
highest_strong_links_round,
&self.epoch_state.verifier,
)
.unwrap_or_default(),
dag_reader
.get_strong_links_for_round(
highest_strong_links_round,
&self.epoch_state.verifier,
)
.unwrap_or_default(),
)
};

Expand Down Expand Up @@ -318,10 +323,11 @@ impl DagDriver {
};
tokio::spawn(Abortable::new(task, abort_registration));
if let Some((prev_handle, prev_round_timestamp)) = self
.rb_abort_handle
.rb_handles
.lock()
.replace((abort_handle, timestamp))
.push_back((abort_handle, timestamp))
{
// TODO: this observation is inaccurate.
observe_round(prev_round_timestamp, RoundStage::Finished);
prev_handle.abort();
}
Expand Down Expand Up @@ -357,8 +363,44 @@ impl RpcHandler for DagDriver {

impl Drop for DagDriver {
fn drop(&mut self) {
if let Some((handle, _)) = self.rb_abort_handle.lock().as_ref() {
let abort_handles = self.rb_handles.lock();
for (handle, _) in abort_handles.iter() {
handle.abort()
}
}
}

struct BoundedVecDeque<T> {
inner: VecDeque<T>,
capacity: usize,
}

impl<T> BoundedVecDeque<T> {
fn new(capacity: usize) -> Self {
assert!(capacity > 0);
Self {
inner: VecDeque::with_capacity(capacity),
capacity,
}
}

fn is_full(&self) -> bool {
self.inner.len() == self.capacity
}

fn push_back(&mut self, item: T) -> Option<T> {
let oldest = if self.is_full() {
self.inner.pop_front()
} else {
None
};

self.inner.push_back(item);
assert!(self.inner.len() <= self.capacity);
oldest
}

fn iter(&self) -> vec_deque::Iter<'_, T> {
self.inner.iter()
}
}

0 comments on commit a472f03

Please sign in to comment.