Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refator: refator some code of Ckb4Ibc endpoint, solved some fatal bugs #364

Merged
merged 6 commits into from
Nov 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .github/workflows/ibc-packet-trigger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,15 @@ on:
workflow_dispatch:
schedule:
- cron: '0 */2 * * *' # runs every 2 hour
pull_request:
paths:
- .github/workflows/ibc-packet-trigger.yaml
- e2e/schedule/**

jobs:
schedule-run-packet-send:
runs-on: ubuntu-latest
timeout-minutes: 30
env:
# for forcerelay-ckb-sdk
FORCERELAY_CKB_SDK_COMMIT: f02969f920fa48234959fd25c771992a85619eb6
Expand Down
27 changes: 21 additions & 6 deletions crates/relayer/src/chain/axon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -887,7 +887,7 @@ impl ChainEndpoint for AxonChain {
});

let packet_filter = |packet: &contract::PacketData| {
if !sequences.is_empty() && sequences.contains(&Sequence::from(packet.sequence)) {
if !sequences.is_empty() && !sequences.contains(&Sequence::from(packet.sequence)) {
return false;
}
if packet.destination_channel != destination_channel_id.to_string() {
Expand All @@ -909,7 +909,9 @@ impl ChainEndpoint for AxonChain {
WithBlockDataType::CreateClient => logs_iter
.filter_map(|(height, tx_hash, event)| {
if matches!(event, OwnableIBCHandlerEvents::CreateClientFilter(..)) {
ibc_event_from_ibc_handler_event(height, tx_hash, event).unwrap()
ibc_event_from_ibc_handler_event(height, tx_hash, event)
.ok()
.unwrap_or(None)
} else {
None
}
Expand All @@ -918,7 +920,9 @@ impl ChainEndpoint for AxonChain {
WithBlockDataType::UpdateClient => logs_iter
.filter_map(|(height, tx_hash, event)| {
if matches!(event, OwnableIBCHandlerEvents::UpdateClientFilter(..)) {
ibc_event_from_ibc_handler_event(height, tx_hash, event).unwrap()
ibc_event_from_ibc_handler_event(height, tx_hash, event)
.ok()
.unwrap_or(None)
} else {
None
}
Expand All @@ -933,7 +937,9 @@ impl ChainEndpoint for AxonChain {
if !packet_filter(packet) {
return None;
}
ibc_event_from_ibc_handler_event(height, tx_hash, event).unwrap()
ibc_event_from_ibc_handler_event(height, tx_hash, event)
.ok()
.unwrap_or(None)
} else {
None
}
Expand All @@ -948,14 +954,17 @@ impl ChainEndpoint for AxonChain {
if !packet_filter(packet) {
return None;
}
ibc_event_from_ibc_handler_event(height, tx_hash, event).unwrap()
ibc_event_from_ibc_handler_event(height, tx_hash, event)
.ok()
.unwrap_or(None)
} else {
None
}
})
.collect(),
};

tracing::debug!("Axon filtered {} packet events", events.len());
Ok(events)
}

Expand Down Expand Up @@ -1167,7 +1176,7 @@ impl AxonChain {
crate::time!("axon_init_event_monitor");
// let header_receiver = self.light_client.subscribe();
let ibc_cache = self.ibc_cache.clone();
let (event_monitor, monitor_tx) = AxonEventMonitor::new(
let (mut event_monitor, monitor_tx) = AxonEventMonitor::new(
self.config.id.clone(),
self.config.websocket_addr.clone(),
self.config.contract_address,
Expand Down Expand Up @@ -1528,6 +1537,12 @@ impl AxonChain {
};
let mut ibc_cache = self.ibc_cache.write().unwrap();
cache_ics_tx_hash_with_event(&mut ibc_cache, event.clone(), tx_hash);
tracing::info!(
"{} transaciton {} committed to {}",
event.event_type().as_str(),
hex::encode(tx_hash),
self.id()
);
Ok(IbcEventWithHeight {
event,
height,
Expand Down
63 changes: 33 additions & 30 deletions crates/relayer/src/chain/axon/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use crate::event::monitor::{Error, EventBatch, MonitorCmd, Next, Result, TxMonit
use ibc_relayer_types::core::ics24_host::identifier::ChainId;
use tendermint_rpc::WebSocketClientUrl;
use tokio::runtime::Runtime as TokioRuntime;
use tracing::{debug, error, info, instrument};
use tracing::{debug, error, info, instrument, warn};

type Client = Provider<Ws>;

Expand All @@ -34,6 +34,7 @@ pub struct AxonEventMonitor {
rx_cmd: channel::Receiver<MonitorCmd>,
event_bus: EventBus<Arc<Result<EventBatch>>>,
ibc_cache: Arc<RwLock<IBCInfoCache>>,
reprocess_events: Vec<(OwnableIBCHandlerEvents, LogMeta)>,
}

impl AxonEventMonitor {
Expand All @@ -57,13 +58,11 @@ impl AxonEventMonitor {
.block_on(Provider::<Ws>::connect(websocket_addr.to_string()))
.map_err(|_| Error::client_creation_failed(chain_id.clone(), websocket_addr))?;

// FIXME: here should consider recovering from long-time-crash
let start_block_number = rt
.block_on(client.get_block_number())
.map_err(|e| Error::others(e.to_string()))?
.as_u64();

info!("listen IBC events from block {start_block_number}");
let event_bus = EventBus::new();
let monitor = Self {
client: Arc::new(client),
Expand All @@ -74,6 +73,7 @@ impl AxonEventMonitor {
rx_cmd,
event_bus,
ibc_cache,
reprocess_events: vec![],
};
Ok((monitor, TxMonitorCmd::new(tx_cmd)))
}
Expand All @@ -87,7 +87,17 @@ impl AxonEventMonitor {
)]
pub fn run(mut self) {
if let Next::Continue = self.update_subscribe(false) {
info!("start Axon event monitor for {}", self.chain_id);
info!(
"start Axon event monitor for {}, reprocess {} events",
self.chain_id,
self.reprocess_events.len()
);
(0..self.reprocess_events.len()).for_each(|_| {
let (event, meta) = self.reprocess_events.remove(0);
self.process_event(event, meta).unwrap_or_else(|e| {
error!("error while process event: {:?}", e);
});
});
loop {
if let Next::Abort = self.run_loop() {
break;
Expand All @@ -99,7 +109,7 @@ impl AxonEventMonitor {
}

pub fn restore_event_tx_hashes(
&self,
&mut self,
latest_block_count: u64,
) -> Result<Vec<IbcEventWithHeight>> {
let contract = Arc::new(Contract::new(
Expand All @@ -113,6 +123,13 @@ impl AxonEventMonitor {
"latest_block_count {latest_block_count} exceeds start_block_number {}",
self.start_block_number
)))?;
let event_filter = |event: &OwnableIBCHandlerEvents| {
matches!(
event,
OwnableIBCHandlerEvents::SendPacketFilter(_)
| OwnableIBCHandlerEvents::WriteAcknowledgementFilter(_)
)
};
let events = self
.rt
.block_on(
Expand All @@ -125,6 +142,9 @@ impl AxonEventMonitor {
.map_err(|e| Error::others(e.to_string()))?
.into_iter()
.map(|(event, meta)| {
if event_filter(&event) {
self.reprocess_events.push((event.clone(), meta.clone()));
}
IbcEventWithHeight::new_with_tx_hash(
event.into(),
Height::from_noncosmos_height(meta.block_number.as_u64()),
Expand Down Expand Up @@ -173,6 +193,7 @@ impl AxonEventMonitor {
self.contract_address,
Arc::clone(&self.client),
));
info!("listen IBC events from block {}", self.start_block_number);
let events = contract.events().from_block(self.start_block_number);
if let Ok(mut meta_stream) = self.rt.block_on(async {
events.stream().await.map(|stream| {
Expand All @@ -182,44 +203,26 @@ impl AxonEventMonitor {
meta_stream
})
}) {
debug!("setup IBC contract events streaming process");
loop {
if let Next::Abort = self.update_subscribe(true) {
return Next::Abort;
}

if let Some(Ok(ret)) = self.rt.block_on(meta_stream.next()) {
match ret {
match self.rt.block_on(meta_stream.next()) {
Some(Ok(ret)) => match ret {
Ok((event, meta)) => {
self.process_event(event, meta).unwrap_or_else(|e| {
error!("error while process event: {:?}", e);
error!("error while process event: {e:?}");
});
}
Err(err) => {
error!("error when monitoring axon events, reason: {:?}", err);
error!("error when monitoring axon events, reason: {err:?}");
return Next::Continue;
// TODO: reconnect
}
}
},
None => warn!("Axon monitor error report: received None"),
_ => {}
}

// Some(header) = self.header_receiver.recv() => {
// if let Next::Abort = self.update_subscribe() {
// return Next::Abort;
// }
// let height = Height::new(0u64, header.number).expect("axon header height");
// let event = IbcEventWithHeight::new(
// events::NewBlock::new(height).into(),
// height,
// );
// let batch = EventBatch {
// chain_id: self.chain_id.clone(),
// tracking_id: TrackingId::new_uuid(),
// height,
// events: vec![event],
// };
// self.process_batch(batch);
// },
}
}
Next::Abort
Expand Down
Loading
Loading