Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(torii-indexer): task manager & parallelize erc transfers #2913

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

Larkooo
Copy link
Collaborator

@Larkooo Larkooo commented Jan 15, 2025

Summary by CodeRabbit

Release Notes

  • New Features

    • Introduced a new task management system for event processing
    • Added task prioritization and unique event identification across various event processors
  • Improvements

    • Enhanced event processing capabilities with more granular task handling
    • Implemented a flexible TaskManager to manage parallelized events more efficiently
  • Technical Updates

    • Restructured event processing logic in the indexer
    • Added methods for task priority and task identification to event processors

Copy link

coderabbitai bot commented Jan 15, 2025

Walkthrough

Ohayo, sensei! This pull request introduces a comprehensive refactoring of the event processing system in the Torii indexer. The changes focus on enhancing task management by introducing a new TaskManager struct, removing the previous ParallelizedEvent approach, and adding task_priority and task_identifier methods across multiple event processors. The modifications aim to improve event processing flexibility, allowing for more sophisticated parallel and prioritized task handling.

Changes

File Change Summary
crates/torii/indexer/src/engine.rs Removed ParallelizedEvent struct, replaced tasks with task_manager
crates/torii/indexer/src/lib.rs Added new task_manager module
crates/torii/indexer/src/processors/* Added task_priority() and task_identifier() methods to multiple event processors
crates/torii/indexer/src/task_manager.rs New implementation of TaskManager and ParallelizedEvent structs

Possibly related PRs

Suggested Reviewers

  • glihm

🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR. (Beta)
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🔭 Outside diff range comments (1)
crates/torii/indexer/src/engine.rs (1)

Line range hint 473-495: Update process_tasks method to use task_manager

Ohayo, sensei! The process_tasks method still references self.tasks, which should now be managed by task_manager. Update the method to retrieve tasks from task_manager and process them accordingly.

🧹 Nitpick comments (10)
crates/torii/indexer/src/task_manager.rs (2)

58-66: Consider returning a reference in take_tasks for efficiency

Ohayo, sensei! In the take_tasks method (lines 58-62), using std::mem::take moves the tasks out of the TaskManager, leaving an empty BTreeMap. If the intention is to process tasks without clearing them, consider returning a reference or cloning the tasks.


24-26: Add visibility modifier to tasks field if needed

Ohayo, sensei! The tasks field in TaskManager is currently private. If external modules need to access tasks, consider adding a visibility modifier like pub(crate).

crates/torii/indexer/src/engine.rs (1)

Line range hint 838-860: Remove unused function get_transaction_hash_from_event_id if obsolete

Ohayo, sensei! The function get_transaction_hash_from_event_id on lines 838-841 might be obsolete due to changes in event handling. If it's no longer used, consider removing it to keep the codebase clean.

crates/torii/indexer/src/processors/raw_event.rs (1)

26-34: Clarify task parallelization for RawEventProcessor

Ohayo, sensei! In the RawEventProcessor, the task_identifier method returns 0, indicating that raw events are not parallelized (lines 30-33). If you plan to parallelize raw event processing in the future, consider adding a TODO comment for clarity.

crates/torii/indexer/src/processors/erc20_legacy_transfer.rs (1)

45-57: Ohayo sensei! Consider reducing code duplication.

The task identification logic is identical to Erc20TransferProcessor except for accessing event data differently. Consider extracting the common hashing logic into a shared utility function.

Here's a suggested approach:

// In a shared utils module:
pub fn compute_transfer_task_id(event_key: &Felt, address1: &Felt, address2: &Felt) -> TaskId {
    let mut hasher = DefaultHasher::new();
    event_key.hash(&mut hasher);
    let canonical_pair = std::cmp::max(*address1, *address2);
    canonical_pair.hash(&mut hasher);
    hasher.finish()
}

// In Erc20TransferProcessor:
fn task_identifier(&self, event: &Event) -> TaskId {
    compute_transfer_task_id(&event.keys[0], &event.keys[1], &event.keys[2])
}

// In Erc20LegacyTransferProcessor:
fn task_identifier(&self, event: &Event) -> TaskId {
    compute_transfer_task_id(&event.keys[0], &event.data[0], &event.data[1])
}
crates/torii/indexer/src/processors/store_del_record.rs (1)

38-43: Consider adding bounds checking and documentation.

The task identifier implementation could benefit from:

  1. Array bounds validation before accessing keys[1] and keys[2]
  2. Comments explaining what these keys represent in the context of StoreDelRecord events
 fn task_identifier(&self, event: &Event) -> TaskId {
+    // Ensure we have enough keys
+    if event.keys.len() < 3 {
+        panic!("StoreDelRecord event must have at least 3 keys");
+    }
+
+    // Keys[1]: Model selector
+    // Keys[2]: Entity ID
     let mut hasher = DefaultHasher::new();
     event.keys[1].hash(&mut hasher);
     event.keys[2].hash(&mut hasher);
     hasher.finish()
 }
crates/torii/indexer/src/processors/store_set_record.rs (1)

37-42: Add safety checks and documentation.

Similar to StoreDelRecord, consider:

  1. Validating array bounds
  2. Documenting the meaning of keys[1] and keys[2]
 fn task_identifier(&self, event: &Event) -> u64 {
+    // Ensure we have enough keys
+    if event.keys.len() < 3 {
+        panic!("StoreSetRecord event must have at least 3 keys");
+    }
+
+    // Keys[1]: Model selector
+    // Keys[2]: Entity ID
     let mut hasher = DefaultHasher::new();
     event.keys[1].hash(&mut hasher);
     event.keys[2].hash(&mut hasher);
     hasher.finish()
 }
crates/torii/indexer/src/processors/store_update_member.rs (1)

33-42: Ohayo sensei! Consider extracting common task ID generation logic.

The task_identifier implementation is identical to StoreUpdateRecordProcessor. Consider:

  1. Extracting this common logic into a trait or utility function
  2. Adding bounds checking for array access
// Example utility function
fn generate_task_id(keys: &[FieldElement]) -> u64 {
    let mut hasher = DefaultHasher::new();
    if keys.len() > 2 {
        keys[1].hash(&mut hasher);
        keys[2].hash(&mut hasher);
    }
    hasher.finish()
}
crates/torii/indexer/src/processors/register_model.rs (1)

39-43: Consider adopting this comprehensive task ID generation approach globally.

This implementation is more robust as it hashes all available keys. Consider using this approach in other processors (like StoreUpdateRecord and StoreUpdateMember) for consistency and completeness.

crates/torii/indexer/src/processors/upgrade_event.rs (1)

Line range hint 82-82: Consider improving documentation clarity, sensei.

The comment "Called model here by language, but it's an event" suggests potential confusion. Consider updating variable names to better reflect their purpose (e.g., event_model or event_definition).

📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a8af3d6 and 15cb6e9.

📒 Files selected for processing (19)
  • crates/torii/indexer/src/engine.rs (1 hunks)
  • crates/torii/indexer/src/lib.rs (1 hunks)
  • crates/torii/indexer/src/processors/erc20_legacy_transfer.rs (3 hunks)
  • crates/torii/indexer/src/processors/erc20_transfer.rs (3 hunks)
  • crates/torii/indexer/src/processors/erc721_legacy_transfer.rs (3 hunks)
  • crates/torii/indexer/src/processors/erc721_transfer.rs (3 hunks)
  • crates/torii/indexer/src/processors/event_message.rs (1 hunks)
  • crates/torii/indexer/src/processors/metadata_update.rs (1 hunks)
  • crates/torii/indexer/src/processors/mod.rs (2 hunks)
  • crates/torii/indexer/src/processors/raw_event.rs (1 hunks)
  • crates/torii/indexer/src/processors/register_event.rs (2 hunks)
  • crates/torii/indexer/src/processors/register_model.rs (2 hunks)
  • crates/torii/indexer/src/processors/store_del_record.rs (3 hunks)
  • crates/torii/indexer/src/processors/store_set_record.rs (2 hunks)
  • crates/torii/indexer/src/processors/store_update_member.rs (2 hunks)
  • crates/torii/indexer/src/processors/store_update_record.rs (2 hunks)
  • crates/torii/indexer/src/processors/upgrade_event.rs (2 hunks)
  • crates/torii/indexer/src/processors/upgrade_model.rs (2 hunks)
  • crates/torii/indexer/src/task_manager.rs (1 hunks)
✅ Files skipped from review due to trivial changes (1)
  • crates/torii/indexer/src/lib.rs
⏰ Context from checks skipped due to timeout of 90000ms (1)
  • GitHub Check: fmt
🔇 Additional comments (16)
crates/torii/indexer/src/task_manager.rs (1)

46-53: Confirm intentional exclusion of events with task_id zero

Ohayo, sensei! The current logic skips adding events to tasks when task_id is zero (lines 46-53). Please verify if this is intentional. If events with task_id zero should be processed differently, consider documenting the rationale.

crates/torii/indexer/src/processors/mod.rs (1)

58-59: Ohayo! The trait changes look good, sensei!

The addition of task_priority and task_identifier methods to the EventProcessor trait provides a solid foundation for task management. This enables consistent prioritization and identification of tasks across all event processors.

crates/torii/indexer/src/processors/event_message.rs (1)

35-38: Ohayo sensei! Let's address the TODO for event message parallelization.

Returning 0 for all task identifiers could lead to task collisions if parallelization is implemented in the future. Consider implementing a proper task identifier based on event properties, similar to other processors.

Would you like me to propose an implementation that generates unique task identifiers based on event properties?

crates/torii/indexer/src/processors/erc20_transfer.rs (1)

45-57: Ohayo! Excellent implementation of task identification, sensei!

The use of canonical representation for transfer addresses is a clever approach. By taking the maximum of the from/to addresses, you ensure that transfers between the same pair of addresses are grouped together regardless of direction. The implementation is well-documented and efficient.

crates/torii/indexer/src/processors/store_del_record.rs (1)

34-36: Ohayo! Task priority looks good, sensei!

The constant priority of 1 is consistent with other processors.

crates/torii/indexer/src/processors/store_set_record.rs (1)

33-35: Task priority implementation looks good, sensei!

Consistent with other processors.

crates/torii/indexer/src/processors/erc721_transfer.rs (2)

40-42: Task priority looks good, sensei!

Consistent priority level of 1 matches other processors.


44-64: Excellent implementation with clear documentation!

The task identification strategy is well thought out:

  1. Uses canonical address pairs to group related transfers
  2. Enables safe parallelization by token ID
  3. Includes clear documentation explaining the approach

This is a great example of how to implement and document task identification logic!

crates/torii/indexer/src/processors/erc721_legacy_transfer.rs (2)

41-43: Task priority implementation looks good, sensei!

Consistent with other processors.


45-65: Excellent implementation with clear documentation!

The task identification strategy mirrors the non-legacy implementation:

  1. Uses canonical address pairs to group related transfers
  2. Enables safe parallelization by token ID
  3. Includes clear documentation explaining the approach

Great consistency with the non-legacy implementation while adapting to the different event structure!

crates/torii/indexer/src/processors/store_update_record.rs (1)

1-1: Ohayo! Implementation looks good, sensei!

The priority value of 1 is appropriate for store update events, as they should be processed after system events (priority 0).

Also applies to: 33-35

crates/torii/indexer/src/processors/register_model.rs (1)

35-37: Ohayo! Excellent priority choice, sensei!

Setting priority 0 for model registration is correct, as models need to be registered before any store operations can be processed.

crates/torii/indexer/src/processors/register_event.rs (1)

35-43: Ohayo! Implementation is consistent and robust, sensei!

The implementation aligns well with RegisterModelProcessor:

  • Priority 0 is appropriate for system events
  • Comprehensive key hashing approach ensures unique task IDs
crates/torii/indexer/src/processors/upgrade_model.rs (2)

Line range hint 44-136: Process method implementation looks solid, sensei!

The extensive error handling and logging in the process method align well with its high-priority task status.


34-43: Ohayo! Add bounds checking for event.keys array access, sensei.

The task_identifier implementation assumes event.keys[1] always exists. Consider adding a bounds check to handle cases where the event keys array might be empty or have insufficient elements.

Also, consider adding documentation to explain:

  • Why priority 0 was chosen for model upgrades
  • The significance of using the second key for task identification
crates/torii/indexer/src/processors/upgrade_event.rs (1)

34-43: Extract common task identification logic, sensei!

The task identifier implementation is duplicated across multiple processors. Consider extracting this into a common utility function or trait default implementation.

Also, similar to the model upgrade processor, add bounds checking for event.keys[1] access.

Comment on lines +33 to +42
pub fn add_parallelized_event(&mut self, parallelized_event: ParallelizedEvent) -> TaskId {
let event_key = parallelized_event.event.keys[0];
let processor = self
.processors
.get_event_processor(parallelized_event.contract_type)
.get(&event_key)
.unwrap()
.iter()
.find(|p| p.validate(&parallelized_event.event))
.unwrap();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Avoid using .unwrap() to prevent potential panics

Ohayo, sensei! The usage of .unwrap() on lines 39 and 42 could lead to a panic if the Option is None. To enhance the robustness of the code, consider handling the None case explicitly or propagating the error using ?.

Apply this diff to handle the potential None values:

 let processor = self
     .processors
     .get_event_processor(parallelized_event.contract_type)
-    .get(&event_key)
-    .unwrap()
+    .get(&event_key)
+    .ok_or_else(|| anyhow!("No processor found for event key"))?
     .iter()
     .find(|p| p.validate(&parallelized_event.event))
-    .unwrap();
+    .ok_or_else(|| anyhow!("No valid processor found for event"))?;

Committable suggestion skipped: line range outside the PR's diff.

@@ -216,7 +208,7 @@ pub struct Engine<P: Provider + Send + Sync + std::fmt::Debug + 'static> {
config: EngineConfig,
shutdown_tx: Sender<()>,
block_tx: Option<BoundedSender<u64>>,
tasks: HashMap<u64, Vec<(ContractType, ParallelizedEvent)>>,
task_manager: TaskManager<P>,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Initialize task_manager field in Engine

Ohayo, sensei! On line 211, the Engine struct has a new field task_manager but it's not initialized in the constructor. Make sure to initialize task_manager in the Engine::new method to prevent a runtime error.

Apply this diff to initialize task_manager:

          block_tx,
          contracts,
-         tasks: HashMap::new(),
+         task_manager: TaskManager::new(processors.clone()),
      }

Committable suggestion skipped: line range outside the PR's diff.

Comment on lines +37 to +42
fn task_identifier(&self, event: &Event) -> u64 {
let mut hasher = DefaultHasher::new();
event.keys[1].hash(&mut hasher);
event.keys[2].hash(&mut hasher);
hasher.finish()
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ohayo sensei! Consider adding bounds checking for array access.

The current implementation assumes keys[1] and keys[2] always exist. Consider adding bounds checking to prevent potential panic:

     fn task_identifier(&self, event: &Event) -> u64 {
         let mut hasher = DefaultHasher::new();
-        event.keys[1].hash(&mut hasher);
-        event.keys[2].hash(&mut hasher);
+        if event.keys.len() > 2 {
+            event.keys[1].hash(&mut hasher);
+            event.keys[2].hash(&mut hasher);
+        }
         hasher.finish()
     }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
fn task_identifier(&self, event: &Event) -> u64 {
let mut hasher = DefaultHasher::new();
event.keys[1].hash(&mut hasher);
event.keys[2].hash(&mut hasher);
hasher.finish()
}
fn task_identifier(&self, event: &Event) -> u64 {
let mut hasher = DefaultHasher::new();
if event.keys.len() > 2 {
event.keys[1].hash(&mut hasher);
event.keys[2].hash(&mut hasher);
}
hasher.finish()
}

Comment on lines +37 to +44
fn task_priority(&self) -> usize {
1
}

fn task_identifier(&self, _event: &Event) -> u64 {
// TODO. for now metadata updates are not parallelized
0
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Ohayo! Consider implementing proper task identification, sensei.

The current implementation returns a hardcoded 0 for all task identifiers, which effectively serializes all metadata updates. This might become a bottleneck as the system scales.

Consider implementing a proper task identification strategy based on:

  • The resource being updated (event.resource)
  • The URI being set
  • Or a combination of both for better parallelization
 fn task_identifier(&self, _event: &Event) -> u64 {
-    // TODO. for now metadata updates are not parallelized
-    0
+    let mut hasher = DefaultHasher::new();
+    event.resource.hash(&mut hasher);
+    hasher.finish()
 }

Committable suggestion skipped: line range outside the PR's diff.

@Larkooo Larkooo changed the title feat(torii-indexer): task manager & task id for every event feat(torii-indexer): task manager & parallelize erc transfers Jan 15, 2025
@@ -28,6 +28,15 @@ where
true
}

fn task_priority(&self) -> usize {
1
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't an enum more idiomatic, or you need logic around this priority, or a priority value seems more manageable?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants