Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(torii-indexer): task manager & parallelize erc transfers #2913

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 1 addition & 9 deletions crates/torii/indexer/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,14 +199,6 @@ pub struct FetchPendingResult {
pub block_number: u64,
}

#[derive(Debug)]
pub struct ParallelizedEvent {
pub block_number: u64,
pub block_timestamp: u64,
pub event_id: String,
pub event: Event,
}

#[allow(missing_debug_implementations)]
pub struct Engine<P: Provider + Send + Sync + std::fmt::Debug + 'static> {
world: Arc<WorldContractReader<P>>,
Expand All @@ -216,7 +208,7 @@ pub struct Engine<P: Provider + Send + Sync + std::fmt::Debug + 'static> {
config: EngineConfig,
shutdown_tx: Sender<()>,
block_tx: Option<BoundedSender<u64>>,
tasks: HashMap<u64, Vec<(ContractType, ParallelizedEvent)>>,
task_manager: TaskManager<P>,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Initialize task_manager field in Engine

Ohayo, sensei! On line 211, the Engine struct has a new field task_manager but it's not initialized in the constructor. Make sure to initialize task_manager in the Engine::new method to prevent a runtime error.

Apply this diff to initialize task_manager:

          block_tx,
          contracts,
-         tasks: HashMap::new(),
+         task_manager: TaskManager::new(processors.clone()),
      }

Committable suggestion skipped: line range outside the PR's diff.

contracts: Arc<HashMap<Felt, ContractType>>,
}

Expand Down
1 change: 1 addition & 0 deletions crates/torii/indexer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ mod constants;
#[path = "test.rs"]
mod test;

mod task_manager;
pub mod engine;
pub mod processors;

Expand Down
22 changes: 22 additions & 0 deletions crates/torii/indexer/src/processors/erc20_legacy_transfer.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::hash::{DefaultHasher, Hash, Hasher};

use anyhow::Error;
use async_trait::async_trait;
use cainome::cairo_serde::{CairoSerde, U256 as U256Cainome};
Expand All @@ -7,6 +9,8 @@ use starknet::providers::Provider;
use torii_sqlite::Sql;
use tracing::debug;

use crate::task_manager::TaskId;

use super::{EventProcessor, EventProcessorConfig};

pub(crate) const LOG_TARGET: &str = "torii_indexer::processors::erc20_legacy_transfer";
Expand Down Expand Up @@ -34,6 +38,24 @@ where
false
}

fn task_priority(&self) -> usize {
1
}

fn task_identifier(&self, event: &Event) -> TaskId {
let mut hasher = DefaultHasher::new();
// Hash the event key (Transfer)
event.keys[0].hash(&mut hasher);

// Take the max of from/to addresses to get a canonical representation
// This ensures transfers between the same pair of addresses are grouped together
// regardless of direction (A->B or B->A)
let canonical_pair = std::cmp::max(event.data[0], event.data[1]);
canonical_pair.hash(&mut hasher);

hasher.finish()
}

async fn process(
&self,
world: &WorldContractReader<P>,
Expand Down
22 changes: 22 additions & 0 deletions crates/torii/indexer/src/processors/erc20_transfer.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::hash::{DefaultHasher, Hash, Hasher};

use anyhow::Error;
use async_trait::async_trait;
use cainome::cairo_serde::{CairoSerde, U256 as U256Cainome};
Expand All @@ -7,6 +9,8 @@ use starknet::providers::Provider;
use torii_sqlite::Sql;
use tracing::debug;

use crate::task_manager::TaskId;

use super::{EventProcessor, EventProcessorConfig};

pub(crate) const LOG_TARGET: &str = "torii_indexer::processors::erc20_transfer";
Expand Down Expand Up @@ -34,6 +38,24 @@ where
false
}

fn task_priority(&self) -> usize {
1
}

fn task_identifier(&self, event: &Event) -> TaskId {
let mut hasher = DefaultHasher::new();
// Hash the event key (Transfer)
event.keys[0].hash(&mut hasher);

// Take the max of from/to addresses to get a canonical representation
// This ensures transfers between the same pair of addresses are grouped together
// regardless of direction (A->B or B->A)
let canonical_pair = std::cmp::max(event.keys[1], event.keys[2]);
canonical_pair.hash(&mut hasher);

hasher.finish()
}

async fn process(
&self,
world: &WorldContractReader<P>,
Expand Down
30 changes: 30 additions & 0 deletions crates/torii/indexer/src/processors/erc721_legacy_transfer.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::hash::{DefaultHasher, Hash, Hasher};

use anyhow::Error;
use async_trait::async_trait;
use cainome::cairo_serde::{CairoSerde, U256 as U256Cainome};
Expand All @@ -7,6 +9,8 @@ use starknet::providers::Provider;
use torii_sqlite::Sql;
use tracing::debug;

use crate::task_manager::TaskId;

use super::{EventProcessor, EventProcessorConfig};

pub(crate) const LOG_TARGET: &str = "torii_indexer::processors::erc721_legacy_transfer";
Expand Down Expand Up @@ -34,6 +38,32 @@ where
false
}

