diff --git a/Cargo.lock b/Cargo.lock index 535327bd7d..28307fda11 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10744,6 +10744,7 @@ dependencies = [ "sysinfo", "test-helper", "thiserror", + "timeout-join-handler", "tokio", ] diff --git a/block-relayer/src/block_relayer.rs b/block-relayer/src/block_relayer.rs index d8d791051c..d8978076ae 100644 --- a/block-relayer/src/block_relayer.rs +++ b/block-relayer/src/block_relayer.rs @@ -203,7 +203,9 @@ impl BlockRelayer { ctx: &mut ServiceContext, ) -> Result<()> { let network = ctx.get_shared::()?; - let block_connector_service = ctx.service_ref::()?.clone(); + let block_connector_service = ctx + .service_ref::>()? + .clone(); let txpool = self.txpool.clone(); let metrics = self.metrics.clone(); let fut = async move { diff --git a/config/src/available_port.rs b/config/src/available_port.rs index 588b28ad81..f03bf1af60 100644 --- a/config/src/available_port.rs +++ b/config/src/available_port.rs @@ -57,7 +57,7 @@ fn get_ephemeral_port() -> ::std::io::Result { use std::net::{TcpListener, TcpStream}; // Request a random available port from the OS - let listener = TcpListener::bind(("localhost", 0))?; + let listener = TcpListener::bind(("127.0.0.1", 0))?; let addr = listener.local_addr()?; // Create and accept a connection (which we'll promptly drop) in order to force the port diff --git a/network/tests/network_node_test.rs b/network/tests/network_node_test.rs index e17b9e94ae..c70ef5af26 100644 --- a/network/tests/network_node_test.rs +++ b/network/tests/network_node_test.rs @@ -35,7 +35,7 @@ fn test_reconnected_peers() -> anyhow::Result<()> { // stop node2, node1's peers is empty node2.stop()?; - thread::sleep(Duration::from_secs(3)); + thread::sleep(Duration::from_secs(12)); loop { let network_state = block_on(async { node1_network.network_state().await })?; debug!("network_state: {:?}", network_state); diff --git a/node/src/node.rs b/node/src/node.rs index fd3e7fcf77..34b7cc20a7 100644 --- a/node/src/node.rs +++ b/node/src/node.rs @@ -51,7 +51,7 @@ use starcoin_sync::block_connector::{BlockConnectorService, ExecuteRequest, Rese use starcoin_sync::sync::SyncService; use starcoin_sync::txn_sync::TxnSyncService; use starcoin_sync::verified_rpc_client::VerifiedRpcClient; -use starcoin_txpool::TxPoolActorService; +use starcoin_txpool::{TxPoolActorService, TxPoolService}; use starcoin_types::system_events::{SystemShutdown, SystemStarted}; use starcoin_vm_runtime::metrics::VMMetrics; use std::sync::Arc; @@ -133,7 +133,9 @@ impl ServiceHandler for NodeService { .start_service_sync(GenerateBlockEventPacemaker::service_name()), ), NodeRequest::ResetNode(block_hash) => { - let connect_service = ctx.service_ref::()?.clone(); + let connect_service = ctx + .service_ref::>()? + .clone(); let fut = async move { info!("Prepare to reset node startup info to {}", block_hash); connect_service.send(ResetRequest { block_hash }).await? @@ -147,7 +149,9 @@ impl ServiceHandler for NodeService { .get_shared_sync::>() .expect("Storage must exist."); - let connect_service = ctx.service_ref::()?.clone(); + let connect_service = ctx + .service_ref::>()? + .clone(); let network = ctx.get_shared::()?; let fut = async move { info!("Prepare to re execute block {}", block_hash); @@ -347,7 +351,9 @@ impl NodeService { registry.register::().await?; - registry.register::().await?; + registry + .register::>() + .await?; registry.register::().await?; let block_relayer = registry.register::().await?; diff --git a/sync/Cargo.toml b/sync/Cargo.toml index fdff574ab8..70c429f5cf 100644 --- a/sync/Cargo.toml +++ b/sync/Cargo.toml @@ -42,6 +42,7 @@ stest = { workspace = true } stream-task = { workspace = true } sysinfo = { workspace = true } thiserror = { workspace = true } +timeout-join-handler = { workspace = true } [dev-dependencies] hex = { workspace = true } diff --git a/sync/src/block_connector/block_connector_service.rs b/sync/src/block_connector/block_connector_service.rs index d35d9e4757..b3ac0303d2 100644 --- a/sync/src/block_connector/block_connector_service.rs +++ b/sync/src/block_connector/block_connector_service.rs @@ -1,13 +1,18 @@ // Copyright (c) The Starcoin Core Contributors // SPDX-License-Identifier: Apache-2.0 +#[cfg(test)] +use super::CheckBlockConnectorHashValue; use crate::block_connector::{ExecuteRequest, ResetRequest, WriteBlockChainService}; use crate::sync::{CheckSyncEvent, SyncService}; -use crate::tasks::{BlockConnectedEvent, BlockDiskCheckEvent}; +use crate::tasks::{BlockConnectedEvent, BlockConnectedFinishEvent, BlockDiskCheckEvent}; +#[cfg(test)] +use anyhow::bail; use anyhow::{format_err, Result}; use network_api::PeerProvider; -use starcoin_chain_api::{ConnectBlockError, WriteableChainService}; +use starcoin_chain_api::{ChainReader, ConnectBlockError, WriteableChainService}; use starcoin_config::{NodeConfig, G_CRATE_VERSION}; +use starcoin_crypto::HashValue; use starcoin_executor::VMMetrics; use starcoin_logger::prelude::*; use starcoin_network::NetworkServiceRef; @@ -17,6 +22,9 @@ use starcoin_service_registry::{ use starcoin_storage::{BlockStore, Storage}; use starcoin_sync_api::PeerNewBlock; use starcoin_txpool::TxPoolService; +use starcoin_txpool_api::TxPoolSyncService; +#[cfg(test)] +use starcoin_txpool_mock_service::MockTxPoolService; use starcoin_types::block::ExecutedBlock; use starcoin_types::sync_status::SyncStatus; use starcoin_types::system_events::{MinedBlock, SyncStatusChangeEvent, SystemShutdown}; @@ -26,15 +34,21 @@ use sysinfo::{DiskExt, System, SystemExt}; const DISK_CHECKPOINT_FOR_PANIC: u64 = 1024 * 1024 * 1024 * 3; const DISK_CHECKPOINT_FOR_WARN: u64 = 1024 * 1024 * 1024 * 5; -pub struct BlockConnectorService { - chain_service: WriteBlockChainService, +pub struct BlockConnectorService +where + TransactionPoolServiceT: TxPoolSyncService + 'static, +{ + chain_service: WriteBlockChainService, sync_status: Option, config: Arc, } -impl BlockConnectorService { +impl BlockConnectorService +where + TransactionPoolServiceT: TxPoolSyncService + 'static, +{ pub fn new( - chain_service: WriteBlockChainService, + chain_service: WriteBlockChainService, config: Arc, ) -> Self { Self { @@ -51,6 +65,10 @@ impl BlockConnectorService { } } + pub fn chain_head_id(&self) -> HashValue { + self.chain_service.get_main().status().head.id() + } + pub fn check_disk_space(&mut self) -> Option> { if System::IS_SUPPORTED { let mut sys = System::new_all(); @@ -97,11 +115,17 @@ impl BlockConnectorService { } } -impl ServiceFactory for BlockConnectorService { - fn create(ctx: &mut ServiceContext) -> Result { +impl ServiceFactory + for BlockConnectorService +where + TransactionPoolServiceT: TxPoolSyncService + 'static, +{ + fn create( + ctx: &mut ServiceContext>, + ) -> Result> { let config = ctx.get_shared::>()?; let bus = ctx.bus_ref().clone(); - let txpool = ctx.get_shared::()?; + let txpool = ctx.get_shared::()?; let storage = ctx.get_shared::>()?; let startup_info = storage .get_startup_info()? @@ -120,7 +144,10 @@ impl ServiceFactory for BlockConnectorService { } } -impl ActorService for BlockConnectorService { +impl ActorService for BlockConnectorService +where + TransactionPoolServiceT: TxPoolSyncService + 'static, +{ fn started(&mut self, ctx: &mut ServiceContext) -> Result<()> { //TODO figure out a more suitable value. ctx.set_mailbox_capacity(1024); @@ -141,11 +168,15 @@ impl ActorService for BlockConnectorService { } } -impl EventHandler for BlockConnectorService { +impl EventHandler + for BlockConnectorService +where + TransactionPoolServiceT: TxPoolSyncService + 'static, +{ fn handle_event( &mut self, _: BlockDiskCheckEvent, - ctx: &mut ServiceContext, + ctx: &mut ServiceContext>, ) { if let Some(res) = self.check_disk_space() { match res { @@ -161,23 +192,70 @@ impl EventHandler for BlockConnectorService { } } -impl EventHandler for BlockConnectorService { +impl EventHandler for BlockConnectorService { fn handle_event( &mut self, msg: BlockConnectedEvent, - _ctx: &mut ServiceContext, + ctx: &mut ServiceContext>, ) { //because this block has execute at sync task, so just try connect to select head chain. //TODO refactor connect and execute let block = msg.block; - if let Err(e) = self.chain_service.try_connect(block) { - error!("Process connected block error: {:?}", e); + let feedback = msg.feedback; + + match msg.action { + crate::tasks::BlockConnectAction::ConnectNewBlock => { + if let Err(e) = self.chain_service.try_connect(block) { + error!("Process connected new block from sync error: {:?}", e); + } + } + crate::tasks::BlockConnectAction::ConnectExecutedBlock => { + if let Err(e) = self.chain_service.switch_new_main(block.header().id(), ctx) { + error!("Process connected executed block from sync error: {:?}", e); + } + } } + + feedback.map(|f| f.unbounded_send(BlockConnectedFinishEvent)); } } -impl EventHandler for BlockConnectorService { +#[cfg(test)] +impl EventHandler for BlockConnectorService { + fn handle_event( + &mut self, + msg: BlockConnectedEvent, + ctx: &mut ServiceContext>, + ) { + //because this block has execute at sync task, so just try connect to select head chain. + //TODO refactor connect and execute + + let block = msg.block; + let feedback = msg.feedback; + + match msg.action { + crate::tasks::BlockConnectAction::ConnectNewBlock => { + if let Err(e) = self.chain_service.apply_failed(block) { + error!("Process connected new block from sync error: {:?}", e); + } + } + crate::tasks::BlockConnectAction::ConnectExecutedBlock => { + if let Err(e) = self.chain_service.switch_new_main(block.header().id(), ctx) { + error!("Process connected executed block from sync error: {:?}", e); + } + } + } + + feedback.map(|f| f.unbounded_send(BlockConnectedFinishEvent)); + } +} + +impl EventHandler + for BlockConnectorService +where + TransactionPoolServiceT: TxPoolSyncService + 'static, +{ fn handle_event(&mut self, msg: MinedBlock, _ctx: &mut ServiceContext) { let MinedBlock(new_block) = msg; let id = new_block.header().id(); @@ -192,13 +270,21 @@ impl EventHandler for BlockConnectorService { } } -impl EventHandler for BlockConnectorService { +impl EventHandler + for BlockConnectorService +where + TransactionPoolServiceT: TxPoolSyncService + 'static, +{ fn handle_event(&mut self, msg: SyncStatusChangeEvent, _ctx: &mut ServiceContext) { self.sync_status = Some(msg.0); } } -impl EventHandler for BlockConnectorService { +impl EventHandler + for BlockConnectorService +where + TransactionPoolServiceT: TxPoolSyncService + 'static, +{ fn handle_event(&mut self, msg: PeerNewBlock, ctx: &mut ServiceContext) { if !self.is_synced() { debug!("[connector] Ignore PeerNewBlock event because the node has not been synchronized yet."); @@ -257,22 +343,50 @@ impl EventHandler for BlockConnectorService { } } -impl ServiceHandler for BlockConnectorService { +impl ServiceHandler + for BlockConnectorService +where + TransactionPoolServiceT: TxPoolSyncService + 'static, +{ fn handle( &mut self, msg: ResetRequest, - _ctx: &mut ServiceContext, + _ctx: &mut ServiceContext>, ) -> Result<()> { self.chain_service.reset(msg.block_hash) } } -impl ServiceHandler for BlockConnectorService { +impl ServiceHandler + for BlockConnectorService +where + TransactionPoolServiceT: TxPoolSyncService + 'static, +{ fn handle( &mut self, msg: ExecuteRequest, - _ctx: &mut ServiceContext, + _ctx: &mut ServiceContext>, ) -> Result { self.chain_service.execute(msg.block) } } + +#[cfg(test)] +impl ServiceHandler + for BlockConnectorService +where + TransactionPoolServiceT: TxPoolSyncService + 'static, +{ + fn handle( + &mut self, + msg: CheckBlockConnectorHashValue, + _ctx: &mut ServiceContext>, + ) -> Result<()> { + if self.chain_service.get_main().status().head().id() == msg.head_hash { + info!("the branch in chain service is the same as target's branch"); + return Ok(()); + } + info!("mock branch in chain service is not the same as target's branch"); + bail!("blockchain in chain service is not the same as target!"); + } +} diff --git a/sync/src/block_connector/mod.rs b/sync/src/block_connector/mod.rs index 05b7cfd2b2..72ea8d3560 100644 --- a/sync/src/block_connector/mod.rs +++ b/sync/src/block_connector/mod.rs @@ -40,3 +40,14 @@ pub struct ExecuteRequest { impl ServiceRequest for ExecuteRequest { type Response = anyhow::Result; } + +#[cfg(test)] +#[derive(Debug, Clone)] +pub struct CheckBlockConnectorHashValue { + pub head_hash: HashValue, +} + +#[cfg(test)] +impl ServiceRequest for CheckBlockConnectorHashValue { + type Response = anyhow::Result<()>; +} diff --git a/sync/src/block_connector/write_block_chain.rs b/sync/src/block_connector/write_block_chain.rs index c22ff42408..cab825e583 100644 --- a/sync/src/block_connector/write_block_chain.rs +++ b/sync/src/block_connector/write_block_chain.rs @@ -10,7 +10,7 @@ use starcoin_crypto::HashValue; use starcoin_executor::VMMetrics; use starcoin_logger::prelude::*; use starcoin_service_registry::bus::{Bus, BusService}; -use starcoin_service_registry::ServiceRef; +use starcoin_service_registry::{ServiceContext, ServiceRef}; use starcoin_storage::Store; use starcoin_txpool_api::TxPoolSyncService; use starcoin_types::block::BlockInfo; @@ -22,6 +22,8 @@ use starcoin_types::{ use std::fmt::Formatter; use std::sync::Arc; +use super::BlockConnectorService; + const MAX_ROLL_BACK_BLOCK: usize = 10; pub struct WriteBlockChainService

@@ -93,15 +95,15 @@ where } } -impl

WriteBlockChainService

+impl WriteBlockChainService where - P: TxPoolSyncService + 'static, + TransactionPoolServiceT: TxPoolSyncService + 'static, { pub fn new( config: Arc, startup_info: StartupInfo, storage: Arc, - txpool: P, + txpool: TransactionPoolServiceT, bus: ServiceRef, vm_metrics: Option, ) -> Result { @@ -169,6 +171,42 @@ where &self.main } + #[cfg(test)] + pub fn apply_failed(&mut self, block: Block) -> Result<()> { + use anyhow::bail; + use starcoin_chain::verifier::FullVerifier; + + // apply but no connection + let verified_block = self.main.verify_with_verifier::(block)?; + let _executed_block = self.main.execute(verified_block)?; + + bail!("failed to apply for tesing the connection later!"); + } + + // for sync task to connect to its chain, if chain's total difficulties is larger than the main + // switch by: + // 1, update the startup info + // 2, broadcast the new header + pub fn switch_new_main( + &mut self, + new_head_block: HashValue, + _ctx: &mut ServiceContext>, + ) -> Result<()> + where + TransactionPoolServiceT: TxPoolSyncService, + { + let new_branch = BlockChain::new( + self.config.net().time_service(), + new_head_block, + self.storage.clone(), + self.vm_metrics.clone(), + )?; + + self.select_head(new_branch)?; + + Ok(()) + } + pub fn select_head(&mut self, new_branch: BlockChain) -> Result<()> { let executed_block = new_branch.head_block(); let main_total_difficulty = self.main.get_total_difficulty()?; diff --git a/sync/src/sync.rs b/sync/src/sync.rs index dd4bb57f3c..abfd0ddd6b 100644 --- a/sync/src/sync.rs +++ b/sync/src/sync.rs @@ -26,6 +26,7 @@ use starcoin_sync_api::{ PeerScoreRequest, PeerScoreResponse, SyncCancelRequest, SyncProgressReport, SyncProgressRequest, SyncServiceHandler, SyncStartRequest, SyncStatusRequest, SyncTarget, }; +use starcoin_txpool::TxPoolService; use starcoin_types::block::BlockIdAndNumber; use starcoin_types::startup_info::ChainStatus; use starcoin_types::sync_status::SyncStatus; @@ -144,7 +145,9 @@ impl SyncService { let network = ctx.get_shared::()?; let storage = self.storage.clone(); let self_ref = ctx.self_ref(); - let connector_service = ctx.service_ref::()?.clone(); + let connector_service = ctx + .service_ref::>()? + .clone(); let config = self.config.clone(); let peer_score_metrics = self.peer_score_metrics.clone(); let sync_metrics = self.metrics.clone(); diff --git a/sync/src/tasks/block_sync_task.rs b/sync/src/tasks/block_sync_task.rs index 57f6703a9d..3fe42d66cc 100644 --- a/sync/src/tasks/block_sync_task.rs +++ b/sync/src/tasks/block_sync_task.rs @@ -18,8 +18,11 @@ use starcoin_sync_api::SyncTarget; use starcoin_types::block::{Block, BlockIdAndNumber, BlockInfo, BlockNumber}; use std::collections::HashMap; use std::sync::Arc; +use std::time::Duration; use stream_task::{CollectorState, TaskError, TaskResultCollector, TaskState}; +use super::{BlockConnectAction, BlockConnectedFinishEvent}; + #[derive(Clone, Debug)] pub struct SyncBlockData { pub(crate) block: Block, @@ -217,6 +220,69 @@ where self.apply_block(block, None) } + fn notify_connected_block( + &mut self, + block: Block, + block_info: BlockInfo, + action: BlockConnectAction, + state: CollectorState, + ) -> Result { + let total_difficulty = block_info.get_total_difficulty(); + + // if the new block's total difficulty is smaller than the current, + // do nothing because we do not need to update the current chain in any other services. + if total_difficulty <= self.current_block_info.total_difficulty { + return Ok(state); // nothing to do + } + + // only try connect block when sync chain total_difficulty > node's current chain. + + // first, create the sender and receiver for ensuring that + // the last block is connected before the next synchronization is triggered. + // if the block is not the last one, we do not want to do this. + let (sender, mut receiver) = match state { + CollectorState::Enough => { + let (s, r) = futures::channel::mpsc::unbounded::(); + (Some(s), Some(r)) + } + CollectorState::Need => (None, None), + }; + + // second, construct the block connect event. + let block_connect_event = BlockConnectedEvent { + block, + feedback: sender, + action, + }; + + // third, broadcast it. + if let Err(e) = self.event_handle.handle(block_connect_event.clone()) { + error!( + "Send BlockConnectedEvent error: {:?}, block_id: {}", + e, + block_info.block_id() + ); + } + + // finally, if it is the last one, wait for the last block to be processed. + if block_connect_event.feedback.is_some() && receiver.is_some() { + let mut count: i32 = 0; + while count < 3 { + count = count.saturating_add(1); + match receiver.as_mut().unwrap().try_next() { + Ok(_) => { + break; + } + Err(_) => { + info!("Waiting for last block to be processed"); + async_std::task::block_on(async_std::task::sleep(Duration::from_secs(10))); + } + } + } + } + Ok(state) + } + fn apply_block(&mut self, block: Block, peer_id: Option) -> Result<()> { if let Some((_failed_block, pre_peer_id, err, version)) = self .chain @@ -293,59 +359,53 @@ where fn collect(&mut self, item: SyncBlockData) -> Result { let (block, block_info, peer_id) = item.into(); - let block_id = block.id(); let timestamp = block.header().timestamp(); - let block_info = match block_info { + let (block_info, action) = match block_info { Some(block_info) => { //If block_info exists, it means that this block was already executed and try connect in the previous sync, but the sync task was interrupted. //So, we just need to update chain and continue self.chain.connect(ExecutedBlock { - block, + block: block.clone(), block_info: block_info.clone(), })?; - block_info + (block_info, BlockConnectAction::ConnectExecutedBlock) } None => { self.apply_block(block.clone(), peer_id)?; self.chain.time_service().adjust(timestamp); - let block_info = self.chain.status().info; - let total_difficulty = block_info.get_total_difficulty(); - // only try connect block when sync chain total_difficulty > node's current chain. - if total_difficulty > self.current_block_info.total_difficulty { - if let Err(e) = self.event_handle.handle(BlockConnectedEvent { block }) { - error!( - "Send BlockConnectedEvent error: {:?}, block_id: {}", - e, block_id - ); - } - } - block_info + ( + self.chain.status().info, + BlockConnectAction::ConnectNewBlock, + ) } }; //verify target - if block_info.block_accumulator_info.num_leaves - == self.target.block_info.block_accumulator_info.num_leaves - { - if block_info != self.target.block_info { - Err(TaskError::BreakError( - RpcVerifyError::new_with_peers( - self.target.peers.clone(), - format!( + let state: Result = + if block_info.block_accumulator_info.num_leaves + == self.target.block_info.block_accumulator_info.num_leaves + { + if block_info != self.target.block_info { + Err(TaskError::BreakError( + RpcVerifyError::new_with_peers( + self.target.peers.clone(), + format!( "Verify target error, expect target: {:?}, collect target block_info:{:?}", self.target.block_info, block_info ), + ) + .into(), ) - .into(), - ) - .into()) + .into()) + } else { + Ok(CollectorState::Enough) + } } else { - Ok(CollectorState::Enough) - } - } else { - Ok(CollectorState::Need) - } + Ok(CollectorState::Need) + }; + + self.notify_connected_block(block, block_info, action, state?) } fn finish(self) -> Result { diff --git a/sync/src/tasks/inner_sync_task.rs b/sync/src/tasks/inner_sync_task.rs index 7552656417..8d0f70e953 100644 --- a/sync/src/tasks/inner_sync_task.rs +++ b/sync/src/tasks/inner_sync_task.rs @@ -117,7 +117,7 @@ where ) .and_then(move |(ancestor, accumulator), event_handle| { let check_local_store = - ancestor_block_info.total_difficulty < current_block_info.total_difficulty; + ancestor_block_info.total_difficulty <= current_block_info.total_difficulty; let block_sync_task = BlockSyncTask::new( accumulator, diff --git a/sync/src/tasks/mock.rs b/sync/src/tasks/mock.rs index 5f5c66034d..6b9ddb3296 100644 --- a/sync/src/tasks/mock.rs +++ b/sync/src/tasks/mock.rs @@ -4,7 +4,7 @@ use crate::tasks::{ BlockConnectedEvent, BlockFetcher, BlockIdFetcher, BlockInfoFetcher, PeerOperator, SyncFetcher, }; -use anyhow::{format_err, Context, Result}; +use anyhow::{format_err, Context, Ok, Result}; use async_std::task::JoinHandle; use futures::channel::mpsc::UnboundedReceiver; use futures::future::BoxFuture; @@ -14,6 +14,7 @@ use network_api::messages::NotificationMessage; use network_api::{PeerId, PeerInfo, PeerSelector, PeerStrategy}; use network_p2p_core::{NetRpcError, RpcErrorCode}; use rand::Rng; +use starcoin_account_api::AccountInfo; use starcoin_accumulator::{Accumulator, MerkleAccumulator}; use starcoin_chain::BlockChain; use starcoin_chain_api::ChainReader; @@ -21,8 +22,10 @@ use starcoin_chain_mock::MockChain; use starcoin_config::ChainNetwork; use starcoin_crypto::HashValue; use starcoin_network_rpc_api::G_RPC_INFO; +use starcoin_storage::Storage; use starcoin_sync_api::SyncTarget; use starcoin_types::block::{Block, BlockIdAndNumber, BlockInfo, BlockNumber}; +use starcoin_types::startup_info::ChainInfo; use std::sync::Arc; use std::time::Duration; @@ -162,6 +165,33 @@ impl SyncNodeMocker { )) } + pub fn new_with_storage( + net: ChainNetwork, + storage: Arc, + chain_info: ChainInfo, + miner: AccountInfo, + delay_milliseconds: u64, + random_error_percent: u32, + ) -> Result { + let chain = MockChain::new_with_storage(net, storage, chain_info.head().id(), miner)?; + let peer_id = PeerId::random(); + let peer_info = PeerInfo::new( + peer_id.clone(), + chain.chain_info(), + NotificationMessage::protocols(), + G_RPC_INFO.clone().into_protocols(), + None, + ); + let peer_selector = PeerSelector::new(vec![peer_info], PeerStrategy::default(), None); + Ok(Self::new_inner( + peer_id, + chain, + ErrorStrategy::Timeout(delay_milliseconds), + random_error_percent, + peer_selector, + )) + } + pub fn new_with_strategy( net: ChainNetwork, error_strategy: ErrorStrategy, diff --git a/sync/src/tasks/mod.rs b/sync/src/tasks/mod.rs index 1ed2424924..7577beaa00 100644 --- a/sync/src/tasks/mod.rs +++ b/sync/src/tasks/mod.rs @@ -1,6 +1,7 @@ // Copyright (c) The Starcoin Core Contributors // SPDX-License-Identifier: Apache-2.0 +use crate::block_connector::BlockConnectorService; use crate::tasks::block_sync_task::SyncBlockData; use crate::tasks::inner_sync_task::InnerSyncTask; use crate::verified_rpc_client::{RpcVerifyError, VerifiedRpcClient}; @@ -19,6 +20,9 @@ use starcoin_service_registry::{ActorService, EventHandler, ServiceRef}; use starcoin_storage::Store; use starcoin_sync_api::SyncTarget; use starcoin_time_service::TimeService; +use starcoin_txpool::TxPoolService; +#[cfg(test)] +use starcoin_txpool_mock_service::MockTxPoolService; use starcoin_types::block::{Block, BlockIdAndNumber, BlockInfo, BlockNumber}; use starcoin_types::startup_info::ChainStatus; use starcoin_types::U256; @@ -380,11 +384,22 @@ impl BlockLocalStore for Arc { } } +#[derive(Clone, Debug)] +pub enum BlockConnectAction { + ConnectNewBlock, + ConnectExecutedBlock, +} + #[derive(Clone, Debug)] pub struct BlockConnectedEvent { pub block: Block, + pub feedback: Option>, + pub action: BlockConnectAction, } +#[derive(Clone, Debug)] +pub struct BlockConnectedFinishEvent; + #[derive(Clone, Debug)] pub struct BlockDiskCheckEvent {} @@ -392,10 +407,15 @@ pub trait BlockConnectedEventHandle: Send + Clone + std::marker::Unpin { fn handle(&mut self, event: BlockConnectedEvent) -> Result<()>; } -impl BlockConnectedEventHandle for ServiceRef -where - S: ActorService + EventHandler, -{ +impl BlockConnectedEventHandle for ServiceRef> { + fn handle(&mut self, event: BlockConnectedEvent) -> Result<()> { + self.notify(event)?; + Ok(()) + } +} + +#[cfg(test)] +impl BlockConnectedEventHandle for ServiceRef> { fn handle(&mut self, event: BlockConnectedEvent) -> Result<()> { self.notify(event)?; Ok(()) @@ -459,6 +479,24 @@ impl BlockConnectedEventHandle for UnboundedSender { } } +#[derive(Debug, Clone)] +pub struct BlockConnectEventHandleMock { + sender: UnboundedSender, +} + +impl BlockConnectEventHandleMock { + pub fn new(sender: UnboundedSender) -> Result { + Ok(Self { sender }) + } +} + +impl BlockConnectedEventHandle for BlockConnectEventHandleMock { + fn handle(&mut self, event: BlockConnectedEvent) -> Result<()> { + self.sender.start_send(event)?; + Ok(()) + } +} + pub struct ExtSyncTaskErrorHandle where F: SyncFetcher + 'static, diff --git a/sync/src/tasks/tests.rs b/sync/src/tasks/tests.rs index 06206f227e..bfa9b02e51 100644 --- a/sync/src/tasks/tests.rs +++ b/sync/src/tasks/tests.rs @@ -2,6 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 #![allow(clippy::integer_arithmetic)] +use crate::block_connector::{BlockConnectorService, CheckBlockConnectorHashValue}; use crate::tasks::block_sync_task::SyncBlockData; use crate::tasks::mock::{ErrorStrategy, MockBlockIdFetcher, SyncNodeMocker}; use crate::tasks::{ @@ -9,37 +10,44 @@ use crate::tasks::{ BlockCollector, BlockFetcher, BlockLocalStore, BlockSyncTask, FindAncestorTask, SyncFetcher, }; use crate::verified_rpc_client::RpcVerifyError; -use anyhow::Context; use anyhow::{format_err, Result}; +use anyhow::{Context, Ok}; use futures::channel::mpsc::unbounded; use futures::future::BoxFuture; use futures::FutureExt; use futures_timer::Delay; use network_api::{PeerId, PeerInfo, PeerSelector, PeerStrategy}; use pin_utils::core_reexport::time::Duration; +use starcoin_account_api::AccountInfo; use starcoin_accumulator::accumulator_info::AccumulatorInfo; use starcoin_accumulator::tree_store::mock::MockAccumulatorStore; use starcoin_accumulator::{Accumulator, MerkleAccumulator}; use starcoin_chain::BlockChain; use starcoin_chain_api::ChainReader; use starcoin_chain_mock::MockChain; -use starcoin_config::{BuiltinNetworkID, ChainNetwork}; +use starcoin_config::{BuiltinNetworkID, ChainNetwork, NodeConfig}; use starcoin_crypto::HashValue; use starcoin_genesis::Genesis; +use starcoin_genesis::Genesis as StarcoinGenesis; use starcoin_logger::prelude::*; -use starcoin_storage::BlockStore; +use starcoin_service_registry::{RegistryAsyncService, RegistryService, ServiceRef}; +use starcoin_storage::{BlockStore, Storage}; use starcoin_sync_api::SyncTarget; +use starcoin_txpool_mock_service::MockTxPoolService; use starcoin_types::{ block::{Block, BlockBody, BlockHeaderBuilder, BlockIdAndNumber, BlockInfo}, U256, }; use std::collections::HashMap; use std::sync::{Arc, Mutex}; +use stest::actix_export::System; use stream_task::{ DefaultCustomErrorHandle, Generator, TaskError, TaskEventCounterHandle, TaskGenerator, }; use test_helper::DummyNetworkService; +use super::BlockConnectedEvent; + #[stest::test(timeout = 120)] pub async fn test_full_sync_new_node() -> Result<()> { let net1 = ChainNetwork::new_builtin(BuiltinNetworkID::Test); @@ -984,3 +992,234 @@ async fn test_sync_target() { assert_eq!(target.target_id.number(), low_chain_info.head().number()); assert_eq!(target.target_id.id(), low_chain_info.head().id()); } + +fn sync_block_in_async_connection( + mut target_node: Arc, + local_node: Arc, + storage: Arc, + block_count: u64, +) -> Result> { + Arc::get_mut(&mut target_node) + .unwrap() + .produce_block(block_count)?; + let target = target_node.sync_target(); + let target_id = target.target_id.id(); + + let (sender, mut receiver) = futures::channel::mpsc::unbounded::(); + let thread_local_node = local_node.clone(); + + let process_block = move || { + let mut chain = MockChain::new_with_storage( + thread_local_node.chain_mocker.net().clone(), + storage.clone(), + thread_local_node.chain_mocker.head().status().head.id(), + thread_local_node.chain_mocker.miner().clone(), + ) + .unwrap(); + loop { + if let std::result::Result::Ok(result) = receiver.try_next() { + match result { + Some(event) => { + chain + .select_head(event.block) + .expect("select head must be successful"); + if event.feedback.is_some() { + event + .feedback + .unwrap() + .unbounded_send(super::BlockConnectedFinishEvent) + .unwrap(); + assert_eq!(target_id, chain.head().status().head.id()); + break; + } + } + None => break, + } + } + } + }; + let handle = std::thread::spawn(process_block); + + let current_block_header = local_node.chain().current_header(); + let storage = local_node.chain().get_storage(); + + let local_net = local_node.chain_mocker.net(); + let (local_ancestor_sender, _local_ancestor_receiver) = unbounded(); + + let (sync_task, _task_handle, task_event_counter) = full_sync_task( + current_block_header.id(), + target.clone(), + false, + local_net.time_service(), + storage.clone(), + sender, + target_node.clone(), + local_ancestor_sender, + DummyNetworkService::default(), + 15, + None, + None, + )?; + let branch = async_std::task::block_on(sync_task)?; + assert_eq!(branch.current_header().id(), target.target_id.id()); + + handle.join().unwrap(); + + let reports = task_event_counter.get_reports(); + reports + .iter() + .for_each(|report| debug!("reports: {}", report)); + + Ok(target_node) +} + +#[stest::test] +async fn test_sync_block_in_async_connection() -> Result<()> { + let net = ChainNetwork::new_builtin(BuiltinNetworkID::Test); + let mut target_node = Arc::new(SyncNodeMocker::new(net.clone(), 1, 0)?); + + let (storage, chain_info, _) = + Genesis::init_storage_for_test(&net).expect("init storage by genesis fail."); + let local_node = Arc::new(SyncNodeMocker::new_with_storage( + net, + storage.clone(), + chain_info, + AccountInfo::random(), + 1, + 0, + )?); + + target_node = + sync_block_in_async_connection(target_node, local_node.clone(), storage.clone(), 10)?; + _ = sync_block_in_async_connection(target_node, local_node, storage, 20)?; + + Ok(()) +} + +fn sync_block_in_block_connection_service_mock( + mut target_node: Arc, + local_node: Arc, + registry: &ServiceRef, + block_count: u64, +) -> Result> { + Arc::get_mut(&mut target_node) + .unwrap() + .produce_block(block_count)?; + loop { + let target = target_node.sync_target(); + + let storage = local_node.chain().get_storage(); + let startup_info = storage + .get_startup_info()? + .ok_or_else(|| format_err!("Startup info should exist."))?; + let current_block_id = startup_info.main; + + let local_net = local_node.chain_mocker.net(); + let (local_ancestor_sender, _local_ancestor_receiver) = unbounded(); + + let (sync_task, _task_handle, task_event_counter) = full_sync_task( + current_block_id, + target.clone(), + false, + local_net.time_service(), + storage.clone(), + async_std::task::block_on( + registry.service_ref::>(), + )? + .clone(), + target_node.clone(), + local_ancestor_sender, + DummyNetworkService::default(), + 15, + None, + None, + )?; + let branch = async_std::task::block_on(sync_task)?; + info!("checking branch in sync service is the same as target's branch"); + assert_eq!(branch.current_header().id(), target.target_id.id()); + + let block_connector_service = async_std::task::block_on( + registry.service_ref::>(), + )? + .clone(); + let result = async_std::task::block_on(block_connector_service.send( + CheckBlockConnectorHashValue { + head_hash: target.target_id.id(), + }, + ))?; + if result.is_ok() { + break; + } + let reports = task_event_counter.get_reports(); + reports + .iter() + .for_each(|report| debug!("reports: {}", report)); + } + + Ok(target_node) +} + +#[stest::test] +async fn test_sync_block_apply_failed_but_connect_success() -> Result<()> { + let config = Arc::new(NodeConfig::random_for_test()); + let (storage, chain_info, _) = StarcoinGenesis::init_storage_for_test(config.net()) + .expect("init storage by genesis fail."); + + let target_node = Arc::new(SyncNodeMocker::new(config.net().clone(), 1, 0)?); + let local_node = Arc::new(SyncNodeMocker::new_with_storage( + config.net().clone(), + storage.clone(), + chain_info.clone(), + AccountInfo::random(), + 1, + 0, + )?); + + let (registry_sender, registry_receiver) = async_std::channel::unbounded(); + + let _handle = timeout_join_handler::spawn(move || { + let system = System::with_tokio_rt(|| { + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .on_thread_stop(|| debug!("main thread stopped")) + .thread_name("main") + .build() + .expect("failed to create tokio runtime for main") + }); + async_std::task::block_on(async { + let registry = RegistryService::launch(); + + registry.put_shared(config.clone()).await.unwrap(); + registry.put_shared(storage.clone()).await.unwrap(); + registry.put_shared(MockTxPoolService::new()).await.unwrap(); + + Delay::new(Duration::from_secs(2)).await; + + registry + .register::>() + .await + .unwrap(); + + registry_sender.send(registry).await.unwrap(); + }); + + system.run().unwrap(); + }); + + let registry = registry_receiver.recv().await.unwrap(); + + let target_node = sync_block_in_block_connection_service_mock( + target_node, + local_node.clone(), + ®istry, + 10, + )?; + _ = sync_block_in_block_connection_service_mock( + target_node, + local_node.clone(), + ®istry, + 20, + )?; + + Ok(()) +}