Skip to content

Commit

Permalink
Merge pull request #377 from synapseweb3/feat/resend-ckb-tx
Browse files Browse the repository at this point in the history
refactor: refactor the monitor and transaction assembly
  • Loading branch information
ashuralyk authored Nov 15, 2023
2 parents 9c53ec6 + d622017 commit ef6bace
Show file tree
Hide file tree
Showing 7 changed files with 390 additions and 281 deletions.
126 changes: 79 additions & 47 deletions crates/relayer/src/chain/axon/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use ethers::prelude::*;
use ethers::providers::Middleware;
use ethers::types::Address;
use ibc_relayer_types::Height;
use tokio_stream::StreamExt;
use OwnableIBCHandler as Contract;
use OwnableIBCHandlerEvents as ContractEvents;

Expand All @@ -26,6 +25,7 @@ type Client = Provider<Ws>;

// #[derive(Clone, Debug)]
pub struct AxonEventMonitor {
websocket_addr: WebSocketClientUrl,
client: Arc<Client>,
rt: Arc<TokioRuntime>,
chain_id: ChainId,
Expand Down Expand Up @@ -56,7 +56,7 @@ impl AxonEventMonitor {

let client = rt
.block_on(Provider::<Ws>::connect(websocket_addr.to_string()))
.map_err(|_| Error::client_creation_failed(chain_id.clone(), websocket_addr))?;
.map_err(|_| Error::client_creation_failed(chain_id.clone(), websocket_addr.clone()))?;

let start_block_number = rt
.block_on(client.get_block_number())
Expand All @@ -65,6 +65,7 @@ impl AxonEventMonitor {

let event_bus = EventBus::new();
let monitor = Self {
websocket_addr,
client: Arc::new(client),
rt,
chain_id,
Expand All @@ -78,6 +79,20 @@ impl AxonEventMonitor {
Ok((monitor, TxMonitorCmd::new(tx_cmd)))
}

// XXX: we met a connection error that ethers-rs doesn't reconnect WebSocket if it meets error,
// we just choose to recreate provider mannully to solve connection problem
//
// see: https://github.com/gakonst/ethers-rs/issues/2323
fn new_ws_provider(&mut self) -> Result<Client> {
let client = self
.rt
.block_on(Provider::<Ws>::connect(self.websocket_addr.to_string()))
.map_err(|_| {
Error::client_creation_failed(self.chain_id.clone(), self.websocket_addr.clone())
})?;
Ok(client)
}

#[allow(clippy::while_let_loop)]
#[instrument(
name = "axon_event_monitor",
Expand All @@ -94,18 +109,37 @@ impl AxonEventMonitor {
);
(0..self.reprocess_events.len()).for_each(|_| {
let (event, meta) = self.reprocess_events.remove(0);
self.process_event(event, meta).unwrap_or_else(|e| {
error!("error while process event: {:?}", e);
});
self.process_event(event, meta);
});
let mut contract = Contract::new(self.contract_address, Arc::clone(&self.client));
info!(
"start to fetch IBC events from block {}",
self.start_block_number
);
loop {
if let Next::Abort = self.run_loop() {
break;
std::thread::sleep(Duration::from_secs(1));
match self.run_once(&contract) {
(Next::Abort, _) => break,
(Next::Continue, false) => match self.new_ws_provider() {
Ok(client) => {
// recreate contract when WS connection meets error
self.client = Arc::new(client);
contract =
Contract::new(self.contract_address, Arc::clone(&self.client));
info!(
"restart to fetch IBC events from block {}",
self.start_block_number
);
}
Err(err) => {
error!("restart provider failed: {err}");
}
},
(Next::Continue, true) => {}
}
}
debug!("event monitor is shutting down");
}
// TODO: close client
}

pub fn restore_event_tx_hashes(
Expand Down Expand Up @@ -186,51 +220,50 @@ impl AxonEventMonitor {
Next::Continue
}

fn run_loop(&mut self) -> Next {
const TIMEOUT_MILLIS: u64 = 300;

let contract = Arc::new(Contract::new(
self.contract_address,
Arc::clone(&self.client),
));
info!("listen IBC events from block {}", self.start_block_number);
let events = contract.events().from_block(self.start_block_number);
if let Ok(mut meta_stream) = self.rt.block_on(async {
events.stream().await.map(|stream| {
let meta_stream = stream.with_meta().timeout_repeating(tokio::time::interval(
Duration::from_millis(TIMEOUT_MILLIS),
));
meta_stream
})
}) {
loop {
if let Next::Abort = self.update_subscribe(true) {
return Next::Abort;
}
fn run_once(&mut self, contract: &OwnableIBCHandler<Client>) -> (Next, bool) {
if let Next::Abort = self.update_subscribe(true) {
return (Next::Abort, true);
}

match self.rt.block_on(meta_stream.next()) {
Some(Ok(ret)) => match ret {
Ok((event, meta)) => {
self.process_event(event, meta).unwrap_or_else(|e| {
error!("error while process event: {e:?}");
});
}
Err(err) => {
error!("error when monitoring axon events, reason: {err:?}");
return Next::Continue;
}
},
None => warn!("Axon monitor error report: received None"),
_ => {}
}
let tip_block_number = match self.rt.block_on(contract.client().get_block_number()) {
Ok(tip) => tip.as_u64(),
Err(err) => {
error!("failed to fetch Axon latest block number: {err}");
return (Next::Continue, false);
}
};

if self.start_block_number >= tip_block_number {
return (Next::Continue, true);
}
Next::Abort

let query = contract
.events()
.from_block(self.start_block_number)
.to_block(tip_block_number);
let events = match self.rt.block_on(query.query_with_meta()) {
Ok(events) => events,
Err(err) => {
error!(
"failed to fetch events from block {} to block {tip_block_number}: {err}",
self.start_block_number
);
return (Next::Continue, false);
}
};

events
.into_iter()
.for_each(|(event, meta)| self.process_event(event, meta));

self.start_block_number = tip_block_number + 1;
(Next::Continue, true)
}

fn process_event(&mut self, event: ContractEvents, meta: LogMeta) -> Result<()> {
fn process_event(&mut self, event: ContractEvents, meta: LogMeta) {
println!("\n{}\n[event] = {:?}", self.chain_id, event);
println!("[event_meta] = {:?}\n", meta);

self.start_block_number = meta.block_number.as_u64();
let event = IbcEventWithHeight::new_with_tx_hash(
event.into(),
Expand All @@ -249,7 +282,6 @@ impl AxonEventMonitor {
events: vec![event],
};
self.process_batch(batch);
Ok(())
}

fn process_batch(&mut self, batch: EventBatch) {
Expand Down
Loading

0 comments on commit ef6bace

Please sign in to comment.