fn task_priority(&self) -> usize {
1
}

fn task_identifier(&self, event: &Event) -> TaskId {
let mut hasher = DefaultHasher::new();
// Hash the event key (Transfer)
event.keys[0].hash(&mut hasher);

// Take the max of from/to addresses to get a canonical representation
// This ensures transfers between the same pair of addresses are grouped together
// regardless of direction (A->B or B->A)
let canonical_pair = std::cmp::max(event.data[0], event.data[1]);
canonical_pair.hash(&mut hasher);

// For ERC721, we can safely parallelize by token ID since each token is unique
// and can only be owned by one address at a time. This means:
// 1. Transfers of different tokens can happen in parallel
// 2. Multiple transfers of the same token must be sequential
// 3. The canonical address pair ensures related transfers stay together
event.data[2].hash(&mut hasher);
event.data[3].hash(&mut hasher);

hasher.finish()
}

async fn process(
&self,
_world: &WorldContractReader<P>,
Expand Down
29 changes: 29 additions & 0 deletions crates/torii/indexer/src/processors/erc721_transfer.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::hash::{DefaultHasher, Hash, Hasher};
use anyhow::Error;
use async_trait::async_trait;
use cainome::cairo_serde::{CairoSerde, U256 as U256Cainome};
Expand All @@ -7,6 +8,8 @@ use starknet::providers::Provider;
use torii_sqlite::Sql;
use tracing::debug;

use crate::task_manager::TaskId;

use super::{EventProcessor, EventProcessorConfig};

pub(crate) const LOG_TARGET: &str = "torii_indexer::processors::erc721_transfer";
Expand Down Expand Up @@ -34,6 +37,32 @@ where
false
}

fn task_priority(&self) -> usize {
1
}

fn task_identifier(&self, event: &Event) -> TaskId {
let mut hasher = DefaultHasher::new();
// Hash the event key (Transfer)
event.keys[0].hash(&mut hasher);

// Take the max of from/to addresses to get a canonical representation
// This ensures transfers between the same pair of addresses are grouped together
// regardless of direction (A->B or B->A)
let canonical_pair = std::cmp::max(event.keys[1], event.keys[2]);
canonical_pair.hash(&mut hasher);

// For ERC721, we can safely parallelize by token ID since each token is unique
// and can only be owned by one address at a time. This means:
// 1. Transfers of different tokens can happen in parallel
// 2. Multiple transfers of the same token must be sequential
// 3. The canonical address pair ensures related transfers stay together
event.keys[3].hash(&mut hasher);
event.keys[4].hash(&mut hasher);

hasher.finish()
}

async fn process(
&self,
_world: &WorldContractReader<P>,
Expand Down
9 changes: 9 additions & 0 deletions crates/torii/indexer/src/processors/event_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,15 @@ where
true
}

fn task_priority(&self) -> usize {
1
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't an enum more idiomatic, or you need logic around this priority, or a priority value seems more manageable?

}

fn task_identifier(&self, _event: &Event) -> u64 {
// TODO. for now event messages are not parallelized
0
}

async fn process(
&self,
_world: &WorldContractReader<P>,
Expand Down
9 changes: 9 additions & 0 deletions crates/torii/indexer/src/processors/metadata_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,15 @@ where
true
}

fn task_priority(&self) -> usize {
1
}

fn task_identifier(&self, _event: &Event) -> u64 {
// TODO. for now metadata updates are not parallelized
0
}
Comment on lines +37 to +44
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Ohayo! Consider implementing proper task identification, sensei.

The current implementation returns a hardcoded 0 for all task identifiers, which effectively serializes all metadata updates. This might become a bottleneck as the system scales.

Consider implementing a proper task identification strategy based on:

  • The resource being updated (event.resource)
  • The URI being set
  • Or a combination of both for better parallelization
 fn task_identifier(&self, _event: &Event) -> u64 {
-    // TODO. for now metadata updates are not parallelized
-    0
+    let mut hasher = DefaultHasher::new();
+    event.resource.hash(&mut hasher);
+    hasher.finish()
 }

Committable suggestion skipped: line range outside the PR's diff.


