diff --git a/crates/torii/indexer/src/engine.rs b/crates/torii/indexer/src/engine.rs index 1210c5ec9d..7619c0e379 100644 --- a/crates/torii/indexer/src/engine.rs +++ b/crates/torii/indexer/src/engine.rs @@ -207,6 +207,12 @@ pub struct ParallelizedEvent { pub event: Event, } +#[derive(Debug)] +struct ContractRange { + from: u64, + to: u64, +} + #[allow(missing_debug_implementations)] pub struct Engine { world: Arc>, @@ -321,116 +327,132 @@ impl Engine

{ } } - // TODO: since we now process blocks in chunks we can parallelize the fetching of data - pub async fn fetch_data(&mut self, cursors: &Cursors) -> Result { + async fn fetch_data(&mut self, cursors: &Cursors) -> Result { let latest_block = self.provider.block_hash_and_number().await?; - let from = cursors.head.unwrap_or(self.config.world_block); - let total_remaining_blocks = latest_block.block_number - from; - let blocks_to_process = total_remaining_blocks.min(self.config.blocks_chunk_size); - let to = from + blocks_to_process; - - let instant = Instant::now(); - let result = if from < latest_block.block_number { - let from = if from == 0 { from } else { from + 1 }; - let data = self.fetch_range(from, to, &cursors.cursor_map).await?; - debug!(target: LOG_TARGET, duration = ?instant.elapsed(), from = %from, to = %to, "Fetched data for range."); - FetchDataResult::Range(data) + // If we're not caught up with the latest block + if cursors.head.unwrap_or(self.config.world_block) < latest_block.block_number { + let instant = Instant::now(); + + // Get contract event ranges + let contract_ranges = + self.get_contract_ranges(cursors, latest_block.block_number).await?; + + if contract_ranges.is_empty() { + return Ok(FetchDataResult::None); + } + + let data = self.fetch_range(contract_ranges).await?; + + debug!( + target: LOG_TARGET, + duration = ?instant.elapsed(), + "Fetched data for custom ranges." + ); + + Ok(FetchDataResult::Range(data)) } else if self.config.flags.contains(IndexingFlags::PENDING_BLOCKS) { let data = self.fetch_pending(latest_block.clone(), cursors.last_pending_block_tx).await?; - debug!(target: LOG_TARGET, duration = ?instant.elapsed(), latest_block_number = %latest_block.block_number, "Fetched pending data."); - if let Some(data) = data { + + Ok(if let Some(data) = data { FetchDataResult::Pending(data) } else { FetchDataResult::None - } + }) } else { - FetchDataResult::None - }; + Ok(FetchDataResult::None) + } + } + + async fn get_contract_ranges( + &self, + cursors: &Cursors, + latest_block: u64, + ) -> Result> { + let mut ranges = HashMap::new(); + let base_from = cursors.head.unwrap_or(self.config.world_block); + + for (contract_address, _) in self.contracts.iter() { + // First fetch a single chunk to find the first event block + let first_event_block = + self.get_first_event_block(base_from, latest_block, *contract_address).await?; + + if let Some(first_block) = first_event_block { + let from = if first_block == 0 { first_block } else { first_block + 1 }; + let total_remaining = latest_block - from; + let blocks_to_process = total_remaining.min(self.config.blocks_chunk_size); + let to = from + blocks_to_process; + + ranges.insert(*contract_address, ContractRange { from, to }); + } + } - Ok(result) + Ok(ranges) } - pub async fn fetch_range( - &mut self, + async fn get_first_event_block( + &self, from: u64, to: u64, - cursor_map: &HashMap, - ) -> Result { - // Process all blocks from current to latest. - let mut fetch_all_events_tasks = VecDeque::new(); - - for contract in self.contracts.iter() { - let events_filter = EventFilter { - from_block: Some(BlockId::Number(from)), - to_block: Some(BlockId::Number(to)), - address: Some(*contract.0), - keys: None, - }; - let token_events_pages = - get_all_events(&self.provider, events_filter, self.config.events_chunk_size); - - // Prefer processing world events first - match contract.1 { - ContractType::WORLD => fetch_all_events_tasks.push_front(token_events_pages), - _ => fetch_all_events_tasks.push_back(token_events_pages), - } - } + contract_address: Felt, + ) -> Result> { + let events_filter = EventFilter { + from_block: Some(BlockId::Number(from)), + to_block: Some(BlockId::Number(to)), + address: Some(contract_address), + keys: None, + }; - let task_result = join_all(fetch_all_events_tasks).await; + // Only fetch 1 event to find the first event block + let events_page = self.provider.get_events(events_filter, None, 1).await?; - let mut events = vec![]; + Ok(events_page.events.first().and_then(|e| e.block_number.map(|b| std::cmp::max(b - 1, 0)))) + } - for result in task_result { - let result = result?; - let contract_address = - result.0.expect("EventFilters that we use always have an address"); - let events_pages = result.1; - let last_contract_tx = cursor_map.get(&contract_address).cloned(); - let mut last_contract_tx_tmp = last_contract_tx; - - debug!(target: LOG_TARGET, "Total events pages fetched for contract ({:#x}): {}", &contract_address, &events_pages.len()); - - for events_page in events_pages { - debug!("Processing events page with events: {}", &events_page.events.len()); - for event in events_page.events { - // Then we skip all transactions until we reach the last pending processed - // transaction (if any) - if let Some(last_contract_tx) = last_contract_tx_tmp { - if event.transaction_hash != last_contract_tx { - continue; - } + async fn fetch_range( + &mut self, + contract_ranges: HashMap, + ) -> Result { + let mut fetch_all_events_tasks = VecDeque::new(); - last_contract_tx_tmp = None; - } + for (contract_address, range) in contract_ranges.iter() { + if let Some(contract_type) = self.contracts.get(contract_address) { + let events_filter = EventFilter { + from_block: Some(BlockId::Number(range.from)), + to_block: Some(BlockId::Number(range.to)), + address: Some(*contract_address), + keys: None, + }; - // Skip the latest pending block transaction events - // * as we might have multiple events for the same transaction - if let Some(last_contract_tx) = last_contract_tx { - if event.transaction_hash == last_contract_tx { - continue; - } - } + let token_events_pages = + get_all_events(&self.provider, events_filter, self.config.events_chunk_size); - events.push(event); + // Prioritize world events + match contract_type { + ContractType::WORLD => fetch_all_events_tasks.push_front(token_events_pages), + _ => fetch_all_events_tasks.push_back(token_events_pages), } } } - // Transactions & blocks to process - let mut blocks = BTreeMap::new(); + let task_results = join_all(fetch_all_events_tasks).await; - // Flatten events pages and events according to the pending block cursor - // to array of (block_number, transaction_hash) + // Process results and build return structures + let mut events = vec![]; + let mut blocks = BTreeMap::new(); let mut transactions = LinkedHashMap::new(); + for result in task_results { + let (_, events_pages) = result?; + events.extend(events_pages.into_iter().flat_map(|e| e.events)); + } + + // Process events into transactions and blocks let mut block_set = HashSet::new(); for event in events { - let block_number = match event.block_number { - Some(block_number) => block_number, - None => unreachable!("In fetch range all events should have block number"), - }; + let block_number = + event.block_number.expect("In fetch range all events should have block number"); block_set.insert(block_number); @@ -440,6 +462,7 @@ impl Engine

