From 012db41ad622e1ebb6477fbd3637d87996ebe5cd Mon Sep 17 00:00:00 2001 From: Balaji Arun Date: Thu, 8 Feb 2024 21:01:23 -0800 Subject: [PATCH] move BoundedVecDeque to aptos-collections --- Cargo.lock | 5 ++ Cargo.toml | 20 ++++-- consensus/Cargo.toml | 10 ++- consensus/src/dag/dag_driver.rs | 42 +----------- crates/aptos-collections/Cargo.toml | 15 +++++ .../src/bounded_vec_deque.rs | 67 +++++++++++++++++++ crates/aptos-collections/src/lib.rs | 3 + 7 files changed, 117 insertions(+), 45 deletions(-) create mode 100644 crates/aptos-collections/Cargo.toml create mode 100644 crates/aptos-collections/src/bounded_vec_deque.rs create mode 100644 crates/aptos-collections/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index f4bff31a1d2ea..77d96186f631b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -762,6 +762,10 @@ dependencies = [ "clap_complete", ] +[[package]] +name = "aptos-collections" +version = "0.1.0" + [[package]] name = "aptos-comparison-testing" version = "0.1.0" @@ -867,6 +871,7 @@ dependencies = [ "aptos-bounded-executor", "aptos-cached-packages", "aptos-channels", + "aptos-collections", "aptos-config", "aptos-consensus-notifications", "aptos-consensus-types", diff --git a/Cargo.toml b/Cargo.toml index a38f2f4190375..b7d1760740736 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -54,6 +54,7 @@ members = [ "crates/aptos-bcs-utils", "crates/aptos-bitvec", "crates/aptos-build-info", + "crates/aptos-collections", "crates/aptos-compression", "crates/aptos-crypto", "crates/aptos-crypto-derive", @@ -291,6 +292,7 @@ aptos-build-info = { path = "crates/aptos-build-info" } aptos-cached-packages = { path = "aptos-move/framework/cached-packages" } aptos-channels = { path = "crates/channel" } aptos-cli-common = { path = "crates/aptos-cli-common" } +aptos-collections = { path = "crates/aptos-collections" } aptos-compression = { path = "crates/aptos-compression" } aptos-consensus = { path = "consensus" } aptos-consensus-notifications = { path = "state-sync/inter-component/consensus-notifications" } @@ -486,7 +488,10 @@ claims = "0.7" clap = { version = "4.3.9", features = ["derive", "unstable-styles"] } clap-verbosity-flag = "2.1.1" clap_complete = "4.4.1" -cloud-storage = { version = "0.11.1", features = ["global-client", "rustls-tls"], default-features = false } +cloud-storage = { version = "0.11.1", features = [ + "global-client", + "rustls-tls", +], default-features = false } codespan-reporting = "0.11.1" concurrent-queue = "2.2.0" console-subscriber = "0.1.8" @@ -618,7 +623,11 @@ rand = "0.7.3" rand_core = "0.5.1" random_word = "0.3.0" rayon = "1.5.2" -redis = { version = "0.22.3", features = ["tokio-comp", "script", "connection-manager"] } +redis = { version = "0.22.3", features = [ + "tokio-comp", + "script", + "connection-manager", +] } redis-test = { version = "0.1.1", features = ["aio"] } regex = "1.9.3" reqwest = { version = "0.11.11", features = [ @@ -632,7 +641,7 @@ reqwest-retry = "0.2.1" ring = { version = "0.16.20", features = ["std"] } ripemd = "0.1.1" rocksdb = { version = "0.21.0", features = ["lz4"] } -rstack-self = { version = "0.3.0", features = ["dw"], default_features = false } +rstack-self = { version = "0.3.0", features = ["dw"], default_features = false } rstest = "0.15.0" rusty-fork = "0.3.0" rustversion = "1.0.14" @@ -646,7 +655,10 @@ siphasher = "0.3.10" serde = { version = "1.0.193", features = ["derive", "rc"] } serde-big-array = "0.5.1" serde_bytes = "0.11.6" -serde_json = { version = "1.0.81", features = ["preserve_order", "arbitrary_precision"] } # Note: arbitrary_precision is required to parse u256 in JSON +serde_json = { version = "1.0.81", features = [ + "preserve_order", + "arbitrary_precision", +] } # Note: arbitrary_precision is required to parse u256 in JSON serde_repr = "0.1" serde_merge = "0.1.3" serde-name = "0.1.1" diff --git a/consensus/Cargo.toml b/consensus/Cargo.toml index ed3bbfee77989..275ebe0ba527f 100644 --- a/consensus/Cargo.toml +++ b/consensus/Cargo.toml @@ -17,6 +17,7 @@ anyhow = { workspace = true } aptos-bitvec = { workspace = true } aptos-bounded-executor = { workspace = true } aptos-channels = { workspace = true } +aptos-collections = { workspace = true } aptos-config = { workspace = true } aptos-consensus-notifications = { workspace = true } aptos-consensus-types = { workspace = true } @@ -97,5 +98,12 @@ tempfile = { workspace = true } [features] default = [] -fuzzing = ["aptos-consensus-types/fuzzing", "aptos-config/fuzzing", "aptos-crypto/fuzzing", "aptos-mempool/fuzzing", "aptos-types/fuzzing", "aptos-safety-rules/testing"] +fuzzing = [ + "aptos-consensus-types/fuzzing", + "aptos-config/fuzzing", + "aptos-crypto/fuzzing", + "aptos-mempool/fuzzing", + "aptos-types/fuzzing", + "aptos-safety-rules/testing", +] failpoints = ["fail/failpoints"] diff --git a/consensus/src/dag/dag_driver.rs b/consensus/src/dag/dag_driver.rs index facae42d4d950..682602be07f5c 100644 --- a/consensus/src/dag/dag_driver.rs +++ b/consensus/src/dag/dag_driver.rs @@ -24,6 +24,7 @@ use crate::{ payload_client::PayloadClient, }; use anyhow::{bail, ensure}; +use aptos_collections::BoundedVecDeque; use aptos_config::config::DagPayloadConfig; use aptos_consensus_types::common::{Author, Payload, PayloadFilter}; use aptos_crypto::hash::CryptoHash; @@ -39,11 +40,7 @@ use futures::{ future::{AbortHandle, Abortable}, FutureExt, }; -use std::{ - collections::{vec_deque, HashSet, VecDeque}, - sync::Arc, - time::Duration, -}; +use std::{collections::HashSet, sync::Arc, time::Duration}; use tokio_retry::strategy::ExponentialBackoff; pub(crate) struct DagDriver { @@ -359,38 +356,3 @@ impl RpcHandler for DagDriver { Ok(CertifiedAck::new(epoch)) } } - -struct BoundedVecDeque { - inner: VecDeque, - capacity: usize, -} - -impl BoundedVecDeque { - fn new(capacity: usize) -> Self { - assert!(capacity > 0); - Self { - inner: VecDeque::with_capacity(capacity), - capacity, - } - } - - fn is_full(&self) -> bool { - self.inner.len() == self.capacity - } - - fn push_back(&mut self, item: T) -> Option { - let oldest = if self.is_full() { - self.inner.pop_front() - } else { - None - }; - - self.inner.push_back(item); - assert!(self.inner.len() <= self.capacity); - oldest - } - - fn iter(&self) -> vec_deque::Iter<'_, T> { - self.inner.iter() - } -} diff --git a/crates/aptos-collections/Cargo.toml b/crates/aptos-collections/Cargo.toml new file mode 100644 index 0000000000000..69c953a35c639 --- /dev/null +++ b/crates/aptos-collections/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "aptos-collections" +description = "Aptos Collections Library" +version = "0.1.0" + +# Workspace inherited keys +authors = { workspace = true } +edition = { workspace = true } +homepage = { workspace = true } +license = { workspace = true } +publish = { workspace = true } +repository = { workspace = true } +rust-version = { workspace = true } + +[dependencies] diff --git a/crates/aptos-collections/src/bounded_vec_deque.rs b/crates/aptos-collections/src/bounded_vec_deque.rs new file mode 100644 index 0000000000000..1fe22a6f831a1 --- /dev/null +++ b/crates/aptos-collections/src/bounded_vec_deque.rs @@ -0,0 +1,67 @@ +use std::collections::{vec_deque, VecDeque}; + +pub struct BoundedVecDeque { + inner: VecDeque, + capacity: usize, +} + +impl BoundedVecDeque { + pub fn new(capacity: usize) -> Self { + assert!(capacity > 0); + Self { + inner: VecDeque::with_capacity(capacity), + capacity, + } + } + + pub fn is_full(&self) -> bool { + self.inner.len() == self.capacity + } + + pub fn push_back(&mut self, item: T) -> Option { + let oldest = if self.is_full() { + self.inner.pop_front() + } else { + None + }; + + self.inner.push_back(item); + assert!(self.inner.len() <= self.capacity); + oldest + } + + pub fn iter(&self) -> vec_deque::Iter<'_, T> { + self.inner.iter() + } +} + +#[cfg(test)] +mod tests { + use super::BoundedVecDeque; + + #[test] + fn test_bounded_vec_deque_capacity() { + let capacity = 10; + let mut queue = BoundedVecDeque::new(capacity); + for i in 0..capacity { + queue.push_back(i); + } + + assert!(queue.is_full()); + + assert_eq!(queue.push_back(capacity), Some(0)); + } + + #[test] + fn test_bounded_vec_deque_iter() { + let capacity = 10; + let mut queue = BoundedVecDeque::new(capacity); + for i in 0..capacity { + queue.push_back(i); + } + + for (i, item) in queue.iter().enumerate() { + assert_eq!(i, *item); + } + } +} diff --git a/crates/aptos-collections/src/lib.rs b/crates/aptos-collections/src/lib.rs new file mode 100644 index 0000000000000..34073571396fb --- /dev/null +++ b/crates/aptos-collections/src/lib.rs @@ -0,0 +1,3 @@ +mod bounded_vec_deque; + +pub use bounded_vec_deque::BoundedVecDeque;