diff --git a/bridges/relays/client-substrate/src/client/caching.rs b/bridges/relays/client-substrate/src/client/caching.rs index abea864db7381..e6dc798a542e4 100644 --- a/bridges/relays/client-substrate/src/client/caching.rs +++ b/bridges/relays/client-substrate/src/client/caching.rs @@ -18,12 +18,13 @@ //! method calls. use crate::{ - client::{Client, SharedSubscriptionFactory}, + client::{Client, SubscriptionBroadcaster}, error::{Error, Result}, AccountIdOf, AccountKeyPairOf, BlockNumberOf, Chain, ChainWithGrandpa, ChainWithTransactions, HashOf, HeaderIdOf, HeaderOf, NonceOf, SignedBlockOf, SimpleRuntimeVersion, Subscription, TransactionTracker, UnsignedTransaction, ANCIENT_BLOCK_THRESHOLD, }; +use std::future::Future; use async_std::sync::{Arc, Mutex, RwLock}; use async_trait::async_trait; @@ -54,8 +55,8 @@ pub struct CachingClient> { /// Client data, shared by all `CachingClient` clones. struct ClientData { - grandpa_justifications: Arc>>>, - beefy_justifications: Arc>>>, + grandpa_justifications: Arc>>>, + beefy_justifications: Arc>>>, // `quick_cache::sync::Cache` has the `get_or_insert_async` method, which fits our needs, // but it uses synchronization primitives that are not aware of async execution. They // can block the executor threads and cause deadlocks => let's use primitives from @@ -112,6 +113,26 @@ impl> CachingClient { cache.write().await.insert(key.clone(), value.clone()); Ok(value) } + + async fn subscribe_finality_justifications<'a>( + &'a self, + maybe_broadcaster: &Mutex>>, + do_subscribe: impl Future>> + 'a, + ) -> Result> { + let mut maybe_broadcaster = maybe_broadcaster.lock().await; + let broadcaster = match maybe_broadcaster.as_ref() { + Some(justifications) => justifications, + None => { + let broadcaster = match SubscriptionBroadcaster::new(do_subscribe.await?) { + Ok(broadcaster) => broadcaster, + Err(subscription) => return Ok(subscription), + }; + maybe_broadcaster.get_or_insert(broadcaster) + }, + }; + + broadcaster.subscribe().await + } } impl> std::fmt::Debug for CachingClient { @@ -192,14 +213,11 @@ impl> Client for CachingClient { where C: ChainWithGrandpa, { - let mut grandpa_justifications = self.data.grandpa_justifications.lock().await; - if let Some(ref grandpa_justifications) = *grandpa_justifications { - grandpa_justifications.subscribe().await - } else { - let subscription = self.backend.subscribe_grandpa_finality_justifications().await?; - *grandpa_justifications = Some(subscription.factory()); - Ok(subscription) - } + self.subscribe_finality_justifications( + &self.data.grandpa_justifications, + self.backend.subscribe_grandpa_finality_justifications(), + ) + .await } async fn generate_grandpa_key_ownership_proof( @@ -214,14 +232,11 @@ impl> Client for CachingClient { } async fn subscribe_beefy_finality_justifications(&self) -> Result> { - let mut beefy_justifications = self.data.beefy_justifications.lock().await; - if let Some(ref beefy_justifications) = *beefy_justifications { - beefy_justifications.subscribe().await - } else { - let subscription = self.backend.subscribe_beefy_finality_justifications().await?; - *beefy_justifications = Some(subscription.factory()); - Ok(subscription) - } + self.subscribe_finality_justifications( + &self.data.beefy_justifications, + self.backend.subscribe_beefy_finality_justifications(), + ) + .await } async fn token_decimals(&self) -> Result> { diff --git a/bridges/relays/client-substrate/src/client/mod.rs b/bridges/relays/client-substrate/src/client/mod.rs index 4eeb564899af9..b60a75868724d 100644 --- a/bridges/relays/client-substrate/src/client/mod.rs +++ b/bridges/relays/client-substrate/src/client/mod.rs @@ -33,7 +33,7 @@ mod rpc_api; mod subscription; pub use client::Client; -pub use subscription::{SharedSubscriptionFactory, Subscription, UnderlyingSubscription}; +pub use subscription::{StreamDescription, Subscription, SubscriptionBroadcaster}; /// Type of RPC client with caching support. pub type RpcWithCachingClient = CachingClient>; diff --git a/bridges/relays/client-substrate/src/client/rpc.rs b/bridges/relays/client-substrate/src/client/rpc.rs index 353f08a0fd3fe..7df9aec24770b 100644 --- a/bridges/relays/client-substrate/src/client/rpc.rs +++ b/bridges/relays/client-substrate/src/client/rpc.rs @@ -24,7 +24,7 @@ use crate::{ SubstrateFrameSystemClient, SubstrateGrandpaClient, SubstrateStateClient, SubstrateSystemClient, }, - subscription::{Subscription, Unwrap}, + subscription::{StreamDescription, Subscription}, Client, }, error::{Error, Result}, @@ -40,8 +40,11 @@ use async_trait::async_trait; use bp_runtime::HeaderIdProvider; use codec::Encode; use frame_support::weights::Weight; -use futures::{TryFutureExt, TryStreamExt}; -use jsonrpsee::ws_client::{WsClient, WsClientBuilder}; +use futures::TryFutureExt; +use jsonrpsee::{ + core::{client::Subscription as RpcSubscription, ClientError}, + ws_client::{WsClient, WsClientBuilder}, +}; use num_traits::Zero; use pallet_transaction_payment::RuntimeDispatchInfo; use relay_utils::{relay_loop::RECONNECT_DELAY, STALL_TIMEOUT}; @@ -241,6 +244,25 @@ impl RpcClient { }) .await } + + async fn subscribe_finality_justifications( + &self, + gadget_name: &str, + do_subscribe: impl FnOnce(Arc) -> Fut + Send + 'static, + ) -> Result> + where + Fut: Future, ClientError>> + Send, + { + let subscription = self + .jsonrpsee_execute(move |client| async move { Ok(do_subscribe(client).await?) }) + .map_err(|e| Error::failed_to_subscribe_justification::(e)) + .await?; + + Ok(Subscription::new_forwarded( + StreamDescription::new(format!("{} justifications", gadget_name), C::NAME.into()), + subscription, + )) + } } impl Clone for RpcClient { @@ -329,19 +351,9 @@ impl Client for RpcClient { where C: ChainWithGrandpa, { - Subscription::new( - C::NAME.into(), - "GRANDPA justifications".into(), - self.jsonrpsee_execute(move |client| async move { - Ok(Box::new( - SubstrateGrandpaClient::::subscribe_justifications(&*client) - .await? - .map_err(Into::into), - )) - }) - .map_err(|e| Error::failed_to_subscribe_justification::(e)) - .await?, - ) + self.subscribe_finality_justifications("GRANDPA", move |client| async move { + SubstrateGrandpaClient::::subscribe_justifications(&*client).await + }) .await } @@ -360,19 +372,9 @@ impl Client for RpcClient { } async fn subscribe_beefy_finality_justifications(&self) -> Result> { - Subscription::new( - C::NAME.into(), - "BEEFY justifications".into(), - self.jsonrpsee_execute(move |client| async move { - Ok(Box::new( - SubstrateBeefyClient::::subscribe_justifications(&*client) - .await? - .map_err(Into::into), - )) - }) - .map_err(|e| Error::failed_to_subscribe_justification::(e)) - .await?, - ) + self.subscribe_finality_justifications("BEEFY", move |client| async move { + SubstrateBeefyClient::::subscribe_justifications(&*client).await + }) .await } @@ -511,7 +513,10 @@ impl Client for RpcClient { self_clone, stall_timeout, tx_hash, - Box::new(Unwrap::new(C::NAME.into(), "transaction events".into(), subscription)), + Subscription::new_forwarded( + StreamDescription::new("transaction events".into(), C::NAME.into()), + subscription, + ), )) }) .await diff --git a/bridges/relays/client-substrate/src/client/subscription.rs b/bridges/relays/client-substrate/src/client/subscription.rs index 14aa7a146b5d1..43a46573f987b 100644 --- a/bridges/relays/client-substrate/src/client/subscription.rs +++ b/bridges/relays/client-substrate/src/client/subscription.rs @@ -14,70 +14,85 @@ // You should have received a copy of the GNU General Public License // along with Parity Bridges Common. If not, see . -use crate::error::Result; +use crate::error::Result as ClientResult; use async_std::{ channel::{bounded, Receiver, Sender}, stream::StreamExt, }; use futures::{FutureExt, Stream}; +use jsonrpsee::core::ClientError; use sp_runtime::DeserializeOwned; use std::{ fmt::Debug, pin::Pin, + result::Result as StdResult, task::{Context, Poll}, }; /// Once channel reaches this capacity, the subscription breaks. const CHANNEL_CAPACITY: usize = 128; -/// Underlying subscription type. -pub type UnderlyingSubscription = Box + Unpin + Send>; +/// Structure describing a stream. +#[derive(Clone)] +pub struct StreamDescription { + stream_name: String, + chain_name: String, +} + +impl StreamDescription { + /// Create a new instance of `StreamDescription`. + pub fn new(stream_name: String, chain_name: String) -> Self { + Self { stream_name, chain_name } + } + + /// Get a stream description. + fn get(&self) -> String { + format!("{} stream of {}", self.stream_name, self.chain_name) + } +} /// Chainable stream that transforms items of type `Result` to items of type `T`. /// /// If it encounters an item of type `Err`, it returns `Poll::Ready(None)` /// and terminates the underlying stream. -pub struct Unwrap>, T, E> { - chain_name: String, - item_type: String, - subscription: Option, +struct Unwrap>, T, E> { + desc: StreamDescription, + stream: Option, } -impl>, T, E> Unwrap { +impl>, T, E> Unwrap { /// Create a new instance of `Unwrap`. - pub fn new(chain_name: String, item_type: String, subscription: S) -> Self { - Self { chain_name, item_type, subscription: Some(subscription) } + pub fn new(desc: StreamDescription, stream: S) -> Self { + Self { desc, stream: Some(stream) } } } -impl> + Unpin, T: DeserializeOwned, E: Debug> Stream +impl> + Unpin, T: DeserializeOwned, E: Debug> Stream for Unwrap { type Item = T; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Poll::Ready(match self.subscription.as_mut() { + Poll::Ready(match self.stream.as_mut() { Some(subscription) => match futures::ready!(Pin::new(subscription).poll_next(cx)) { Some(Ok(item)) => Some(item), Some(Err(e)) => { - self.subscription.take(); + self.stream.take(); log::debug!( target: "bridge", - "{} stream of {} has returned error: {:?}. It may need to be restarted", - self.item_type, - self.chain_name, + "{} has returned error: {:?}. It may need to be restarted", + self.desc.get(), e, ); None }, None => { - self.subscription.take(); + self.stream.take(); log::debug!( target: "bridge", - "{} stream of {} has returned `None`. It may need to be restarted", - self.item_type, - self.chain_name, + "{} has returned `None`. It may need to be restarted", + self.desc.get() ); None }, @@ -89,71 +104,73 @@ impl> + Unpin, T: DeserializeOwned, E /// Subscription factory that produces subscriptions, sharing the same background thread. #[derive(Clone)] -pub struct SharedSubscriptionFactory { - subscribers_sender: Sender>>, +pub struct SubscriptionBroadcaster { + desc: StreamDescription, + subscribers_sender: Sender>, } -impl SharedSubscriptionFactory { +impl SubscriptionBroadcaster { /// Create new subscription factory. - pub async fn new( - chain_name: String, - item_type: String, - subscription: UnderlyingSubscription>, - ) -> Self { + pub fn new(subscription: Subscription) -> StdResult> { + // It doesn't make sense to further broadcast a broadcasted subscription. + if subscription.is_broadcasted { + return Err(subscription) + } + + let desc = subscription.desc().clone(); let (subscribers_sender, subscribers_receiver) = bounded(CHANNEL_CAPACITY); - async_std::task::spawn(background_worker( - chain_name.clone(), - item_type.clone(), - Box::new(Unwrap::new(chain_name, item_type, subscription)), - subscribers_receiver, - )); - Self { subscribers_sender } + async_std::task::spawn(background_worker(subscription, subscribers_receiver)); + Ok(Self { desc, subscribers_sender }) } /// Produce new subscription. - pub async fn subscribe(&self) -> Result> { + pub async fn subscribe(&self) -> ClientResult> { let (items_sender, items_receiver) = bounded(CHANNEL_CAPACITY); self.subscribers_sender.try_send(items_sender)?; - Ok(Subscription { items_receiver, subscribers_sender: self.subscribers_sender.clone() }) + Ok(Subscription::new_broadcasted(self.desc.clone(), items_receiver)) } } /// Subscription to some chain events. pub struct Subscription { - items_receiver: Receiver>, - subscribers_sender: Sender>>, + desc: StreamDescription, + subscription: Box + Unpin + Send>, + is_broadcasted: bool, } impl Subscription { - /// Create new subscription. - pub async fn new( - chain_name: String, - item_type: String, - subscription: UnderlyingSubscription>, - ) -> Result { - SharedSubscriptionFactory::::new(chain_name, item_type, subscription) - .await - .subscribe() - .await + /// Create new forwarded subscription. + pub fn new_forwarded( + desc: StreamDescription, + subscription: impl Stream> + Unpin + Send + 'static, + ) -> Self { + Self { + desc: desc.clone(), + subscription: Box::new(Unwrap::new(desc, subscription)), + is_broadcasted: false, + } } - /// Return subscription factory for this subscription. - pub fn factory(&self) -> SharedSubscriptionFactory { - SharedSubscriptionFactory { subscribers_sender: self.subscribers_sender.clone() } + /// Create new broadcasted subscription. + pub fn new_broadcasted( + desc: StreamDescription, + subscription: impl Stream + Unpin + Send + 'static, + ) -> Self { + Self { desc, subscription: Box::new(subscription), is_broadcasted: true } } - /// Consumes subscription and returns future items stream. - pub fn into_stream(self) -> impl Stream { - futures::stream::unfold(self, |mut this| async { - let item = this.items_receiver.next().await.unwrap_or(None); - item.map(|i| (i, this)) - }) + /// Get the description of the underlying stream + pub fn desc(&self) -> &StreamDescription { + &self.desc } +} - /// Return next item from the subscription. - pub async fn next(&self) -> Result> { - Ok(self.items_receiver.recv().await?) +impl Stream for Subscription { + type Item = T; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Poll::Ready(futures::ready!(Pin::new(&mut self.subscription).poll_next(cx))) } } @@ -163,17 +180,14 @@ impl Subscription { /// message (`Err` or `None`) to all known listeners. Also, when it stops, all /// subsequent reads and new subscribers will get the connection error (`ChannelError`). async fn background_worker( - chain_name: String, - item_type: String, - mut subscription: UnderlyingSubscription, - mut subscribers_receiver: Receiver>>, + mut subscription: Subscription, + mut subscribers_receiver: Receiver>, ) { - fn log_task_exit(chain_name: &str, item_type: &str, reason: &str) { + fn log_task_exit(desc: &StreamDescription, reason: &str) { log::debug!( target: "bridge", - "Background task of {} subscription of {} has stopped: {}", - item_type, - chain_name, + "Background task of subscription broadcaster for {} has stopped: {}", + desc.get(), reason, ); } @@ -184,7 +198,7 @@ async fn background_worker( None => { // it means that the last subscriber/factory has been dropped, so we need to // exit too - return log_task_exit(&chain_name, &item_type, "client has stopped") + return log_task_exit(subscription.desc(), "client has stopped") }, }; @@ -200,22 +214,24 @@ async fn background_worker( None => { // it means that the last subscriber/factory has been dropped, so we need to // exit too - return log_task_exit(&chain_name, &item_type, "client has stopped") + return log_task_exit(subscription.desc(), "client has stopped") }, } }, - item = subscription.next().fuse() => { - let is_stream_finished = item.is_none(); - // notify subscribers - subscribers.retain(|subscriber| { - let send_result = subscriber.try_send(item.clone()); - send_result.is_ok() - }); - - // it means that the underlying client has dropped, so we can't do anything here - // and need to stop the task - if is_stream_finished { - return log_task_exit(&chain_name, &item_type, "stream has finished"); + maybe_item = subscription.subscription.next().fuse() => { + match maybe_item { + Some(item) => { + // notify subscribers + subscribers.retain(|subscriber| { + let send_result = subscriber.try_send(item.clone()); + send_result.is_ok() + }); + } + None => { + // The underlying client has dropped, so we can't do anything here + // and need to stop the task. + return log_task_exit(subscription.desc(), "stream has finished"); + } } }, } diff --git a/bridges/relays/client-substrate/src/lib.rs b/bridges/relays/client-substrate/src/lib.rs index 65647889c6f8b..12a1c48c09c7a 100644 --- a/bridges/relays/client-substrate/src/lib.rs +++ b/bridges/relays/client-substrate/src/lib.rs @@ -41,8 +41,8 @@ pub use crate::{ }, client::{ is_ancient_block, rpc_with_caching as new, ChainRuntimeVersion, Client, - OpaqueGrandpaAuthoritiesSet, RpcWithCachingClient, SimpleRuntimeVersion, Subscription, - ANCIENT_BLOCK_THRESHOLD, + OpaqueGrandpaAuthoritiesSet, RpcWithCachingClient, SimpleRuntimeVersion, StreamDescription, + Subscription, ANCIENT_BLOCK_THRESHOLD, }, error::{Error, Result}, sync_header::SyncHeader, diff --git a/bridges/relays/client-substrate/src/transaction_tracker.rs b/bridges/relays/client-substrate/src/transaction_tracker.rs index 1dc97faf2a6a8..b4801c89f51e1 100644 --- a/bridges/relays/client-substrate/src/transaction_tracker.rs +++ b/bridges/relays/client-substrate/src/transaction_tracker.rs @@ -16,9 +16,7 @@ //! Helper for tracking transaction invalidation events. -use crate::{ - client::UnderlyingSubscription, Chain, Error, HashOf, HeaderIdOf, TransactionStatusOf, -}; +use crate::{Chain, Error, HashOf, HeaderIdOf, Subscription, TransactionStatusOf}; use async_trait::async_trait; use futures::{future::Either, Future, FutureExt, Stream, StreamExt}; @@ -66,7 +64,7 @@ pub struct TransactionTracker { environment: E, transaction_hash: HashOf, stall_timeout: Duration, - subscription: UnderlyingSubscription>, + subscription: Subscription>, } impl> TransactionTracker { @@ -75,7 +73,7 @@ impl> TransactionTracker { environment: E, stall_timeout: Duration, transaction_hash: HashOf, - subscription: UnderlyingSubscription>, + subscription: Subscription>, ) -> Self { Self { environment, stall_timeout, transaction_hash, subscription } } @@ -303,7 +301,7 @@ async fn watch_transaction_status< #[cfg(test)] mod tests { use super::*; - use crate::test_chain::TestChain; + use crate::{test_chain::TestChain, StreamDescription}; use futures::{FutureExt, SinkExt}; use sc_transaction_pool_api::TransactionStatus; @@ -330,7 +328,10 @@ mod tests { TestEnvironment(Ok(HeaderId(0, Default::default()))), Duration::from_secs(0), Default::default(), - Box::new(receiver), + Subscription::new_forwarded( + StreamDescription::new("test".into(), "test".into()), + receiver, + ), ); // we can't do `.now_or_never()` on `do_wait()` call, because `Subscription` has its own @@ -338,7 +339,7 @@ mod tests { // relatively small timeout here let wait_for_stall_timeout = async_std::task::sleep(std::time::Duration::from_millis(100)); let wait_for_stall_timeout_rest = futures::future::ready(()); - sender.send(status).await.unwrap(); + sender.send(Ok(status)).await.unwrap(); let (ts, is) = tx_tracker.do_wait(wait_for_stall_timeout, wait_for_stall_timeout_rest).await; @@ -455,7 +456,10 @@ mod tests { TestEnvironment(Ok(HeaderId(0, Default::default()))), Duration::from_secs(0), Default::default(), - Box::new(receiver), + Subscription::new_forwarded( + StreamDescription::new("test".into(), "test".into()), + receiver, + ), ); let wait_for_stall_timeout = futures::future::ready(()).shared(); diff --git a/bridges/relays/lib-substrate-relay/src/finality_base/engine.rs b/bridges/relays/lib-substrate-relay/src/finality_base/engine.rs index 32f2fd0080fbb..f23e1add36650 100644 --- a/bridges/relays/lib-substrate-relay/src/finality_base/engine.rs +++ b/bridges/relays/lib-substrate-relay/src/finality_base/engine.rs @@ -29,6 +29,7 @@ use bp_header_chain::{ }; use bp_runtime::{BasicOperatingMode, HeaderIdProvider, OperatingMode}; use codec::{Decode, Encode}; +use futures::stream::StreamExt; use num_traits::{One, Zero}; use relay_substrate_client::{ BlockNumberOf, Chain, ChainWithGrandpa, Client, Error as SubstrateError, HashOf, HeaderOf, @@ -278,17 +279,14 @@ impl Engine for Grandpa { // But now there are problems with this approach - `CurrentSetId` may return invalid value. // So here we're waiting for the next justification, read the authorities set and then try // to figure out the set id with bruteforce. - let justifications = Self::source_finality_proofs(&source_client) + let mut justifications = Self::source_finality_proofs(&source_client) .await .map_err(|err| Error::Subscribe(C::NAME, err))?; // Read next justification - the header that it finalizes will be used as initial header. let justification = justifications .next() .await - .map_err(|e| Error::ReadJustification(C::NAME, e)) - .and_then(|justification| { - justification.ok_or(Error::ReadJustificationStreamEnded(C::NAME)) - })?; + .ok_or(Error::ReadJustificationStreamEnded(C::NAME))?; // Read initial header. let justification: GrandpaJustification = diff --git a/bridges/relays/lib-substrate-relay/src/finality_base/mod.rs b/bridges/relays/lib-substrate-relay/src/finality_base/mod.rs index 2e0a7089a931d..71d15ca3868e0 100644 --- a/bridges/relays/lib-substrate-relay/src/finality_base/mod.rs +++ b/bridges/relays/lib-substrate-relay/src/finality_base/mod.rs @@ -54,7 +54,7 @@ pub async fn finality_proofs( ) -> Result, Error> { Ok(unfold( P::FinalityEngine::source_finality_proofs(client).await?, - move |subscription| async move { + move |mut subscription| async move { loop { let log_error = |err| { log::error!( @@ -65,8 +65,7 @@ pub async fn finality_proofs( ); }; - let next_justification = - subscription.next().await.map_err(|err| log_error(err.to_string())).ok()??; + let next_justification = subscription.next().await?; let decoded_justification = >::FinalityProof::decode(