Skip to content

Commit

Permalink
chore: follow comments in PR
Browse files Browse the repository at this point in the history
  • Loading branch information
liyukun committed Nov 1, 2023
1 parent 18bb928 commit ed31f6d
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 38 deletions.
4 changes: 4 additions & 0 deletions .github/workflows/ibc-packet-trigger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ on:
workflow_dispatch:
schedule:
- cron: '0 */2 * * *' # runs every 2 hour
pull_request:
paths:
- .github/workflows/ibc-packet-trigger.yaml
- e2e/schedule/**

jobs:
schedule-run-packet-send:
Expand Down
2 changes: 1 addition & 1 deletion crates/relayer/src/chain/axon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -783,7 +783,7 @@ impl ChainEndpoint for AxonChain {
.call(),
)
.map_err(convert_err)?;
if !found {
if found {
sequences.push(seq);
}
}
Expand Down
61 changes: 25 additions & 36 deletions crates/relayer/src/chain/ckb4ibc/monitor.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::HashMap;
use std::collections::{BTreeMap, HashMap};
use std::str::FromStr;
use std::sync::{Arc, RwLock};
use std::time::Duration;
Expand Down Expand Up @@ -29,7 +29,6 @@ use ibc_relayer_types::events::IbcEvent;
use ibc_relayer_types::timestamp::Timestamp;
use tokio::runtime::Runtime as TokioRuntime;
use tokio::sync::watch::Receiver as WatchReceiver;
use tokio::sync::Mutex;
use tracing::{error, info};

use crate::chain::ckb::prelude::CkbReader;
Expand Down Expand Up @@ -70,8 +69,8 @@ pub struct Ckb4IbcEventMonitor {
cache_set: RwLock<CacheSet<H256>>,
counterparty_client_type_rx: WatchReceiver<Option<ClientType>>,
counterparty_client_type: ClientType,
fetch_cursors: Mutex<HashMap<IbcProtocolType, JsonBytes>>,
useless_write_ack_packets: Mutex<HashMap<u64, (IbcPacket, CellInput)>>,
fetch_cursors: HashMap<IbcProtocolType, JsonBytes>,
useless_write_ack_packets: BTreeMap<u64, (IbcPacket, CellInput)>,
}

impl Ckb4IbcEventMonitor {
Expand All @@ -93,8 +92,8 @@ impl Ckb4IbcEventMonitor {
cache_set: RwLock::new(CacheSet::new(512)),
counterparty_client_type_rx,
counterparty_client_type: ClientType::Mock,
fetch_cursors: Mutex::new(HashMap::new()),
useless_write_ack_packets: Mutex::new(HashMap::new()),
fetch_cursors: HashMap::new(),
useless_write_ack_packets: BTreeMap::new(),
};
(monitor, TxMonitorCmd::new(tx_cmd), tx_write_ack)
}
Expand Down Expand Up @@ -125,24 +124,16 @@ impl Ckb4IbcEventMonitor {
}
}

async fn require_useless_write_ack_packet(&self) -> Result<()> {
async fn handle_get_useless_write_ack_packet(&mut self) -> Result<()> {
if let Ok((resposne, block_number_gap)) = self.rx_write_ack.try_recv() {
let useless_key = self
.useless_write_ack_packets
.lock()
.await
.keys()
.next()
.cloned();
let useless_key = self.useless_write_ack_packets.keys().next().cloned();
if let Some(block_number) = useless_key {
let tip_block_number = tip_block_number(self.rpc_client.as_ref())
.await
.map_err(|err| Error::others(err.detail().to_string()))?;
if block_number + block_number_gap < tip_block_number {
let (packet, input) = self
.useless_write_ack_packets
.lock()
.await
.remove(&block_number)
.unwrap();
resposne.send(Some((packet, input))).unwrap();
Expand All @@ -160,23 +151,24 @@ impl Ckb4IbcEventMonitor {
MonitorCmd::Subscribe(tx) => tx.send(self.event_bus.subscribe()).unwrap(),
}
}
let futs = tokio::join!(
self.fetch_channel_events(),
self.fetch_connection_events(),
self.fetch_packet_events(),
);
self.process_batch(futs.0);
self.process_batch(futs.1);
self.process_batch(futs.2);

if let Err(err) = self.require_useless_write_ack_packet().await {
// 'mut self' cannot be used in tokio::join macro, it can only be handled in sequence
let connection_events = self.fetch_connection_events().await;
let channel_events = self.fetch_channel_events().await;
let packet_events = self.fetch_packet_events().await;

self.process_batch(connection_events);
self.process_batch(channel_events);
self.process_batch(packet_events);

if let Err(err) = self.handle_get_useless_write_ack_packet().await {
error!("{err}");
}

Next::Continue
}

async fn fetch_connection_events(&self) -> Result<EventBatch> {
async fn fetch_connection_events(&mut self) -> Result<EventBatch> {
let connection_code_hash = get_script_hash(&self.config.connection_type_args);
let client_type_hash = self
.config
Expand Down Expand Up @@ -285,7 +277,7 @@ impl Ckb4IbcEventMonitor {
})
}

async fn fetch_channel_events(&self) -> Result<EventBatch> {
async fn fetch_channel_events(&mut self) -> Result<EventBatch> {
let client_id = self
.config
.lc_client_type_hash(self.counterparty_client_type)
Expand Down Expand Up @@ -380,7 +372,7 @@ impl Ckb4IbcEventMonitor {
})
}

async fn fetch_packet_events(&self) -> Result<EventBatch> {
async fn fetch_packet_events(&mut self) -> Result<EventBatch> {
let script = Script::new_builder()
.code_hash(get_script_hash(&self.config.packet_type_args))
.hash_type(ScriptHashType::Type.into())
Expand All @@ -404,7 +396,7 @@ impl Ckb4IbcEventMonitor {
.await
.map_err(|err| Error::others(err.detail().to_string()))?;

let mut useless_packets = self.useless_write_ack_packets.lock().await;
let useless_packets = &mut self.useless_write_ack_packets;
let events = ibc_packets
.into_iter()
.filter(|(((packet, _), tx), _)| {
Expand Down Expand Up @@ -474,7 +466,7 @@ impl Ckb4IbcEventMonitor {
}

async fn search_and_extract<T, F>(
&self,
&mut self,
search_key: SearchKey,
extractor: &F,
limit: u32,
Expand All @@ -483,7 +475,7 @@ impl Ckb4IbcEventMonitor {
where
F: Fn(TransactionView) -> Result<(T, H256)>,
{
let cursor = self.fetch_cursors.lock().await.get(&ibc_protocol).cloned();
let cursor = self.fetch_cursors.get(&ibc_protocol).cloned();
let cells = self
.rpc_client
.fetch_live_cells(search_key, limit, cursor)
Expand Down Expand Up @@ -530,12 +522,9 @@ impl Ckb4IbcEventMonitor {
}

if cells.objects.is_empty() {
self.fetch_cursors.lock().await.remove(&ibc_protocol);
self.fetch_cursors.remove(&ibc_protocol);
} else {
self.fetch_cursors
.lock()
.await
.insert(ibc_protocol, cells.last_cursor);
self.fetch_cursors.insert(ibc_protocol, cells.last_cursor);
}
Ok(result)
}
Expand Down
2 changes: 1 addition & 1 deletion tools/ibc-test/contracts/version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
Built at e8e4a12bab6fb7e665f6b2556896f181ffd0b1c0 (ibc-sudt-transfer)
Built at 78c45fb7df36a07276866263d8d509469a4bcde4

0 comments on commit ed31f6d

Please sign in to comment.