Skip to content

Commit

Permalink
feat: complete header sync methods
Browse files Browse the repository at this point in the history
  • Loading branch information
liyukun committed Nov 6, 2023
1 parent c71f6e4 commit 54cb83e
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 55 deletions.
16 changes: 8 additions & 8 deletions crates/relayer/src/chain/axon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ use ibc_relayer_types::{
};
use tendermint_rpc::endpoint::broadcast::tx_sync::Response;

use self::{contract::OwnableIBCHandler, emitter::CellProcessManager, monitor::AxonEventMonitor};
use self::{contract::OwnableIBCHandler, emitter::CkbSyncManager, monitor::AxonEventMonitor};

type ContractProvider = SignerMiddleware<Provider<Ws>, Wallet<SigningKey>>;
type IBCContract = OwnableIBCHandler<ContractProvider>;
Expand Down Expand Up @@ -1164,14 +1164,19 @@ impl AxonChain {
crate::time!("axon_init_event_monitor");

let ibc_cache = self.ibc_cache.clone();
let cell_process_manager = CellProcessManager::new(
let mut cell_process_manager = CkbSyncManager::new(
self.rt.clone(),
&self.config.emitter_ckb_url.to_string(),
self.chain_id,
self.contract_provider()?,
self.config.emitter_scan_start_block_number,
self.config.emitter_cell_start_block_number,
);

// start sync ckb header
cell_process_manager
.spawn_header_processor(self.config.emitter_header_start_block_number)
.map_err(Error::event_monitor)?;

let (mut event_monitor, monitor_tx) = AxonEventMonitor::new(
self.config.id.clone(),
self.config.websocket_addr.clone(),
Expand All @@ -1191,11 +1196,6 @@ impl AxonChain {
.into_iter()
.for_each(|v| cache_ics_tx_hash_with_event(&mut ibc_cache, v.event, v.tx_hash));

// resotore cell_emitter filters
event_monitor
.restore_cell_emitter_filters()
.map_err(Error::event_monitor)?;

thread::spawn(move || event_monitor.run());
Ok(monitor_tx)
}
Expand Down
62 changes: 46 additions & 16 deletions crates/relayer/src/chain/axon/emitter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use ckb_jsonrpc_types::{CellInfo, HeaderView, OutPoint};
use ckb_types::H256;
use emitter_core::{
cell_process::CellProcess,
header_sync::HeaderSyncProcess,
rpc_client::RpcClient,
types::{IndexerTip, RpcSearchKey},
Submit, SubmitProcess, TipState,
Expand Down Expand Up @@ -173,9 +174,6 @@ impl SubmitProcess for CkbSubmitProcess {
true
}

// TODO: it's a normal use to upload headers via cell-emitter, but due to the
// feature that Axon won't check header's continuity, we can upload the header
// just before relaying IBC messages, if so, this method would be abandoned
async fn submit_headers(&mut self, headers: Vec<HeaderView>) -> bool {
if let Err(err) = self.upload_headers(headers).await {
tracing::error!("failed to sync CKB headers: {err}");
Expand All @@ -184,16 +182,17 @@ impl SubmitProcess for CkbSubmitProcess {
}
}

pub struct CellProcessManager {
pub struct CkbSyncManager {
rt: Arc<Runtime>,
rpc: RpcClient,
chain_id: u64,
contract: Arc<ContractProvider>,
start_tip_number: u64,
cell_processors: HashMap<RpcSearchKey, JoinHandle<()>>,
header_processor: Option<JoinHandle<()>>,
}

impl CellProcessManager {
impl CkbSyncManager {
pub fn new(
rt: Arc<Runtime>,
ckb_uri: &str,
Expand All @@ -208,9 +207,21 @@ impl CellProcessManager {
contract,
start_tip_number,
cell_processors: HashMap::new(),
header_processor: None,
}
}

fn tip_state(&self, block_number: u64) -> Result<CkbTipState, Error> {
let old_tip_header = self
.rt
.block_on(self.rpc.get_header_by_number(block_number.into()))
.map_err(|e| {
Error::others(format!("failed to fetch CKB header {}: {e}", block_number))
})?;
let tip_state = CkbTipState::new(old_tip_header.hash, block_number);
Ok(tip_state)
}

pub fn spawn_cell_processor(&mut self, search_key: RpcSearchKey) -> Result<bool, Error> {
// feature closed
if self.start_tip_number == 0 {
Expand All @@ -219,19 +230,9 @@ impl CellProcessManager {
if self.cell_processors.contains_key(&search_key) {
return Ok(false);
}
let old_tip_header = self
.rt
.block_on(self.rpc.get_header_by_number(self.start_tip_number.into()))
.map_err(|e| {
Error::others(format!(
"failed to fetch CKB header {}: {e}",
self.start_tip_number
))
})?;
let tip_state = CkbTipState::new(old_tip_header.hash, self.start_tip_number);
let mut cell_processor = CellProcess::new(
search_key.clone(),
tip_state,
self.tip_state(self.start_tip_number)?,
self.rpc.clone(),
CkbSubmitProcess::new(self.chain_id, self.contract.clone()),
);
Expand All @@ -250,4 +251,33 @@ impl CellProcessManager {
false
}
}

pub fn spawn_header_processor(&mut self, start_block_number: u64) -> Result<bool, Error> {
// feature closed
if start_block_number == 0 {
return Ok(true);
}
if self.header_processor.is_some() {
return Ok(false);
}
let mut header_processor = HeaderSyncProcess::new(
self.tip_state(start_block_number)?,
self.rpc.clone(),
CkbSubmitProcess::new(self.chain_id, self.contract.clone()),
);
let handle = self.rt.spawn(async move {
header_processor.run().await;
});
self.header_processor = Some(handle);
Ok(true)
}

// TODO: use this method before each submit of IBC messages
pub fn _sync_header(&self, headers: Vec<HeaderView>) -> Result<(), Error> {
self.rt
.block_on(
CkbSubmitProcess::new(self.chain_id, self.contract.clone()).upload_headers(headers),
)
.map_err(|err| Error::others(err.to_string()))
}
}
34 changes: 6 additions & 28 deletions crates/relayer/src/chain/axon/monitor.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant};

use super::emitter::CellProcessManager;
use super::emitter::CkbSyncManager;
use super::{contract::*, IBCInfoCache};
use crate::chain::axon::cache_ics_tx_hash_with_event;
use crate::event::bus::EventBus;
Expand Down Expand Up @@ -38,7 +38,7 @@ pub struct AxonEventMonitor {
event_bus: EventBus<Arc<Result<EventBatch>>>,
ibc_cache: Arc<RwLock<IBCInfoCache>>,
reprocess_events: Vec<(OwnableIBCHandlerEvents, LogMeta)>,
cell_process_manager: CellProcessManager,
ckb_sync_manager: CkbSyncManager,
}

impl AxonEventMonitor {
Expand All @@ -55,7 +55,7 @@ impl AxonEventMonitor {
contract_address: Address,
rt: Arc<TokioRuntime>,
ibc_cache: Arc<RwLock<IBCInfoCache>>,
cell_process_manager: CellProcessManager,
ckb_sync_manager: CkbSyncManager,
) -> Result<(Self, TxMonitorCmd)> {
let (tx_cmd, rx_cmd) = channel::unbounded();

Expand All @@ -79,7 +79,7 @@ impl AxonEventMonitor {
event_bus,
ibc_cache,
reprocess_events: vec![],
cell_process_manager,
ckb_sync_manager,
};
Ok((monitor, TxMonitorCmd::new(tx_cmd)))
}
Expand Down Expand Up @@ -165,26 +165,6 @@ impl AxonEventMonitor {
Ok(events)
}

// TODO: monitor can recover Axon events from at least 25000 blocks, it's enough to restore emitter filters,
// and in case of unrecoverable, since filter isn't the sensitive data, users can register filter again
pub fn restore_cell_emitter_filters(&mut self) -> Result<()> {
// let contract = Contract::new(self.contract_address, Arc::clone(&self.client));
// // FIXME: consider what if the format of filter stored on-chain is invalid
// let filters = self
// .rt
// .block_on(contract.get_cell_emitter_filters().call())
// .map_err(|e| Error::others(e.to_string()))?
// .into_iter()
// .map(|filter| self.cell_process_manager.spawn_cell_processor(&filter))
// .collect::<Result<Vec<_>>>()?;
// info!(
// "resotred {} filters on contract {}",
// filters.len(),
// self.contract_address
// );
Ok(())
}

fn update_subscribe(&mut self, use_try: bool) -> Next {
let cmd = if use_try {
match self.rx_cmd.try_recv() {
Expand Down Expand Up @@ -302,15 +282,13 @@ impl AxonEventMonitor {
ContractEvents::RegisterCellEmitterFilterFilter(RegisterCellEmitterFilterFilter {
filter,
}) => {
self.cell_process_manager
.spawn_cell_processor(filter.into())?;
self.ckb_sync_manager.spawn_cell_processor(filter.into())?;
Ok(true)
}
ContractEvents::RemoveCellEmitterFilterFilter(RemoveCellEmitterFilterFilter {
filter,
}) => {
self.cell_process_manager
.remove_cell_processor(filter.into());
self.ckb_sync_manager.remove_cell_processor(filter.into());
Ok(true)
}
_ => Ok(false),
Expand Down
3 changes: 2 additions & 1 deletion crates/relayer/src/config/axon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ pub struct AxonChainConfig {
pub key_name: String,
pub store_prefix: String,
pub emitter_ckb_url: Url,
pub emitter_scan_start_block_number: u64,
pub emitter_cell_start_block_number: u64,
pub emitter_header_start_block_number: u64,

#[serde(default)]
pub packet_filter: PacketFilter,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ restore_block_count = 10000
key_name = "relayer_axon_wallet"
store_prefix = "forcerelay"
emitter_ckb_url = "https://testnet.ckbapp.dev"
emitter_scan_start_block_number = 1000000
emitter_cell_start_block_number = 1000000
emitter_header_start_block_number = 1000000

[[chains]]
id = "ckb4ibc-0"
Expand Down
3 changes: 2 additions & 1 deletion tools/test-framework/src/types/single/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,8 @@ impl FullNode {
transfer_contract_address,
restore_block_count,
emitter_ckb_url: Url::from_str("http://127.0.0.1").unwrap(),
emitter_scan_start_block_number: 0, // means close cell_emitter
emitter_cell_start_block_number: 0, // 0 means to close cell sync feature
emitter_header_start_block_number: 0, // 0 means to close header sync feature
};
Ok(config::ChainConfig::Axon(axon_config))
}
Expand Down

0 comments on commit 54cb83e

Please sign in to comment.