Skip to content

Commit

Permalink
Do not spawn additional task for on-demand relays (paritytech#933)
Browse files Browse the repository at this point in the history
* do not spawn additional task for on-demand relays

* compilation
  • Loading branch information
svyatonik authored and serban300 committed Apr 10, 2024
1 parent 188d86e commit 99c7ad7
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 12 deletions.
1 change: 1 addition & 0 deletions bridges/relays/bin-substrate/src/cli/relay_headers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ impl RelayHeaders {
Finality::new(target_client.clone(), target_sign),
source_client,
target_client,
false,
metrics_params,
)
.await
Expand Down
2 changes: 2 additions & 0 deletions bridges/relays/bin-substrate/src/finality_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ pub async fn run<SourceChain, TargetChain, P>(
pipeline: P,
source_client: Client<SourceChain>,
target_client: Client<TargetChain>,
is_on_demand_task: bool,
metrics_params: MetricsParams,
) -> anyhow::Result<()>
where
Expand All @@ -137,6 +138,7 @@ where
FinalitySource::new(source_client),
SubstrateFinalityTarget::new(target_client, pipeline),
FinalitySyncParams {
is_on_demand_task,
tick: std::cmp::max(SourceChain::AVERAGE_BLOCK_INTERVAL, TargetChain::AVERAGE_BLOCK_INTERVAL),
recent_finality_proofs_limit: RECENT_FINALITY_PROOFS_LIMIT,
stall_timeout: STALL_TIMEOUT,
Expand Down
23 changes: 14 additions & 9 deletions bridges/relays/bin-substrate/src/on_demand_headers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,15 +300,20 @@ fn select_on_demand_relay_action<C: Chain>(
.checked_sub(&best_finalized_source_header_at_target)
.unwrap_or_else(Zero::zero);
if current_headers_difference > maximal_headers_difference {
log::trace!(
target: "bridge",
"Too many {} headers missing at target in {} relay. Going to sync up to the {}",
C::NAME,
relay_task_name,
best_finalized_source_header_at_source,
);

required_source_header_at_target = best_finalized_source_header_at_source;

// don't log if relay is already running
if !is_active {
log::trace!(
target: "bridge",
"Too many {} headers missing at target in {} relay ({} vs {}). Going to sync up to the {}",
C::NAME,
relay_task_name,
best_finalized_source_header_at_source,
best_finalized_source_header_at_target,
best_finalized_source_header_at_source,
);
}
}

// now let's select what to do with relay
Expand Down Expand Up @@ -345,7 +350,7 @@ where
TargetSign: 'static,
{
let headers_relay_future =
crate::finality_pipeline::run(pipeline, source_client, target_client, MetricsParams::disabled());
crate::finality_pipeline::run(pipeline, source_client, target_client, true, MetricsParams::disabled());
let closure_task_name = task_name.clone();
async_std::task::Builder::new()
.name(task_name.clone())
Expand Down
3 changes: 3 additions & 0 deletions bridges/relays/finality/src/finality_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ use std::{
/// Finality proof synchronization loop parameters.
#[derive(Debug, Clone)]
pub struct FinalitySyncParams {
/// If `true`, then the separate async task for running finality loop is NOT spawned.
pub is_on_demand_task: bool,
/// Interval at which we check updates on both clients. Normally should be larger than
/// `min(source_block_time, target_block_time)`.
///
Expand Down Expand Up @@ -105,6 +107,7 @@ pub async fn run<P: FinalitySyncPipeline>(
) -> Result<(), String> {
let exit_signal = exit_signal.shared();
relay_utils::relay_loop(source_client, target_client)
.spawn_loop_task(!sync_params.is_on_demand_task)
.with_metrics(Some(metrics_prefix::<P>()), metrics_params)
.loop_metric(|registry, prefix| SyncLoopMetrics::new(registry, prefix))?
.standalone_metric(|registry, prefix| GlobalMetrics::new(registry, prefix))?
Expand Down
1 change: 1 addition & 0 deletions bridges/relays/finality/src/finality_loop_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ fn run_sync_loop(state_function: impl Fn(&mut ClientsData) -> bool + Send + Sync
data: clients_data.clone(),
};
let sync_params = FinalitySyncParams {
is_on_demand_task: false,
tick: Duration::from_secs(0),
recent_finality_proofs_limit: 1024,
stall_timeout: Duration::from_secs(1),
Expand Down
28 changes: 25 additions & 3 deletions bridges/relays/utils/src/relay_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ pub trait Client: 'static + Clone + Send + Sync {
pub fn relay_loop<SC, TC>(source_client: SC, target_client: TC) -> Loop<SC, TC, ()> {
Loop {
reconnect_delay: RECONNECT_DELAY,
spawn_loop_task: true,
source_client,
target_client,
loop_metric: None,
Expand All @@ -49,6 +50,7 @@ pub fn relay_metrics(prefix: Option<String>, params: MetricsParams) -> LoopMetri
LoopMetrics {
relay_loop: Loop {
reconnect_delay: RECONNECT_DELAY,
spawn_loop_task: true,
source_client: (),
target_client: (),
loop_metric: None,
Expand All @@ -63,6 +65,7 @@ pub fn relay_metrics(prefix: Option<String>, params: MetricsParams) -> LoopMetri
/// Generic relay loop.
pub struct Loop<SC, TC, LM> {
reconnect_delay: Duration,
spawn_loop_task: bool,
source_client: SC,
target_client: TC,
loop_metric: Option<LM>,
Expand All @@ -84,11 +87,23 @@ impl<SC, TC, LM> Loop<SC, TC, LM> {
self
}

/// Set spawn-dedicated-loop-task flag.
///
/// If `true` (default), separate async task is spawned to run relay loop. This is the default
/// behavior for all loops. If `false`, then loop is executed as a part of the current
/// task. The `false` is used for on-demand tasks, which are cancelled from time to time
/// and there's already a dedicated on-demand task for running such loops.
pub fn spawn_loop_task(mut self, spawn_loop_task: bool) -> Self {
self.spawn_loop_task = spawn_loop_task;
self
}

/// Start building loop metrics using given prefix.
pub fn with_metrics(self, prefix: Option<String>, params: MetricsParams) -> LoopMetrics<SC, TC, ()> {
LoopMetrics {
relay_loop: Loop {
reconnect_delay: self.reconnect_delay,
spawn_loop_task: self.spawn_loop_task,
source_client: self.source_client,
target_client: self.target_client,
loop_metric: None,
Expand All @@ -113,7 +128,8 @@ impl<SC, TC, LM> Loop<SC, TC, LM> {
TC: 'static + Client,
LM: 'static + Send + Clone,
{
async_std::task::spawn(async move {
let spawn_loop_task = self.spawn_loop_task;
let run_loop_task = async move {
crate::initialize::initialize_loop(loop_name);

loop {
Expand Down Expand Up @@ -162,8 +178,13 @@ impl<SC, TC, LM> Loop<SC, TC, LM> {
}

Ok(())
})
.await
};

if spawn_loop_task {
async_std::task::spawn(run_loop_task).await
} else {
run_loop_task.await
}
}
}

Expand Down Expand Up @@ -239,6 +260,7 @@ impl<SC, TC, LM> LoopMetrics<SC, TC, LM> {

Ok(Loop {
reconnect_delay: self.relay_loop.reconnect_delay,
spawn_loop_task: self.relay_loop.spawn_loop_task,
source_client: self.relay_loop.source_client,
target_client: self.relay_loop.target_client,
loop_metric: self.loop_metric,
Expand Down

0 comments on commit 99c7ad7

Please sign in to comment.