Skip to content

Commit

Permalink
Remove included messages from mempool
Browse files Browse the repository at this point in the history
  • Loading branch information
FrederikBolding committed Jan 8, 2025
1 parent 5875217 commit 9945334
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 2 deletions.
7 changes: 6 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use snapchain::network::server::MyHubService;
use snapchain::node::snapchain_node::SnapchainNode;
use snapchain::proto::admin_service_server::AdminServiceServer;
use snapchain::proto::hub_service_server::HubServiceServer;
use snapchain::proto::Block;
use snapchain::storage::db::RocksDB;
use snapchain::storage::store::engine::MempoolMessage;
use snapchain::storage::store::BlockStore;
Expand Down Expand Up @@ -96,6 +97,7 @@ async fn main() -> Result<(), Box<dyn Error>> {

let (system_tx, mut system_rx) = mpsc::channel::<SystemMessage>(100);
let (mempool_tx, mut mempool_rx) = mpsc::channel::<MempoolMessage>(100);
let (block_tx, mut block_rx) = mpsc::channel::<Block>(100);

let gossip_result = SnapchainGossip::create(
keypair.clone(),
Expand Down Expand Up @@ -130,7 +132,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
app_config.mempool.clone(),
Some(app_config.rpc_address.clone()),
gossip_tx.clone(),
None,
Some(block_tx.clone()),
block_store.clone(),
app_config.rocksdb_dir.clone(),
statsd_client.clone(),
Expand Down Expand Up @@ -339,6 +341,9 @@ async fn main() -> Result<(), Box<dyn Error>> {
}
}
}
Some(block) = block_rx.recv() => {
node.handle_block(block).await;
}
}
}
}
27 changes: 26 additions & 1 deletion src/node/snapchain_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ use informalsystems_malachitebft_metrics::Metrics;
use libp2p::identity::ed25519::Keypair;
use ractor::ActorRef;
use std::collections::{BTreeMap, HashMap};
use tokio::sync::mpsc;
use std::sync::Arc;
use tokio::sync::{mpsc, RwLock};
use tracing::warn;

const MAX_SHARDS: u32 = 64;
Expand All @@ -31,6 +32,7 @@ pub struct SnapchainNode {
pub shard_stores: HashMap<u32, Stores>,
pub shard_senders: HashMap<u32, Senders>,
pub address: Address,
shard_mempools: HashMap<u32, Arc<RwLock<Mempool>>>,
}

impl SnapchainNode {
Expand All @@ -54,6 +56,7 @@ impl SnapchainNode {

let mut shard_senders: HashMap<u32, Senders> = HashMap::new();
let mut shard_stores: HashMap<u32, Stores> = HashMap::new();
let mut shard_mempools: HashMap<u32, Arc<RwLock<Mempool>>> = HashMap::new();

// Create the shard validators
for shard_id in config.shard_ids {
Expand Down Expand Up @@ -99,6 +102,7 @@ impl SnapchainNode {

shard_senders.insert(shard_id, engine.get_senders());
shard_stores.insert(shard_id, engine.get_stores());
shard_mempools.insert(shard_id, engine.mempool());

let shard_proposer = ShardProposer::new(
validator_address.clone(),
Expand Down Expand Up @@ -190,6 +194,7 @@ impl SnapchainNode {
address: validator_address,
shard_senders,
shard_stores,
shard_mempools,
}
}

Expand Down Expand Up @@ -224,4 +229,24 @@ impl SnapchainNode {
warn!("No actor found for shard, could not forward message");
}
}

pub async fn handle_block(&self, block: Block) {
for chunk in block.shard_chunks {
let header = chunk.header.expect("Expects chunk to have a header");
let height = header.height.expect("Expects header to have a height");
let mempool = self
.shard_mempools
.get(&height.shard_index)
.expect("Expects mempool to exist for shard");
let mut mempool_write = mempool.write().await;
for transaction in chunk.transactions {
for user_message in transaction.user_messages {
mempool_write.remove(user_message.hex_hash());
}
for system_message in transaction.system_messages {
mempool_write.remove(system_message.hex_hash());
}
}
}
}
}

0 comments on commit 9945334

Please sign in to comment.