async fn process(
&self,
_world: &WorldContractReader<P>,
Expand Down
5 changes: 5 additions & 0 deletions crates/torii/indexer/src/processors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ use starknet::core::types::{Event, Felt, Transaction};
use starknet::providers::Provider;
use torii_sqlite::Sql;

use crate::task_manager::TaskId;

pub mod erc20_legacy_transfer;
pub mod erc20_transfer;
pub mod erc721_legacy_transfer;
Expand Down Expand Up @@ -53,6 +55,9 @@ where

fn validate(&self, event: &Event) -> bool;

fn task_priority(&self) -> usize;
fn task_identifier(&self, event: &Event) -> TaskId;

#[allow(clippy::too_many_arguments)]
async fn process(
&self,
Expand Down
9 changes: 9 additions & 0 deletions crates/torii/indexer/src/processors/raw_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,15 @@ where
true
}

fn task_priority(&self) -> usize {
1
}

fn task_identifier(&self, _event: &Event) -> u64 {
// TODO. for now raw events are not parallelized
0
}

async fn process(
&self,
_world: &WorldContractReader<P>,
Expand Down
12 changes: 12 additions & 0 deletions crates/torii/indexer/src/processors/register_event.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::hash::{DefaultHasher, Hash, Hasher};

use anyhow::{Error, Ok, Result};
use async_trait::async_trait;
use dojo_world::contracts::abigen::world::Event as WorldEvent;
Expand Down Expand Up @@ -30,6 +32,16 @@ where
true
}

fn task_priority(&self) -> usize {
0
}

fn task_identifier(&self, event: &Event) -> u64 {
let mut hasher = DefaultHasher::new();
event.keys.iter().for_each(|k| k.hash(&mut hasher));
hasher.finish()
}

async fn process(
&self,
world: &WorldContractReader<P>,
Expand Down
12 changes: 12 additions & 0 deletions crates/torii/indexer/src/processors/register_model.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::hash::{DefaultHasher, Hash, Hasher};

use anyhow::{Error, Ok, Result};
use async_trait::async_trait;
use dojo_world::contracts::abigen::world::Event as WorldEvent;
Expand Down Expand Up @@ -30,6 +32,16 @@ where
true
}

fn task_priority(&self) -> usize {
0
}

fn task_identifier(&self, event: &Event) -> u64 {
let mut hasher = DefaultHasher::new();
event.keys.iter().for_each(|k| k.hash(&mut hasher));
hasher.finish()
}

async fn process(
&self,
world: &WorldContractReader<P>,
Expand Down
15 changes: 15 additions & 0 deletions crates/torii/indexer/src/processors/store_del_record.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::hash::{DefaultHasher, Hash, Hasher};

use anyhow::{Error, Result};
use async_trait::async_trait;
use dojo_world::contracts::abigen::world::Event as WorldEvent;
Expand All @@ -7,6 +9,8 @@ use starknet::providers::Provider;
use torii_sqlite::Sql;
use tracing::{debug, info};

use crate::task_manager::TaskId;

use super::{EventProcessor, EventProcessorConfig};

pub(crate) const LOG_TARGET: &str = "torii_indexer::processors::store_del_record";
Expand All @@ -27,6 +31,17 @@ where
true
}

fn task_priority(&self) -> usize {
1
}

fn task_identifier(&self, event: &Event) -> TaskId {
let mut hasher = DefaultHasher::new();
event.keys[1].hash(&mut hasher);
event.keys[2].hash(&mut hasher);
hasher.finish()
}

async fn process(
&self,
_world: &WorldContractReader<P>,
Expand Down
13 changes: 13 additions & 0 deletions crates/torii/indexer/src/processors/store_set_record.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::hash::{DefaultHasher, Hash, Hasher};

use anyhow::{Error, Result};
use async_trait::async_trait;
use dojo_world::contracts::abigen::world::Event as WorldEvent;
Expand Down Expand Up @@ -28,6 +30,17 @@ where
true
}

fn task_priority(&self) -> usize {
1
}

fn task_identifier(&self, event: &Event) -> u64 {
let mut hasher = DefaultHasher::new();
event.keys[1].hash(&mut hasher);
event.keys[2].hash(&mut hasher);
hasher.finish()
}

async fn process(
&self,
_world: &WorldContractReader<P>,
Expand Down
12 changes: 12 additions & 0 deletions crates/torii/indexer/src/processors/store_update_member.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::hash::{DefaultHasher, Hash, Hasher};
use anyhow::{Context, Error, Result};
use async_trait::async_trait;
use dojo_types::schema::{Struct, Ty};
Expand Down Expand Up @@ -29,6 +30,17 @@ where
true
}

fn task_priority(&self) -> usize {
1
}

fn task_identifier(&self, event: &Event) -> u64 {
let mut hasher = DefaultHasher::new();
event.keys[1].hash(&mut hasher);
event.keys[2].hash(&mut hasher);
hasher.finish()
}

async fn process(
&self,
_world: &WorldContractReader<P>,
Expand Down
Loading
Loading