Skip to content

Commit

Permalink
refator: refator some code of Ckb4Ibc endpoint, solved some fatal bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
liyukun committed Nov 1, 2023
1 parent 132bf3b commit 1c19889
Show file tree
Hide file tree
Showing 18 changed files with 604 additions and 289 deletions.
29 changes: 22 additions & 7 deletions crates/relayer/src/chain/axon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -781,7 +781,7 @@ impl ChainEndpoint for AxonChain {
.call(),
)
.map_err(convert_err)?;
if found {
if !found {
sequences.push(seq);
}
}
Expand Down Expand Up @@ -885,7 +885,7 @@ impl ChainEndpoint for AxonChain {
});

let packet_filter = |packet: &contract::PacketData| {
if !sequences.is_empty() && sequences.contains(&Sequence::from(packet.sequence)) {
if !sequences.is_empty() && !sequences.contains(&Sequence::from(packet.sequence)) {
return false;
}
if packet.destination_channel != destination_channel_id.to_string() {
Expand All @@ -907,7 +907,9 @@ impl ChainEndpoint for AxonChain {
WithBlockDataType::CreateClient => logs_iter
.filter_map(|(height, tx_hash, event)| {
if matches!(event, OwnableIBCHandlerEvents::CreateClientFilter(..)) {
ibc_event_from_ibc_handler_event(height, tx_hash, event).unwrap()
ibc_event_from_ibc_handler_event(height, tx_hash, event)
.ok()
.unwrap_or(None)
} else {
None
}
Expand All @@ -916,7 +918,9 @@ impl ChainEndpoint for AxonChain {
WithBlockDataType::UpdateClient => logs_iter
.filter_map(|(height, tx_hash, event)| {
if matches!(event, OwnableIBCHandlerEvents::UpdateClientFilter(..)) {
ibc_event_from_ibc_handler_event(height, tx_hash, event).unwrap()
ibc_event_from_ibc_handler_event(height, tx_hash, event)
.ok()
.unwrap_or(None)
} else {
None
}
Expand All @@ -931,7 +935,9 @@ impl ChainEndpoint for AxonChain {
if !packet_filter(packet) {
return None;
}
ibc_event_from_ibc_handler_event(height, tx_hash, event).unwrap()
ibc_event_from_ibc_handler_event(height, tx_hash, event)
.ok()
.unwrap_or(None)
} else {
None
}
Expand All @@ -946,14 +952,17 @@ impl ChainEndpoint for AxonChain {
if !packet_filter(packet) {
return None;
}
ibc_event_from_ibc_handler_event(height, tx_hash, event).unwrap()
ibc_event_from_ibc_handler_event(height, tx_hash, event)
.ok()
.unwrap_or(None)
} else {
None
}
})
.collect(),
};

tracing::debug!("Axon filtered {} packet events", events.len());
Ok(events)
}

Expand Down Expand Up @@ -1166,7 +1175,7 @@ impl AxonChain {
fn init_event_monitor(&mut self) -> Result<TxMonitorCmd, Error> {
crate::time!("axon_init_event_monitor");
// let header_receiver = self.light_client.subscribe();
let (event_monitor, monitor_tx) = AxonEventMonitor::new(
let (mut event_monitor, monitor_tx) = AxonEventMonitor::new(
self.config.id.clone(),
self.config.websocket_addr.clone(),
self.config.contract_address,
Expand Down Expand Up @@ -1524,6 +1533,12 @@ impl AxonChain {
Height::from_noncosmos_height(block_height.as_u64())
};
cache_ics_tx_hash_with_event(self.id(), event.clone(), tx_hash);
tracing::info!(
"{} transaciton {} committed to {}",
event.event_type().as_str(),
hex::encode(tx_hash),
self.id()
);
Ok(IbcEventWithHeight {
event,
height,
Expand Down
26 changes: 24 additions & 2 deletions crates/relayer/src/chain/axon/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ pub struct AxonEventMonitor {
start_block_number: u64,
rx_cmd: channel::Receiver<MonitorCmd>,
event_bus: EventBus<Arc<Result<EventBatch>>>,
reprocess_events: Vec<(OwnableIBCHandlerEvents, LogMeta)>,
}

impl AxonEventMonitor {
Expand Down Expand Up @@ -71,6 +72,7 @@ impl AxonEventMonitor {
start_block_number,
rx_cmd,
event_bus,
reprocess_events: vec![],
};
Ok((monitor, TxMonitorCmd::new(tx_cmd)))
}
Expand All @@ -84,7 +86,17 @@ impl AxonEventMonitor {
)]
pub fn run(mut self) {
if let Next::Continue = self.update_subscribe(false) {
info!("start Axon event monitor for {}", self.chain_id);
info!(
"start Axon event monitor for {}, reprocess {} events",
self.chain_id,
self.reprocess_events.len()
);
(0..self.reprocess_events.len()).for_each(|_| {
let (event, meta) = self.reprocess_events.remove(0);
self.process_event(event, meta).unwrap_or_else(|e| {
error!("error while process event: {:?}", e);
});
});
loop {
if let Next::Abort = self.run_loop() {
break;
Expand All @@ -96,7 +108,7 @@ impl AxonEventMonitor {
}

pub fn restore_event_tx_hashes(
&self,
&mut self,
latest_block_count: u64,
) -> Result<Vec<IbcEventWithHeight>> {
let contract = Arc::new(Contract::new(
Expand All @@ -110,6 +122,13 @@ impl AxonEventMonitor {
"latest_block_count {latest_block_count} exceeds start_block_number {}",
self.start_block_number
)))?;
let event_filter = |event: &OwnableIBCHandlerEvents| {
matches!(
event,
OwnableIBCHandlerEvents::SendPacketFilter(_)
| OwnableIBCHandlerEvents::WriteAcknowledgementFilter(_)
)
};
let events = self
.rt
.block_on(
Expand All @@ -122,6 +141,9 @@ impl AxonEventMonitor {
.map_err(|e| Error::others(e.to_string()))?
.into_iter()
.map(|(event, meta)| {
if event_filter(&event) {
self.reprocess_events.push((event.clone(), meta.clone()));
}
IbcEventWithHeight::new_with_tx_hash(
event.into(),
Height::from_noncosmos_height(meta.block_number.as_u64()),
Expand Down
Loading

0 comments on commit 1c19889

Please sign in to comment.