Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

select head when sync failed and retry #3973

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion block-relayer/src/block_relayer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,9 @@ impl BlockRelayer {
ctx: &mut ServiceContext<BlockRelayer>,
) -> Result<()> {
let network = ctx.get_shared::<NetworkServiceRef>()?;
let block_connector_service = ctx.service_ref::<BlockConnectorService>()?.clone();
let block_connector_service = ctx
.service_ref::<BlockConnectorService<TxPoolService>>()?
.clone();
let txpool = self.txpool.clone();
let metrics = self.metrics.clone();
let fut = async move {
Expand Down
2 changes: 1 addition & 1 deletion config/src/available_port.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ fn get_ephemeral_port() -> ::std::io::Result<u16> {
use std::net::{TcpListener, TcpStream};

// Request a random available port from the OS
let listener = TcpListener::bind(("localhost", 0))?;
let listener = TcpListener::bind(("127.0.0.1", 0))?;
let addr = listener.local_addr()?;

// Create and accept a connection (which we'll promptly drop) in order to force the port
Expand Down
2 changes: 1 addition & 1 deletion network/tests/network_node_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ fn test_reconnected_peers() -> anyhow::Result<()> {

// stop node2, node1's peers is empty
node2.stop()?;
thread::sleep(Duration::from_secs(3));
thread::sleep(Duration::from_secs(12));
loop {
let network_state = block_on(async { node1_network.network_state().await })?;
debug!("network_state: {:?}", network_state);
Expand Down
14 changes: 10 additions & 4 deletions node/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ use starcoin_sync::block_connector::{BlockConnectorService, ExecuteRequest, Rese
use starcoin_sync::sync::SyncService;
use starcoin_sync::txn_sync::TxnSyncService;
use starcoin_sync::verified_rpc_client::VerifiedRpcClient;
use starcoin_txpool::TxPoolActorService;
use starcoin_txpool::{TxPoolActorService, TxPoolService};
use starcoin_types::system_events::{SystemShutdown, SystemStarted};
use starcoin_vm_runtime::metrics::VMMetrics;
use std::sync::Arc;
Expand Down Expand Up @@ -133,7 +133,9 @@ impl ServiceHandler<Self, NodeRequest> for NodeService {
.start_service_sync(GenerateBlockEventPacemaker::service_name()),
),
NodeRequest::ResetNode(block_hash) => {
let connect_service = ctx.service_ref::<BlockConnectorService>()?.clone();
let connect_service = ctx
.service_ref::<BlockConnectorService<TxPoolService>>()?
.clone();
let fut = async move {
info!("Prepare to reset node startup info to {}", block_hash);
connect_service.send(ResetRequest { block_hash }).await?
Expand All @@ -147,7 +149,9 @@ impl ServiceHandler<Self, NodeRequest> for NodeService {
.get_shared_sync::<Arc<Storage>>()
.expect("Storage must exist.");

let connect_service = ctx.service_ref::<BlockConnectorService>()?.clone();
let connect_service = ctx
.service_ref::<BlockConnectorService<TxPoolService>>()?
.clone();
let network = ctx.get_shared::<NetworkServiceRef>()?;
let fut = async move {
info!("Prepare to re execute block {}", block_hash);
Expand Down Expand Up @@ -347,7 +351,9 @@ impl NodeService {

registry.register::<ChainNotifyHandlerService>().await?;

registry.register::<BlockConnectorService>().await?;
registry
.register::<BlockConnectorService<TxPoolService>>()
.await?;
registry.register::<SyncService>().await?;

let block_relayer = registry.register::<BlockRelayer>().await?;
Expand Down
1 change: 1 addition & 0 deletions sync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ stest = { workspace = true }
stream-task = { workspace = true }
sysinfo = { workspace = true }
thiserror = { workspace = true }
timeout-join-handler = { workspace = true }

[dev-dependencies]
hex = { workspace = true }
Expand Down
160 changes: 137 additions & 23 deletions sync/src/block_connector/block_connector_service.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
// Copyright (c) The Starcoin Core Contributors
// SPDX-License-Identifier: Apache-2.0

#[cfg(test)]
use super::CheckBlockConnectorHashValue;
use crate::block_connector::{ExecuteRequest, ResetRequest, WriteBlockChainService};
use crate::sync::{CheckSyncEvent, SyncService};
use crate::tasks::{BlockConnectedEvent, BlockDiskCheckEvent};
use crate::tasks::{BlockConnectedEvent, BlockConnectedFinishEvent, BlockDiskCheckEvent};
#[cfg(test)]
use anyhow::bail;
use anyhow::{format_err, Result};
use network_api::PeerProvider;
use starcoin_chain_api::{ConnectBlockError, WriteableChainService};
use starcoin_chain_api::{ChainReader, ConnectBlockError, WriteableChainService};
use starcoin_config::{NodeConfig, G_CRATE_VERSION};
use starcoin_crypto::HashValue;
use starcoin_executor::VMMetrics;
use starcoin_logger::prelude::*;
use starcoin_network::NetworkServiceRef;
Expand All @@ -17,6 +22,9 @@ use starcoin_service_registry::{
use starcoin_storage::{BlockStore, Storage};
use starcoin_sync_api::PeerNewBlock;
use starcoin_txpool::TxPoolService;
use starcoin_txpool_api::TxPoolSyncService;
#[cfg(test)]
use starcoin_txpool_mock_service::MockTxPoolService;
use starcoin_types::block::ExecutedBlock;
use starcoin_types::sync_status::SyncStatus;
use starcoin_types::system_events::{MinedBlock, SyncStatusChangeEvent, SystemShutdown};
Expand All @@ -26,15 +34,21 @@ use sysinfo::{DiskExt, System, SystemExt};
const DISK_CHECKPOINT_FOR_PANIC: u64 = 1024 * 1024 * 1024 * 3;
const DISK_CHECKPOINT_FOR_WARN: u64 = 1024 * 1024 * 1024 * 5;

pub struct BlockConnectorService {
chain_service: WriteBlockChainService<TxPoolService>,
pub struct BlockConnectorService<TransactionPoolServiceT>
where
TransactionPoolServiceT: TxPoolSyncService + 'static,
{
chain_service: WriteBlockChainService<TransactionPoolServiceT>,
sync_status: Option<SyncStatus>,
config: Arc<NodeConfig>,
}

impl BlockConnectorService {
impl<TransactionPoolServiceT> BlockConnectorService<TransactionPoolServiceT>
where
TransactionPoolServiceT: TxPoolSyncService + 'static,
{
pub fn new(
chain_service: WriteBlockChainService<TxPoolService>,
chain_service: WriteBlockChainService<TransactionPoolServiceT>,
config: Arc<NodeConfig>,
) -> Self {
Self {
Expand All @@ -51,6 +65,10 @@ impl BlockConnectorService {
}
}

pub fn chain_head_id(&self) -> HashValue {
self.chain_service.get_main().status().head.id()
}

pub fn check_disk_space(&mut self) -> Option<Result<u64>> {
if System::IS_SUPPORTED {
let mut sys = System::new_all();
Expand Down Expand Up @@ -97,11 +115,17 @@ impl BlockConnectorService {
}
}

impl ServiceFactory<Self> for BlockConnectorService {
fn create(ctx: &mut ServiceContext<BlockConnectorService>) -> Result<BlockConnectorService> {
impl<TransactionPoolServiceT> ServiceFactory<Self>
for BlockConnectorService<TransactionPoolServiceT>
where
TransactionPoolServiceT: TxPoolSyncService + 'static,
{
fn create(
ctx: &mut ServiceContext<BlockConnectorService<TransactionPoolServiceT>>,
) -> Result<BlockConnectorService<TransactionPoolServiceT>> {
let config = ctx.get_shared::<Arc<NodeConfig>>()?;
let bus = ctx.bus_ref().clone();
let txpool = ctx.get_shared::<TxPoolService>()?;
let txpool = ctx.get_shared::<TransactionPoolServiceT>()?;
let storage = ctx.get_shared::<Arc<Storage>>()?;
let startup_info = storage
.get_startup_info()?
Expand All @@ -120,7 +144,10 @@ impl ServiceFactory<Self> for BlockConnectorService {
}
}

impl ActorService for BlockConnectorService {
impl<TransactionPoolServiceT> ActorService for BlockConnectorService<TransactionPoolServiceT>
where
TransactionPoolServiceT: TxPoolSyncService + 'static,
{
fn started(&mut self, ctx: &mut ServiceContext<Self>) -> Result<()> {
//TODO figure out a more suitable value.
ctx.set_mailbox_capacity(1024);
Expand All @@ -141,11 +168,15 @@ impl ActorService for BlockConnectorService {
}
}

impl EventHandler<Self, BlockDiskCheckEvent> for BlockConnectorService {
impl<TransactionPoolServiceT> EventHandler<Self, BlockDiskCheckEvent>
for BlockConnectorService<TransactionPoolServiceT>
where
TransactionPoolServiceT: TxPoolSyncService + 'static,
{
fn handle_event(
&mut self,
_: BlockDiskCheckEvent,
ctx: &mut ServiceContext<BlockConnectorService>,
ctx: &mut ServiceContext<BlockConnectorService<TransactionPoolServiceT>>,
) {
if let Some(res) = self.check_disk_space() {
match res {
Expand All @@ -161,23 +192,70 @@ impl EventHandler<Self, BlockDiskCheckEvent> for BlockConnectorService {
}
}

impl EventHandler<Self, BlockConnectedEvent> for BlockConnectorService {
impl EventHandler<Self, BlockConnectedEvent> for BlockConnectorService<TxPoolService> {
fn handle_event(
&mut self,
msg: BlockConnectedEvent,
_ctx: &mut ServiceContext<BlockConnectorService>,
ctx: &mut ServiceContext<BlockConnectorService<TxPoolService>>,
) {
//because this block has execute at sync task, so just try connect to select head chain.
//TODO refactor connect and execute

let block = msg.block;
if let Err(e) = self.chain_service.try_connect(block) {
error!("Process connected block error: {:?}", e);
let feedback = msg.feedback;

match msg.action {
crate::tasks::BlockConnectAction::ConnectNewBlock => {
if let Err(e) = self.chain_service.try_connect(block) {
error!("Process connected new block from sync error: {:?}", e);
}
}
crate::tasks::BlockConnectAction::ConnectExecutedBlock => {
if let Err(e) = self.chain_service.switch_new_main(block.header().id(), ctx) {
error!("Process connected executed block from sync error: {:?}", e);
}
}
}

feedback.map(|f| f.unbounded_send(BlockConnectedFinishEvent));
}
}

impl EventHandler<Self, MinedBlock> for BlockConnectorService {
#[cfg(test)]
impl EventHandler<Self, BlockConnectedEvent> for BlockConnectorService<MockTxPoolService> {
fn handle_event(
&mut self,
msg: BlockConnectedEvent,
ctx: &mut ServiceContext<BlockConnectorService<MockTxPoolService>>,
) {
//because this block has execute at sync task, so just try connect to select head chain.
//TODO refactor connect and execute

let block = msg.block;
let feedback = msg.feedback;

match msg.action {
crate::tasks::BlockConnectAction::ConnectNewBlock => {
if let Err(e) = self.chain_service.apply_failed(block) {
error!("Process connected new block from sync error: {:?}", e);
}
}
crate::tasks::BlockConnectAction::ConnectExecutedBlock => {
if let Err(e) = self.chain_service.switch_new_main(block.header().id(), ctx) {
error!("Process connected executed block from sync error: {:?}", e);
}
}
}

feedback.map(|f| f.unbounded_send(BlockConnectedFinishEvent));
}
}

impl<TransactionPoolServiceT> EventHandler<Self, MinedBlock>
for BlockConnectorService<TransactionPoolServiceT>
where
TransactionPoolServiceT: TxPoolSyncService + 'static,
{
fn handle_event(&mut self, msg: MinedBlock, _ctx: &mut ServiceContext<Self>) {
let MinedBlock(new_block) = msg;
let id = new_block.header().id();
Expand All @@ -192,13 +270,21 @@ impl EventHandler<Self, MinedBlock> for BlockConnectorService {
}
}

impl EventHandler<Self, SyncStatusChangeEvent> for BlockConnectorService {
impl<TransactionPoolServiceT> EventHandler<Self, SyncStatusChangeEvent>
for BlockConnectorService<TransactionPoolServiceT>
where
TransactionPoolServiceT: TxPoolSyncService + 'static,
{
fn handle_event(&mut self, msg: SyncStatusChangeEvent, _ctx: &mut ServiceContext<Self>) {
self.sync_status = Some(msg.0);
}
}

impl EventHandler<Self, PeerNewBlock> for BlockConnectorService {
impl<TransactionPoolServiceT> EventHandler<Self, PeerNewBlock>
for BlockConnectorService<TransactionPoolServiceT>
where
TransactionPoolServiceT: TxPoolSyncService + 'static,
{
fn handle_event(&mut self, msg: PeerNewBlock, ctx: &mut ServiceContext<Self>) {
if !self.is_synced() {
debug!("[connector] Ignore PeerNewBlock event because the node has not been synchronized yet.");
Expand Down Expand Up @@ -257,22 +343,50 @@ impl EventHandler<Self, PeerNewBlock> for BlockConnectorService {
}
}

impl ServiceHandler<Self, ResetRequest> for BlockConnectorService {
impl<TransactionPoolServiceT> ServiceHandler<Self, ResetRequest>
for BlockConnectorService<TransactionPoolServiceT>
where
TransactionPoolServiceT: TxPoolSyncService + 'static,
{
fn handle(
&mut self,
msg: ResetRequest,
_ctx: &mut ServiceContext<BlockConnectorService>,
_ctx: &mut ServiceContext<BlockConnectorService<TransactionPoolServiceT>>,
) -> Result<()> {
self.chain_service.reset(msg.block_hash)
}
}

impl ServiceHandler<Self, ExecuteRequest> for BlockConnectorService {
impl<TransactionPoolServiceT> ServiceHandler<Self, ExecuteRequest>
for BlockConnectorService<TransactionPoolServiceT>
where
TransactionPoolServiceT: TxPoolSyncService + 'static,
{
fn handle(
&mut self,
msg: ExecuteRequest,
_ctx: &mut ServiceContext<BlockConnectorService>,
_ctx: &mut ServiceContext<BlockConnectorService<TransactionPoolServiceT>>,
) -> Result<ExecutedBlock> {
self.chain_service.execute(msg.block)
}
}

#[cfg(test)]
impl<TransactionPoolServiceT> ServiceHandler<Self, CheckBlockConnectorHashValue>
for BlockConnectorService<TransactionPoolServiceT>
where
TransactionPoolServiceT: TxPoolSyncService + 'static,
{
fn handle(
&mut self,
msg: CheckBlockConnectorHashValue,
_ctx: &mut ServiceContext<BlockConnectorService<TransactionPoolServiceT>>,
) -> Result<()> {
if self.chain_service.get_main().status().head().id() == msg.head_hash {
info!("the branch in chain service is the same as target's branch");
return Ok(());
}
info!("mock branch in chain service is not the same as target's branch");
bail!("blockchain in chain service is not the same as target!");
}
}
11 changes: 11 additions & 0 deletions sync/src/block_connector/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,14 @@ pub struct ExecuteRequest {
impl ServiceRequest for ExecuteRequest {
type Response = anyhow::Result<ExecutedBlock>;
}

#[cfg(test)]
#[derive(Debug, Clone)]
pub struct CheckBlockConnectorHashValue {
pub head_hash: HashValue,
}

#[cfg(test)]
impl ServiceRequest for CheckBlockConnectorHashValue {
type Response = anyhow::Result<()>;
}
Loading
Loading