{ .push(event); } + // Fetch block timestamps concurrently let semaphore = Arc::new(Semaphore::new(self.config.max_concurrent_tasks)); let mut set: JoinSet> = JoinSet::new(); @@ -448,7 +471,6 @@ impl Engine

{ let provider = self.provider.clone(); set.spawn(async move { let _permit = semaphore.acquire().await.unwrap(); - debug!("Fetching block timestamp for block number: {}", block_number); let block_timestamp = get_block_timestamp(&provider, block_number).await?; Ok((block_number, block_timestamp)) }); @@ -459,10 +481,9 @@ impl Engine

{ blocks.insert(block_number, block_timestamp); } - debug!("Transactions: {}", &transactions.len()); - debug!("Blocks: {}", &blocks.len()); + let latest_block_number = contract_ranges.values().map(|r| r.to).max().unwrap_or(0); - Ok(FetchRangeResult { transactions, blocks, latest_block_number: to }) + Ok(FetchRangeResult { transactions, blocks, latest_block_number }) } async fn fetch_pending( @@ -494,7 +515,7 @@ impl Engine

{ })) } - pub async fn process(&mut self, fetch_result: FetchDataResult) -> Result<()> { + async fn process(&mut self, fetch_result: FetchDataResult) -> Result<()> { match fetch_result { FetchDataResult::Range(data) => self.process_range(data).await?, FetchDataResult::Pending(data) => self.process_pending(data).await?, @@ -504,7 +525,7 @@ impl Engine

{ Ok(()) } - pub async fn process_pending(&mut self, data: FetchPendingResult) -> Result<()> { + async fn process_pending(&mut self, data: FetchPendingResult) -> Result<()> { // Skip transactions that have been processed already // Our cursor is the last processed transaction @@ -550,10 +571,11 @@ impl Engine

{ Ok(()) } - pub async fn process_range(&mut self, data: FetchRangeResult) -> Result<()> { + async fn process_range(&mut self, data: FetchRangeResult) -> Result<()> { // Process all transactions let mut processed_blocks = HashSet::new(); let mut cursor_map = HashMap::new(); + for ((block_number, transaction_hash), events) in data.transactions { debug!("Processing transaction hash: {:#x}", transaction_hash); // Process transaction