From b73256f19cbfab08080a7fd608267319ca69d28d Mon Sep 17 00:00:00 2001 From: Svyatoslav Nikolsky Date: Tue, 27 Apr 2021 16:33:28 +0300 Subject: [PATCH] reonnect to failed client in on-demand relay background task (#936) --- .../bin-substrate/src/on_demand_headers.rs | 64 +++++++++----- bridges/relays/utils/src/relay_loop.rs | 84 +++++++++++-------- 2 files changed, 92 insertions(+), 56 deletions(-) diff --git a/bridges/relays/bin-substrate/src/on_demand_headers.rs b/bridges/relays/bin-substrate/src/on_demand_headers.rs index 3b65ac7068c3..77d2b3705410 100644 --- a/bridges/relays/bin-substrate/src/on_demand_headers.rs +++ b/bridges/relays/bin-substrate/src/on_demand_headers.rs @@ -32,7 +32,10 @@ use relay_substrate_client::{ finality_source::FinalitySource as SubstrateFinalitySource, BlockNumberOf, Chain, Client, HashOf, HeaderIdOf, SyncHeader, }; -use relay_utils::{metrics::MetricsParams, BlockNumberBase, HeaderId}; +use relay_utils::{ + metrics::MetricsParams, relay_loop::Client as RelayClient, BlockNumberBase, FailedClient, HeaderId, + MaybeConnectionError, +}; use std::fmt::Debug; /// On-demand Substrate <-> Substrate headers relay. @@ -132,11 +135,11 @@ async fn background_task( FinalityTargetClient>, { let relay_task_name = on_demand_headers_relay_name::(); - let finality_source = SubstrateFinalitySource::< + let mut finality_source = SubstrateFinalitySource::< _, SubstrateFinalityToSubstrate, >::new(source_client.clone()); - let finality_target = SubstrateFinalityTarget::new(target_client.clone(), pipeline.clone()); + let mut finality_target = SubstrateFinalityTarget::new(target_client.clone(), pipeline.clone()); let mut active_headers_relay = None; let mut required_header_number = Zero::zero(); @@ -171,15 +174,35 @@ async fn background_task( // read best finalized source header number from source let best_finalized_source_header_at_source = best_finalized_source_header_at_source(&finality_source, &relay_task_name).await; + if matches!(best_finalized_source_header_at_source, Err(ref e) if e.is_connection_error()) { + relay_utils::relay_loop::reconnect_failed_client( + FailedClient::Source, + relay_utils::relay_loop::RECONNECT_DELAY, + &mut finality_source, + &mut finality_target, + ) + .await; + continue; + } // read best finalized source header number from target let best_finalized_source_header_at_target = best_finalized_source_header_at_target::(&finality_target, &relay_task_name).await; + if matches!(best_finalized_source_header_at_target, Err(ref e) if e.is_connection_error()) { + relay_utils::relay_loop::reconnect_failed_client( + FailedClient::Target, + relay_utils::relay_loop::RECONNECT_DELAY, + &mut finality_source, + &mut finality_target, + ) + .await; + continue; + } // start or stop headers relay if required let action = select_on_demand_relay_action::( - best_finalized_source_header_at_source, - best_finalized_source_header_at_target, + best_finalized_source_header_at_source.ok(), + best_finalized_source_header_at_target.ok(), required_header_number, maximal_headers_difference, &relay_task_name, @@ -213,25 +236,21 @@ async fn background_task( async fn best_finalized_source_header_at_source( finality_source: &SubstrateFinalitySource, relay_task_name: &str, -) -> Option +) -> Result as RelayClient>::Error> where SubstrateFinalitySource: FinalitySourceClient

, P: FinalitySyncPipeline, { - finality_source - .best_finalized_block_number() - .await - .map(Some) - .unwrap_or_else(|error| { - log::error!( - target: "bridge", - "Failed to read best finalized source header from source in {} relay: {:?}", - relay_task_name, - error, - ); + finality_source.best_finalized_block_number().await.map_err(|error| { + log::error!( + target: "bridge", + "Failed to read best finalized source header from source in {} relay: {:?}", + relay_task_name, + error, + ); - None - }) + error + }) } /// Read best finalized source block number from target client. @@ -240,7 +259,7 @@ where async fn best_finalized_source_header_at_target( finality_target: &SubstrateFinalityTarget, relay_task_name: &str, -) -> Option +) -> Result as RelayClient>::Error> where SubstrateFinalityTarget: FinalityTargetClient

, P: FinalitySyncPipeline, @@ -248,8 +267,7 @@ where finality_target .best_finalized_source_block_number() .await - .map(Some) - .unwrap_or_else(|error| { + .map_err(|error| { log::error!( target: "bridge", "Failed to read best finalized source header from target in {} relay: {:?}", @@ -257,7 +275,7 @@ where error, ); - None + error }) } diff --git a/bridges/relays/utils/src/relay_loop.rs b/bridges/relays/utils/src/relay_loop.rs index 46cd242ce1fd..8fcaabe4430c 100644 --- a/bridges/relays/utils/src/relay_loop.rs +++ b/bridges/relays/utils/src/relay_loop.rs @@ -139,39 +139,15 @@ impl Loop { match result { Ok(()) => break, - Err(failed_client) => loop { - async_std::task::sleep(self.reconnect_delay).await; - if failed_client == FailedClient::Both || failed_client == FailedClient::Source { - match self.source_client.reconnect().await { - Ok(()) => (), - Err(error) => { - log::warn!( - target: "bridge", - "Failed to reconnect to source client. Going to retry in {}s: {:?}", - self.reconnect_delay.as_secs(), - error, - ); - continue; - } - } - } - if failed_client == FailedClient::Both || failed_client == FailedClient::Target { - match self.target_client.reconnect().await { - Ok(()) => (), - Err(error) => { - log::warn!( - target: "bridge", - "Failed to reconnect to target client. Going to retry in {}s: {:?}", - self.reconnect_delay.as_secs(), - error, - ); - continue; - } - } - } - - break; - }, + Err(failed_client) => { + reconnect_failed_client( + failed_client, + self.reconnect_delay, + &mut self.source_client, + &mut self.target_client, + ) + .await + } } log::debug!(target: "bridge", "Restarting relay loop"); @@ -268,6 +244,48 @@ impl LoopMetrics { } } +/// Deal with the client who has returned connection error. +pub async fn reconnect_failed_client( + failed_client: FailedClient, + reconnect_delay: Duration, + source_client: &mut impl Client, + target_client: &mut impl Client, +) { + loop { + async_std::task::sleep(reconnect_delay).await; + if failed_client == FailedClient::Both || failed_client == FailedClient::Source { + match source_client.reconnect().await { + Ok(()) => (), + Err(error) => { + log::warn!( + target: "bridge", + "Failed to reconnect to source client. Going to retry in {}s: {:?}", + reconnect_delay.as_secs(), + error, + ); + continue; + } + } + } + if failed_client == FailedClient::Both || failed_client == FailedClient::Target { + match target_client.reconnect().await { + Ok(()) => (), + Err(error) => { + log::warn!( + target: "bridge", + "Failed to reconnect to target client. Going to retry in {}s: {:?}", + reconnect_delay.as_secs(), + error, + ); + continue; + } + } + } + + break; + } +} + /// Create new registry with global metrics. fn create_metrics_registry(prefix: Option) -> Registry { match prefix {