diff --git a/crates/relayer/src/chain/axon/monitor.rs b/crates/relayer/src/chain/axon/monitor.rs index 5e92c88c5..a983b0c6f 100644 --- a/crates/relayer/src/chain/axon/monitor.rs +++ b/crates/relayer/src/chain/axon/monitor.rs @@ -11,7 +11,6 @@ use ethers::prelude::*; use ethers::providers::Middleware; use ethers::types::Address; use ibc_relayer_types::Height; -use tokio_stream::StreamExt; use OwnableIBCHandler as Contract; use OwnableIBCHandlerEvents as ContractEvents; @@ -26,6 +25,7 @@ type Client = Provider; // #[derive(Clone, Debug)] pub struct AxonEventMonitor { + websocket_addr: WebSocketClientUrl, client: Arc, rt: Arc, chain_id: ChainId, @@ -56,7 +56,7 @@ impl AxonEventMonitor { let client = rt .block_on(Provider::::connect(websocket_addr.to_string())) - .map_err(|_| Error::client_creation_failed(chain_id.clone(), websocket_addr))?; + .map_err(|_| Error::client_creation_failed(chain_id.clone(), websocket_addr.clone()))?; let start_block_number = rt .block_on(client.get_block_number()) @@ -65,6 +65,7 @@ impl AxonEventMonitor { let event_bus = EventBus::new(); let monitor = Self { + websocket_addr, client: Arc::new(client), rt, chain_id, @@ -78,6 +79,20 @@ impl AxonEventMonitor { Ok((monitor, TxMonitorCmd::new(tx_cmd))) } + // XXX: we met a connection error that ethers-rs doesn't reconnect WebSocket if it meets error, + // we just choose to recreate provider mannully to solve connection problem + // + // see: https://github.com/gakonst/ethers-rs/issues/2323 + fn new_ws_provider(&mut self) -> Result { + let client = self + .rt + .block_on(Provider::::connect(self.websocket_addr.to_string())) + .map_err(|_| { + Error::client_creation_failed(self.chain_id.clone(), self.websocket_addr.clone()) + })?; + Ok(client) + } + #[allow(clippy::while_let_loop)] #[instrument( name = "axon_event_monitor", @@ -94,18 +109,37 @@ impl AxonEventMonitor { ); (0..self.reprocess_events.len()).for_each(|_| { let (event, meta) = self.reprocess_events.remove(0); - self.process_event(event, meta).unwrap_or_else(|e| { - error!("error while process event: {:?}", e); - }); + self.process_event(event, meta); }); + let mut contract = Contract::new(self.contract_address, Arc::clone(&self.client)); + info!( + "start to fetch IBC events from block {}", + self.start_block_number + ); loop { - if let Next::Abort = self.run_loop() { - break; + std::thread::sleep(Duration::from_secs(1)); + match self.run_once(&contract) { + (Next::Abort, _) => break, + (Next::Continue, false) => match self.new_ws_provider() { + Ok(client) => { + // recreate contract when WS connection meets error + self.client = Arc::new(client); + contract = + Contract::new(self.contract_address, Arc::clone(&self.client)); + info!( + "restart to fetch IBC events from block {}", + self.start_block_number + ); + } + Err(err) => { + error!("restart provider failed: {err}"); + } + }, + (Next::Continue, true) => {} } } debug!("event monitor is shutting down"); } - // TODO: close client } pub fn restore_event_tx_hashes( @@ -186,51 +220,50 @@ impl AxonEventMonitor { Next::Continue } - fn run_loop(&mut self) -> Next { - const TIMEOUT_MILLIS: u64 = 300; - - let contract = Arc::new(Contract::new( - self.contract_address, - Arc::clone(&self.client), - )); - info!("listen IBC events from block {}", self.start_block_number); - let events = contract.events().from_block(self.start_block_number); - if let Ok(mut meta_stream) = self.rt.block_on(async { - events.stream().await.map(|stream| { - let meta_stream = stream.with_meta().timeout_repeating(tokio::time::interval( - Duration::from_millis(TIMEOUT_MILLIS), - )); - meta_stream - }) - }) { - loop { - if let Next::Abort = self.update_subscribe(true) { - return Next::Abort; - } + fn run_once(&mut self, contract: &OwnableIBCHandler) -> (Next, bool) { + if let Next::Abort = self.update_subscribe(true) { + return (Next::Abort, true); + } - match self.rt.block_on(meta_stream.next()) { - Some(Ok(ret)) => match ret { - Ok((event, meta)) => { - self.process_event(event, meta).unwrap_or_else(|e| { - error!("error while process event: {e:?}"); - }); - } - Err(err) => { - error!("error when monitoring axon events, reason: {err:?}"); - return Next::Continue; - } - }, - None => warn!("Axon monitor error report: received None"), - _ => {} - } + let tip_block_number = match self.rt.block_on(contract.client().get_block_number()) { + Ok(tip) => tip.as_u64(), + Err(err) => { + error!("failed to fetch Axon latest block number: {err}"); + return (Next::Continue, false); } + }; + + if self.start_block_number >= tip_block_number { + return (Next::Continue, true); } - Next::Abort + + let query = contract + .events() + .from_block(self.start_block_number) + .to_block(tip_block_number); + let events = match self.rt.block_on(query.query_with_meta()) { + Ok(events) => events, + Err(err) => { + error!( + "failed to fetch events from block {} to block {tip_block_number}: {err}", + self.start_block_number + ); + return (Next::Continue, false); + } + }; + + events + .into_iter() + .for_each(|(event, meta)| self.process_event(event, meta)); + + self.start_block_number = tip_block_number + 1; + (Next::Continue, true) } - fn process_event(&mut self, event: ContractEvents, meta: LogMeta) -> Result<()> { + fn process_event(&mut self, event: ContractEvents, meta: LogMeta) { println!("\n{}\n[event] = {:?}", self.chain_id, event); println!("[event_meta] = {:?}\n", meta); + self.start_block_number = meta.block_number.as_u64(); let event = IbcEventWithHeight::new_with_tx_hash( event.into(), @@ -249,7 +282,6 @@ impl AxonEventMonitor { events: vec![event], }; self.process_batch(batch); - Ok(()) } fn process_batch(&mut self, batch: EventBatch) { diff --git a/crates/relayer/src/chain/ckb4ibc.rs b/crates/relayer/src/chain/ckb4ibc.rs index 99c84dd55..93f3375a8 100644 --- a/crates/relayer/src/chain/ckb4ibc.rs +++ b/crates/relayer/src/chain/ckb4ibc.rs @@ -22,7 +22,7 @@ use crate::keyring::{KeyRing, Secp256k1KeyPair}; use crate::misbehaviour::MisbehaviourEvidence; use ckb_ics_axon::handler::{IbcChannel, IbcConnections, IbcPacket, PacketStatus}; -use ckb_ics_axon::message::Envelope; +use ckb_ics_axon::message::{Envelope, MsgType}; use ckb_ics_axon::object::Ordering; use ckb_ics_axon::{ChannelArgs, PacketArgs}; use ckb_jsonrpc_types::{Status, TransactionView}; @@ -36,6 +36,7 @@ use ckb_types::molecule::prelude::Entity; use ckb_types::packed::{CellInput, OutPoint, Script, WitnessArgs}; use ckb_types::prelude::{Builder, Pack, Unpack}; use futures::TryFutureExt; +use ibc_proto::google::protobuf::Any; use ibc_proto::ibc::apps::fee::v1::{ QueryIncentivizedPacketRequest, QueryIncentivizedPacketResponse, }; @@ -108,6 +109,10 @@ pub mod utils; pub use utils::keccak256; +type ConnectionCache = + HashMap)>; +type PacketInputData = HashMap<(ChannelId, PortId, Sequence), (CellInput, u64)>; + pub struct Ckb4IbcChain { rt: Arc, rpc_client: Arc, @@ -127,12 +132,8 @@ pub struct Ckb4IbcChain { client_outpoints: RefCell>, channel_input_data: RefCell>, channel_cache: RefCell>, - #[allow(clippy::type_complexity)] - connection_cache: RefCell< - HashMap)>, - >, - #[allow(clippy::type_complexity)] - packet_input_data: RefCell>, + connection_cache: RefCell, + packet_input_data: RefCell, packet_cache: RefCell>, } @@ -179,17 +180,7 @@ impl Ckb4IbcChain { } Ok(Converter { write_ack_cmd: &self.tx_write_ack_cmd, - channel_input_data: self.channel_input_data.borrow(), - channel_cache: self.channel_cache.borrow(), - config: &self.config, - connection_cache: self.connection_cache.borrow(), - client_outpoints: self.client_outpoints.borrow(), - packet_input_data: self.packet_input_data.borrow(), - packet_cache: self.packet_cache.borrow(), - chan_contract_outpoint: &self.channel_outpoint, - packet_contract_outpoint: &self.packet_outpoint, - conn_contract_outpoint: &self.connection_outpoint, - commitment_prefix: self.query_commitment_prefix()?, + ckb_instance: self, }) } @@ -477,6 +468,61 @@ impl Ckb4IbcChain { .collect::>(); Ok(packets.first().cloned()) } + + #[allow(clippy::type_complexity)] + fn assemble_transaction_from_msg( + &self, + msg: &Any, + ) -> Result<(Option, Option<(TransactionView, MsgType)>), Error> { + let converter = self.get_converter()?; + let CkbTxInfo { + unsigned_tx, + envelope, + input_capacity, + event, + } = convert_msg_to_ckb_tx(msg, &converter)?; + if unsigned_tx.is_none() { + return Ok((event, None)); + } + let unsigned_tx = unsigned_tx.unwrap(); + let msg_type = envelope.msg_type; + match self.complete_tx_with_secp256k1_change_and_envelope( + unsigned_tx, + input_capacity, + envelope, + ) { + Ok(tx) => { + let last_input_idx = tx.inputs().len() - 1; + let secret_key = self + .keybase + .get_key(&self.config.key_name) + .map_err(Error::key_base)? + .into_ckb_keypair(self.network()?) + .private_key; + let signer = SecpSighashScriptSigner::new(Box::new( + SecpCkbRawKeySigner::new_with_secret_keys(vec![secret_key]), + )); + let tx = signer + .sign_tx( + &tx, + &ScriptGroup { + script: Script::from(&self.tx_assembler_address()?), + group_type: ScriptGroupType::Lock, + // TODO: here should be more indices in case of more than one Secp256k1 cells + // have been filled in the transaction + input_indices: vec![last_input_idx], + output_indices: vec![], + }, + ) + .map_err(|err| Error::other_error(err.to_string()))?; + Ok((event, Some((tx.into(), msg_type)))) + } + Err(err) => { + // return signing error such as no enough ckb + Err(err) + } + } + } } impl ChainEndpoint for Ckb4IbcChain { @@ -624,125 +670,92 @@ impl ChainEndpoint for Ckb4IbcChain { &mut self, tracked_msgs: TrackedMsgs, ) -> Result, Error> { - let mut txs = Vec::new(); - let mut tx_hashes = Vec::new(); - let mut events = Vec::new(); let mut result_events = Vec::new(); - for msg in tracked_msgs.msgs { - let converter = self.get_converter()?; - let CkbTxInfo { - unsigned_tx, - envelope, - input_capacity, - event, - } = convert_msg_to_ckb_tx(msg, &converter)?; - if unsigned_tx.is_none() { - if let Some(e) = event { - if let IbcEvent::CreateClient(e) = &e { - let client_type = e.0.client_type; - info!("the counterparty client type of Ckb4Ibc is set as {client_type}"); + let mut msgs = tracked_msgs.msgs; + let mut retry_times = 0; + let sync_if_create_client = |event: &IbcEvent| -> Option { + if let IbcEvent::CreateClient(e) = event { + let client_type = e.0.client_type; + info!("counterparty client type of Ckb4Ibc is set to {client_type}"); + return Some(client_type); + } + None + }; + while !msgs.is_empty() { + let msg = msgs.remove(0); + match self.assemble_transaction_from_msg(&msg)? { + (Some(event), None) => { + if let Some(client_type) = sync_if_create_client(&event) { self.sync_counterparty_client_type(client_type); + let ibc_event = IbcEventWithHeight::new(event, Height::default()); + return Ok(vec![ibc_event]); + } else { + return Ok(vec![]); } - let ibc_event = IbcEventWithHeight::new(e, Height::default()); - result_events.push(ibc_event); - } - continue; - } - let unsigned_tx = unsigned_tx.unwrap(); - let msg_type = envelope.msg_type; - match self.complete_tx_with_secp256k1_change_and_envelope( - unsigned_tx, - input_capacity, - envelope, - ) { - Ok(tx) => { - let last_input_idx = tx.inputs().len() - 1; - let secret_key = self - .keybase - .get_key(&self.config.key_name) - .map_err(Error::key_base)? - .into_ckb_keypair(self.network()?) - .private_key; - let signer = SecpSighashScriptSigner::new(Box::new( - SecpCkbRawKeySigner::new_with_secret_keys(vec![secret_key]), - )); - let tx = signer - .sign_tx( - &tx, - &ScriptGroup { - script: Script::from(&self.tx_assembler_address()?), - group_type: ScriptGroupType::Lock, - // TODO: here should be more indices in case of more than one Secp256k1 cells - // have been filled in the transaction - input_indices: vec![last_input_idx], - output_indices: vec![], - }, - ) - .unwrap(); - tx_hashes.push(tx.hash().unpack()); - txs.push((tx, msg_type)); - events.push(event); - } - Err(err) => { - // return signing error such as no enough ckb - return Err(err); } - } - } - let responses = txs.iter().map(|(tx, msg_type)| { - let tx: TransactionView = tx.clone().into(); - self.rpc_client - .send_transaction(&tx.inner, None) - .and_then(|tx_hash| { - let confirms = 3; - info!( - "{:?} transaction {} committed to {}, wait {confirms} blocks confirmation", - *msg_type, - hex::encode(&tx_hash), - self.id() - ); - wait_ckb_transaction_committed( - &self.rpc_client, - tx_hash, - Duration::from_secs(10), - confirms, - Duration::from_secs(600), - ) - }) - }); - let responses = self.rt.block_on(futures::future::join_all(responses)); - for (i, response) in responses.iter().enumerate() { - match response { - Ok(height) => { - if let Some(event) = events.get(i).unwrap().clone() { - if let IbcEvent::CreateClient(e) = &event { - let client_type = e.0.client_type; - info!( - "the counterparty client type of Ckb4Ibc is set as {client_type}" - ); - self.sync_counterparty_client_type(client_type); + (Some(event), Some((tx, msg_type))) => match self + .rt + .block_on(self.rpc_client.send_transaction(&tx.inner, None)) + { + Ok(tx_hash) => { + // TODO: put confirms count into config + let confirms = 3; + info!( + "{msg_type:?} transaction {} committed to {}, wait {confirms} blocks confirmation", + hex::encode(&tx_hash), + self.id() + ); + retry_times = 0; + match self.rt.block_on(wait_ckb_transaction_committed( + &self.rpc_client, + tx_hash.clone(), + Duration::from_secs(10), + confirms, + Duration::from_secs(600), + )) { + Ok(height) => { + if let Some(client_type) = sync_if_create_client(&event) { + self.sync_counterparty_client_type(client_type); + } + let ibc_event_with_height = IbcEventWithHeight { + event, + height: Height::from_noncosmos_height(height), + tx_hash: tx_hash.into(), + }; + result_events.push(ibc_event_with_height); + } + Err(err) => { + warn!("wait transaction failed: {err}"); + continue; + } } - let tx_hash: [u8; 32] = tx_hashes.get(i).unwrap().clone().into(); - let ibc_event_with_height = IbcEventWithHeight { - event, - height: Height::from_noncosmos_height(*height), - tx_hash, - }; - result_events.push(ibc_event_with_height); } - } - Err(e) => { - let tx: TransactionView = txs[i].0.clone().into(); - let json_tx = serde_json::to_string_pretty(&tx).unwrap(); - let error = format!("{e}\n\n======== transaction info ========\n\n{json_tx}\n"); - return Err(Error::send_tx(error)); - } + Err(e) => { + let json_tx = serde_json::to_string_pretty(&tx).unwrap(); + let error = + format!("{e}\n\n======== transaction info ========\n\n{json_tx}\n"); + if error.contains("UnknowOutpoint") || error.contains("PoolRejectedRBF") { + if retry_times < 3 { + msgs.insert(0, msg); + } + retry_times += 1; + warn!("error occurred, clear cache and try again: {e}"); + self.clear_cache(); + continue; + } + return Err(Error::other_error(error)); + } + }, + _ => unreachable!(), } } self.clear_cache(); Ok(result_events) } + // FIXME: this method should be in non-blocking mode, but we can't be confident it + // won't leave a protential issue if we do so, and working in blocking mode + // is ok for now, so leave this comment to fix in upcomming days fn send_messages_and_wait_check_tx( &mut self, tracked_msgs: TrackedMsgs, diff --git a/crates/relayer/src/chain/ckb4ibc/message.rs b/crates/relayer/src/chain/ckb4ibc/message.rs index 2cc306095..b9a1fc001 100644 --- a/crates/relayer/src/chain/ckb4ibc/message.rs +++ b/crates/relayer/src/chain/ckb4ibc/message.rs @@ -3,9 +3,7 @@ mod client; mod connection; mod packet; -use std::{cell::Ref, collections::HashMap}; - -use crate::{config::ckb4ibc::ChainConfig, error::Error, keyring::Secp256k1KeyPair}; +use crate::{config::ckb4ibc::ChainConfig, error::Error}; use ckb_ics_axon::{ handler::{IbcChannel, IbcConnections, IbcPacket}, message::Envelope, @@ -32,8 +30,6 @@ use ibc_relayer_types::{ conn_open_try::MsgConnectionOpenTry, conn_open_try::TYPE_URL as CONN_OPEN_TRY_TYPE_URL, }, core::{ - ics02_client::client_type::ClientType, - ics03_connection::connection::IdentifiedConnectionEnd, ics04_channel::{ msgs::{ acknowledgement::{MsgAcknowledgement, TYPE_URL as ACK_PACKET_TYPE_URL}, @@ -51,7 +47,6 @@ use ibc_relayer_types::{ }, packet::Sequence, }, - ics23_commitment::commitment::CommitmentPrefix, ics24_host::identifier::{ChannelId, ConnectionId, PortId}, }, events::IbcEvent, @@ -61,6 +56,7 @@ use ibc_relayer_types::{ use super::{ monitor::WriteAckMonitorCmd, utils::{generate_connection_id, get_script_hash}, + Ckb4IbcChain, }; use client::{convert_create_client, convert_update_client}; @@ -77,31 +73,33 @@ macro_rules! convert { } pub trait MsgToTxConverter { - fn get_key(&self) -> &Secp256k1KeyPair; - - fn get_ibc_connections(&self, client_id: &str) -> Result<&IbcConnections, Error>; + fn get_ibc_connections(&self, client_id: &str) -> Result; fn get_ibc_connections_by_connection_id( &self, connection_id: &ConnectionId, - ) -> Result<&IbcConnections, Error>; + ) -> Result; fn get_ibc_connections_by_port_id( &self, channel_id: &ChannelId, - ) -> Result<&IbcConnections, Error>; + ) -> Result; - fn get_ibc_connections_input(&self, client_id: &str) -> Result<(&CellInput, u64), Error>; + fn get_ibc_connections_input(&self, client_id: &str) -> Result<(CellInput, u64), Error>; - fn get_ibc_channel(&self, id: &ChannelId) -> Result<&IbcChannel, Error>; + fn get_ibc_channel( + &self, + channel_id: &ChannelId, + port: Option<&PortId>, + ) -> Result; fn get_ibc_channel_input( &self, channel_id: &ChannelId, port_id: &PortId, - ) -> Result<(&CellInput, u64), Error>; + ) -> Result<(CellInput, u64), Error>; - fn get_client_outpoint(&self, client_id: &str) -> Option<&OutPoint>; + fn get_client_outpoint(&self, client_id: &str) -> Option; fn get_conn_contract_outpoint(&self) -> &OutPoint; @@ -119,15 +117,15 @@ pub trait MsgToTxConverter { &self, chan: &ChannelId, port: &PortId, - seq: &Sequence, - ) -> Result<(&CellInput, u64), Error>; + seq: Sequence, + ) -> Result<(CellInput, u64), Error>; fn get_ibc_packet( &self, chan: &ChannelId, port: &PortId, - seq: &Sequence, - ) -> Result<&IbcPacket, Error>; + seq: Sequence, + ) -> Result; fn get_commitment_prefix(&self) -> Vec; @@ -141,52 +139,44 @@ pub trait MsgToTxConverter { pub struct Converter<'a> { pub write_ack_cmd: &'a Option, - pub channel_input_data: Ref<'a, HashMap<(ChannelId, PortId), (CellInput, u64)>>, - pub channel_cache: Ref<'a, HashMap>, - #[allow(clippy::type_complexity)] - pub connection_cache: Ref< - 'a, - HashMap)>, - >, - #[allow(clippy::type_complexity)] - pub packet_input_data: Ref<'a, HashMap<(ChannelId, PortId, Sequence), (CellInput, u64)>>, - pub packet_cache: Ref<'a, HashMap<(ChannelId, PortId, Sequence), IbcPacket>>, - pub config: &'a ChainConfig, - pub client_outpoints: Ref<'a, HashMap>, - pub chan_contract_outpoint: &'a OutPoint, - pub packet_contract_outpoint: &'a OutPoint, - pub conn_contract_outpoint: &'a OutPoint, - pub commitment_prefix: CommitmentPrefix, + pub ckb_instance: &'a Ckb4IbcChain, } impl<'a> MsgToTxConverter for Converter<'a> { - fn get_key(&self) -> &Secp256k1KeyPair { - todo!() - } - - fn get_ibc_connections(&self, client_id: &str) -> Result<&IbcConnections, Error> { - let client_type = self.config.lc_client_type(client_id)?; - if let Some((connection, _, _, _)) = self.connection_cache.get(&client_type) { - Ok(connection) - } else { - Err(Error::query(format!( - "client_type {client_type} isn't in cache" - ))) + fn get_ibc_connections(&self, client_id: &str) -> Result { + let client_type = self.get_config().lc_client_type(client_id)?; + if let Some((connection, _, _, _)) = self + .ckb_instance + .connection_cache + .borrow() + .get(&client_type) + { + return Ok(connection.clone()); } + self.ckb_instance.query_connection_and_cache()?; + let connection_cache = self.ckb_instance.connection_cache.borrow(); + let (connection, _, _, _) = + connection_cache + .get(&client_type) + .ok_or(Error::query(format!( + "client_type {client_type} isn't in cache" + )))?; + Ok(connection.clone()) } fn get_ibc_connections_by_connection_id( &self, connection_id: &ConnectionId, - ) -> Result<&IbcConnections, Error> { - let ibc_connections = self.connection_cache.iter().find(|(_, (v, _, _, _))| { + ) -> Result { + let conneciton_cache = self.ckb_instance.connection_cache.borrow(); + let ibc_connections = conneciton_cache.iter().find(|(_, (v, _, _, _))| { v.connections .iter() .enumerate() .any(|(idx, c)| connection_id == &generate_connection_id(idx as u16, &c.client_id)) }); if let Some((_, (value, _, _, _))) = ibc_connections { - Ok(value) + Ok(value.clone()) } else { Err(Error::query(format!( "connection {connection_id} not found in cache" @@ -197,9 +187,9 @@ impl<'a> MsgToTxConverter for Converter<'a> { fn get_ibc_connections_by_port_id( &self, channel_id: &ChannelId, - ) -> Result<&IbcConnections, Error> { - let channel = self - .channel_cache + ) -> Result { + let channel_cache = self.ckb_instance.channel_cache.borrow(); + let channel = channel_cache .get(channel_id) .ok_or_else(|| Error::query(format!("channel {channel_id} not found in cache")))?; // FIXME: should modify ibc contract @@ -207,74 +197,129 @@ impl<'a> MsgToTxConverter for Converter<'a> { self.get_ibc_connections_by_connection_id(&connection_id) } - fn get_ibc_connections_input(&self, client_id: &str) -> Result<(&CellInput, u64), Error> { - let client_type = self.config.lc_client_type(client_id)?; - if let Some((_, cell_input, capacity, _)) = self.connection_cache.get(&client_type) { - Ok((cell_input, *capacity)) - } else { - Err(Error::query(format!( - "client_type {client_type} isn't in cache" - ))) + fn get_ibc_connections_input(&self, client_id: &str) -> Result<(CellInput, u64), Error> { + let client_type = self.get_config().lc_client_type(client_id)?; + if let Some((_, cell_input, capacity, _)) = self + .ckb_instance + .connection_cache + .borrow() + .get(&client_type) + { + return Ok((cell_input.clone(), *capacity)); } + self.ckb_instance.query_connection_and_cache()?; + let connection_cache = self.ckb_instance.connection_cache.borrow(); + let (_, cell_input, capacity, _) = + connection_cache + .get(&client_type) + .ok_or(Error::query(format!( + "client_type {client_type} isn't in cache" + )))?; + Ok((cell_input.clone(), *capacity)) } - fn get_ibc_channel(&self, channel_id: &ChannelId) -> Result<&IbcChannel, Error> { - self.channel_cache - .get(channel_id) - .ok_or(Error::query(format!("no channel_id {channel_id}"))) + fn get_ibc_channel( + &self, + channel_id: &ChannelId, + port_id: Option<&PortId>, + ) -> Result { + if let Some(channel) = self.ckb_instance.channel_cache.borrow().get(channel_id) { + return Ok(channel.clone()); + } + if let Some(port_id) = port_id { + self.ckb_instance + .fetch_channel_cell_and_extract(channel_id, port_id, true)?; + self.ckb_instance + .channel_cache + .borrow() + .get(channel_id) + .ok_or(Error::query(format!("no channel_id {channel_id}"))) + .cloned() + } else { + Err(Error::query(format!("no channel_id {channel_id}"))) + } } fn get_ibc_channel_input( &self, channel_id: &ChannelId, port_id: &PortId, - ) -> Result<(&CellInput, u64), Error> { - self.channel_input_data + ) -> Result<(CellInput, u64), Error> { + if let Some((input, capacity)) = self + .ckb_instance + .channel_input_data + .borrow() + .get(&(channel_id.clone(), port_id.clone())) + { + return Ok((input.clone(), *capacity)); + } + self.ckb_instance + .fetch_channel_cell_and_extract(channel_id, port_id, true)?; + self.ckb_instance + .channel_input_data + .borrow() .get(&(channel_id.clone(), port_id.clone())) - .map(|(input, capacity)| (input, *capacity)) + .map(|(input, capacity)| (input.clone(), *capacity)) .ok_or(Error::query(format!("no channel({channel_id}/{port_id})"))) } - fn get_client_outpoint(&self, client_id: &str) -> Option<&OutPoint> { - let Some(client_type) = self.config.lc_client_type(client_id).ok() else { + fn get_client_outpoint(&self, client_id: &str) -> Option { + let Some(client_type) = self.get_config().lc_client_type(client_id).ok() else { return None; }; - self.client_outpoints.get(&client_type) + self.ckb_instance + .client_outpoints + .borrow() + .get(&client_type) + .cloned() } fn get_conn_contract_outpoint(&self) -> &OutPoint { - self.conn_contract_outpoint + &self.ckb_instance.connection_outpoint } fn get_chan_contract_outpoint(&self) -> &OutPoint { - self.chan_contract_outpoint + &self.ckb_instance.channel_outpoint } fn get_packet_contract_outpoint(&self) -> &OutPoint { - self.packet_contract_outpoint + &self.ckb_instance.packet_outpoint } fn get_channel_code_hash(&self) -> Byte32 { - get_script_hash(&self.config.channel_type_args) + get_script_hash(&self.get_config().channel_type_args) } fn get_packet_code_hash(&self) -> Byte32 { - get_script_hash(&self.config.packet_type_args) + get_script_hash(&self.get_config().packet_type_args) } fn get_connection_code_hash(&self) -> Byte32 { - get_script_hash(&self.config.connection_type_args) + get_script_hash(&self.get_config().connection_type_args) } fn get_ibc_packet_input( &self, channel_id: &ChannelId, port_id: &PortId, - sequence: &Sequence, - ) -> Result<(&CellInput, u64), Error> { - self.packet_input_data - .get(&(channel_id.clone(), port_id.clone(), *sequence)) - .map(|(input, capacity)| (input, *capacity)) + sequence: Sequence, + ) -> Result<(CellInput, u64), Error> { + if let Some((input, capacity)) = self + .ckb_instance + .packet_input_data + .borrow() + .get(&(channel_id.clone(), port_id.clone(), sequence)) + .map(|(input, capacity)| (input.clone(), *capacity)) + { + return Ok((input, capacity)); + } + self.ckb_instance + .fetch_packet_cells_and_extract(channel_id, port_id, Some(sequence))?; + self.ckb_instance + .packet_input_data + .borrow() + .get(&(channel_id.clone(), port_id.clone(), sequence)) + .map(|(input, capacity)| (input.clone(), *capacity)) .ok_or(Error::query(format!( "no packet({channel_id}/{port_id}/{sequence})" ))) @@ -284,21 +329,33 @@ impl<'a> MsgToTxConverter for Converter<'a> { &self, channel_id: &ChannelId, port_id: &PortId, - sequence: &Sequence, - ) -> Result<&IbcPacket, Error> { - self.packet_cache - .get(&(channel_id.clone(), port_id.clone(), *sequence)) + sequence: Sequence, + ) -> Result { + if let Some(packet) = self.ckb_instance.packet_cache.borrow().get(&( + channel_id.clone(), + port_id.clone(), + sequence, + )) { + return Ok(packet.clone()); + } + self.ckb_instance + .fetch_packet_cells_and_extract(channel_id, port_id, Some(sequence))?; + self.ckb_instance + .packet_cache + .borrow() + .get(&(channel_id.clone(), port_id.clone(), sequence)) .ok_or(Error::query(format!( "no packet({channel_id}/{port_id}/{sequence})" ))) + .cloned() } fn get_commitment_prefix(&self) -> Vec { - self.commitment_prefix.as_bytes().to_vec() + self.get_config().store_prefix.clone().into_bytes() } fn get_config(&self) -> &ChainConfig { - self.config + &self.ckb_instance.config } fn require_useless_write_ack_packet( @@ -325,7 +382,7 @@ pub struct CkbTxInfo { // Return a transaction which needs to be added relayer's input in it and to be signed. pub fn convert_msg_to_ckb_tx( - msg: Any, + msg: &Any, converter: &C, ) -> Result { match msg.type_url.as_str() { diff --git a/crates/relayer/src/chain/ckb4ibc/message/channel.rs b/crates/relayer/src/chain/ckb4ibc/message/channel.rs index e417c9509..b4ffa28d5 100644 --- a/crates/relayer/src/chain/ckb4ibc/message/channel.rs +++ b/crates/relayer/src/chain/ckb4ibc/message/channel.rs @@ -115,7 +115,7 @@ pub fn convert_chan_open_init_to_tx( port_id: convert_port_id_to_array(&msg.port_id)?, }; - let old_connection = get_encoded_object(old_connection_cell); + let old_connection = get_encoded_object(&old_connection_cell); let new_connection = get_encoded_object(&new_connection_cell); let connection_lock = get_connection_lock_script(converter.get_config(), Some(client_id.clone()))?; @@ -164,7 +164,7 @@ pub fn convert_chan_open_try_to_tx( let ibc_channel = get_encoded_object(&ibc_channel_end); let (client_cell_type_args, client_id) = get_client_id_from_channel(&msg.channel, converter)?; - let old_connection = get_encoded_object(old_connection_cell); + let old_connection = get_encoded_object(&old_connection_cell); let new_connection = get_encoded_object(&new_connection_cell); let envelope = Envelope { @@ -217,7 +217,7 @@ pub fn convert_chan_open_ack_to_tx( converter: &C, ) -> Result { let channel_idx = get_channel_number(&msg.channel_id)?; - let old_channel = converter.get_ibc_channel(&msg.channel_id)?; + let old_channel = converter.get_ibc_channel(&msg.channel_id, None)?; let counterparty_port_id = PortId::from_str(&old_channel.counterparty.port_id).unwrap(); let mut new_channel = old_channel.clone(); new_channel.state = CkbState::Open; @@ -242,7 +242,7 @@ pub fn convert_chan_open_ack_to_tx( }; let channel_lock = get_channel_lock_script(converter, channel_args.to_args()); - let old_channel = get_encoded_object(old_channel); + let old_channel = get_encoded_object(&old_channel); let new_channel = get_encoded_object(&new_channel); let (channel_input, input_capacity) = converter.get_ibc_channel_input(&msg.channel_id, &msg.port_id)?; @@ -276,7 +276,7 @@ pub fn convert_chan_open_confirm_to_tx( msg: MsgChannelOpenConfirm, converter: &C, ) -> Result { - let old_channel = converter.get_ibc_channel(&msg.channel_id)?; + let old_channel = converter.get_ibc_channel(&msg.channel_id, None)?; let mut new_channel = old_channel.clone(); new_channel.state = CkbState::Open; @@ -304,7 +304,7 @@ pub fn convert_chan_open_confirm_to_tx( }; let channel_lock = get_channel_lock_script(converter, channel_args.to_args()); - let old_channel = get_encoded_object(old_channel); + let old_channel = get_encoded_object(&old_channel); let new_channel = get_encoded_object(&new_channel); let (channel_input, input_capacity) = converter.get_ibc_channel_input(&msg.channel_id, &msg.port_id)?; diff --git a/crates/relayer/src/chain/ckb4ibc/message/connection.rs b/crates/relayer/src/chain/ckb4ibc/message/connection.rs index e9021bcc0..dda743959 100644 --- a/crates/relayer/src/chain/ckb4ibc/message/connection.rs +++ b/crates/relayer/src/chain/ckb4ibc/message/connection.rs @@ -58,7 +58,7 @@ pub fn convert_conn_open_init_to_tx( content: rlp::encode(&CkbMsgConnectionOpenInit {}).to_vec(), }; - let old_connection = get_encoded_object(old_ibc_connection_cell); + let old_connection = get_encoded_object(&old_ibc_connection_cell); let new_connection = get_encoded_object(&new_ibc_connection_cell); let connection_lock = get_connection_lock_script(converter.get_config(), Some(client_id.clone()))?; @@ -123,7 +123,7 @@ pub fn convert_conn_open_try_to_tx( .to_vec(), }; - let old_connection = get_encoded_object(old_ibc_connection_cell); + let old_connection = get_encoded_object(&old_ibc_connection_cell); let new_connection = get_encoded_object(&new_ibc_connection_cell); let connection_lock = get_connection_lock_script(converter.get_config(), Some(client_id.clone()))?; @@ -177,7 +177,7 @@ pub fn convert_conn_open_ack_to_tx( let counterparty_client_id = connection_end.counterparty.client_id.clone(); let client_id = connection_end.client_id.clone(); - let old_connection = get_encoded_object(old_ibc_connection_cell); + let old_connection = get_encoded_object(&old_ibc_connection_cell); let new_connection = get_encoded_object(&new_ibc_connection_cell); let connection_lock = get_connection_lock_script(converter.get_config(), Some(client_id.clone()))?; @@ -235,7 +235,7 @@ pub fn convert_conn_open_confirm_to_tx( .map(|v| v.parse().unwrap()); let client_id = connection_end.client_id.clone(); - let old_connection = get_encoded_object(old_ibc_connection_cell); + let old_connection = get_encoded_object(&old_ibc_connection_cell); let new_connection = get_encoded_object(&new_ibc_connection_cell); let connection_lock = get_connection_lock_script(converter.get_config(), Some(client_id.clone()))?; diff --git a/crates/relayer/src/chain/ckb4ibc/message/packet.rs b/crates/relayer/src/chain/ckb4ibc/message/packet.rs index 2d7053a34..f51c84150 100644 --- a/crates/relayer/src/chain/ckb4ibc/message/packet.rs +++ b/crates/relayer/src/chain/ckb4ibc/message/packet.rs @@ -7,9 +7,12 @@ use ckb_ics_axon::message::MsgType; use ckb_ics_axon::object::{Ordering, Packet as CkbPacket}; use ckb_ics_axon::{ChannelArgs, PacketArgs}; use ckb_types::packed::BytesOpt; +use ibc_relayer_types::core::ics04_channel::events::AcknowledgePacket; +use ibc_relayer_types::core::ics04_channel::events::ReceivePacket; use ibc_relayer_types::core::ics04_channel::msgs::acknowledgement::MsgAcknowledgement; use ibc_relayer_types::core::ics04_channel::msgs::recv_packet::MsgRecvPacket; use ibc_relayer_types::core::ics04_channel::packet::Packet; +use ibc_relayer_types::events::IbcEvent; use super::{CkbTxInfo, MsgToTxConverter, TxBuilder}; use crate::chain::ckb4ibc::utils::{ @@ -43,7 +46,8 @@ pub fn convert_recv_packet_to_tx( converter: &C, ) -> Result { let channel_id = msg.packet.destination_channel.clone(); - let old_channel_end = converter.get_ibc_channel(&channel_id)?; + let old_channel_end = + converter.get_ibc_channel(&channel_id, Some(&msg.packet.destination_port))?; let mut new_channel_end = old_channel_end.clone(); let packet = convert_ibc_packet(&msg.packet); @@ -93,7 +97,7 @@ pub fn convert_recv_packet_to_tx( port_id, }; - let old_channel = get_encoded_object(old_channel_end); + let old_channel = get_encoded_object(&old_channel_end); let new_channel = get_encoded_object(&new_channel_end); let ibc_packet = get_encoded_object(&IbcPacket { packet, @@ -135,11 +139,13 @@ pub fn convert_recv_packet_to_tx( .witness(write_ack_witness, ibc_packet.witness) .build(); + let event = IbcEvent::ReceivePacket(ReceivePacket { packet: msg.packet }); + Ok(CkbTxInfo { unsigned_tx: Some(packet_tx), envelope, input_capacity, - event: None, + event: Some(event), }) } @@ -148,7 +154,7 @@ pub fn convert_ack_packet_to_tx( converter: &C, ) -> Result { let channel_id = msg.packet.source_channel.clone(); - let old_channel_end = converter.get_ibc_channel(&channel_id)?; + let old_channel_end = converter.get_ibc_channel(&channel_id, Some(&msg.packet.source_port))?; let mut new_channel_end = old_channel_end.clone(); match old_channel_end.order { @@ -157,7 +163,7 @@ pub fn convert_ack_packet_to_tx( Ordering::Unknown => return Err(Error::other("channel ordering must be Order or Unorder")), } - let old_channel = get_encoded_object(old_channel_end); + let old_channel = get_encoded_object(&old_channel_end); let new_channel = get_encoded_object(&new_channel_end); let ack_packet = CkbMsgAckPacket { @@ -188,11 +194,11 @@ pub fn convert_ack_packet_to_tx( let (old_packet_input, packet_capacity) = converter.get_ibc_packet_input( &channel_id, &msg.packet.source_port, - &msg.packet.sequence, + msg.packet.sequence, )?; let old_ibc_packet = - converter.get_ibc_packet(&channel_id, &msg.packet.source_port, &msg.packet.sequence)?; - let old_packet = get_encoded_object(old_ibc_packet); + converter.get_ibc_packet(&channel_id, &msg.packet.source_port, msg.packet.sequence)?; + let old_packet = get_encoded_object(&old_ibc_packet); let (client_cell_type_args, client_id) = extract_client_id_by_connection_id(&new_channel_end.connection_hops[0], converter)?; @@ -218,10 +224,12 @@ pub fn convert_ack_packet_to_tx( .witness(old_packet.witness, new_packet.witness) .build(); + let event = IbcEvent::AcknowledgePacket(AcknowledgePacket { packet: msg.packet }); + Ok(CkbTxInfo { unsigned_tx: Some(packed_tx), envelope, input_capacity: channel_capacity + packet_capacity, - event: None, + event: Some(event), }) } diff --git a/crates/relayer/src/chain/ckb4ibc/utils.rs b/crates/relayer/src/chain/ckb4ibc/utils.rs index b2af0347e..873c6c764 100644 --- a/crates/relayer/src/chain/ckb4ibc/utils.rs +++ b/crates/relayer/src/chain/ckb4ibc/utils.rs @@ -242,7 +242,6 @@ pub fn get_client_outpoint( ) -> Result { converter .get_client_outpoint(client_id) - .cloned() .ok_or(Error::other_error(format!("not found {client_id}"))) }