Skip to content

Commit

Permalink
refactor: refactor Axon monitor
Browse files Browse the repository at this point in the history
  • Loading branch information
liyukun committed Nov 8, 2023
1 parent e34cdd1 commit f70daf6
Showing 1 changed file with 44 additions and 45 deletions.
89 changes: 44 additions & 45 deletions crates/relayer/src/chain/axon/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use ethers::prelude::*;
use ethers::providers::Middleware;
use ethers::types::Address;
use ibc_relayer_types::Height;
use tokio_stream::StreamExt;
use OwnableIBCHandler as Contract;
use OwnableIBCHandlerEvents as ContractEvents;

Expand Down Expand Up @@ -94,18 +93,21 @@ impl AxonEventMonitor {
);
(0..self.reprocess_events.len()).for_each(|_| {
let (event, meta) = self.reprocess_events.remove(0);
self.process_event(event, meta).unwrap_or_else(|e| {
error!("error while process event: {:?}", e);
});
self.process_event(event, meta);
});
let contract = Contract::new(self.contract_address, Arc::clone(&self.client));
info!(
"start to fetch IBC events from block {}",
self.start_block_number
);
loop {
if let Next::Abort = self.run_loop() {
std::thread::sleep(Duration::from_secs(1));
if let Next::Abort = self.run_once(&contract) {
break;
}
}
debug!("event monitor is shutting down");
}
// TODO: close client
}

pub fn restore_event_tx_hashes(
Expand Down Expand Up @@ -186,51 +188,49 @@ impl AxonEventMonitor {
Next::Continue
}

fn run_loop(&mut self) -> Next {
const TIMEOUT_MILLIS: u64 = 300;

let contract = Arc::new(Contract::new(
self.contract_address,
Arc::clone(&self.client),
));
info!("listen IBC events from block {}", self.start_block_number);
let events = contract.events().from_block(self.start_block_number);
if let Ok(mut meta_stream) = self.rt.block_on(async {
events.stream().await.map(|stream| {
let meta_stream = stream.with_meta().timeout_repeating(tokio::time::interval(
Duration::from_millis(TIMEOUT_MILLIS),
));
meta_stream
})
}) {
loop {
if let Next::Abort = self.update_subscribe(true) {
return Next::Abort;
}
fn run_once(&mut self, contract: &OwnableIBCHandler<Client>) -> Next {
if let Next::Abort = self.update_subscribe(true) {
return Next::Abort;
}

match self.rt.block_on(meta_stream.next()) {
Some(Ok(ret)) => match ret {
Ok((event, meta)) => {
self.process_event(event, meta).unwrap_or_else(|e| {
error!("error while process event: {e:?}");
});
}
Err(err) => {
error!("error when monitoring axon events, reason: {err:?}");
return Next::Continue;
}
},
None => warn!("Axon monitor error report: received None"),
_ => {}
}
let tip_block_number = match self.rt.block_on(contract.client().get_block_number()) {
Ok(tip) => tip.as_u64(),
Err(err) => {
error!("failed to fetch Axon latest block number: {err}");
return Next::Continue;
}
};

if self.start_block_number >= tip_block_number {
return Next::Continue;
}
Next::Abort

let query = contract
.events()
.from_block(self.start_block_number)
.to_block(tip_block_number);
let events = match self.rt.block_on(query.query_with_meta()) {
Ok(events) => events,
Err(err) => {
error!(
"failed to fetch events from block {} to block {tip_block_number}: {err}",
self.start_block_number
);
return Next::Continue;
}
};

events
.into_iter()
.for_each(|(event, meta)| self.process_event(event, meta));

Next::Continue
}

fn process_event(&mut self, event: ContractEvents, meta: LogMeta) -> Result<()> {
fn process_event(&mut self, event: ContractEvents, meta: LogMeta) {
println!("\n{}\n[event] = {:?}", self.chain_id, event);
println!("[event_meta] = {:?}\n", meta);

self.start_block_number = meta.block_number.as_u64();
let event = IbcEventWithHeight::new_with_tx_hash(
event.into(),
Expand All @@ -249,7 +249,6 @@ impl AxonEventMonitor {
events: vec![event],
};
self.process_batch(batch);
Ok(())
}

fn process_batch(&mut self, batch: EventBatch) {
Expand Down

0 comments on commit f70daf6

Please sign in to comment.