diff --git a/Cargo.lock b/Cargo.lock index 920a9d45..c4bcc4ff 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5153,6 +5153,7 @@ dependencies = [ "hex", "humantime", "humantime-serde", + "itertools 0.13.0", "libp2p", "malachite-common", "malachite-config", diff --git a/Cargo.toml b/Cargo.toml index d3baff76..12452c1e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -47,6 +47,7 @@ threadpool = "1.8.1" rand = "0.8.5" humantime-serde = "1.1.1" humantime = "2.1.0" +itertools = "0.13.0" [build-dependencies] tonic-build = "0.9.2" diff --git a/src/consensus/consensus.rs b/src/consensus/consensus.rs index d331650f..150a98ae 100644 --- a/src/consensus/consensus.rs +++ b/src/consensus/consensus.rs @@ -14,7 +14,7 @@ use malachite_consensus::{Effect, ProposedValue, Resume, SignedConsensusMsg}; use malachite_metrics::Metrics; use crate::consensus::timers::{TimeoutElapsed, TimerScheduler}; -use crate::consensus::validator::{self, ShardValidator}; +use crate::consensus::validator::ShardValidator; use crate::core::types::{ Height, ShardId, SnapchainContext, SnapchainShard, SnapchainValidator, SnapchainValidatorContext, diff --git a/src/consensus/proposer.rs b/src/consensus/proposer.rs index 49ce10f2..269a44b6 100644 --- a/src/consensus/proposer.rs +++ b/src/consensus/proposer.rs @@ -294,7 +294,7 @@ impl BlockProposer { async fn publish_new_block(&self, block: Block) { match self.block_tx.send(block.clone()).await { Err(err) => { - error!("Erorr publishing new block {:#?}", err) + error!("Error publishing new block {:?}", err.to_string()); } Ok(_) => {} } diff --git a/src/core/message.rs b/src/core/message.rs index bb324b3b..736f6971 100644 --- a/src/core/message.rs +++ b/src/core/message.rs @@ -1,4 +1,5 @@ use crate::proto::msg as message; +use crate::proto::snapchain; impl message::Message { pub fn is_type(&self, message_type: message::MessageType) -> bool { @@ -20,4 +21,20 @@ impl message::Message { 0 } } + + pub fn hex_hash(&self) -> String { + hex::encode(&self.hash) + } +} + +impl snapchain::ValidatorMessage { + pub fn fid(&self) -> u32 { + if let Some(fname) = &self.fname_transfer { + return fname.fid as u32; + } + if let Some(event) = &self.on_chain_event { + return event.fid as u32; + } + 0 + } } diff --git a/src/network/server.rs b/src/network/server.rs index 22f85746..ef9eb06e 100644 --- a/src/network/server.rs +++ b/src/network/server.rs @@ -4,7 +4,7 @@ use crate::core::error::HubError; use crate::proto::msg as message; use crate::proto::rpc::snapchain_service_server::SnapchainService; use crate::proto::rpc::{BlocksRequest, BlocksResponse, ShardChunksRequest, ShardChunksResponse}; -use crate::storage::store::engine::Message; +use crate::storage::store::engine::MempoolMessage; use crate::storage::store::shard::ShardStore; use crate::storage::store::BlockStore; use hex::ToHex; @@ -13,7 +13,7 @@ use tonic::{Request, Response, Status}; use tracing::info; pub struct MySnapchainService { - message_tx: mpsc::Sender, + message_tx: mpsc::Sender, block_store: BlockStore, shard_stores: HashMap, } @@ -22,7 +22,7 @@ impl MySnapchainService { pub fn new( block_store: BlockStore, shard_stores: HashMap, - message_tx: mpsc::Sender, + message_tx: mpsc::Sender, ) -> Self { Self { block_store, @@ -43,7 +43,7 @@ impl SnapchainService for MySnapchainService { let message = request.into_inner(); self.message_tx - .send(Message::UserMessage(message.clone())) + .send(MempoolMessage::UserMessage(message.clone())) .await .unwrap(); // Do we need clone here? I think yes? diff --git a/src/node/snapchain_node.rs b/src/node/snapchain_node.rs index 3d52da62..00a6d319 100644 --- a/src/node/snapchain_node.rs +++ b/src/node/snapchain_node.rs @@ -6,10 +6,9 @@ use crate::core::types::{ SnapchainValidatorSet, }; use crate::network::gossip::GossipEvent; -use crate::proto::msg as message; use crate::proto::snapchain::{Block, ShardChunk}; use crate::storage::db::RocksDB; -use crate::storage::store::engine::{BlockEngine, Message, ShardEngine}; +use crate::storage::store::engine::{BlockEngine, MempoolMessage, ShardEngine}; use crate::storage::store::shard::ShardStore; use crate::storage::store::BlockStore; use libp2p::identity::ed25519::Keypair; @@ -24,7 +23,7 @@ const MAX_SHARDS: u32 = 3; pub struct SnapchainNode { pub consensus_actors: BTreeMap>>, - pub messages_tx_by_shard: HashMap>, + pub messages_tx_by_shard: HashMap>, pub shard_stores: HashMap, pub address: Address, } @@ -45,7 +44,7 @@ impl SnapchainNode { let (shard_decision_tx, shard_decision_rx) = mpsc::channel::(100); - let mut shard_messages: HashMap> = HashMap::new(); + let mut shard_messages: HashMap> = HashMap::new(); let mut shard_stores: HashMap = HashMap::new(); diff --git a/src/proto/blocks.proto b/src/proto/blocks.proto index d8978a6c..541b7feb 100644 --- a/src/proto/blocks.proto +++ b/src/proto/blocks.proto @@ -122,7 +122,7 @@ message Transaction { // Fname transfers message FnameTransfer { - + uint64 fid = 1; } // Validator initiated prunes/revokes etc diff --git a/src/storage/store/account/store.rs b/src/storage/store/account/store.rs index 3ee11ddf..6e7551af 100644 --- a/src/storage/store/account/store.rs +++ b/src/storage/store/account/store.rs @@ -12,7 +12,6 @@ use crate::{ proto::msg::{link_body::Target, message_data::Body, Message, MessageType}, storage::db::{RocksDB, RocksDbTransactionBatch}, }; -use prost::Message as _; use std::clone::Clone; use std::string::ToString; use std::sync::{Arc, Mutex}; diff --git a/src/storage/store/engine.rs b/src/storage/store/engine.rs index 231858a5..4b386cce 100644 --- a/src/storage/store/engine.rs +++ b/src/storage/store/engine.rs @@ -1,10 +1,9 @@ -use super::account::{OnchainEventStorageError, OnchainEventsPage}; +use super::account::OnchainEventStorageError; use super::shard::ShardStore; use crate::core::error::HubError; -use crate::core::types::{proto, Height}; +use crate::core::types::Height; use crate::proto::onchain_event::{OnChainEvent, OnChainEventType}; -use crate::proto::{msg as message, snapchain}; -use crate::storage::db; +use crate::proto::{hub_event, msg as message, snapchain}; use crate::storage::db::{PageOptions, RocksDB, RocksDbTransactionBatch}; use crate::storage::store::account::{ CastStore, MessagesPage, OnchainEventStore, Store, StoreEventHandler, @@ -12,13 +11,13 @@ use crate::storage::store::account::{ use crate::storage::store::BlockStore; use crate::storage::trie; use crate::storage::trie::merkle_trie; +use itertools::Itertools; use message::MessageType; use snapchain::{Block, ShardChunk, Transaction}; -use std::iter; use std::sync::Arc; use thiserror::Error; use tokio::sync::mpsc; -use tracing::{error, warn}; +use tracing::{error, info, warn}; #[derive(Error, Debug)] enum EngineError { @@ -34,6 +33,9 @@ enum EngineError { #[error("unsupported message type")] UnsupportedMessageType(MessageType), + #[error("unsupported event")] + UnsupportedEvent, + #[error("merkle trie root hash mismatch")] HashMismatch, @@ -57,11 +59,20 @@ impl EngineError { } #[derive(Clone)] -pub enum Message { +pub enum MempoolMessage { UserMessage(message::Message), ValidatorMessage(snapchain::ValidatorMessage), } +impl MempoolMessage { + pub fn fid(&self) -> u32 { + match self { + MempoolMessage::UserMessage(msg) => msg.fid(), + MempoolMessage::ValidatorMessage(msg) => msg.fid(), + } + } +} + // Shard state root and the transactions #[derive(Clone)] pub struct ShardStateChange { @@ -73,8 +84,8 @@ pub struct ShardStateChange { pub struct ShardEngine { shard_id: u32, shard_store: ShardStore, - messages_rx: mpsc::Receiver, - messages_tx: mpsc::Sender, + messages_rx: mpsc::Receiver, + messages_tx: mpsc::Sender, trie: merkle_trie::MerkleTrie, cast_store: Store, pub db: Arc, @@ -109,7 +120,7 @@ impl ShardEngine { let onchain_event_store = OnchainEventStore::new(shard_store.db.clone(), event_handler.clone()); - let (messages_tx, messages_rx) = mpsc::channel::(10_000); + let (messages_tx, messages_rx) = mpsc::channel::(10_000); ShardEngine { shard_id, shard_store, @@ -122,7 +133,7 @@ impl ShardEngine { } } - pub fn messages_tx(&self) -> mpsc::Sender { + pub fn messages_tx(&self) -> mpsc::Sender { self.messages_tx.clone() } @@ -145,75 +156,65 @@ impl ShardEngine { } } - let mut merged_user_messages: Vec = vec![]; - let mut merged_system_messages: Vec = vec![]; - - for msg in &messages { - match msg { - Message::ValidatorMessage(msg) => { - if let Some(onchain_event) = &msg.on_chain_event { - match self - .onchain_event_store - .merge_onchain_event(onchain_event.clone(), txn_batch) - { - Ok(_) => { - merged_system_messages.push(msg.clone()); - // TODO(aditi): Insert into trie. - } - Err(err) => { - error!("Unable to merge onchain event: {}", err) - } - } - } - } - Message::UserMessage(msg) => { - let data = msg.data.as_ref().ok_or(EngineError::NoMessageData)?; - let msg_type = MessageType::try_from(data.r#type) - .or(Err(EngineError::InvalidMessageType))?; - match msg_type { - MessageType::CastAdd => { - self.cast_store - .merge(&msg, txn_batch) - .map_err(EngineError::new_store_error(msg.hash.clone()))?; - merged_user_messages.push(msg.clone()); - self.trie - .insert(&*self.db, txn_batch, vec![msg.hash.clone()])?; - } - unhandled_type => { - return Err(EngineError::UnsupportedMessageType(unhandled_type)); - } - } - } - } + let mut snapchain_txns = self.create_transactions_from_mempool(messages); + for snapchain_txn in &mut snapchain_txns { + self.replay_snapchain_txn(&snapchain_txn, txn_batch)?; + snapchain_txn.account_root = self.trie.root_hash()?; // TODO: This should use the account root and not the shard root } - // TODO: Group by fid so we only have a single txn per block per fid - let mut transactions = vec![]; - let snap_txn = snapchain::Transaction { - fid: 1234, //TODO - account_root: vec![5, 5, 6, 6], //TODO - system_messages: merged_system_messages, //TODO - user_messages: merged_user_messages, - }; - transactions.push(snap_txn); - let new_root_hash = self.trie.root_hash()?; - let result = ShardStateChange { shard_id, new_state_root: new_root_hash.clone(), - transactions, + transactions: snapchain_txns, }; Ok(result) } + // Groups messages by fid and creates a transaction for each fid + fn create_transactions_from_mempool( + &mut self, + mut messages: Vec, + ) -> Vec { + let mut transactions = vec![]; + + let grouped_messages = messages.iter().into_group_map_by(|msg| msg.fid()); + let unique_fids = grouped_messages.keys().len(); + for (fid, messages) in grouped_messages { + let mut transaction = Transaction { + fid: fid as u64, + account_root: vec![], // Starts empty, will be updated after replay + system_messages: vec![], + user_messages: vec![], + }; + for msg in messages { + match msg { + MempoolMessage::ValidatorMessage(msg) => { + transaction.system_messages.push(msg.clone()); + } + MempoolMessage::UserMessage(msg) => { + transaction.user_messages.push(msg.clone()); + } + } + } + transactions.push(transaction); + } + info!( + transactions = transactions.len(), + messages = messages.len(), + fids = unique_fids, + "Created transactions from mempool" + ); + transactions + } + pub fn propose_state_change(&mut self, shard: u32) -> ShardStateChange { let mut txn = RocksDbTransactionBatch::new(); let result = self.prepare_proposal(&mut txn, shard).unwrap(); //TODO: don't unwrap() - // TODO: use drop trait? - self.trie.reload(&*self.db).unwrap(); + // TODO: this should probably operate automatically via drop trait + self.trie.reload(&self.db).unwrap(); result } @@ -224,48 +225,130 @@ impl ShardEngine { transactions: &[Transaction], shard_root: &[u8], ) -> Result<(), EngineError> { - let mut merged_messages: Vec = vec![]; - - for snap_txn in transactions { - for msg in &snap_txn.user_messages { - let data = msg.data.as_ref().ok_or(EngineError::NoMessageData)?; - let msg_type = - MessageType::try_from(data.r#type).or(Err(EngineError::InvalidMessageType))?; + for snapchain_txn in transactions { + self.replay_snapchain_txn(snapchain_txn, txn_batch)?; + } - match msg_type { - MessageType::CastAdd => { - self.cast_store - .merge(msg, txn_batch) - .map_err(EngineError::new_store_error(msg.hash.clone()))?; + let root1 = self.trie.root_hash()?; - merged_messages.push(msg.clone()); + if &root1 != shard_root { + return Err(EngineError::HashMismatch); + } - self.trie - .insert(&*self.db, txn_batch, vec![msg.hash.clone()])?; - } + Ok(()) + } - unhandled_type => { - return Err(EngineError::UnsupportedMessageType(unhandled_type)); - } + fn replay_snapchain_txn( + &mut self, + snapchain_txn: &Transaction, + txn_batch: &mut RocksDbTransactionBatch, + ) -> Result<(), EngineError> { + let total_user_messages = snapchain_txn.user_messages.len(); + let total_system_messages = snapchain_txn.system_messages.len(); + let mut user_messages_count = 0; + let mut system_messages_count = 0; + + for msg in &snapchain_txn.user_messages { + // Errors are validated based on the shard root + let result = self.merge_message(msg, txn_batch); + match result { + Ok(event) => { + self.update_trie(event, txn_batch)?; + user_messages_count += 1; + } + Err(err) => { + warn!( + fid = msg.fid(), + hash = msg.hex_hash(), + "Error merging message: {:?}", + err + ); } } + } - for msg in &snap_txn.system_messages { - if let Some(onchain_event) = &msg.on_chain_event { - self.onchain_event_store - .merge_onchain_event(onchain_event.clone(), txn_batch)?; + for msg in &snapchain_txn.system_messages { + if let Some(onchain_event) = &msg.on_chain_event { + let event = self + .onchain_event_store + .merge_onchain_event(onchain_event.clone(), txn_batch); - // TODO(aditi): Insert into the trie + match event { + Ok(hub_event) => { + self.update_trie(hub_event, txn_batch)?; + system_messages_count += 1; + } + Err(err) => { + warn!("Error merging onchain event: {:?}", err); + } } } } - let root1 = self.trie.root_hash()?; + info!( + fid = snapchain_txn.fid, + num_user_messages = total_user_messages, + num_system_messages = total_system_messages, + user_messages_merged = user_messages_count, + system_messages_merged = system_messages_count, + "Replayed transaction" + ); - if &root1 != shard_root { - return Err(EngineError::HashMismatch); - } + // TODO: This should return the account root + Ok(()) + } + fn merge_message( + &mut self, + msg: &message::Message, + txn_batch: &mut RocksDbTransactionBatch, + ) -> Result { + let data = msg.data.as_ref().ok_or(EngineError::NoMessageData)?; + let mt = MessageType::try_from(data.r#type).or(Err(EngineError::InvalidMessageType))?; + + let event = match mt { + MessageType::CastAdd => self + .cast_store + .merge(msg, txn_batch) + .map_err(EngineError::new_store_error(msg.hash.clone())), + unhandled_type => { + return Err(EngineError::UnsupportedMessageType(unhandled_type)); + } + }?; + + Ok(event) + } + + fn update_trie( + &mut self, + event: hub_event::HubEvent, + txn_batch: &mut RocksDbTransactionBatch, + ) -> Result<(), EngineError> { + match event.body { + Some(hub_event::hub_event::Body::MergeMessageBody(merge)) => { + if let Some(msg) = merge.message { + self.trie + .insert(&self.db, txn_batch, vec![msg.hash.clone()])?; + } + for deleted_message in merge.deleted_messages { + self.trie + .delete(&self.db, txn_batch, vec![deleted_message.hash.clone()])?; + } + } + Some(hub_event::hub_event::Body::MergeOnChainEventBody(merge)) => { + if let Some(onchain_event) = merge.on_chain_event { + self.trie.insert( + &self.db, + txn_batch, + vec![onchain_event.transaction_hash.clone()], + )?; + } + } + _ => { + // TODO: This fallback case should not exist, every event should be handled + return Err(EngineError::UnsupportedEvent); + } + } Ok(()) } @@ -298,7 +381,7 @@ impl ShardEngine { } self.db.commit(txn).unwrap(); - self.trie.reload(&*self.shard_store.db).unwrap(); + self.trie.reload(&self.db).unwrap(); match self.shard_store.put_shard_chunk(shard_chunk) { Err(err) => { @@ -308,6 +391,15 @@ impl ShardEngine { } } + pub(crate) fn sync_id_exists(&mut self, sync_id: &Vec) -> bool { + self.trie + .exists(&self.db, sync_id.as_ref()) + .unwrap_or_else(|err| { + error!("Error checking if sync id exists: {:?}", err); + false + }) + } + pub fn get_confirmed_height(&self) -> Height { match self.shard_store.max_block_number() { Ok(block_num) => Height::new(self.shard_id, block_num), diff --git a/src/storage/store/engine_tests.rs b/src/storage/store/engine_tests.rs index 07a77009..aa97e01c 100644 --- a/src/storage/store/engine_tests.rs +++ b/src/storage/store/engine_tests.rs @@ -5,7 +5,7 @@ mod tests { use crate::proto::onchain_event::{OnChainEvent, OnChainEventType}; use crate::proto::snapchain::{Height, ShardChunk, ShardHeader, Transaction}; use crate::storage::db; - use crate::storage::store::engine::{Message, ShardEngine, ShardStateChange}; + use crate::storage::store::engine::{MempoolMessage, ShardEngine, ShardStateChange}; use crate::storage::store::shard::ShardStore; use crate::utils::cli; use prost::Message as _; @@ -51,14 +51,7 @@ mod tests { shard_index, block_number, }); - - //TODO: don't assume 1 transaction - chunk.transactions[0].user_messages = change.transactions[0] - .user_messages - .iter() - .cloned() - .collect(); - + chunk.transactions = change.transactions.clone(); chunk } @@ -88,7 +81,7 @@ mod tests { block_number: 0, block_hash: vec![], block_timestamp: 0, - transaction_hash: vec![], + transaction_hash: [1; 10].to_vec(), log_index: 0, fid: 1, tx_index: 0, @@ -120,8 +113,7 @@ mod tests { let state_change = engine.propose_state_change(1); assert_eq!(1, state_change.shard_id); - assert_eq!(state_change.transactions.len(), 1); - assert_eq!(0, state_change.transactions[0].user_messages.len()); + assert_eq!(state_change.transactions.len(), 0); assert_eq!( "237b11d0dd9e78994ef2f141c7f170d48bb51d34", to_hex(&state_change.new_state_root) @@ -180,7 +172,7 @@ mod tests { let messages_tx = engine.messages_tx(); messages_tx - .send(Message::UserMessage(msg1.clone())) + .send(MempoolMessage::UserMessage(msg1.clone())) .await .unwrap(); let state_change = engine.propose_state_change(1); @@ -196,6 +188,9 @@ mod tests { let events = HubEvent::get_events(engine.db.clone(), 0, None, None).unwrap(); assert_eq!(0, events.events.len()); + // And it's not inserted into the trie + assert_eq!(engine.sync_id_exists(&msg1.hash), false); + let valid = engine.validate_state_change(&state_change); assert!(valid); @@ -224,6 +219,9 @@ mod tests { to_hex(&msg1.hash), to_hex(&generated_event.message.unwrap().hash) ); + + // The message exists in the trie + assert_eq!(engine.sync_id_exists(&msg1.hash), true); } #[tokio::test] @@ -253,7 +251,7 @@ mod tests { { messages_tx - .send(Message::UserMessage(msg1.clone())) + .send(MempoolMessage::UserMessage(msg1.clone())) .await .unwrap(); let state_change = engine.propose_state_change(1); @@ -282,7 +280,7 @@ mod tests { { messages_tx - .send(Message::UserMessage(msg2.clone())) + .send(MempoolMessage::UserMessage(msg2.clone())) .await .unwrap(); let state_change = engine.propose_state_change(1); @@ -323,11 +321,11 @@ mod tests { { messages_tx - .send(Message::UserMessage(msg1.clone())) + .send(MempoolMessage::UserMessage(msg1.clone())) .await .unwrap(); messages_tx - .send(Message::UserMessage(msg2.clone())) + .send(MempoolMessage::UserMessage(msg2.clone())) .await .unwrap(); let state_change = engine.propose_state_change(1); @@ -364,7 +362,7 @@ mod tests { let (mut engine, _tmpdir) = new_engine(); let messages_tx = engine.messages_tx(); messages_tx - .send(Message::ValidatorMessage( + .send(MempoolMessage::ValidatorMessage( crate::proto::snapchain::ValidatorMessage { on_chain_event: Some(onchain_event), fname_transfer: None, diff --git a/tests/consensus_test.rs b/tests/consensus_test.rs index 96d1cb4d..3f8b1e2f 100644 --- a/tests/consensus_test.rs +++ b/tests/consensus_test.rs @@ -78,12 +78,16 @@ impl NodeForTest { let node_id = node.id(); let assert_valid_block = move |block: &Block| { let header = block.header.as_ref().unwrap(); - let message_count = block.shard_chunks[0].transactions[0].user_messages.len(); + let transactions_count: usize = block + .shard_chunks + .iter() + .map(|c| c.transactions.len()) + .sum(); info!( hash = hex::encode(&block.hash), height = header.height.as_ref().map(|h| h.block_number), id = node_id, - message_count, + transactions = transactions_count, "decided block", ); assert_eq!(block.shard_chunks.len(), num_shards as usize); @@ -312,9 +316,11 @@ async fn test_basic_consensus() { hash.extend_from_slice(&i.to_be_bytes()); // just for now messages_tx1 - .send(snapchain::storage::store::engine::Message::UserMessage( - cli::compose_message(321, format!("Cast {}", i).as_str(), None, None), - )) + .send( + snapchain::storage::store::engine::MempoolMessage::UserMessage( + cli::compose_message(321, format!("Cast {}", i).as_str(), None, None), + ), + ) .await .unwrap(); i += 1;