From 8aab1611e167b7e396f2a9003af7f29877a1f93e Mon Sep 17 00:00:00 2001 From: liyukun Date: Mon, 6 Nov 2023 23:17:12 +0800 Subject: [PATCH] feat: complete header sync methods --- crates/relayer/src/chain/axon.rs | 16 ++--- crates/relayer/src/chain/axon/emitter.rs | 62 ++++++++++++++----- crates/relayer/src/chain/axon/monitor.rs | 34 ++-------- crates/relayer/src/config/axon.rs | 3 +- .../config/fixtures/relayer_conf_example.toml | 3 +- tools/test-framework/src/types/single/node.rs | 3 +- 6 files changed, 66 insertions(+), 55 deletions(-) diff --git a/crates/relayer/src/chain/axon.rs b/crates/relayer/src/chain/axon.rs index 29841354f..bf3d4fcd0 100644 --- a/crates/relayer/src/chain/axon.rs +++ b/crates/relayer/src/chain/axon.rs @@ -79,7 +79,7 @@ use ibc_relayer_types::{ }; use tendermint_rpc::endpoint::broadcast::tx_sync::Response; -use self::{contract::OwnableIBCHandler, emitter::CellProcessManager, monitor::AxonEventMonitor}; +use self::{contract::OwnableIBCHandler, emitter::CkbSyncManager, monitor::AxonEventMonitor}; type ContractProvider = SignerMiddleware, Wallet>; type IBCContract = OwnableIBCHandler; @@ -1185,14 +1185,19 @@ impl AxonChain { crate::time!("axon_init_event_monitor"); let ibc_cache = self.ibc_cache.clone(); - let cell_process_manager = CellProcessManager::new( + let mut cell_process_manager = CkbSyncManager::new( self.rt.clone(), &self.config.emitter_ckb_url.to_string(), self.chain_id, self.contract_provider()?, - self.config.emitter_scan_start_block_number, + self.config.emitter_cell_start_block_number, ); + // start sync ckb header + cell_process_manager + .spawn_header_processor(self.config.emitter_header_start_block_number) + .map_err(Error::event_monitor)?; + let (mut event_monitor, monitor_tx) = AxonEventMonitor::new( self.config.id.clone(), self.config.websocket_addr.clone(), @@ -1212,11 +1217,6 @@ impl AxonChain { .into_iter() .for_each(|v| cache_ics_tx_hash_with_event(&mut ibc_cache, v.event, v.tx_hash)); - // resotore cell_emitter filters - event_monitor - .restore_cell_emitter_filters() - .map_err(Error::event_monitor)?; - thread::spawn(move || event_monitor.run()); Ok(monitor_tx) } diff --git a/crates/relayer/src/chain/axon/emitter.rs b/crates/relayer/src/chain/axon/emitter.rs index f189603b6..62e07ceaf 100644 --- a/crates/relayer/src/chain/axon/emitter.rs +++ b/crates/relayer/src/chain/axon/emitter.rs @@ -5,6 +5,7 @@ use ckb_jsonrpc_types::{CellInfo, HeaderView, OutPoint}; use ckb_types::H256; use emitter_core::{ cell_process::CellProcess, + header_sync::HeaderSyncProcess, rpc_client::RpcClient, types::{IndexerTip, RpcSearchKey}, Submit, SubmitProcess, TipState, @@ -173,9 +174,6 @@ impl SubmitProcess for CkbSubmitProcess { true } - // TODO: it's a normal use to upload headers via cell-emitter, but due to the - // feature that Axon won't check header's continuity, we can upload the header - // just before relaying IBC messages, if so, this method would be abandoned async fn submit_headers(&mut self, headers: Vec) -> bool { if let Err(err) = self.upload_headers(headers).await { tracing::error!("failed to sync CKB headers: {err}"); @@ -184,16 +182,17 @@ impl SubmitProcess for CkbSubmitProcess { } } -pub struct CellProcessManager { +pub struct CkbSyncManager { rt: Arc, rpc: RpcClient, chain_id: u64, contract: Arc, start_tip_number: u64, cell_processors: HashMap>, + header_processor: Option>, } -impl CellProcessManager { +impl CkbSyncManager { pub fn new( rt: Arc, ckb_uri: &str, @@ -208,9 +207,21 @@ impl CellProcessManager { contract, start_tip_number, cell_processors: HashMap::new(), + header_processor: None, } } + fn tip_state(&self, block_number: u64) -> Result { + let old_tip_header = self + .rt + .block_on(self.rpc.get_header_by_number(block_number.into())) + .map_err(|e| { + Error::others(format!("failed to fetch CKB header {}: {e}", block_number)) + })?; + let tip_state = CkbTipState::new(old_tip_header.hash, block_number); + Ok(tip_state) + } + pub fn spawn_cell_processor(&mut self, search_key: RpcSearchKey) -> Result { // feature closed if self.start_tip_number == 0 { @@ -219,19 +230,9 @@ impl CellProcessManager { if self.cell_processors.contains_key(&search_key) { return Ok(false); } - let old_tip_header = self - .rt - .block_on(self.rpc.get_header_by_number(self.start_tip_number.into())) - .map_err(|e| { - Error::others(format!( - "failed to fetch CKB header {}: {e}", - self.start_tip_number - )) - })?; - let tip_state = CkbTipState::new(old_tip_header.hash, self.start_tip_number); let mut cell_processor = CellProcess::new( search_key.clone(), - tip_state, + self.tip_state(self.start_tip_number)?, self.rpc.clone(), CkbSubmitProcess::new(self.chain_id, self.contract.clone()), ); @@ -250,4 +251,33 @@ impl CellProcessManager { false } } + + pub fn spawn_header_processor(&mut self, start_block_number: u64) -> Result { + // feature closed + if start_block_number == 0 { + return Ok(true); + } + if self.header_processor.is_some() { + return Ok(false); + } + let mut header_processor = HeaderSyncProcess::new( + self.tip_state(start_block_number)?, + self.rpc.clone(), + CkbSubmitProcess::new(self.chain_id, self.contract.clone()), + ); + let handle = self.rt.spawn(async move { + header_processor.run().await; + }); + self.header_processor = Some(handle); + Ok(true) + } + + // TODO: use this method before each submit of IBC messages + pub fn _sync_header(&self, headers: Vec) -> Result<(), Error> { + self.rt + .block_on( + CkbSubmitProcess::new(self.chain_id, self.contract.clone()).upload_headers(headers), + ) + .map_err(|err| Error::others(err.to_string())) + } } diff --git a/crates/relayer/src/chain/axon/monitor.rs b/crates/relayer/src/chain/axon/monitor.rs index 7e5544bc2..6daf09e18 100644 --- a/crates/relayer/src/chain/axon/monitor.rs +++ b/crates/relayer/src/chain/axon/monitor.rs @@ -1,7 +1,7 @@ use std::sync::{Arc, RwLock}; use std::time::{Duration, Instant}; -use super::emitter::CellProcessManager; +use super::emitter::CkbSyncManager; use super::{contract::*, IBCInfoCache}; use crate::chain::axon::cache_ics_tx_hash_with_event; use crate::event::bus::EventBus; @@ -38,7 +38,7 @@ pub struct AxonEventMonitor { event_bus: EventBus>>, ibc_cache: Arc>, reprocess_events: Vec<(OwnableIBCHandlerEvents, LogMeta)>, - cell_process_manager: CellProcessManager, + ckb_sync_manager: CkbSyncManager, } impl AxonEventMonitor { @@ -55,7 +55,7 @@ impl AxonEventMonitor { contract_address: Address, rt: Arc, ibc_cache: Arc>, - cell_process_manager: CellProcessManager, + ckb_sync_manager: CkbSyncManager, ) -> Result<(Self, TxMonitorCmd)> { let (tx_cmd, rx_cmd) = channel::unbounded(); @@ -79,7 +79,7 @@ impl AxonEventMonitor { event_bus, ibc_cache, reprocess_events: vec![], - cell_process_manager, + ckb_sync_manager, }; Ok((monitor, TxMonitorCmd::new(tx_cmd))) } @@ -165,26 +165,6 @@ impl AxonEventMonitor { Ok(events) } - // TODO: monitor can recover Axon events from at least 25000 blocks, it's enough to restore emitter filters, - // and in case of unrecoverable, since filter isn't the sensitive data, users can register filter again - pub fn restore_cell_emitter_filters(&mut self) -> Result<()> { - // let contract = Contract::new(self.contract_address, Arc::clone(&self.client)); - // // FIXME: consider what if the format of filter stored on-chain is invalid - // let filters = self - // .rt - // .block_on(contract.get_cell_emitter_filters().call()) - // .map_err(|e| Error::others(e.to_string()))? - // .into_iter() - // .map(|filter| self.cell_process_manager.spawn_cell_processor(&filter)) - // .collect::>>()?; - // info!( - // "resotred {} filters on contract {}", - // filters.len(), - // self.contract_address - // ); - Ok(()) - } - fn update_subscribe(&mut self, use_try: bool) -> Next { let cmd = if use_try { match self.rx_cmd.try_recv() { @@ -302,15 +282,13 @@ impl AxonEventMonitor { ContractEvents::RegisterCellEmitterFilterFilter(RegisterCellEmitterFilterFilter { filter, }) => { - self.cell_process_manager - .spawn_cell_processor(filter.into())?; + self.ckb_sync_manager.spawn_cell_processor(filter.into())?; Ok(true) } ContractEvents::RemoveCellEmitterFilterFilter(RemoveCellEmitterFilterFilter { filter, }) => { - self.cell_process_manager - .remove_cell_processor(filter.into()); + self.ckb_sync_manager.remove_cell_processor(filter.into()); Ok(true) } _ => Ok(false), diff --git a/crates/relayer/src/config/axon.rs b/crates/relayer/src/config/axon.rs index 2f1ece9eb..46d30ad07 100644 --- a/crates/relayer/src/config/axon.rs +++ b/crates/relayer/src/config/axon.rs @@ -16,7 +16,8 @@ pub struct AxonChainConfig { pub key_name: String, pub store_prefix: String, pub emitter_ckb_url: Url, - pub emitter_scan_start_block_number: u64, + pub emitter_cell_start_block_number: u64, + pub emitter_header_start_block_number: u64, #[serde(default)] pub packet_filter: PacketFilter, diff --git a/crates/relayer/tests/config/fixtures/relayer_conf_example.toml b/crates/relayer/tests/config/fixtures/relayer_conf_example.toml index 14a1f4ca7..dee5066b0 100644 --- a/crates/relayer/tests/config/fixtures/relayer_conf_example.toml +++ b/crates/relayer/tests/config/fixtures/relayer_conf_example.toml @@ -95,7 +95,8 @@ restore_block_count = 10000 key_name = "relayer_axon_wallet" store_prefix = "forcerelay" emitter_ckb_url = "https://testnet.ckbapp.dev" -emitter_scan_start_block_number = 1000000 +emitter_cell_start_block_number = 1000000 +emitter_header_start_block_number = 1000000 [[chains]] id = "ckb4ibc-0" diff --git a/tools/test-framework/src/types/single/node.rs b/tools/test-framework/src/types/single/node.rs index 7e125161e..8e10e177d 100644 --- a/tools/test-framework/src/types/single/node.rs +++ b/tools/test-framework/src/types/single/node.rs @@ -233,7 +233,8 @@ impl FullNode { transfer_contract_address, restore_block_count, emitter_ckb_url: Url::from_str("http://127.0.0.1").unwrap(), - emitter_scan_start_block_number: 0, // means close cell_emitter + emitter_cell_start_block_number: 0, // 0 means to close cell sync feature + emitter_header_start_block_number: 0, // 0 means to close header sync feature }; Ok(config::ChainConfig::Axon(axon_config)) }