Skip to content

Commit

Permalink
[rand gen] add in mem db
Browse files Browse the repository at this point in the history
  • Loading branch information
zekun000 committed Dec 8, 2023
1 parent 98f595c commit 7d64633
Show file tree
Hide file tree
Showing 7 changed files with 161 additions and 14 deletions.
4 changes: 2 additions & 2 deletions consensus/consensus-types/src/randomness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ use crate::{block::Block, common::Round};
use aptos_crypto::HashValue;
use serde::{Deserialize, Serialize};

#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Eq)]
#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Eq, Hash)]
struct RandMetadataToSign {
epoch: u64,
round: Round,
}

#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Eq)]
#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Eq, Hash)]
pub struct RandMetadata {
metadata_to_sign: RandMetadataToSign,
// not used for signing
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/rand/rand_gen/aug_data_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ impl<D, Storage: AugDataStorage<D>> AugDataStore<D, Storage> {
(to_remove, to_keep)
}

pub fn new(epoch: u64, db: Arc<Storage>, config: RandConfig) -> Self {
pub fn new(epoch: u64, config: RandConfig, db: Arc<Storage>) -> Self {
let all_data = db.get_all_aug_data().unwrap_or_default();
let (to_remove, aug_data) = Self::filter_by_epoch(epoch, all_data.into_iter());
if let Err(e) = db.remove_aug_data(to_remove.into_iter()) {
Expand Down
21 changes: 19 additions & 2 deletions consensus/src/rand/rand_gen/rand_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ use crate::{
network::{IncomingRandGenRequest, NetworkSender},
pipeline::buffer_manager::{OrderedBlocks, ResetRequest},
rand::rand_gen::{
aug_data_store::AugDataStore,
network_messages::{RandMessage, RpcRequest},
rand_store::RandStore,
storage::interface::{AugDataStorage, RandStorage},
types::{AugmentedData, Proof, RandConfig, RandDecision, Share},
},
};
Expand All @@ -23,7 +25,7 @@ use tokio_retry::strategy::ExponentialBackoff;
pub type Sender<T> = UnboundedSender<T>;
pub type Receiver<T> = UnboundedReceiver<T>;

pub struct RandManager<S: Share, P: Proof<Share = S>, D: AugmentedData> {
pub struct RandManager<S: Share, P: Proof<Share = S>, D: AugmentedData, Storage> {
author: Author,
epoch_state: Arc<EpochState>,
stop: bool,
Expand All @@ -37,16 +39,26 @@ pub struct RandManager<S: Share, P: Proof<Share = S>, D: AugmentedData> {

// downstream channels
outgoing_blocks: Sender<OrderedBlocks>,
// local state
rand_store: RandStore<S, P, Storage>,
aug_data_store: AugDataStore<D, Storage>,
}

impl<S: Share, P: Proof<Share = S>, D: AugmentedData> RandManager<S, P, D> {
impl<
S: Share,
P: Proof<Share = S>,
D: AugmentedData,
Storage: RandStorage<S, P> + AugDataStorage<D>,
> RandManager<S, P, D, Storage>
{
pub fn new(
author: Author,
epoch_state: Arc<EpochState>,
signer: Arc<ValidatorSigner>,
config: RandConfig,
outgoing_blocks: Sender<OrderedBlocks>,
network_sender: Arc<NetworkSender>,
db: Arc<Storage>,
) -> Self {
let (rand_decision_tx, rand_decision_rx) = tokio::sync::mpsc::unbounded_channel();
let rb_backoff_policy = ExponentialBackoff::from_millis(2)
Expand All @@ -59,6 +71,8 @@ impl<S: Share, P: Proof<Share = S>, D: AugmentedData> RandManager<S, P, D> {
TimeService::real(),
Duration::from_secs(10),
));
let rand_store = RandStore::new(author, config.clone(), db.clone());
let aug_data_store = AugDataStore::new(epoch_state.epoch, config.clone(), db);

Self {
author,
Expand All @@ -71,6 +85,9 @@ impl<S: Share, P: Proof<Share = S>, D: AugmentedData> RandManager<S, P, D> {
rand_decision_tx,
rand_decision_rx,
outgoing_blocks,

rand_store,
aug_data_store,
}
}

Expand Down
22 changes: 15 additions & 7 deletions consensus/src/rand/rand_gen/rand_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::{
pipeline::buffer_manager::OrderedBlocks,
rand::rand_gen::{
block_queue::{BlockQueue, QueueItem},
storage::interface::RandStorage,
types::{Proof, RandConfig, RandDecision, RandShare, Share},
},
};
Expand All @@ -14,7 +15,7 @@ use aptos_consensus_types::{
randomness::RandMetadata,
};
use aptos_logger::error;
use std::collections::HashMap;
use std::{collections::HashMap, sync::Arc};

struct ShareAggregator<S> {
shares: HashMap<Author, RandShare<S>>,
Expand Down Expand Up @@ -159,20 +160,22 @@ impl<S: Share, P: Proof<Share = S>> RandItem<S, P> {
}
}

pub struct RandStore<S, P> {
pub struct RandStore<S, P, Storage> {
author: Author,
rand_config: RandConfig,
rand_map: HashMap<Round, RandItem<S, P>>,
block_queue: BlockQueue,
db: Arc<Storage>,
}

impl<S: Share, P: Proof<Share = S>> RandStore<S, P> {
pub fn new(author: Author, rand_config: RandConfig) -> Self {
impl<S: Share, P: Proof<Share = S>, Storage: RandStorage<S, P>> RandStore<S, P, Storage> {
pub fn new(author: Author, rand_config: RandConfig, db: Arc<Storage>) -> Self {
Self {
author,
rand_config,
rand_map: HashMap::new(),
block_queue: BlockQueue::new(),
db,
}
}

Expand Down Expand Up @@ -246,13 +249,14 @@ mod tests {
use crate::rand::rand_gen::{
block_queue::QueueItem,
rand_store::{RandItem, RandStore, ShareAggregator},
storage::in_memory::InMemRandDb,
test_utils::{
create_decision, create_ordered_blocks, create_share, create_share_for_round,
},
types::{MockProof, MockShare, RandConfig},
types::{MockAugData, MockProof, MockShare, RandConfig},
};
use aptos_consensus_types::{common::Author, randomness::RandMetadata};
use std::{collections::HashMap, str::FromStr};
use std::{collections::HashMap, str::FromStr, sync::Arc};

#[test]
fn test_share_aggregator() {
Expand Down Expand Up @@ -313,7 +317,11 @@ mod tests {
.collect();
let authors: Vec<Author> = weights.keys().cloned().collect();
let config = RandConfig::new(Author::ZERO, weights);
let mut rand_store = RandStore::new(Author::ZERO, config);
let mut rand_store = RandStore::new(
Author::ZERO,
config,
Arc::new(InMemRandDb::<MockShare, MockProof, MockAugData>::new()),
);

let rounds = vec![vec![1], vec![2, 3], vec![5, 8, 13]];
let blocks_1 = QueueItem::new(create_ordered_blocks(rounds[0].clone()));
Expand Down
116 changes: 116 additions & 0 deletions consensus/src/rand/rand_gen/storage/in_memory.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use crate::rand::rand_gen::{
storage::interface::{AugDataStorage, RandStorage},
types::{
AugData, AugDataId, AugmentedData, CertifiedAugData, Proof, RandDecision, RandShare, Share,
ShareId,
},
};
use aptos_consensus_types::randomness::RandMetadata;
use aptos_infallible::RwLock;
use std::collections::HashMap;

pub struct InMemRandDb<S, P, D> {
shares: RwLock<HashMap<ShareId, RandShare<S>>>,
decisions: RwLock<HashMap<RandMetadata, RandDecision<P>>>,
aug_data: RwLock<HashMap<AugDataId, AugData<D>>>,
certified_aug_data: RwLock<HashMap<AugDataId, CertifiedAugData<D>>>,
}

impl<S, P, D> InMemRandDb<S, P, D> {
pub fn new() -> Self {
Self {
shares: RwLock::new(HashMap::new()),
decisions: RwLock::new(HashMap::new()),
aug_data: RwLock::new(HashMap::new()),
certified_aug_data: RwLock::new(HashMap::new()),
}
}
}

impl<S: Share, P: Proof<Share = S>, D: AugmentedData> RandStorage<S, P> for InMemRandDb<S, P, D> {
fn save_share(&self, share: &RandShare<S>) -> anyhow::Result<()> {
self.shares
.write()
.insert(share.share_id().clone(), share.clone());
Ok(())
}

fn save_decision(&self, decision: &RandDecision<P>) -> anyhow::Result<()> {
self.decisions
.write()
.insert(decision.rand_metadata().clone(), decision.clone());
Ok(())
}

fn get_all_shares(&self) -> anyhow::Result<Vec<(ShareId, RandShare<S>)>> {
Ok(self.shares.read().clone().into_iter().collect())
}

fn get_all_decision(&self) -> anyhow::Result<Vec<(RandMetadata, RandDecision<P>)>> {
Ok(self.decisions.read().clone().into_iter().collect())
}

fn remove_shares(&self, shares: impl Iterator<Item = RandShare<S>>) -> anyhow::Result<()> {
for share in shares {
self.shares.write().remove(&share.share_id());
}
Ok(())
}

fn remove_decisions(
&self,
decisions: impl Iterator<Item = RandDecision<P>>,
) -> anyhow::Result<()> {
for decision in decisions {
self.decisions.write().remove(decision.rand_metadata());
}
Ok(())
}
}

impl<S: Share, P: Proof<Share = S>, D: AugmentedData> AugDataStorage<D> for InMemRandDb<S, P, D> {
fn save_aug_data(&self, aug_data: &AugData<D>) -> anyhow::Result<()> {
self.aug_data
.write()
.insert(aug_data.id(), aug_data.clone());
Ok(())
}

fn save_certified_aug_data(
&self,
certified_aug_data: &CertifiedAugData<D>,
) -> anyhow::Result<()> {
self.certified_aug_data
.write()
.insert(certified_aug_data.id(), certified_aug_data.clone());
Ok(())
}

fn get_all_aug_data(&self) -> anyhow::Result<Vec<(AugDataId, AugData<D>)>> {
Ok(self.aug_data.read().clone().into_iter().collect())
}

fn get_all_certified_aug_data(&self) -> anyhow::Result<Vec<(AugDataId, CertifiedAugData<D>)>> {
Ok(self.certified_aug_data.read().clone().into_iter().collect())
}

fn remove_aug_data(&self, aug_data: impl Iterator<Item = AugData<D>>) -> anyhow::Result<()> {
for data in aug_data {
self.aug_data.write().remove(&data.id());
}
Ok(())
}

fn remove_certified_aug_data(
&self,
certified_aug_data: impl Iterator<Item = CertifiedAugData<D>>,
) -> anyhow::Result<()> {
for data in certified_aug_data {
self.certified_aug_data.write().remove(&data.id());
}
Ok(())
}
}
1 change: 1 addition & 0 deletions consensus/src/rand/rand_gen/storage/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0
pub mod db;
pub mod in_memory;
pub mod interface;
mod schema;
9 changes: 7 additions & 2 deletions consensus/src/rand/rand_gen/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ pub(super) struct MockShare;
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub(super) struct MockProof;

#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub(super) struct MockAugData;

impl Share for MockShare {
fn verify(
&self,
Expand Down Expand Up @@ -48,6 +51,8 @@ impl Proof for MockProof {
}
}

impl AugmentedData for MockAugData {}

pub trait Share:
Clone + Debug + PartialEq + Send + Sync + Serialize + DeserializeOwned + 'static
{
Expand All @@ -74,7 +79,7 @@ pub trait AugmentedData:
{
}

#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)]
pub struct ShareId {
epoch: u64,
round: Round,
Expand Down Expand Up @@ -173,7 +178,7 @@ impl<P> ShareAck<P> {
}
}

#[derive(Debug, PartialEq, Serialize, Deserialize)]
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Hash, Eq)]
pub struct AugDataId {
epoch: u64,
author: Author,
Expand Down

0 comments on commit 7d64633

Please sign in to comment.