Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: refactor the monitor and transaction assembly #377

Merged
merged 8 commits into from
Nov 15, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 32 additions & 9 deletions crates/relayer/src/chain/axon/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type Client = Provider<Ws>;

// #[derive(Clone, Debug)]
pub struct AxonEventMonitor {
websocket_addr: WebSocketClientUrl,
client: Arc<Client>,
rt: Arc<TokioRuntime>,
chain_id: ChainId,
Expand Down Expand Up @@ -55,7 +56,7 @@ impl AxonEventMonitor {

let client = rt
.block_on(Provider::<Ws>::connect(websocket_addr.to_string()))
.map_err(|_| Error::client_creation_failed(chain_id.clone(), websocket_addr))?;
.map_err(|_| Error::client_creation_failed(chain_id.clone(), websocket_addr.clone()))?;

let start_block_number = rt
.block_on(client.get_block_number())
Expand All @@ -64,6 +65,7 @@ impl AxonEventMonitor {

let event_bus = EventBus::new();
let monitor = Self {
websocket_addr,
client: Arc::new(client),
rt,
chain_id,
Expand All @@ -77,6 +79,20 @@ impl AxonEventMonitor {
Ok((monitor, TxMonitorCmd::new(tx_cmd)))
}

// XXX: we met a connection error that ethers-rs doesn't reconnection if it meets error,
ashuralyk marked this conversation as resolved.
Show resolved Hide resolved
// we just choose to recreate provider mannully to solve connection problem
//
// see: https://github.com/gakonst/ethers-rs/issues/2323
fn new_ws_provider(&mut self) -> Result<Client> {
let client = self
.rt
.block_on(Provider::<Ws>::connect(self.websocket_addr.to_string()))
.map_err(|_| {
Error::client_creation_failed(self.chain_id.clone(), self.websocket_addr.clone())
})?;
Ok(client)
}

#[allow(clippy::while_let_loop)]
#[instrument(
name = "axon_event_monitor",
Expand Down Expand Up @@ -104,14 +120,21 @@ impl AxonEventMonitor {
std::thread::sleep(Duration::from_secs(1));
match self.run_once(&contract) {
(Next::Abort, _) => break,
(Next::Continue, false) => {
// recreate contract when WS connection meets error
contract = Contract::new(self.contract_address, Arc::clone(&self.client));
info!(
"re-start to fetch IBC events from block {}",
self.start_block_number
);
}
(Next::Continue, false) => match self.new_ws_provider() {
Ok(client) => {
// recreate contract when WS connection meets error
self.client = Arc::new(client);
contract =
Contract::new(self.contract_address, Arc::clone(&self.client));
info!(
"restart to fetch IBC events from block {}",
self.start_block_number
);
}
Err(err) => {
error!("restart provider failed: {err}");
}
},
(Next::Continue, true) => {}
}
}
Expand Down
4 changes: 4 additions & 0 deletions crates/relayer/src/chain/ckb4ibc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -698,6 +698,7 @@ impl ChainEndpoint for Ckb4IbcChain {
.block_on(self.rpc_client.send_transaction(&tx.inner, None))
{
Ok(tx_hash) => {
// TODO: put confirms count into config
let confirms = 3;
ashuralyk marked this conversation as resolved.
Show resolved Hide resolved
info!(
"{msg_type:?} transaction {} committed to {}, wait {confirms} blocks confirmation",
Expand Down Expand Up @@ -752,6 +753,9 @@ impl ChainEndpoint for Ckb4IbcChain {
Ok(result_events)
}

// FIXME: this method should be in non-blocking mode, but we can't be confident it
// won't leave a protential issue if we do so, and working in blocking mode
// is ok for now, so leave this comment to fix in upcomming days
fn send_messages_and_wait_check_tx(
&mut self,
tracked_msgs: TrackedMsgs,
Expand Down
Loading