Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(torii/core): handle an edge case with pending block processing #2597

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 18 additions & 12 deletions crates/torii/core/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ use dojo_world::contracts::world::WorldContractReader;
use futures_util::future::{join_all, try_join_all};
use hashlink::LinkedHashMap;
use starknet::core::types::{
BlockId, BlockTag, EmittedEvent, Event, EventFilter, EventsPage, MaybePendingBlockWithReceipts,
MaybePendingBlockWithTxHashes, PendingBlockWithReceipts, Transaction, TransactionReceipt,
TransactionWithReceipt,
BlockHashAndNumber, BlockId, BlockTag, EmittedEvent, Event, EventFilter, EventsPage,
MaybePendingBlockWithReceipts, MaybePendingBlockWithTxHashes, PendingBlockWithReceipts,
Transaction, TransactionReceipt, TransactionWithReceipt,
};
use starknet::core::utils::get_selector_from_name;
use starknet::providers::Provider;
Expand Down Expand Up @@ -309,23 +309,23 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> {

// TODO: since we now process blocks in chunks we can parallelize the fetching of data
pub async fn fetch_data(&mut self, cursors: &Cursors) -> Result<FetchDataResult> {
let latest_block_number = self.provider.block_hash_and_number().await?.block_number;
let latest_block = self.provider.block_hash_and_number().await?;

let from = cursors.head.unwrap_or(0);
let total_remaining_blocks = latest_block_number - from;
let total_remaining_blocks = latest_block.block_number - from;
let blocks_to_process = total_remaining_blocks.min(self.config.blocks_chunk_size);
let to = from + blocks_to_process;

let instant = Instant::now();
let result = if from < latest_block_number {
let result = if from < latest_block.block_number {
let from = if from == 0 { from } else { from + 1 };
let data = self.fetch_range(from, to, &cursors.cursor_map).await?;
debug!(target: LOG_TARGET, duration = ?instant.elapsed(), from = %from, to = %to, "Fetched data for range.");
FetchDataResult::Range(data)
} else if self.config.index_pending {
let data =
self.fetch_pending(latest_block_number + 1, cursors.last_pending_block_tx).await?;
debug!(target: LOG_TARGET, duration = ?instant.elapsed(), latest_block_number = %latest_block_number, "Fetched pending data.");
self.fetch_pending(latest_block.clone(), cursors.last_pending_block_tx).await?;
debug!(target: LOG_TARGET, duration = ?instant.elapsed(), latest_block_number = %latest_block.block_number, "Fetched pending data.");
if let Some(data) = data {
FetchDataResult::Pending(data)
} else {
Expand Down Expand Up @@ -453,12 +453,18 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> {

async fn fetch_pending(
&self,
block_number: u64,
block: BlockHashAndNumber,
last_pending_block_tx: Option<Felt>,
) -> Result<Option<FetchPendingResult>> {
let block = if let MaybePendingBlockWithReceipts::PendingBlock(pending) =
let pending_block = if let MaybePendingBlockWithReceipts::PendingBlock(pending) =
self.provider.get_block_with_receipts(BlockId::Tag(BlockTag::Pending)).await?
{
// if the parent hash is not the hash of the latest block that we fetched, then it means
// a new block got mined just after we fetched the latest block information
if block.block_hash != pending.parent_hash {
return Ok(None);
}

pending
} else {
// TODO: change this to unreachable once katana is updated to return PendingBlockWithTxs
Expand All @@ -468,8 +474,8 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> {
};

Ok(Some(FetchPendingResult {
pending_block: Box::new(block),
block_number,
pending_block: Box::new(pending_block),
block_number: block.block_number + 1,
last_pending_block_tx,
}))
}
Expand Down
Loading