Skip to content

Commit

Permalink
Use block number as cursor
Browse files Browse the repository at this point in the history
Using the RPC cursor param for polling doesn't seem to be supported.
  • Loading branch information
blckngm committed Aug 28, 2023
1 parent 848f74f commit 50fa836
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 12 deletions.
19 changes: 15 additions & 4 deletions forcerelay-ckb-sdk-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,12 @@ use serde_with::{serde_as, TryFromIntoRef};
#[rpc]
#[async_trait]
pub trait Rpc {
async fn search_packet_cells(&self, config: Config, limit: u32) -> Result<Vec<PacketCell>>;
async fn search_packet_cells(
&self,
config: Config,
limit: u32,
mut first_block_to_search: u64,
) -> Result<(Vec<PacketCell>, u64)>;
async fn get_latest_channel_cell(&self, config: Config) -> Result<IbcChannelCell>;

async fn get_axon_metadata_cell_dep(&self, config: Config) -> Result<CellDep>;
Expand Down Expand Up @@ -115,10 +120,16 @@ impl RpcImpl {

#[async_trait]
impl Rpc for RpcImpl {
async fn search_packet_cells(&self, config: Config, limit: u32) -> Result<Vec<PacketCell>> {
PacketCell::search(&self.client, &config, limit)
async fn search_packet_cells(
&self,
config: Config,
limit: u32,
mut first_block_to_search: u64,
) -> Result<(Vec<PacketCell>, u64)> {
let cells = PacketCell::search(&self.client, &config, limit, &mut first_block_to_search)
.await
.map_err(internal_error)
.map_err(internal_error)?;
Ok((cells, first_block_to_search))
}
async fn get_latest_channel_cell(&self, config: Config) -> Result<IbcChannelCell> {
IbcChannelCell::get_latest(&self.client, &config)
Expand Down
24 changes: 16 additions & 8 deletions src/search.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use ckb_ics_axon::{
message::{Envelope, MsgType},
PacketArgs,
};
use ckb_jsonrpc_types::{CellOutput, JsonBytes, OutPoint, Script, TransactionView};
use ckb_jsonrpc_types::{CellOutput, OutPoint, Script, TransactionView};
use ckb_sdk::rpc::ckb_indexer;
use ckb_types::{
packed,
Expand All @@ -32,16 +32,21 @@ pub struct PacketCell {

impl PacketCell {
/// Search for and parse live packet cells.
pub async fn search(client: &CkbRpcClient, config: &Config, limit: u32) -> Result<Vec<Self>> {
search_packet_cells(client, config, limit, &mut None).await
pub async fn search(
client: &CkbRpcClient,
config: &Config,
limit: u32,
first_block_to_search: &mut u64,
) -> Result<Vec<Self>> {
search_packet_cells(client, config, limit, first_block_to_search).await
}

pub fn subscribe(
client: CkbRpcClient,
config: Config,
) -> impl Stream<Item = Result<Vec<Self>>> {
async_stream::try_stream! {
let mut cursor = None;
let mut cursor = 0;
loop {
let cells = search_packet_cells(&client, &config, 64, &mut cursor).await?;
if !cells.is_empty() {
Expand Down Expand Up @@ -87,20 +92,23 @@ async fn search_packet_cells(
client: &CkbRpcClient,
config: &Config,
limit: u32,
cursor: &mut Option<JsonBytes>,
first_block_to_search: &mut u64,
) -> Result<Vec<PacketCell>> {
ensure!(limit > 0);
let tip = client
.get_indexer_tip()
.await?
.map_or(0, |t| t.block_number.into());
let last_block_to_search = tip.saturating_sub(config.confirmations.into());
if *first_block_to_search > last_block_to_search {
return Ok(vec![]);
}
let cells = client
.get_cells(
ckb_indexer::SearchKey {
filter: Some(ckb_indexer::SearchKeyFilter {
block_range: Some([
0.into(),
(*first_block_to_search).into(),
// +1 because this is exclusive.
last_block_to_search.saturating_add(1).into(),
]),
Expand All @@ -114,7 +122,7 @@ async fn search_packet_cells(
},
ckb_indexer::Order::Asc,
limit.into(),
cursor.clone(),
None,
)
.await?;

Expand All @@ -134,7 +142,7 @@ async fn search_packet_cells(
};
result.push(p);
}
*cursor = Some(cells.last_cursor);
*first_block_to_search = last_block_to_search.saturating_add(1);
Ok(result)
}

Expand Down

0 comments on commit 50fa836

Please sign in to comment.