From 15cb6e9ecd5737050c28e3f777f9f94e2fa4eb21 Mon Sep 17 00:00:00 2001 From: Nasr Date: Wed, 15 Jan 2025 17:06:07 +0700 Subject: [PATCH] feat(torii-indexer): task manager & task id for every event --- crates/torii/indexer/src/engine.rs | 10 +-- crates/torii/indexer/src/lib.rs | 1 + .../src/processors/erc20_legacy_transfer.rs | 22 ++++++ .../indexer/src/processors/erc20_transfer.rs | 22 ++++++ .../src/processors/erc721_legacy_transfer.rs | 30 +++++++++ .../indexer/src/processors/erc721_transfer.rs | 29 ++++++++ .../indexer/src/processors/event_message.rs | 9 +++ .../indexer/src/processors/metadata_update.rs | 9 +++ crates/torii/indexer/src/processors/mod.rs | 5 ++ .../torii/indexer/src/processors/raw_event.rs | 9 +++ .../indexer/src/processors/register_event.rs | 12 ++++ .../indexer/src/processors/register_model.rs | 12 ++++ .../src/processors/store_del_record.rs | 15 +++++ .../src/processors/store_set_record.rs | 13 ++++ .../src/processors/store_update_member.rs | 12 ++++ .../src/processors/store_update_record.rs | 13 ++++ .../indexer/src/processors/upgrade_event.rs | 12 ++++ .../indexer/src/processors/upgrade_model.rs | 12 ++++ crates/torii/indexer/src/task_manager.rs | 67 +++++++++++++++++++ 19 files changed, 305 insertions(+), 9 deletions(-) create mode 100644 crates/torii/indexer/src/task_manager.rs diff --git a/crates/torii/indexer/src/engine.rs b/crates/torii/indexer/src/engine.rs index 1210c5ec9d..2c5286c68a 100644 --- a/crates/torii/indexer/src/engine.rs +++ b/crates/torii/indexer/src/engine.rs @@ -199,14 +199,6 @@ pub struct FetchPendingResult { pub block_number: u64, } -#[derive(Debug)] -pub struct ParallelizedEvent { - pub block_number: u64, - pub block_timestamp: u64, - pub event_id: String, - pub event: Event, -} - #[allow(missing_debug_implementations)] pub struct Engine { world: Arc>, @@ -216,7 +208,7 @@ pub struct Engine { config: EngineConfig, shutdown_tx: Sender<()>, block_tx: Option>, - tasks: HashMap>, + task_manager: TaskManager

, contracts: Arc>, } diff --git a/crates/torii/indexer/src/lib.rs b/crates/torii/indexer/src/lib.rs index 7191c5480f..e766b08dc0 100644 --- a/crates/torii/indexer/src/lib.rs +++ b/crates/torii/indexer/src/lib.rs @@ -4,6 +4,7 @@ mod constants; #[path = "test.rs"] mod test; +mod task_manager; pub mod engine; pub mod processors; diff --git a/crates/torii/indexer/src/processors/erc20_legacy_transfer.rs b/crates/torii/indexer/src/processors/erc20_legacy_transfer.rs index 3b207d466b..9f01406089 100644 --- a/crates/torii/indexer/src/processors/erc20_legacy_transfer.rs +++ b/crates/torii/indexer/src/processors/erc20_legacy_transfer.rs @@ -1,3 +1,5 @@ +use std::hash::{DefaultHasher, Hash, Hasher}; + use anyhow::Error; use async_trait::async_trait; use cainome::cairo_serde::{CairoSerde, U256 as U256Cainome}; @@ -7,6 +9,8 @@ use starknet::providers::Provider; use torii_sqlite::Sql; use tracing::debug; +use crate::task_manager::TaskId; + use super::{EventProcessor, EventProcessorConfig}; pub(crate) const LOG_TARGET: &str = "torii_indexer::processors::erc20_legacy_transfer"; @@ -34,6 +38,24 @@ where false } + fn task_priority(&self) -> usize { + 1 + } + + fn task_identifier(&self, event: &Event) -> TaskId { + let mut hasher = DefaultHasher::new(); + // Hash the event key (Transfer) + event.keys[0].hash(&mut hasher); + + // Take the max of from/to addresses to get a canonical representation + // This ensures transfers between the same pair of addresses are grouped together + // regardless of direction (A->B or B->A) + let canonical_pair = std::cmp::max(event.data[0], event.data[1]); + canonical_pair.hash(&mut hasher); + + hasher.finish() + } + async fn process( &self, world: &WorldContractReader

