Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: refactor the monitor and transaction assembly #377

Merged
merged 8 commits into from
Nov 15, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 55 additions & 46 deletions crates/relayer/src/chain/axon/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use ethers::prelude::*;
use ethers::providers::Middleware;
use ethers::types::Address;
use ibc_relayer_types::Height;
use tokio_stream::StreamExt;
use OwnableIBCHandler as Contract;
use OwnableIBCHandlerEvents as ContractEvents;

Expand Down Expand Up @@ -94,18 +93,30 @@ impl AxonEventMonitor {
);
(0..self.reprocess_events.len()).for_each(|_| {
let (event, meta) = self.reprocess_events.remove(0);
self.process_event(event, meta).unwrap_or_else(|e| {
error!("error while process event: {:?}", e);
});
self.process_event(event, meta);
});
let mut contract = Contract::new(self.contract_address, Arc::clone(&self.client));
info!(
"start to fetch IBC events from block {}",
self.start_block_number
);
loop {
if let Next::Abort = self.run_loop() {
break;
std::thread::sleep(Duration::from_secs(1));
match self.run_once(&contract) {
(Next::Abort, _) => break,
(Next::Continue, false) => {
// recreate contract when WS connection meets error
contract = Contract::new(self.contract_address, Arc::clone(&self.client));
info!(
"re-start to fetch IBC events from block {}",
self.start_block_number
);
}
(Next::Continue, true) => {}
}
}
debug!("event monitor is shutting down");
}
// TODO: close client
}

pub fn restore_event_tx_hashes(
Expand Down Expand Up @@ -186,51 +197,50 @@ impl AxonEventMonitor {
Next::Continue
}

fn run_loop(&mut self) -> Next {
const TIMEOUT_MILLIS: u64 = 300;

let contract = Arc::new(Contract::new(
self.contract_address,
Arc::clone(&self.client),
));
info!("listen IBC events from block {}", self.start_block_number);
let events = contract.events().from_block(self.start_block_number);
if let Ok(mut meta_stream) = self.rt.block_on(async {
events.stream().await.map(|stream| {
let meta_stream = stream.with_meta().timeout_repeating(tokio::time::interval(
Duration::from_millis(TIMEOUT_MILLIS),
));
meta_stream
})
}) {
loop {
if let Next::Abort = self.update_subscribe(true) {
return Next::Abort;
}
fn run_once(&mut self, contract: &OwnableIBCHandler<Client>) -> (Next, bool) {
if let Next::Abort = self.update_subscribe(true) {
return (Next::Abort, true);
}

match self.rt.block_on(meta_stream.next()) {
Some(Ok(ret)) => match ret {
Ok((event, meta)) => {
self.process_event(event, meta).unwrap_or_else(|e| {
error!("error while process event: {e:?}");
});
}
Err(err) => {
error!("error when monitoring axon events, reason: {err:?}");
return Next::Continue;
}
},
None => warn!("Axon monitor error report: received None"),
_ => {}
}
let tip_block_number = match self.rt.block_on(contract.client().get_block_number()) {
Ok(tip) => tip.as_u64(),
Err(err) => {
error!("failed to fetch Axon latest block number: {err}");
return (Next::Continue, false);
}
};

if self.start_block_number >= tip_block_number {
return (Next::Continue, true);
}
Next::Abort

let query = contract
.events()
.from_block(self.start_block_number)
.to_block(tip_block_number);
let events = match self.rt.block_on(query.query_with_meta()) {
Ok(events) => events,
Err(err) => {
error!(
"failed to fetch events from block {} to block {tip_block_number}: {err}",
self.start_block_number
);
return (Next::Continue, false);
}
};

events
.into_iter()
.for_each(|(event, meta)| self.process_event(event, meta));

self.start_block_number = tip_block_number + 1;
(Next::Continue, true)
}

fn process_event(&mut self, event: ContractEvents, meta: LogMeta) -> Result<()> {
fn process_event(&mut self, event: ContractEvents, meta: LogMeta) {
println!("\n{}\n[event] = {:?}", self.chain_id, event);
println!("[event_meta] = {:?}\n", meta);

self.start_block_number = meta.block_number.as_u64();
let event = IbcEventWithHeight::new_with_tx_hash(
event.into(),
Expand All @@ -249,7 +259,6 @@ impl AxonEventMonitor {
events: vec![event],
};
self.process_batch(batch);
Ok(())
}

fn process_batch(&mut self, batch: EventBatch) {
Expand Down
Loading
Loading