Skip to content

Commit

Permalink
reonnect to failed client in on-demand relay background task (parityt…
Browse files Browse the repository at this point in the history
  • Loading branch information
svyatonik authored and serban300 committed Apr 10, 2024
1 parent 99c7ad7 commit b73256f
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 56 deletions.
64 changes: 41 additions & 23 deletions bridges/relays/bin-substrate/src/on_demand_headers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@ use relay_substrate_client::{
finality_source::FinalitySource as SubstrateFinalitySource, BlockNumberOf, Chain, Client, HashOf, HeaderIdOf,
SyncHeader,
};
use relay_utils::{metrics::MetricsParams, BlockNumberBase, HeaderId};
use relay_utils::{
metrics::MetricsParams, relay_loop::Client as RelayClient, BlockNumberBase, FailedClient, HeaderId,
MaybeConnectionError,
};
use std::fmt::Debug;

/// On-demand Substrate <-> Substrate headers relay.
Expand Down Expand Up @@ -132,11 +135,11 @@ async fn background_task<SourceChain, TargetChain, TargetSign>(
FinalityTargetClient<SubstrateFinalityToSubstrate<SourceChain, TargetChain, TargetSign>>,
{
let relay_task_name = on_demand_headers_relay_name::<SourceChain, TargetChain>();
let finality_source = SubstrateFinalitySource::<
let mut finality_source = SubstrateFinalitySource::<
_,
SubstrateFinalityToSubstrate<SourceChain, TargetChain, TargetSign>,
>::new(source_client.clone());
let finality_target = SubstrateFinalityTarget::new(target_client.clone(), pipeline.clone());
let mut finality_target = SubstrateFinalityTarget::new(target_client.clone(), pipeline.clone());

let mut active_headers_relay = None;
let mut required_header_number = Zero::zero();
Expand Down Expand Up @@ -171,15 +174,35 @@ async fn background_task<SourceChain, TargetChain, TargetSign>(
// read best finalized source header number from source
let best_finalized_source_header_at_source =
best_finalized_source_header_at_source(&finality_source, &relay_task_name).await;
if matches!(best_finalized_source_header_at_source, Err(ref e) if e.is_connection_error()) {
relay_utils::relay_loop::reconnect_failed_client(
FailedClient::Source,
relay_utils::relay_loop::RECONNECT_DELAY,
&mut finality_source,
&mut finality_target,
)
.await;
continue;
}

// read best finalized source header number from target
let best_finalized_source_header_at_target =
best_finalized_source_header_at_target::<SourceChain, _, _>(&finality_target, &relay_task_name).await;
if matches!(best_finalized_source_header_at_target, Err(ref e) if e.is_connection_error()) {
relay_utils::relay_loop::reconnect_failed_client(
FailedClient::Target,
relay_utils::relay_loop::RECONNECT_DELAY,
&mut finality_source,
&mut finality_target,
)
.await;
continue;
}

// start or stop headers relay if required
let action = select_on_demand_relay_action::<SourceChain>(
best_finalized_source_header_at_source,
best_finalized_source_header_at_target,
best_finalized_source_header_at_source.ok(),
best_finalized_source_header_at_target.ok(),
required_header_number,
maximal_headers_difference,
&relay_task_name,
Expand Down Expand Up @@ -213,25 +236,21 @@ async fn background_task<SourceChain, TargetChain, TargetSign>(
async fn best_finalized_source_header_at_source<SourceChain: Chain, P>(
finality_source: &SubstrateFinalitySource<SourceChain, P>,
relay_task_name: &str,
) -> Option<SourceChain::BlockNumber>
) -> Result<SourceChain::BlockNumber, <SubstrateFinalitySource<SourceChain, P> as RelayClient>::Error>
where
SubstrateFinalitySource<SourceChain, P>: FinalitySourceClient<P>,
P: FinalitySyncPipeline<Number = SourceChain::BlockNumber>,
{
finality_source
.best_finalized_block_number()
.await
.map(Some)
.unwrap_or_else(|error| {
log::error!(
target: "bridge",
"Failed to read best finalized source header from source in {} relay: {:?}",
relay_task_name,
error,
);
finality_source.best_finalized_block_number().await.map_err(|error| {
log::error!(
target: "bridge",
"Failed to read best finalized source header from source in {} relay: {:?}",
relay_task_name,
error,
);

None
})
error
})
}

/// Read best finalized source block number from target client.
Expand All @@ -240,24 +259,23 @@ where
async fn best_finalized_source_header_at_target<SourceChain: Chain, TargetChain: Chain, P>(
finality_target: &SubstrateFinalityTarget<TargetChain, P>,
relay_task_name: &str,
) -> Option<SourceChain::BlockNumber>
) -> Result<SourceChain::BlockNumber, <SubstrateFinalityTarget<TargetChain, P> as RelayClient>::Error>
where
SubstrateFinalityTarget<TargetChain, P>: FinalityTargetClient<P>,
P: FinalitySyncPipeline<Number = SourceChain::BlockNumber>,
{
finality_target
.best_finalized_source_block_number()
.await
.map(Some)
.unwrap_or_else(|error| {
.map_err(|error| {
log::error!(
target: "bridge",
"Failed to read best finalized source header from target in {} relay: {:?}",
relay_task_name,
error,
);

None
error
})
}

Expand Down
84 changes: 51 additions & 33 deletions bridges/relays/utils/src/relay_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,39 +139,15 @@ impl<SC, TC, LM> Loop<SC, TC, LM> {

match result {
Ok(()) => break,
Err(failed_client) => loop {
async_std::task::sleep(self.reconnect_delay).await;
if failed_client == FailedClient::Both || failed_client == FailedClient::Source {
match self.source_client.reconnect().await {
Ok(()) => (),
Err(error) => {
log::warn!(
target: "bridge",
"Failed to reconnect to source client. Going to retry in {}s: {:?}",
self.reconnect_delay.as_secs(),
error,
);
continue;
}
}
}
if failed_client == FailedClient::Both || failed_client == FailedClient::Target {
match self.target_client.reconnect().await {
Ok(()) => (),
Err(error) => {
log::warn!(
target: "bridge",
"Failed to reconnect to target client. Going to retry in {}s: {:?}",
self.reconnect_delay.as_secs(),
error,
);
continue;
}
}
}

break;
},
Err(failed_client) => {
reconnect_failed_client(
failed_client,
self.reconnect_delay,
&mut self.source_client,
&mut self.target_client,
)
.await
}
}

log::debug!(target: "bridge", "Restarting relay loop");
Expand Down Expand Up @@ -268,6 +244,48 @@ impl<SC, TC, LM> LoopMetrics<SC, TC, LM> {
}
}

/// Deal with the client who has returned connection error.
pub async fn reconnect_failed_client(
failed_client: FailedClient,
reconnect_delay: Duration,
source_client: &mut impl Client,
target_client: &mut impl Client,
) {
loop {
async_std::task::sleep(reconnect_delay).await;
if failed_client == FailedClient::Both || failed_client == FailedClient::Source {
match source_client.reconnect().await {
Ok(()) => (),
Err(error) => {
log::warn!(
target: "bridge",
"Failed to reconnect to source client. Going to retry in {}s: {:?}",
reconnect_delay.as_secs(),
error,
);
continue;
}
}
}
if failed_client == FailedClient::Both || failed_client == FailedClient::Target {
match target_client.reconnect().await {
Ok(()) => (),
Err(error) => {
log::warn!(
target: "bridge",
"Failed to reconnect to target client. Going to retry in {}s: {:?}",
reconnect_delay.as_secs(),
error,
);
continue;
}
}
}

break;
}
}

/// Create new registry with global metrics.
fn create_metrics_registry(prefix: Option<String>) -> Registry {
match prefix {
Expand Down

0 comments on commit b73256f

Please sign in to comment.