Skip to content

Commit

Permalink
opt(torii-core): move off queryqueue for executing tx (#2460)
Browse files Browse the repository at this point in the history
* opt(torii-core): move off queryqueue for executing tx

* feat: replace queury queue by executor

* fix: executor

* refactor: executor logic

* fix: tests

* fmt

* executor inside of tokio select

* executor graceful exit

* priv execute

* contracts insertion shouldnt go through executor

* clean code

* exec

* fix: tests

* oneshot channel for execution result

* fmt

* clone shutdown tx

* fmt

* fix: test exec

* non bloking execute engine

* executor logs

* in memory head

* fmt

* fix: tests

* fixx: libp2p

* fmt

* try fix libp2p test

* fix tests

* fmt

* use tempfile for tests

* fix

* c

* fix: sql tests

* clone

* fmt

* fmt

* no temp file

* tmp file

* fix: lock issues

* manuallyt use tmp file

* fix graphql tests

* fix: tests

* clippy

* fix torii bin

* engine executions

* use tmp file for db

* fix: cursor

* chore

* wip

* cleaning code

* refactor: handle errors without panic

* use vec
  • Loading branch information
Larkooo authored Oct 3, 2024
1 parent 1e33eda commit c291c7a
Show file tree
Hide file tree
Showing 25 changed files with 812 additions and 602 deletions.
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions bin/torii/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ tracing-subscriber.workspace = true
tracing.workspace = true
url.workspace = true
webbrowser = "0.8"
tempfile.workspace = true

[dev-dependencies]
camino.workspace = true
Expand Down
17 changes: 14 additions & 3 deletions bin/torii/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@ use sqlx::SqlitePool;
use starknet::core::types::Felt;
use starknet::providers::jsonrpc::HttpTransport;
use starknet::providers::JsonRpcClient;
use tempfile::NamedTempFile;
use tokio::sync::broadcast;
use tokio::sync::broadcast::Sender;
use tokio_stream::StreamExt;
use torii_core::engine::{Engine, EngineConfig, IndexingFlags, Processors};
use torii_core::executor::Executor;
use torii_core::processors::event_message::EventMessageProcessor;
use torii_core::processors::generate_event_processors_map;
use torii_core::processors::metadata_update::MetadataUpdateProcessor;
Expand Down Expand Up @@ -64,7 +66,7 @@ struct Args {

/// Database filepath (ex: indexer.db). If specified file doesn't exist, it will be
/// created. Defaults to in-memory database
#[arg(short, long, default_value = ":memory:")]
#[arg(short, long, default_value = "")]
database: String,

/// Specify a block to start indexing from, ignored if stored head exists
Expand Down Expand Up @@ -163,8 +165,12 @@ async fn main() -> anyhow::Result<()> {
})
.expect("Error setting Ctrl-C handler");

let tempfile = NamedTempFile::new()?;
let database_path =
if args.database.is_empty() { tempfile.path().to_str().unwrap() } else { &args.database };

let mut options =
SqliteConnectOptions::from_str(&args.database)?.create_if_missing(true).with_regexp();
SqliteConnectOptions::from_str(database_path)?.create_if_missing(true).with_regexp();

// Performance settings
options = options.auto_vacuum(SqliteAutoVacuum::None);
Expand All @@ -185,7 +191,12 @@ async fn main() -> anyhow::Result<()> {
// Get world address
let world = WorldContractReader::new(args.world_address, provider.clone());

let db = Sql::new(pool.clone(), args.world_address).await?;
let (mut executor, sender) = Executor::new(pool.clone(), shutdown_tx.clone()).await?;
tokio::spawn(async move {
executor.run().await.unwrap();
});

let db = Sql::new(pool.clone(), args.world_address, sender.clone()).await?;

let processors = Processors {
event: generate_event_processors_map(vec![
Expand Down
1 change: 1 addition & 0 deletions crates/torii/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,4 @@ dojo-test-utils.workspace = true
dojo-utils.workspace = true
katana-runner.workspace = true
scarb.workspace = true
tempfile.workspace = true
95 changes: 49 additions & 46 deletions crates/torii/core/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::time::Duration;
use anyhow::Result;
use bitflags::bitflags;
use dojo_world::contracts::world::WorldContractReader;
use futures_util::future::try_join_all;
use hashlink::LinkedHashMap;
use starknet::core::types::{
BlockId, BlockTag, EmittedEvent, Event, EventFilter, Felt, MaybePendingBlockWithReceipts,
Expand All @@ -17,7 +18,6 @@ use starknet::providers::Provider;
use tokio::sync::broadcast::Sender;
use tokio::sync::mpsc::Sender as BoundedSender;
use tokio::sync::Semaphore;
use tokio::task::JoinSet;
use tokio::time::{sleep, Instant};
use tracing::{debug, error, info, trace, warn};

Expand Down Expand Up @@ -108,6 +108,13 @@ pub struct ParallelizedEvent {
pub event: Event,
}

#[derive(Debug)]
pub struct EngineHead {
pub block_number: u64,
pub last_pending_block_world_tx: Option<Felt>,
pub last_pending_block_tx: Option<Felt>,
}

#[allow(missing_debug_implementations)]
pub struct Engine<P: Provider + Send + Sync + std::fmt::Debug + 'static> {
world: Arc<WorldContractReader<P>>,
Expand Down Expand Up @@ -151,7 +158,7 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> {
// use the start block provided by user if head is 0
let (head, _, _) = self.db.head().await?;
if head == 0 {
self.db.set_head(self.config.start_block);
self.db.set_head(self.config.start_block)?;
} else if self.config.start_block != 0 {
warn!(target: LOG_TARGET, "Start block ignored, stored head exists and will be used instead.");
}
Expand All @@ -164,6 +171,7 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> {
let mut erroring_out = false;
loop {
let (head, last_pending_block_world_tx, last_pending_block_tx) = self.db.head().await?;

tokio::select! {
_ = shutdown_rx.recv() => {
break Ok(());
Expand All @@ -179,7 +187,7 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> {
}

match self.process(fetch_result).await {
Ok(()) => {}
Ok(_) => self.db.execute().await?,
Err(e) => {
error!(target: LOG_TARGET, error = %e, "Processing fetched data.");
erroring_out = true;
Expand Down Expand Up @@ -363,21 +371,15 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> {
}))
}

pub async fn process(&mut self, fetch_result: FetchDataResult) -> Result<()> {
pub async fn process(&mut self, fetch_result: FetchDataResult) -> Result<Option<EngineHead>> {
match fetch_result {
FetchDataResult::Range(data) => {
self.process_range(data).await?;
}
FetchDataResult::Pending(data) => {
self.process_pending(data).await?;
}
FetchDataResult::None => {}
FetchDataResult::Range(data) => self.process_range(data).await.map(Some),
FetchDataResult::Pending(data) => self.process_pending(data).await.map(Some),
FetchDataResult::None => Ok(None),
}

Ok(())
}

pub async fn process_pending(&mut self, data: FetchPendingResult) -> Result<()> {
pub async fn process_pending(&mut self, data: FetchPendingResult) -> Result<EngineHead> {
// Skip transactions that have been processed already
// Our cursor is the last processed transaction

Expand Down Expand Up @@ -407,16 +409,19 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> {
// provider. So we can fail silently and try
// again in the next iteration.
warn!(target: LOG_TARGET, transaction_hash = %format!("{:#x}", transaction_hash), "Retrieving pending transaction receipt.");
self.db.set_head(data.block_number - 1);
self.db.set_head(data.block_number - 1)?;
if let Some(tx) = last_pending_block_tx {
self.db.set_last_pending_block_tx(Some(tx));
self.db.set_last_pending_block_tx(Some(tx))?;
}

if let Some(tx) = last_pending_block_world_tx {
self.db.set_last_pending_block_world_tx(Some(tx));
self.db.set_last_pending_block_world_tx(Some(tx))?;
}
self.db.execute().await?;
return Ok(());
return Ok(EngineHead {
block_number: data.block_number - 1,
last_pending_block_tx,
last_pending_block_world_tx,
});
}
_ => {
error!(target: LOG_TARGET, error = %e, transaction_hash = %format!("{:#x}", transaction_hash), "Processing pending transaction.");
Expand All @@ -441,22 +446,24 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> {

// Set the head to the last processed pending transaction
// Head block number should still be latest block number
self.db.set_head(data.block_number - 1);
self.db.set_head(data.block_number - 1)?;

if let Some(tx) = last_pending_block_tx {
self.db.set_last_pending_block_tx(Some(tx));
self.db.set_last_pending_block_tx(Some(tx))?;
}

if let Some(tx) = last_pending_block_world_tx {
self.db.set_last_pending_block_world_tx(Some(tx));
self.db.set_last_pending_block_world_tx(Some(tx))?;
}

self.db.execute().await?;

Ok(())
Ok(EngineHead {
block_number: data.block_number - 1,
last_pending_block_world_tx,
last_pending_block_tx,
})
}

pub async fn process_range(&mut self, data: FetchRangeResult) -> Result<()> {
pub async fn process_range(&mut self, data: FetchRangeResult) -> Result<EngineHead> {
// Process all transactions
let mut last_block = 0;
for ((block_number, transaction_hash), events) in data.transactions {
Expand Down Expand Up @@ -486,38 +493,36 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> {
self.process_block(block_number, data.blocks[&block_number]).await?;
last_block = block_number;
}

if self.db.query_queue.queue.len() >= QUERY_QUEUE_BATCH_SIZE {
self.db.execute().await?;
}
}

// Process parallelized events
self.process_tasks().await?;

self.db.set_head(data.latest_block_number);
self.db.set_last_pending_block_world_tx(None);
self.db.set_last_pending_block_tx(None);
self.db.set_head(data.latest_block_number)?;
self.db.set_last_pending_block_world_tx(None)?;
self.db.set_last_pending_block_tx(None)?;

self.db.execute().await?;

Ok(())
Ok(EngineHead {
block_number: data.latest_block_number,
last_pending_block_tx: None,
last_pending_block_world_tx: None,
})
}

async fn process_tasks(&mut self) -> Result<()> {
// We use a semaphore to limit the number of concurrent tasks
let semaphore = Arc::new(Semaphore::new(self.config.max_concurrent_tasks));

// Run all tasks concurrently
let mut set = JoinSet::new();
let mut handles = Vec::new();
for (task_id, events) in self.tasks.drain() {
let db = self.db.clone();
let world = self.world.clone();
let processors = self.processors.clone();
let semaphore = semaphore.clone();

set.spawn(async move {
let _permit = semaphore.acquire().await.unwrap();
handles.push(tokio::spawn(async move {
let _permit = semaphore.acquire().await?;
let mut local_db = db.clone();
for ParallelizedEvent { event_id, event, block_number, block_timestamp } in events {
if let Some(processor) = processors.event.get(&event.keys[0]) {
Expand All @@ -531,15 +536,13 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> {
}
}
}
Ok::<_, anyhow::Error>(local_db)
});

Ok::<_, anyhow::Error>(())
}));
}

// Join all tasks
while let Some(result) = set.join_next().await {
let local_db = result??;
self.db.merge(local_db)?;
}
try_join_all(handles).await?;

Ok(())
}
Expand Down Expand Up @@ -688,7 +691,7 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> {
transaction_hash: Felt,
) -> Result<()> {
if self.config.flags.contains(IndexingFlags::RAW_EVENTS) {
self.db.store_event(event_id, event, transaction_hash, block_timestamp);
self.db.store_event(event_id, event, transaction_hash, block_timestamp)?;
}

let event_key = event.keys[0];
Expand Down
Loading

0 comments on commit c291c7a

Please sign in to comment.