Skip to content

Commit

Permalink
[dag] use DropGuard to drop handles
Browse files Browse the repository at this point in the history
  • Loading branch information
ibalajiarun committed Feb 9, 2024
1 parent 44a8f58 commit 634e70a
Showing 1 changed file with 4 additions and 14 deletions.
18 changes: 4 additions & 14 deletions consensus/src/dag/dag_driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use aptos_consensus_types::common::{Author, Payload, PayloadFilter};
use aptos_crypto::hash::CryptoHash;
use aptos_infallible::Mutex;
use aptos_logger::{debug, error};
use aptos_reliable_broadcast::ReliableBroadcast;
use aptos_reliable_broadcast::{DropGuard, ReliableBroadcast};
use aptos_time_service::{TimeService, TimeServiceTrait};
use aptos_types::{block_info::Round, epoch_state::EpochState};
use aptos_validator_transaction_pool as vtxn_pool;
Expand All @@ -53,7 +53,7 @@ pub(crate) struct DagDriver {
payload_client: Arc<dyn PayloadClient>,
reliable_broadcast: Arc<ReliableBroadcast<DAGMessage, ExponentialBackoff, DAGRpcResult>>,
time_service: TimeService,
rb_handles: Mutex<BoundedVecDeque<(AbortHandle, u64)>>,
rb_handles: Mutex<BoundedVecDeque<(DropGuard, u64)>>,
storage: Arc<dyn DAGStorage>,
order_rule: Mutex<OrderRule>,
fetch_requester: Arc<dyn TFetchRequester>,
Expand Down Expand Up @@ -323,14 +323,13 @@ impl DagDriver {
debug!("Finish reliable broadcast for round {}", round);
};
tokio::spawn(Abortable::new(task, abort_registration));
if let Some((prev_handle, prev_round_timestamp)) = self
if let Some((_handle, prev_round_timestamp)) = self
.rb_handles
.lock()
.push_back((abort_handle, timestamp))
.push_back((DropGuard::new(abort_handle), timestamp))
{
// TODO: this observation is inaccurate.
observe_round(prev_round_timestamp, RoundStage::Finished);
prev_handle.abort();
}
}
}
Expand Down Expand Up @@ -361,15 +360,6 @@ impl RpcHandler for DagDriver {
}
}

impl Drop for DagDriver {
fn drop(&mut self) {
let abort_handles = self.rb_handles.lock();
for (handle, _) in abort_handles.iter() {
handle.abort()
}
}
}

struct BoundedVecDeque<T> {
inner: VecDeque<T>,
capacity: usize,
Expand Down

0 comments on commit 634e70a

Please sign in to comment.