Skip to content

Commit

Permalink
fix: ckb4ibc monitor watch counterparty client type
Browse files Browse the repository at this point in the history
Don't block when the counterparty client type is already known!
  • Loading branch information
blckngm committed Aug 24, 2023
1 parent 7d549a6 commit bdf32f3
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 23 deletions.
33 changes: 20 additions & 13 deletions crates/relayer/src/chain/ckb4ibc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ use ckb_types::core::TransactionView as CoreTransactionView;
use ckb_types::molecule::prelude::Entity;
use ckb_types::packed::{CellInput, OutPoint, Script, WitnessArgs};
use ckb_types::prelude::{Builder, Pack, Unpack};
use crossbeam_channel::Sender;
use futures::TryFutureExt;
use ibc_proto::ibc::apps::fee::v1::{
QueryIncentivizedPacketRequest, QueryIncentivizedPacketResponse,
Expand Down Expand Up @@ -115,13 +114,13 @@ pub struct Ckb4IbcChain {
cached_network: RwLock<Option<NetworkType>>,

tx_monitor_cmd: Option<TxMonitorCmd>,
tx_internal_cmd: Option<Sender<ClientType>>,

connection_outpoint: OutPoint,
channel_outpoint: OutPoint,
packet_outpoint: OutPoint,

counterparty_client_type: RefCell<ClientType>,
counterparty_client_type: tokio::sync::watch::Sender<Option<ClientType>>,

client_outpoints: RefCell<HashMap<ClientType, OutPoint>>,
channel_input_data: RefCell<HashMap<(ChannelId, PortId), CellInput>>,
channel_cache: RefCell<HashMap<ChannelId, IbcChannel>>,
Expand Down Expand Up @@ -203,12 +202,12 @@ impl Ckb4IbcChain {
}

fn init_event_monitor(&mut self) -> Result<TxMonitorCmd, Error> {
let (monitor, monitor_tx, internal_tx) = Ckb4IbcEventMonitor::new(
let (monitor, monitor_tx) = Ckb4IbcEventMonitor::new(
self.rt.clone(),
self.rpc_client.clone(),
self.config.clone(),
self.counterparty_client_type.subscribe(),
);
self.tx_internal_cmd = Some(internal_tx);
std::thread::spawn(move || monitor.run());
Ok(monitor_tx)
}
Expand Down Expand Up @@ -279,7 +278,7 @@ impl Ckb4IbcChain {
let channel_code_hash = self.get_converter()?.get_channel_code_hash();
let client_id = self
.config
.lc_client_type_hash(*self.counterparty_client_type.borrow())?;
.lc_client_type_hash(self.counterparty_client_type())?;
let channel_args = ChannelArgs {
client_id: client_id.into(),
open: is_open,
Expand Down Expand Up @@ -413,12 +412,21 @@ impl Ckb4IbcChain {
Ok(result)
}

fn counterparty_client_type(&self) -> ClientType {
self.counterparty_client_type
.borrow()
.unwrap_or(ClientType::Mock)
}

fn sync_counterparty_client_type(&self, client_type: ClientType) {
*self.counterparty_client_type.borrow_mut() = client_type;
if let Some(cmd) = &self.tx_internal_cmd {
cmd.send(client_type)
.expect("send counterparty_client_type");
}
self.counterparty_client_type.send_if_modified(|prev| {
if *prev != Some(client_type) {
*prev = Some(client_type);
true
} else {
false
}
});
}

fn fetch_packet_cell_and_extract(
Expand Down Expand Up @@ -509,12 +517,11 @@ impl ChainEndpoint for Ckb4IbcChain {
keybase,
cached_network: RwLock::new(None),
tx_monitor_cmd: None,
tx_internal_cmd: None,
client_outpoints: RefCell::new(client_outpoints),
connection_outpoint: conn_contract_cell.unwrap().out_point,
channel_outpoint: chan_contract_cell.unwrap().out_point,
packet_outpoint: packet_contract_cell.unwrap().out_point,
counterparty_client_type: RefCell::new(ClientType::Mock),
counterparty_client_type: tokio::sync::watch::channel(None).0,
channel_input_data: RefCell::new(HashMap::new()),
channel_cache: RefCell::new(HashMap::new()),
connection_cache: RefCell::new(HashMap::new()),
Expand Down
31 changes: 21 additions & 10 deletions crates/relayer/src/chain/ckb4ibc/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use ckb_types::core::ScriptHashType;
use ckb_types::packed::Script;
use ckb_types::prelude::{Builder, Entity, Pack};
use ckb_types::H256;
use crossbeam_channel::{Receiver, Sender};
use crossbeam_channel::Receiver;
use ibc_relayer_types::core::ics02_client::client_type::ClientType;
use ibc_relayer_types::core::ics02_client::height::Height;
use ibc_relayer_types::core::ics03_connection::events::{
Expand Down Expand Up @@ -49,10 +49,10 @@ pub struct Ckb4IbcEventMonitor {
rt: Arc<TokioRuntime>,
rpc_client: Arc<RpcClient>,
rx_cmd: Receiver<MonitorCmd>,
rx_internal: Receiver<ClientType>,
event_bus: EventBus<Arc<Result<EventBatch>>>,
config: ChainConfig,
cache_set: RwLock<CacheSet<H256>>,
counterparty_client_type_rx: tokio::sync::watch::Receiver<Option<ClientType>>,
counterparty_client_type: ClientType,
}

Expand All @@ -61,24 +61,39 @@ impl Ckb4IbcEventMonitor {
rt: Arc<TokioRuntime>,
rpc_client: Arc<RpcClient>,
config: ChainConfig,
) -> (Self, TxMonitorCmd, Sender<ClientType>) {
counterparty_client_type_rx: tokio::sync::watch::Receiver<Option<ClientType>>,
) -> (Self, TxMonitorCmd) {
let (tx_cmd, rx_cmd) = crossbeam_channel::unbounded();
let (tx_internal, rx_internal) = crossbeam_channel::unbounded();
let monitor = Ckb4IbcEventMonitor {
rt,
rpc_client,
rx_cmd,
rx_internal,
event_bus: EventBus::default(),
config,
cache_set: RwLock::new(CacheSet::new(512)),
counterparty_client_type_rx,
counterparty_client_type: ClientType::Mock,
};
(monitor, TxMonitorCmd::new(tx_cmd), tx_internal)
(monitor, TxMonitorCmd::new(tx_cmd))
}

pub fn run(mut self) {
let rt = self.rt.clone();
// Block here until the counterparty is revealed.
tracing::info!("receiving counterparty client type");
rt.block_on(async {
self.counterparty_client_type = self
.counterparty_client_type_rx
.wait_for(|t| t.is_some())
.await
.expect("counterparty_client_type sender is closed")
// Unwrapping is OK because the value is Some.
.unwrap();
});
tracing::info!(
"received counterparty client type: {}",
self.counterparty_client_type
);
loop {
std::thread::sleep(Duration::from_secs(5));
let result = rt.block_on(self.run_once());
Expand All @@ -96,10 +111,6 @@ impl Ckb4IbcEventMonitor {
MonitorCmd::Subscribe(tx) => tx.send(self.event_bus.subscribe()).unwrap(),
}
}
// block here until the counterparty is revealed
if let Ok(client_type) = self.rx_internal.recv() {
self.counterparty_client_type = client_type;
}
let result = async {
tokio::select! {
batch = self.fetch_channel_events() => batch,
Expand Down

0 comments on commit bdf32f3

Please sign in to comment.