, diff --git a/crates/torii/indexer/src/processors/erc20_transfer.rs b/crates/torii/indexer/src/processors/erc20_transfer.rs index a0643abd41..f4d8426dbe 100644 --- a/crates/torii/indexer/src/processors/erc20_transfer.rs +++ b/crates/torii/indexer/src/processors/erc20_transfer.rs @@ -1,3 +1,5 @@ +use std::hash::{DefaultHasher, Hash, Hasher}; + use anyhow::Error; use async_trait::async_trait; use cainome::cairo_serde::{CairoSerde, U256 as U256Cainome}; @@ -7,6 +9,8 @@ use starknet::providers::Provider; use torii_sqlite::Sql; use tracing::debug; +use crate::task_manager::TaskId; + use super::{EventProcessor, EventProcessorConfig}; pub(crate) const LOG_TARGET: &str = "torii_indexer::processors::erc20_transfer"; @@ -34,6 +38,24 @@ where false } + fn task_priority(&self) -> usize { + 1 + } + + fn task_identifier(&self, event: &Event) -> TaskId { + let mut hasher = DefaultHasher::new(); + // Hash the event key (Transfer) + event.keys[0].hash(&mut hasher); + + // Take the max of from/to addresses to get a canonical representation + // This ensures transfers between the same pair of addresses are grouped together + // regardless of direction (A->B or B->A) + let canonical_pair = std::cmp::max(event.keys[1], event.keys[2]); + canonical_pair.hash(&mut hasher); + + hasher.finish() + } + async fn process( &self, world: &WorldContractReader

, diff --git a/crates/torii/indexer/src/processors/erc721_legacy_transfer.rs b/crates/torii/indexer/src/processors/erc721_legacy_transfer.rs index df6b2a88de..f329fbbaa4 100644 --- a/crates/torii/indexer/src/processors/erc721_legacy_transfer.rs +++ b/crates/torii/indexer/src/processors/erc721_legacy_transfer.rs @@ -1,3 +1,5 @@ +use std::hash::{DefaultHasher, Hash, Hasher}; + use anyhow::Error; use async_trait::async_trait; use cainome::cairo_serde::{CairoSerde, U256 as U256Cainome}; @@ -7,6 +9,8 @@ use starknet::providers::Provider; use torii_sqlite::Sql; use tracing::debug; +use crate::task_manager::TaskId; + use super::{EventProcessor, EventProcessorConfig}; pub(crate) const LOG_TARGET: &str = "torii_indexer::processors::erc721_legacy_transfer"; @@ -34,6 +38,32 @@ where false } + fn task_priority(&self) -> usize { + 1 + } + + fn task_identifier(&self, event: &Event) -> TaskId { + let mut hasher = DefaultHasher::new(); + // Hash the event key (Transfer) + event.keys[0].hash(&mut hasher); + + // Take the max of from/to addresses to get a canonical representation + // This ensures transfers between the same pair of addresses are grouped together + // regardless of direction (A->B or B->A) + let canonical_pair = std::cmp::max(event.data[0], event.data[1]); + canonical_pair.hash(&mut hasher); + + // For ERC721, we can safely parallelize by token ID since each token is unique + // and can only be owned by one address at a time. This means: + // 1. Transfers of different tokens can happen in parallel + // 2. Multiple transfers of the same token must be sequential + // 3. The canonical address pair ensures related transfers stay together + event.data[2].hash(&mut hasher); + event.data[3].hash(&mut hasher); + + hasher.finish() + } + async fn process( &self, _world: &WorldContractReader

, diff --git a/crates/torii/indexer/src/processors/erc721_transfer.rs b/crates/torii/indexer/src/processors/erc721_transfer.rs index faf124360b..60a68df784 100644 --- a/crates/torii/indexer/src/processors/erc721_transfer.rs +++ b/crates/torii/indexer/src/processors/erc721_transfer.rs @@ -1,3 +1,4 @@ +use std::hash::{DefaultHasher, Hash, Hasher}; use anyhow::Error; use async_trait::async_trait; use cainome::cairo_serde::{CairoSerde, U256 as U256Cainome}; @@ -7,6 +8,8 @@ use starknet::providers::Provider; use torii_sqlite::Sql; use tracing::debug; +use crate::task_manager::TaskId; + use super::{EventProcessor, EventProcessorConfig}; pub(crate) const LOG_TARGET: &str = "torii_indexer::processors::erc721_transfer"; @@ -34,6 +37,32 @@ where false } + fn task_priority(&self) -> usize { + 1 + } + + fn task_identifier(&self, event: &Event) -> TaskId { + let mut hasher = DefaultHasher::new(); + // Hash the event key (Transfer) + event.keys[0].hash(&mut hasher); + + // Take the max of from/to addresses to get a canonical representation + // This ensures transfers between the same pair of addresses are grouped together + // regardless of direction (A->B or B->A) + let canonical_pair = std::cmp::max(event.keys[1], event.keys[2]); + canonical_pair.hash(&mut hasher); + + // For ERC721, we can safely parallelize by token ID since each token is unique + // and can only be owned by one address at a time. This means: + // 1. Transfers of different tokens can happen in parallel + // 2. Multiple transfers of the same token must be sequential + // 3. The canonical address pair ensures related transfers stay together + event.keys[3].hash(&mut hasher); + event.keys[4].hash(&mut hasher); + + hasher.finish() + } + async fn process( &self, _world: &WorldContractReader

, diff --git a/crates/torii/indexer/src/processors/event_message.rs b/crates/torii/indexer/src/processors/event_message.rs index 4495665bed..97954edd18 100644 --- a/crates/torii/indexer/src/processors/event_message.rs +++ b/crates/torii/indexer/src/processors/event_message.rs @@ -28,6 +28,15 @@ where true } + fn task_priority(&self) -> usize { + 1 + } + + fn task_identifier(&self, _event: &Event) -> u64 { + // TODO. for now event messages are not parallelized + 0 + } + async fn process( &self, _world: &WorldContractReader

, diff --git a/crates/torii/indexer/src/processors/metadata_update.rs b/crates/torii/indexer/src/processors/metadata_update.rs index ec2c4b493d..017ad8b451 100644 --- a/crates/torii/indexer/src/processors/metadata_update.rs +++ b/crates/torii/indexer/src/processors/metadata_update.rs @@ -34,6 +34,15 @@ where true } + fn task_priority(&self) -> usize { + 1 + } + + fn task_identifier(&self, _event: &Event) -> u64 { + // TODO. for now metadata updates are not parallelized + 0 + } + async fn process( &self, _world: &WorldContractReader

, diff --git a/crates/torii/indexer/src/processors/mod.rs b/crates/torii/indexer/src/processors/mod.rs index abe358b6e7..3044577bd0 100644 --- a/crates/torii/indexer/src/processors/mod.rs +++ b/crates/torii/indexer/src/processors/mod.rs @@ -7,6 +7,8 @@ use starknet::core::types::{Event, Felt, Transaction}; use starknet::providers::Provider; use torii_sqlite::Sql; +use crate::task_manager::TaskId; + pub mod erc20_legacy_transfer; pub mod erc20_transfer; pub mod erc721_legacy_transfer; @@ -53,6 +55,9 @@ where fn validate(&self, event: &Event) -> bool; + fn task_priority(&self) -> usize; + fn task_identifier(&self, event: &Event) -> TaskId; + #[allow(clippy::too_many_arguments)] async fn process( &self, diff --git a/crates/torii/indexer/src/processors/raw_event.rs b/crates/torii/indexer/src/processors/raw_event.rs index c30a918fa2..262d76cce8 100644 --- a/crates/torii/indexer/src/processors/raw_event.rs +++ b/crates/torii/indexer/src/processors/raw_event.rs @@ -23,6 +23,15 @@ where true } + fn task_priority(&self) -> usize { + 1 + } + + fn task_identifier(&self, _event: &Event) -> u64 { + // TODO. for now raw events are not parallelized + 0 + } + async fn process( &self, _world: &WorldContractReader

, diff --git a/crates/torii/indexer/src/processors/register_event.rs b/crates/torii/indexer/src/processors/register_event.rs index e9c94f296a..f109f2ddd4 100644 --- a/crates/torii/indexer/src/processors/register_event.rs +++ b/crates/torii/indexer/src/processors/register_event.rs @@ -1,3 +1,5 @@ +use std::hash::{DefaultHasher, Hash, Hasher}; + use anyhow::{Error, Ok, Result}; use async_trait::async_trait; use dojo_world::contracts::abigen::world::Event as WorldEvent; @@ -30,6 +32,16 @@ where true } + fn task_priority(&self) -> usize { + 0 + } + + fn task_identifier(&self, event: &Event) -> u64 { + let mut hasher = DefaultHasher::new(); + event.keys.iter().for_each(|k| k.hash(&mut hasher)); + hasher.finish() + } + async fn process( &self, world: &WorldContractReader

, diff --git a/crates/torii/indexer/src/processors/register_model.rs b/crates/torii/indexer/src/processors/register_model.rs index d630feff88..cae4772ff1 100644 --- a/crates/torii/indexer/src/processors/register_model.rs +++ b/crates/torii/indexer/src/processors/register_model.rs @@ -1,3 +1,5 @@ +use std::hash::{DefaultHasher, Hash, Hasher}; + use anyhow::{Error, Ok, Result}; use async_trait::async_trait; use dojo_world::contracts::abigen::world::Event as WorldEvent; @@ -30,6 +32,16 @@ where true } + fn task_priority(&self) -> usize { + 0 + } + + fn task_identifier(&self, event: &Event) -> u64 { + let mut hasher = DefaultHasher::new(); + event.keys.iter().for_each(|k| k.hash(&mut hasher)); + hasher.finish() + } + async fn process( &self, world: &WorldContractReader

, diff --git a/crates/torii/indexer/src/processors/store_del_record.rs b/crates/torii/indexer/src/processors/store_del_record.rs index e109f069d7..dbb72eb1a0 100644 --- a/crates/torii/indexer/src/processors/store_del_record.rs +++ b/crates/torii/indexer/src/processors/store_del_record.rs @@ -1,3 +1,5 @@ +use std::hash::{DefaultHasher, Hash, Hasher}; + use anyhow::{Error, Result}; use async_trait::async_trait; use dojo_world::contracts::abigen::world::Event as WorldEvent; @@ -7,6 +9,8 @@ use starknet::providers::Provider; use torii_sqlite::Sql; use tracing::{debug, info}; +use crate::task_manager::TaskId; + use super::{EventProcessor, EventProcessorConfig}; pub(crate) const LOG_TARGET: &str = "torii_indexer::processors::store_del_record"; @@ -27,6 +31,17 @@ where true } + fn task_priority(&self) -> usize { + 1 + } + + fn task_identifier(&self, event: &Event) -> TaskId { + let mut hasher = DefaultHasher::new(); + event.keys[1].hash(&mut hasher); + event.keys[2].hash(&mut hasher); + hasher.finish() + } + async fn process( &self, _world: &WorldContractReader

, diff --git a/crates/torii/indexer/src/processors/store_set_record.rs b/crates/torii/indexer/src/processors/store_set_record.rs index c564cb3968..cb3fd04a49 100644 --- a/crates/torii/indexer/src/processors/store_set_record.rs +++ b/crates/torii/indexer/src/processors/store_set_record.rs @@ -1,3 +1,5 @@ +use std::hash::{DefaultHasher, Hash, Hasher}; + use anyhow::{Error, Result}; use async_trait::async_trait; use dojo_world::contracts::abigen::world::Event as WorldEvent; @@ -28,6 +30,17 @@ where true } + fn task_priority(&self) -> usize { + 1 + } + + fn task_identifier(&self, event: &Event) -> u64 { + let mut hasher = DefaultHasher::new(); + event.keys[1].hash(&mut hasher); + event.keys[2].hash(&mut hasher); + hasher.finish() + } + async fn process( &self, _world: &WorldContractReader

, diff --git a/crates/torii/indexer/src/processors/store_update_member.rs b/crates/torii/indexer/src/processors/store_update_member.rs index 776d61d9cb..64ed9778c8 100644 --- a/crates/torii/indexer/src/processors/store_update_member.rs +++ b/crates/torii/indexer/src/processors/store_update_member.rs @@ -1,3 +1,4 @@ +use std::hash::{DefaultHasher, Hash, Hasher}; use anyhow::{Context, Error, Result}; use async_trait::async_trait; use dojo_types::schema::{Struct, Ty}; @@ -29,6 +30,17 @@ where true } + fn task_priority(&self) -> usize { + 1 + } + + fn task_identifier(&self, event: &Event) -> u64 { + let mut hasher = DefaultHasher::new(); + event.keys[1].hash(&mut hasher); + event.keys[2].hash(&mut hasher); + hasher.finish() + } + async fn process( &self, _world: &WorldContractReader

, diff --git a/crates/torii/indexer/src/processors/store_update_record.rs b/crates/torii/indexer/src/processors/store_update_record.rs index b92344849d..624ea53f74 100644 --- a/crates/torii/indexer/src/processors/store_update_record.rs +++ b/crates/torii/indexer/src/processors/store_update_record.rs @@ -1,3 +1,5 @@ +use std::hash::{DefaultHasher, Hash, Hasher}; + use anyhow::{Error, Result}; use async_trait::async_trait; use dojo_types::schema::Ty; @@ -28,6 +30,17 @@ where true } + fn task_priority(&self) -> usize { + 1 + } + + fn task_identifier(&self, event: &Event) -> u64 { + let mut hasher = DefaultHasher::new(); + event.keys[1].hash(&mut hasher); + event.keys[2].hash(&mut hasher); + hasher.finish() + } + async fn process( &self, _world: &WorldContractReader

, diff --git a/crates/torii/indexer/src/processors/upgrade_event.rs b/crates/torii/indexer/src/processors/upgrade_event.rs index ba7966d63b..2840530faa 100644 --- a/crates/torii/indexer/src/processors/upgrade_event.rs +++ b/crates/torii/indexer/src/processors/upgrade_event.rs @@ -1,3 +1,4 @@ +use std::hash::{DefaultHasher, Hash, Hasher}; use anyhow::{Error, Result}; use async_trait::async_trait; use dojo_world::contracts::abigen::world::Event as WorldEvent; @@ -30,6 +31,17 @@ where true } + fn task_priority(&self) -> usize { + 0 + } + + fn task_identifier(&self, event: &Event) -> u64 { + let mut hasher = DefaultHasher::new(); + // event selector + event.keys[1].hash(&mut hasher); + hasher.finish() + } + async fn process( &self, world: &WorldContractReader

, diff --git a/crates/torii/indexer/src/processors/upgrade_model.rs b/crates/torii/indexer/src/processors/upgrade_model.rs index 40717df30d..2bedeee8a5 100644 --- a/crates/torii/indexer/src/processors/upgrade_model.rs +++ b/crates/torii/indexer/src/processors/upgrade_model.rs @@ -1,3 +1,4 @@ +use std::hash::{DefaultHasher, Hash, Hasher}; use anyhow::{Error, Result}; use async_trait::async_trait; use dojo_world::contracts::abigen::world::Event as WorldEvent; @@ -30,6 +31,17 @@ where true } + fn task_priority(&self) -> usize { + 0 + } + + fn task_identifier(&self, event: &Event) -> u64 { + let mut hasher = DefaultHasher::new(); + // model selector + event.keys[1].hash(&mut hasher); + hasher.finish() + } + async fn process( &self, world: &WorldContractReader

, diff --git a/crates/torii/indexer/src/task_manager.rs b/crates/torii/indexer/src/task_manager.rs new file mode 100644 index 0000000000..1dbc1b7690 --- /dev/null +++ b/crates/torii/indexer/src/task_manager.rs @@ -0,0 +1,67 @@ +use std::{ + collections::{BTreeMap, HashMap}, + sync::Arc, +}; + +use starknet::{core::types::Event, providers::Provider}; +use torii_sqlite::types::ContractType; + +use crate::engine::Processors; + +pub type TaskId = u64; +type TaskPriority = usize; + +#[derive(Debug)] +pub struct ParallelizedEvent { + pub contract_type: ContractType, + pub block_number: u64, + pub block_timestamp: u64, + pub event_id: String, + pub event: Event, +} + +pub struct TaskManager { + tasks: BTreeMap>>, + processors: Arc>, +} + +impl TaskManager

{ + pub fn new(processors: Arc>) -> Self { + Self { tasks: BTreeMap::new(), processors } + } + + pub fn add_parallelized_event(&mut self, parallelized_event: ParallelizedEvent) -> TaskId { + let event_key = parallelized_event.event.keys[0]; + let processor = self + .processors + .get_event_processor(parallelized_event.contract_type) + .get(&event_key) + .unwrap() + .iter() + .find(|p| p.validate(¶llelized_event.event)) + .unwrap(); + let priority = processor.task_priority(); + let task_id = processor.task_identifier(¶llelized_event.event); + + if task_id != 0 { + self.tasks + .entry(priority) + .or_default() + .entry(task_id) + .or_default() + .push(parallelized_event); + } + + task_id + } + + pub fn take_tasks( + &mut self, + ) -> BTreeMap>> { + std::mem::take(&mut self.tasks) + } + + pub fn is_empty(&self) -> bool { + self.tasks.is_empty() + } +}