From 2972ae304018b73ec0eedc48865055561c41b9cd Mon Sep 17 00:00:00 2001 From: lambda-0x <0xlambda@protonmail.com> Date: Fri, 4 Oct 2024 01:22:56 +0530 Subject: [PATCH] Merge branch 'main' into feat/torii/ercs --- Cargo.lock | 173 +++++- Cargo.toml | 2 +- bin/torii/Cargo.toml | 1 + bin/torii/src/main.rs | 17 +- crates/torii/core/Cargo.toml | 3 +- crates/torii/core/src/engine.rs | 53 +- crates/torii/core/src/executor.rs | 297 ++++++++++ crates/torii/core/src/lib.rs | 1 + .../core/src/processors/metadata_update.rs | 6 +- .../core/src/processors/store_transaction.rs | 2 +- crates/torii/core/src/sql/cache.rs | 6 + crates/torii/core/src/sql/erc.rs | 45 +- crates/torii/core/src/sql/mod.rs | 517 +++++++++--------- crates/torii/core/src/sql/test.rs | 89 ++- crates/torii/graphql/Cargo.toml | 1 + .../torii/graphql/src/tests/entities_test.rs | 5 +- .../torii/graphql/src/tests/metadata_test.rs | 23 +- crates/torii/graphql/src/tests/mod.rs | 22 +- .../graphql/src/tests/models_ordering_test.rs | 5 +- crates/torii/graphql/src/tests/models_test.rs | 5 +- .../graphql/src/tests/subscription_test.rs | 46 +- crates/torii/grpc/Cargo.toml | 1 + .../grpc/src/server/tests/entities_test.rs | 30 +- crates/torii/libp2p/src/server/mod.rs | 3 +- crates/torii/libp2p/src/tests.rs | 84 +-- .../manifests/dev/deployment/manifest.json | 6 +- .../manifests/dev/deployment/manifest.toml | 6 +- 27 files changed, 975 insertions(+), 474 deletions(-) create mode 100644 crates/torii/core/src/executor.rs diff --git a/Cargo.lock b/Cargo.lock index 82cda6b586..540102afb0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -15,14 +15,14 @@ dependencies = [ [[package]] name = "account_sdk" version = "0.1.0" -source = "git+https://github.com/cartridge-gg/controller?rev=fea57f1#fea57f16ba2c52e89311787aaaea8875b35e38f4" +source = "git+https://github.com/cartridge-gg/controller#61d2fd0cd856daa01b2da52b762368542c03da6f" dependencies = [ "anyhow", "async-trait", "auto_impl", "base64 0.22.1", "base64urlsafedata", - "cainome", + "cainome 0.2.3 (git+https://github.com/cartridge-gg/cainome?tag=v0.4.2)", "coset", "ecdsa", "futures", @@ -37,6 +37,7 @@ dependencies = [ "primitive-types", "reqwest 0.11.27", "serde", + "serde-wasm-bindgen", "serde_cbor_2", "serde_json", "serde_with 3.9.0", @@ -47,6 +48,7 @@ dependencies = [ "thiserror", "tokio", "toml 0.8.19", + "tsify-next", "u256-literal", "url", "urlencoding", @@ -2278,10 +2280,35 @@ source = "git+https://github.com/cartridge-gg/cainome?tag=v0.4.1#db76fb849d1b7f3 dependencies = [ "anyhow", "async-trait", - "cainome-cairo-serde", - "cainome-parser", - "cainome-rs", - "cainome-rs-macro", + "cainome-cairo-serde 0.1.0 (git+https://github.com/cartridge-gg/cainome?tag=v0.4.1)", + "cainome-parser 0.1.0 (git+https://github.com/cartridge-gg/cainome?tag=v0.4.1)", + "cainome-rs 0.1.0 (git+https://github.com/cartridge-gg/cainome?tag=v0.4.1)", + "cainome-rs-macro 0.1.0 (git+https://github.com/cartridge-gg/cainome?tag=v0.4.1)", + "camino", + "clap", + "clap_complete", + "convert_case 0.6.0", + "serde", + "serde_json", + "starknet 0.12.0", + "starknet-types-core", + "thiserror", + "tracing", + "tracing-subscriber", + "url", +] + +[[package]] +name = "cainome" +version = "0.2.3" +source = "git+https://github.com/cartridge-gg/cainome?tag=v0.4.2#4e3924fb82b7299d56d3619aa5d7b9863f581e0a" +dependencies = [ + "anyhow", + "async-trait", + "cainome-cairo-serde 0.1.0 (git+https://github.com/cartridge-gg/cainome?tag=v0.4.2)", + "cainome-parser 0.1.0 (git+https://github.com/cartridge-gg/cainome?tag=v0.4.2)", + "cainome-rs 0.1.0 (git+https://github.com/cartridge-gg/cainome?tag=v0.4.2)", + "cainome-rs-macro 0.1.0 (git+https://github.com/cartridge-gg/cainome?tag=v0.4.2)", "camino", "clap", "clap_complete", @@ -2306,6 +2333,16 @@ dependencies = [ "thiserror", ] +[[package]] +name = "cainome-cairo-serde" +version = "0.1.0" +source = "git+https://github.com/cartridge-gg/cainome?tag=v0.4.2#4e3924fb82b7299d56d3619aa5d7b9863f581e0a" +dependencies = [ + "serde", + "starknet 0.12.0", + "thiserror", +] + [[package]] name = "cainome-parser" version = "0.1.0" @@ -2319,14 +2356,45 @@ dependencies = [ "thiserror", ] +[[package]] +name = "cainome-parser" +version = "0.1.0" +source = "git+https://github.com/cartridge-gg/cainome?tag=v0.4.2#4e3924fb82b7299d56d3619aa5d7b9863f581e0a" +dependencies = [ + "convert_case 0.6.0", + "quote", + "serde_json", + "starknet 0.12.0", + "syn 2.0.77", + "thiserror", +] + [[package]] name = "cainome-rs" version = "0.1.0" source = "git+https://github.com/cartridge-gg/cainome?tag=v0.4.1#db76fb849d1b7f3e9a2e943868bcd8616cf72e90" dependencies = [ "anyhow", - "cainome-cairo-serde", - "cainome-parser", + "cainome-cairo-serde 0.1.0 (git+https://github.com/cartridge-gg/cainome?tag=v0.4.1)", + "cainome-parser 0.1.0 (git+https://github.com/cartridge-gg/cainome?tag=v0.4.1)", + "camino", + "prettyplease 0.2.22", + "proc-macro2", + "quote", + "serde_json", + "starknet 0.12.0", + "syn 2.0.77", + "thiserror", +] + +[[package]] +name = "cainome-rs" +version = "0.1.0" +source = "git+https://github.com/cartridge-gg/cainome?tag=v0.4.2#4e3924fb82b7299d56d3619aa5d7b9863f581e0a" +dependencies = [ + "anyhow", + "cainome-cairo-serde 0.1.0 (git+https://github.com/cartridge-gg/cainome?tag=v0.4.2)", + "cainome-parser 0.1.0 (git+https://github.com/cartridge-gg/cainome?tag=v0.4.2)", "camino", "prettyplease 0.2.22", "proc-macro2", @@ -2343,9 +2411,27 @@ version = "0.1.0" source = "git+https://github.com/cartridge-gg/cainome?tag=v0.4.1#db76fb849d1b7f3e9a2e943868bcd8616cf72e90" dependencies = [ "anyhow", - "cainome-cairo-serde", - "cainome-parser", - "cainome-rs", + "cainome-cairo-serde 0.1.0 (git+https://github.com/cartridge-gg/cainome?tag=v0.4.1)", + "cainome-parser 0.1.0 (git+https://github.com/cartridge-gg/cainome?tag=v0.4.1)", + "cainome-rs 0.1.0 (git+https://github.com/cartridge-gg/cainome?tag=v0.4.1)", + "proc-macro-error", + "proc-macro2", + "quote", + "serde_json", + "starknet 0.12.0", + "syn 2.0.77", + "thiserror", +] + +[[package]] +name = "cainome-rs-macro" +version = "0.1.0" +source = "git+https://github.com/cartridge-gg/cainome?tag=v0.4.2#4e3924fb82b7299d56d3619aa5d7b9863f581e0a" +dependencies = [ + "anyhow", + "cainome-cairo-serde 0.1.0 (git+https://github.com/cartridge-gg/cainome?tag=v0.4.2)", + "cainome-parser 0.1.0 (git+https://github.com/cartridge-gg/cainome?tag=v0.4.2)", + "cainome-rs 0.1.0 (git+https://github.com/cartridge-gg/cainome?tag=v0.4.2)", "proc-macro-error", "proc-macro2", "quote", @@ -4390,7 +4476,7 @@ version = "1.0.0-alpha.13" dependencies = [ "assert_matches", "async-trait", - "cainome", + "cainome 0.2.3 (git+https://github.com/cartridge-gg/cainome?tag=v0.4.1)", "camino", "chrono", "convert_case 0.6.0", @@ -4515,7 +4601,7 @@ dependencies = [ name = "dojo-types" version = "1.0.0-alpha.13" dependencies = [ - "cainome", + "cainome 0.2.3 (git+https://github.com/cartridge-gg/cainome?tag=v0.4.1)", "crypto-bigint", "hex", "itertools 0.12.1", @@ -4551,7 +4637,7 @@ dependencies = [ "assert_fs", "assert_matches", "async-trait", - "cainome", + "cainome 0.2.3 (git+https://github.com/cartridge-gg/cainome?tag=v0.4.1)", "cairo-lang-filesystem", "cairo-lang-project", "cairo-lang-starknet", @@ -8046,7 +8132,7 @@ dependencies = [ "alloy-primitives", "anyhow", "assert_matches", - "cainome", + "cainome 0.2.3 (git+https://github.com/cartridge-gg/cainome?tag=v0.4.1)", "dojo-metrics", "dojo-test-utils", "dojo-utils", @@ -12348,6 +12434,17 @@ dependencies = [ "serde", ] +[[package]] +name = "serde-wasm-bindgen" +version = "0.6.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8302e169f0eddcc139c70f139d19d6467353af16f9fce27e8c30158036a1e16b" +dependencies = [ + "js-sys", + "serde", + "wasm-bindgen", +] + [[package]] name = "serde_bytes" version = "0.11.15" @@ -12729,8 +12826,8 @@ dependencies = [ [[package]] name = "slot" -version = "0.16.0" -source = "git+https://github.com/cartridge-gg/slot?rev=630ed37#630ed377d55662847d2219c8662f6d0867f3e2fb" +version = "0.17.0" +source = "git+https://github.com/cartridge-gg/slot?rev=544cbc6#544cbc60162795ca83377c8b1d959bae6ba0e073" dependencies = [ "account_sdk", "anyhow", @@ -12867,7 +12964,7 @@ dependencies = [ "assert_fs", "async-trait", "bigdecimal", - "cainome", + "cainome 0.2.3 (git+https://github.com/cartridge-gg/cainome?tag=v0.4.1)", "cairo-lang-compiler", "cairo-lang-defs", "cairo-lang-filesystem", @@ -12931,7 +13028,7 @@ dependencies = [ "assert_fs", "async-trait", "bigdecimal", - "cainome", + "cainome 0.2.3 (git+https://github.com/cartridge-gg/cainome?tag=v0.4.1)", "cairo-lang-compiler", "cairo-lang-defs", "cairo-lang-filesystem", @@ -14485,6 +14582,7 @@ dependencies = [ "sqlx", "starknet 0.12.0", "starknet-crypto 0.7.2", + "tempfile", "tokio", "tokio-stream", "tokio-util", @@ -14537,7 +14635,7 @@ dependencies = [ "async-trait", "base64 0.21.7", "bitflags 2.6.0", - "cainome", + "cainome 0.2.3 (git+https://github.com/cartridge-gg/cainome?tag=v0.4.1)", "camino", "chrono", "crypto-bigint", @@ -14559,9 +14657,9 @@ dependencies = [ "sqlx", "starknet 0.12.0", "starknet-crypto 0.7.2", + "tempfile", "thiserror", "tokio", - "tokio-stream", "tokio-util", "toml 0.8.19", "tracing", @@ -14577,7 +14675,7 @@ dependencies = [ "async-recursion", "async-trait", "base64 0.21.7", - "cainome", + "cainome 0.2.3 (git+https://github.com/cartridge-gg/cainome?tag=v0.4.1)", "camino", "chrono", "convert_case 0.6.0", @@ -14599,6 +14697,7 @@ dependencies = [ "starknet-crypto 0.7.2", "strum 0.25.0", "strum_macros 0.25.3", + "tempfile", "thiserror", "tokio", "tokio-stream", @@ -14614,7 +14713,7 @@ dependencies = [ name = "torii-grpc" version = "1.0.0-alpha.13" dependencies = [ - "cainome", + "cainome 0.2.3 (git+https://github.com/cartridge-gg/cainome?tag=v0.4.1)", "camino", "crypto-bigint", "dojo-test-utils", @@ -14638,6 +14737,7 @@ dependencies = [ "starknet-crypto 0.7.2", "strum 0.25.0", "strum_macros 0.25.3", + "tempfile", "thiserror", "tokio", "tokio-stream", @@ -14658,7 +14758,7 @@ name = "torii-relay" version = "1.0.0-alpha.13" dependencies = [ "anyhow", - "cainome", + "cainome 0.2.3 (git+https://github.com/cartridge-gg/cainome?tag=v0.4.1)", "chrono", "crypto-bigint", "dojo-types", @@ -14952,6 +15052,31 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "tsify-next" +version = "0.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2f4a645dca4ee0800f5ab60ce166deba2db6a0315de795a2691e138a3d55d756" +dependencies = [ + "gloo-utils 0.2.0", + "serde", + "serde_json", + "tsify-next-macros", + "wasm-bindgen", +] + +[[package]] +name = "tsify-next-macros" +version = "0.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d5c06f8a51d759bb58129e30b2631739e7e1e4579fad1f30ac09a6c88e488a6" +dependencies = [ + "proc-macro2", + "quote", + "serde_derive_internals", + "syn 2.0.77", +] + [[package]] name = "tungstenite" version = "0.21.0" diff --git a/Cargo.toml b/Cargo.toml index 503b0d3b67..5988c89ded 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -231,7 +231,7 @@ alloy-sol-types = { version = "0.8.3", default-features = false } criterion = "0.5.1" # Slot integration. Dojo don't need to manually include `account_sdk` as dependency as `slot` already re-exports it. -slot = { git = "https://github.com/cartridge-gg/slot", rev = "630ed37" } +slot = { git = "https://github.com/cartridge-gg/slot", rev = "544cbc6" } alloy-contract = { version = "0.3", default-features = false } alloy-json-rpc = { version = "0.3", default-features = false } diff --git a/bin/torii/Cargo.toml b/bin/torii/Cargo.toml index 977764b26d..aadbd390cd 100644 --- a/bin/torii/Cargo.toml +++ b/bin/torii/Cargo.toml @@ -46,6 +46,7 @@ tracing-subscriber.workspace = true tracing.workspace = true url.workspace = true webbrowser = "0.8" +tempfile.workspace = true [dev-dependencies] camino.workspace = true diff --git a/bin/torii/src/main.rs b/bin/torii/src/main.rs index ac9c510997..3d6d748025 100644 --- a/bin/torii/src/main.rs +++ b/bin/torii/src/main.rs @@ -30,10 +30,12 @@ use sqlx::SqlitePool; use starknet::core::types::Felt; use starknet::providers::jsonrpc::HttpTransport; use starknet::providers::JsonRpcClient; +use tempfile::NamedTempFile; use tokio::sync::broadcast; use tokio::sync::broadcast::Sender; use tokio_stream::StreamExt; use torii_core::engine::{Engine, EngineConfig, IndexingFlags, Processors}; +use torii_core::executor::Executor; use torii_core::processors::store_transaction::StoreTransactionProcessor; use torii_core::simple_broker::SimpleBroker; use torii_core::sql::Sql; @@ -59,7 +61,7 @@ struct Args { /// Database filepath (ex: indexer.db). If specified file doesn't exist, it will be /// created. Defaults to in-memory database - #[arg(short, long, default_value = ":memory:")] + #[arg(short, long, default_value = "")] database: String, /// Address to serve api endpoints at. @@ -178,8 +180,12 @@ async fn main() -> anyhow::Result<()> { }) .expect("Error setting Ctrl-C handler"); + let tempfile = NamedTempFile::new()?; + let database_path = + if args.database.is_empty() { tempfile.path().to_str().unwrap() } else { &args.database }; + let mut options = - SqliteConnectOptions::from_str(&args.database)?.create_if_missing(true).with_regexp(); + SqliteConnectOptions::from_str(database_path)?.create_if_missing(true).with_regexp(); // Performance settings options = options.auto_vacuum(SqliteAutoVacuum::None); @@ -203,7 +209,12 @@ async fn main() -> anyhow::Result<()> { let contracts = config.contracts.iter().map(|contract| (contract.address, contract.r#type)).collect(); - let db = Sql::new(pool.clone(), world_address, &contracts).await?; + let (mut executor, sender) = Executor::new(pool.clone(), shutdown_tx.clone()).await?; + tokio::spawn(async move { + executor.run().await.unwrap(); + }); + + let db = Sql::new(pool.clone(), sender.clone(), &contracts).await?; let processors = Processors { transaction: vec![Box::new(StoreTransactionProcessor)], diff --git a/crates/torii/core/Cargo.toml b/crates/torii/core/Cargo.toml index 3efb37f39b..30040d528b 100644 --- a/crates/torii/core/Cargo.toml +++ b/crates/torii/core/Cargo.toml @@ -32,7 +32,7 @@ starknet-crypto.workspace = true starknet.workspace = true thiserror.workspace = true tokio = { version = "1.32.0", features = [ "sync" ], default-features = true } -tokio-stream = "0.1.11" +# tokio-stream = "0.1.11" tokio-util.workspace = true toml.workspace = true tracing.workspace = true @@ -43,3 +43,4 @@ dojo-test-utils.workspace = true dojo-utils.workspace = true katana-runner.workspace = true scarb.workspace = true +tempfile.workspace = true diff --git a/crates/torii/core/src/engine.rs b/crates/torii/core/src/engine.rs index a9d2e69f53..eddb41594a 100644 --- a/crates/torii/core/src/engine.rs +++ b/crates/torii/core/src/engine.rs @@ -7,7 +7,7 @@ use std::time::Duration; use anyhow::Result; use bitflags::bitflags; use dojo_world::contracts::world::WorldContractReader; -use futures_util::future::join_all; +use futures_util::future::{join_all, try_join_all}; use hashlink::LinkedHashMap; use starknet::core::types::{ BlockId, BlockTag, EmittedEvent, Event, EventFilter, EventsPage, MaybePendingBlockWithReceipts, @@ -181,6 +181,13 @@ pub struct ParallelizedEvent { pub event: Event, } +#[derive(Debug)] +pub struct EngineHead { + pub block_number: u64, + pub last_pending_block_world_tx: Option, + pub last_pending_block_tx: Option, +} + #[allow(missing_debug_implementations)] pub struct Engine { world: Arc>, @@ -228,7 +235,7 @@ impl Engine

{ // use the start block provided by user if head is 0 let (head, _, _) = self.db.head(self.world.address).await?; if head == 0 { - self.db.set_head(self.world.address, self.config.start_block); + self.db.set_head(self.world.address, self.config.start_block)?; } else if self.config.start_block != 0 { warn!(target: LOG_TARGET, "Start block ignored, stored head exists and will be used instead."); } @@ -256,7 +263,7 @@ impl Engine

{ } match self.process(fetch_result).await { - Ok(()) => {} + Ok(_) => self.db.execute().await?, Err(e) => { error!(target: LOG_TARGET, error = %e, "Processing fetched data."); erroring_out = true; @@ -447,14 +454,10 @@ impl Engine

{ pub async fn process(&mut self, fetch_result: FetchDataResult) -> Result<()> { match fetch_result { - FetchDataResult::Range(data) => { - self.process_range(data).await?; - } - FetchDataResult::Pending(data) => { - self.process_pending(data).await?; - } + FetchDataResult::Range(data) => self.process_range(data).await?, + FetchDataResult::Pending(data) => self.process_pending(data).await?, FetchDataResult::None => {} - } + }; self.db.apply_cache_diff().await?; Ok(()) @@ -497,9 +500,7 @@ impl Engine

{ self.process_tasks().await?; // Head block number should still be latest block number - self.db.update_cursors(data.block_number - 1, last_pending_block_tx, cursor_map); - - self.db.execute().await?; + self.db.update_cursors(data.block_number - 1, last_pending_block_tx, cursor_map)?; Ok(()) } @@ -534,18 +535,12 @@ impl Engine

{ self.process_block(block_number, data.blocks[&block_number]).await?; processed_blocks.insert(block_number); } - - if self.db.query_queue.queue.len() >= QUERY_QUEUE_BATCH_SIZE { - self.db.execute().await?; - } } // Process parallelized events self.process_tasks().await?; - self.db.reset_cursors(data.latest_block_number); - - self.db.execute().await?; + self.db.reset_cursors(data.latest_block_number)?; Ok(()) } @@ -555,15 +550,15 @@ impl Engine

{ let semaphore = Arc::new(Semaphore::new(self.config.max_concurrent_tasks)); // Run all tasks concurrently - let mut set = JoinSet::new(); + let mut handles = Vec::new(); for (task_id, events) in self.tasks.drain() { let db = self.db.clone(); let world = self.world.clone(); let semaphore = semaphore.clone(); let processors = self.processors.clone(); - set.spawn(async move { - let _permit = semaphore.acquire().await.unwrap(); + handles.push(tokio::spawn(async move { + let _permit = semaphore.acquire().await?; let mut local_db = db.clone(); for (contract_type, ParallelizedEvent { event_id, event, block_number, block_timestamp }) in events { let contract_processors = processors.get_event_processor(contract_type); @@ -581,15 +576,13 @@ impl Engine

{ } } } - Ok::<_, anyhow::Error>(local_db) - }); + + Ok::<_, anyhow::Error>(()) + })); } // Join all tasks - while let Some(result) = set.join_next().await { - let local_db = result??; - self.db.merge(local_db)?; - } + try_join_all(handles).await?; Ok(()) } @@ -742,7 +735,7 @@ impl Engine

{ if self.config.flags.contains(IndexingFlags::RAW_EVENTS) { match contract_type { ContractType::WORLD => { - self.db.store_event(event_id, event, transaction_hash, block_timestamp); + self.db.store_event(event_id, event, transaction_hash, block_timestamp)?; } // ERC events needs to be processed inside there respective processor // we store transfer events for ERC contracts regardless of this flag diff --git a/crates/torii/core/src/executor.rs b/crates/torii/core/src/executor.rs new file mode 100644 index 0000000000..503759e43f --- /dev/null +++ b/crates/torii/core/src/executor.rs @@ -0,0 +1,297 @@ +use std::mem; + +use anyhow::{Context, Result}; +use dojo_types::schema::{Struct, Ty}; +use sqlx::query::Query; +use sqlx::sqlite::SqliteArguments; +use sqlx::{FromRow, Pool, Sqlite, Transaction}; +use starknet::core::types::Felt; +use tokio::sync::broadcast::{Receiver, Sender}; +use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; +use tokio::sync::oneshot; +use tokio::time::Instant; +use tracing::{debug, error}; + +use crate::simple_broker::SimpleBroker; +use crate::types::{ + Entity as EntityUpdated, Event as EventEmitted, EventMessage as EventMessageUpdated, + Model as ModelRegistered, +}; + +pub(crate) const LOG_TARGET: &str = "torii_core::executor"; + +#[derive(Debug, Clone)] +pub enum Argument { + Null, + Int(i64), + Bool(bool), + String(String), + FieldElement(Felt), +} + +#[derive(Debug, Clone)] +pub enum BrokerMessage { + ModelRegistered(ModelRegistered), + EntityUpdated(EntityUpdated), + EventMessageUpdated(EventMessageUpdated), + EventEmitted(EventEmitted), +} + +#[derive(Debug, Clone)] +pub struct DeleteEntityQuery { + pub entity_id: String, + pub event_id: String, + pub block_timestamp: String, + pub ty: Ty, +} + +#[derive(Debug, Clone)] +pub enum QueryType { + SetEntity(Ty), + DeleteEntity(DeleteEntityQuery), + EventMessage(Ty), + RegisterModel, + StoreEvent, + Execute, + Other, +} + +#[derive(Debug)] +pub struct Executor<'c> { + pool: Pool, + transaction: Transaction<'c, Sqlite>, + publish_queue: Vec, + rx: UnboundedReceiver, + shutdown_rx: Receiver<()>, +} + +#[derive(Debug)] +pub struct QueryMessage { + pub statement: String, + pub arguments: Vec, + pub query_type: QueryType, + tx: Option>>, +} + +impl QueryMessage { + pub fn new(statement: String, arguments: Vec, query_type: QueryType) -> Self { + Self { statement, arguments, query_type, tx: None } + } + + pub fn new_recv( + statement: String, + arguments: Vec, + query_type: QueryType, + ) -> (Self, oneshot::Receiver>) { + let (tx, rx) = oneshot::channel(); + (Self { statement, arguments, query_type, tx: Some(tx) }, rx) + } + + pub fn other(statement: String, arguments: Vec) -> Self { + Self { statement, arguments, query_type: QueryType::Other, tx: None } + } + + pub fn other_recv( + statement: String, + arguments: Vec, + ) -> (Self, oneshot::Receiver>) { + let (tx, rx) = oneshot::channel(); + (Self { statement, arguments, query_type: QueryType::Other, tx: Some(tx) }, rx) + } + + pub fn execute() -> Self { + Self { + statement: "".to_string(), + arguments: vec![], + query_type: QueryType::Execute, + tx: None, + } + } + + pub fn execute_recv() -> (Self, oneshot::Receiver>) { + let (tx, rx) = oneshot::channel(); + ( + Self { + statement: "".to_string(), + arguments: vec![], + query_type: QueryType::Execute, + tx: Some(tx), + }, + rx, + ) + } +} + +impl<'c> Executor<'c> { + pub async fn new( + pool: Pool, + shutdown_tx: Sender<()>, + ) -> Result<(Self, UnboundedSender)> { + let (tx, rx) = unbounded_channel(); + let transaction = pool.begin().await?; + let publish_queue = Vec::new(); + let shutdown_rx = shutdown_tx.subscribe(); + + Ok((Executor { pool, transaction, publish_queue, rx, shutdown_rx }, tx)) + } + + pub async fn run(&mut self) -> Result<()> { + loop { + tokio::select! { + _ = self.shutdown_rx.recv() => { + debug!(target: LOG_TARGET, "Shutting down executor"); + break Ok(()); + } + Some(msg) = self.rx.recv() => { + let QueryMessage { statement, arguments, query_type, tx } = msg; + let mut query = sqlx::query(&statement); + + for arg in &arguments { + query = match arg { + Argument::Null => query.bind(None::), + Argument::Int(integer) => query.bind(integer), + Argument::Bool(bool) => query.bind(bool), + Argument::String(string) => query.bind(string), + Argument::FieldElement(felt) => query.bind(format!("{:#x}", felt)), + } + } + + match self.handle_query_type(query, query_type.clone(), &statement, &arguments, tx).await { + Ok(()) => {}, + Err(e) => { + error!(target: LOG_TARGET, r#type = ?query_type, error = %e, "Failed to execute query."); + } + } + } + } + } + } + + async fn handle_query_type<'a>( + &mut self, + query: Query<'a, Sqlite, SqliteArguments<'a>>, + query_type: QueryType, + statement: &str, + arguments: &[Argument], + sender: Option>>, + ) -> Result<()> { + let tx = &mut self.transaction; + + match query_type { + QueryType::SetEntity(entity) => { + let row = query.fetch_one(&mut **tx).await.with_context(|| { + format!("Failed to execute query: {:?}, args: {:?}", statement, arguments) + })?; + let mut entity_updated = EntityUpdated::from_row(&row)?; + entity_updated.updated_model = Some(entity); + entity_updated.deleted = false; + let broker_message = BrokerMessage::EntityUpdated(entity_updated); + self.publish_queue.push(broker_message); + } + QueryType::DeleteEntity(entity) => { + let delete_model = query.execute(&mut **tx).await.with_context(|| { + format!("Failed to execute query: {:?}, args: {:?}", statement, arguments) + })?; + if delete_model.rows_affected() == 0 { + return Ok(()); + } + + let row = sqlx::query( + "UPDATE entities SET updated_at=CURRENT_TIMESTAMP, executed_at=?, event_id=? \ + WHERE id = ? RETURNING *", + ) + .bind(entity.block_timestamp) + .bind(entity.event_id) + .bind(entity.entity_id) + .fetch_one(&mut **tx) + .await?; + let mut entity_updated = EntityUpdated::from_row(&row)?; + entity_updated.updated_model = + Some(Ty::Struct(Struct { name: entity.ty.name(), children: vec![] })); + + let count = sqlx::query_scalar::<_, i64>( + "SELECT count(*) FROM entity_model WHERE entity_id = ?", + ) + .bind(entity_updated.id.clone()) + .fetch_one(&mut **tx) + .await?; + + // Delete entity if all of its models are deleted + if count == 0 { + sqlx::query("DELETE FROM entities WHERE id = ?") + .bind(entity_updated.id.clone()) + .execute(&mut **tx) + .await?; + entity_updated.deleted = true; + } + + let broker_message = BrokerMessage::EntityUpdated(entity_updated); + self.publish_queue.push(broker_message); + } + QueryType::RegisterModel => { + let row = query.fetch_one(&mut **tx).await.with_context(|| { + format!("Failed to execute query: {:?}, args: {:?}", statement, arguments) + })?; + let model_registered = ModelRegistered::from_row(&row)?; + self.publish_queue.push(BrokerMessage::ModelRegistered(model_registered)); + } + QueryType::EventMessage(entity) => { + let row = query.fetch_one(&mut **tx).await.with_context(|| { + format!("Failed to execute query: {:?}, args: {:?}", statement, arguments) + })?; + let mut event_message = EventMessageUpdated::from_row(&row)?; + event_message.updated_model = Some(entity); + let broker_message = BrokerMessage::EventMessageUpdated(event_message); + self.publish_queue.push(broker_message); + } + QueryType::StoreEvent => { + let row = query.fetch_one(&mut **tx).await.with_context(|| { + format!("Failed to execute query: {:?}, args: {:?}", statement, arguments) + })?; + let event = EventEmitted::from_row(&row)?; + self.publish_queue.push(BrokerMessage::EventEmitted(event)); + } + QueryType::Execute => { + debug!(target: LOG_TARGET, "Executing query."); + let instant = Instant::now(); + let res = self.execute().await; + debug!(target: LOG_TARGET, duration = ?instant.elapsed(), "Executed query."); + + if let Some(sender) = sender { + sender + .send(res) + .map_err(|_| anyhow::anyhow!("Failed to send execute result"))?; + } else { + res?; + } + } + QueryType::Other => { + query.execute(&mut **tx).await.with_context(|| { + format!("Failed to execute query: {:?}, args: {:?}", statement, arguments) + })?; + } + } + + Ok(()) + } + + async fn execute(&mut self) -> Result<()> { + let transaction = mem::replace(&mut self.transaction, self.pool.begin().await?); + transaction.commit().await?; + + for message in self.publish_queue.drain(..) { + send_broker_message(message); + } + + Ok(()) + } +} + +fn send_broker_message(message: BrokerMessage) { + match message { + BrokerMessage::ModelRegistered(model) => SimpleBroker::publish(model), + BrokerMessage::EntityUpdated(entity) => SimpleBroker::publish(entity), + BrokerMessage::EventMessageUpdated(event) => SimpleBroker::publish(event), + BrokerMessage::EventEmitted(event) => SimpleBroker::publish(event), + } +} diff --git a/crates/torii/core/src/lib.rs b/crates/torii/core/src/lib.rs index 77ac72dd35..0615f98b4e 100644 --- a/crates/torii/core/src/lib.rs +++ b/crates/torii/core/src/lib.rs @@ -2,6 +2,7 @@ pub mod engine; pub mod error; +pub mod executor; pub mod model; pub mod processors; pub mod simple_broker; diff --git a/crates/torii/core/src/processors/metadata_update.rs b/crates/torii/core/src/processors/metadata_update.rs index 594a32898a..4b17858d89 100644 --- a/crates/torii/core/src/processors/metadata_update.rs +++ b/crates/torii/core/src/processors/metadata_update.rs @@ -64,7 +64,7 @@ where uri = %uri_str, "Resource metadata set." ); - db.set_metadata(resource, &uri_str, block_timestamp); + db.set_metadata(resource, &uri_str, block_timestamp)?; let db = db.clone(); let resource = *resource; @@ -83,9 +83,7 @@ where async fn try_retrieve(mut db: Sql, resource: Felt, uri_str: String) { match metadata(uri_str.clone()).await { Ok((metadata, icon_img, cover_img)) => { - db.update_metadata(&resource, &uri_str, &metadata, &icon_img, &cover_img) - .await - .unwrap(); + db.update_metadata(&resource, &uri_str, &metadata, &icon_img, &cover_img).unwrap(); info!( target: LOG_TARGET, resource = %format!("{:#x}", resource), diff --git a/crates/torii/core/src/processors/store_transaction.rs b/crates/torii/core/src/processors/store_transaction.rs index 2e7056e401..101fb88093 100644 --- a/crates/torii/core/src/processors/store_transaction.rs +++ b/crates/torii/core/src/processors/store_transaction.rs @@ -21,7 +21,7 @@ impl TransactionProcessor

for StoreTran transaction: &Transaction, ) -> Result<(), Error> { let transaction_id = format!("{:#064x}:{:#x}", block_number, transaction_hash); - db.store_transaction(transaction, &transaction_id, block_timestamp); + db.store_transaction(transaction, &transaction_id, block_timestamp)?; Ok(()) } } diff --git a/crates/torii/core/src/sql/cache.rs b/crates/torii/core/src/sql/cache.rs index 5daca8c1e6..8cbcba36ed 100644 --- a/crates/torii/core/src/sql/cache.rs +++ b/crates/torii/core/src/sql/cache.rs @@ -131,6 +131,12 @@ pub struct LocalCache { pub token_id_registry: HashSet, } +impl Clone for LocalCache { + fn clone(&self) -> Self { + Self { erc_cache: HashMap::new(), token_id_registry: HashSet::new() } + } +} + impl LocalCache { pub async fn new(pool: Pool) -> Self { // read existing token_id's from balances table and cache them diff --git a/crates/torii/core/src/sql/erc.rs b/crates/torii/core/src/sql/erc.rs index b157ab0b87..78e064f258 100644 --- a/crates/torii/core/src/sql/erc.rs +++ b/crates/torii/core/src/sql/erc.rs @@ -5,9 +5,9 @@ use starknet::core::utils::{get_selector_from_name, parse_cairo_short_string}; use starknet::providers::Provider; use tracing::debug; -use super::query_queue::{Argument, QueryType}; use super::utils::{sql_string_to_u256, u256_to_sql_string, I256}; use super::{Sql, FELT_DELIMITER}; +use crate::executor::{Argument, QueryMessage}; use crate::sql::utils::{felt_and_u256_to_sql_string, felt_to_sql_string, felts_to_sql_string}; use crate::types::ContractType; use crate::utils::utc_dt_string_from_timestamp; @@ -30,7 +30,7 @@ impl Sql { if !token_exists { self.register_erc20_token_metadata(contract_address, &token_id, provider).await?; - self.query_queue.execute_all().await?; + self.execute().await?; } self.store_erc_transfer_event( @@ -40,7 +40,7 @@ impl Sql { amount, &token_id, block_timestamp, - ); + )?; if from_address != Felt::ZERO { // from_address/contract_address/ @@ -83,7 +83,7 @@ impl Sql { if !token_exists { self.register_erc721_token_metadata(contract_address, &token_id, provider).await?; - self.query_queue.execute_all().await?; + self.execute().await?; } self.store_erc_transfer_event( @@ -93,7 +93,7 @@ impl Sql { U256::from(1u8), &token_id, block_timestamp, - ); + )?; // from_address/contract_address:id if from_address != Felt::ZERO { @@ -187,9 +187,10 @@ impl Sql { let decimals = u8::cairo_deserialize(&decimals, 0).expect("Return value not u8"); // Insert the token into the tokens table - self.query_queue.enqueue( + self.executor.send(QueryMessage::other( "INSERT INTO tokens (id, contract_address, name, symbol, decimals) VALUES (?, ?, ?, \ - ?, ?)", + ?, ?)" + .to_string(), vec![ Argument::String(token_id.to_string()), Argument::FieldElement(contract_address), @@ -197,8 +198,7 @@ impl Sql { Argument::String(symbol), Argument::Int(decimals.into()), ], - QueryType::Other, - ); + ))?; self.local_cache.register_token_id(token_id.to_string()); @@ -225,9 +225,10 @@ impl Sql { contract_address = %felt_to_sql_string(&contract_address), "Token already registered for contract_address, so reusing fetched data", ); - self.query_queue.enqueue( + self.executor.send(QueryMessage::other( "INSERT INTO tokens (id, contract_address, name, symbol, decimals) VALUES (?, ?, \ - ?, ?, ?)", + ?, ?, ?)" + .to_string(), vec![ Argument::String(token_id.to_string()), Argument::FieldElement(contract_address), @@ -235,8 +236,7 @@ impl Sql { Argument::String(symbol), Argument::Int(decimals.into()), ], - QueryType::Other, - ); + ))?; self.local_cache.register_token_id(token_id.to_string()); return Ok(()); } @@ -286,9 +286,10 @@ impl Sql { let decimals = 0; // Insert the token into the tokens table - self.query_queue.enqueue( + self.executor.send(QueryMessage::other( "INSERT INTO tokens (id, contract_address, name, symbol, decimals) VALUES (?, ?, ?, \ - ?, ?)", + ?, ?)" + .to_string(), vec![ Argument::String(token_id.to_string()), Argument::FieldElement(contract_address), @@ -296,8 +297,7 @@ impl Sql { Argument::String(symbol), Argument::Int(decimals.into()), ], - QueryType::Other, - ); + ))?; self.local_cache.register_token_id(token_id.to_string()); @@ -312,12 +312,12 @@ impl Sql { amount: U256, token_id: &str, block_timestamp: u64, - ) { + ) -> Result<()> { let insert_query = "INSERT INTO erc_transfers (contract_address, from_address, \ to_address, amount, token_id, executed_at) VALUES (?, ?, ?, ?, ?, ?)"; - self.query_queue.enqueue( - insert_query, + self.executor.send(QueryMessage::other( + insert_query.to_string(), vec![ Argument::FieldElement(contract_address), Argument::FieldElement(from), @@ -326,8 +326,9 @@ impl Sql { Argument::String(token_id.to_string()), Argument::String(utc_dt_string_from_timestamp(block_timestamp)), ], - QueryType::Other, - ); + ))?; + + Ok(()) } pub async fn apply_cache_diff(&mut self) -> Result<()> { diff --git a/crates/torii/core/src/sql/mod.rs b/crates/torii/core/src/sql/mod.rs index 562313941a..a9471f8583 100644 --- a/crates/torii/core/src/sql/mod.rs +++ b/crates/torii/core/src/sql/mod.rs @@ -9,14 +9,14 @@ use dojo_types::schema::{EnumOption, Member, Struct, Ty}; use dojo_world::contracts::abi::model::Layout; use dojo_world::contracts::naming::compute_selector_from_names; use dojo_world::metadata::WorldMetadata; -use query_queue::{Argument, DeleteEntityQuery, QueryQueue, QueryType}; use sqlx::pool::PoolConnection; use sqlx::{Pool, Sqlite}; use starknet::core::types::{Event, Felt, InvokeTransaction, Transaction}; use starknet_crypto::poseidon_hash_many; -use tracing::{debug, warn}; +use tokio::sync::mpsc::UnboundedSender; use utils::felts_to_sql_string; +use crate::executor::{Argument, DeleteEntityQuery, QueryMessage, QueryType}; use crate::types::ContractType; use crate::utils::utc_dt_string_from_timestamp; @@ -36,11 +36,10 @@ pub mod utils; use cache::{LocalCache, Model, ModelCache}; -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct Sql { - world_address: Felt, pub pool: Pool, - pub query_queue: QueryQueue, + pub executor: UnboundedSender, model_cache: Arc, // when SQL struct is cloned a empty local_cache is created local_cache: LocalCache, @@ -53,69 +52,39 @@ pub struct Cursors { pub head: Option, } -impl Clone for Sql { - fn clone(&self) -> Self { - Self { - world_address: self.world_address, - pool: self.pool.clone(), - query_queue: QueryQueue::new(self.pool.clone()), - model_cache: self.model_cache.clone(), - local_cache: LocalCache::empty(), - } - } -} - impl Sql { pub async fn new( pool: Pool, - world_address: Felt, + executor: UnboundedSender, contracts: &HashMap, ) -> Result { - let mut query_queue = QueryQueue::new(pool.clone()); - for contract in contracts { - query_queue.enqueue( + executor.send(QueryMessage::other( "INSERT OR IGNORE INTO contracts (id, contract_address, contract_type) VALUES (?, \ - ?, ?)", + ?, ?)" + .to_string(), vec![ Argument::FieldElement(*contract.0), Argument::FieldElement(*contract.0), Argument::String(contract.1.to_string()), ], - QueryType::Other, - ); + ))?; } - query_queue.execute_all().await?; - let local_cache = LocalCache::new(pool.clone()).await; - - Ok(Self { + let db = Self { pool: pool.clone(), - world_address, - query_queue, - model_cache: Arc::new(ModelCache::new(pool)), + executor, + model_cache: Arc::new(ModelCache::new(pool.clone())), local_cache, - }) - } - - pub fn merge(&mut self, other: Sql) -> Result<()> { - // Merge query queue - self.query_queue.queue.extend(other.query_queue.queue); + }; - // This should never happen - if self.world_address != other.world_address { - warn!( - "Merging Sql instances with different world addresses: {} and {}", - self.world_address, other.world_address - ); - } + db.execute().await?; - Ok(()) + Ok(db) } pub async fn head(&self, contract: Felt) -> Result<(u64, Option, Option)> { - let mut conn: PoolConnection = self.pool.acquire().await?; let indexer_query = sqlx::query_as::<_, (Option, Option, Option, String)>( "SELECT head, last_pending_block_contract_tx, last_pending_block_tx, \ @@ -124,31 +93,36 @@ impl Sql { .bind(format!("{:#x}", contract)); let indexer: (Option, Option, Option, String) = indexer_query - .fetch_one(&mut *conn) + .fetch_one(&self.pool) .await .with_context(|| format!("Failed to fetch head for contract: {:#x}", contract))?; Ok(( - indexer.0.map(|h| h.try_into().expect("doesn't fit in u64")).unwrap_or(0), + indexer + .0 + .map(|h| h.try_into().map_err(|_| anyhow!("Head value {} doesn't fit in u64", h))) + .transpose()? + .unwrap_or(0), indexer.1.map(|f| Felt::from_str(&f)).transpose()?, indexer.2.map(|f| Felt::from_str(&f)).transpose()?, )) } - pub fn set_head(&mut self, contract: Felt, head: u64) { + pub fn set_head(&mut self, contract: Felt, head: u64) -> Result<()> { let head = Argument::Int(head.try_into().expect("doesn't fit in u64")); let id = Argument::FieldElement(contract); - self.query_queue.enqueue( - "UPDATE contracts SET head = ? WHERE id = ?", + self.executor.send(QueryMessage::other( + "UPDATE contracts SET head = ? WHERE id = ?".to_string(), vec![head, id], - QueryType::Other, - ); + ))?; + + Ok(()) } pub fn set_last_pending_block_contract_tx( &mut self, contract: Felt, last_pending_block_contract_tx: Option, - ) { + ) -> Result<()> { let last_pending_block_contract_tx = if let Some(f) = last_pending_block_contract_tx { Argument::String(format!("{:#x}", f)) } else { @@ -157,25 +131,27 @@ impl Sql { let id = Argument::FieldElement(contract); - self.query_queue.enqueue( - "UPDATE contracts SET last_pending_block_contract_tx = ? WHERE id = ?", + self.executor.send(QueryMessage::other( + "UPDATE contracts SET last_pending_block_contract_tx = ? WHERE id = ?".to_string(), vec![last_pending_block_contract_tx, id], - QueryType::Other, - ); + ))?; + + Ok(()) } - pub fn set_last_pending_block_tx(&mut self, last_pending_block_tx: Option) { + pub fn set_last_pending_block_tx(&mut self, last_pending_block_tx: Option) -> Result<()> { let last_pending_block_tx = if let Some(f) = last_pending_block_tx { Argument::String(format!("{:#x}", f)) } else { Argument::Null }; - self.query_queue.enqueue( - "UPDATE contracts SET last_pending_block_tx = ? WHERE 1=1", + self.executor.send(QueryMessage::other( + "UPDATE contracts SET last_pending_block_tx = ? WHERE 1=1".to_string(), vec![last_pending_block_tx], - QueryType::Other, - ) + ))?; + + Ok(()) } pub(crate) async fn cursors(&self) -> Result { @@ -216,7 +192,7 @@ impl Sql { head: u64, last_pending_block_tx: Option, cursor_map: HashMap, - ) { + ) -> Result<()> { let head = Argument::Int(head.try_into().expect("doesn't fit in u64")); let last_pending_block_tx = if let Some(f) = last_pending_block_tx { Argument::String(format!("{:#x}", f)) @@ -224,34 +200,36 @@ impl Sql { Argument::Null }; - self.query_queue.enqueue( - "UPDATE contracts SET head = ?, last_pending_block_tx = ? WHERE 1=1", + self.executor.send(QueryMessage::other( + "UPDATE contracts SET head = ?, last_pending_block_tx = ? WHERE 1=1".to_string(), vec![head, last_pending_block_tx], - QueryType::Other, - ); + ))?; for cursor in cursor_map { let tx = Argument::FieldElement(cursor.1); let contract = Argument::FieldElement(cursor.0); - self.query_queue.enqueue( - "UPDATE contracts SET last_pending_block_contract_tx = ? WHERE id = ?", + self.executor.send(QueryMessage::other( + "UPDATE contracts SET last_pending_block_contract_tx = ? WHERE id = ?".to_string(), vec![tx, contract], - QueryType::Other, - ); + ))?; } + + Ok(()) } // For a given contract address, sets head to the passed value and sets // last_pending_block_contract_tx and last_pending_block_tx to null - pub fn reset_cursors(&mut self, head: u64) { + pub fn reset_cursors(&mut self, head: u64) -> Result<()> { let head = Argument::Int(head.try_into().expect("doesn't fit in u64")); - self.query_queue.enqueue( + self.executor.send(QueryMessage::other( "UPDATE contracts SET head = ?, last_pending_block_contract_tx = ?, \ - last_pending_block_tx = ? WHERE 1=1", + last_pending_block_tx = ? WHERE 1=1" + .to_string(), vec![head, Argument::Null, Argument::Null], - QueryType::Other, - ); + ))?; + + Ok(()) } #[allow(clippy::too_many_arguments)] @@ -276,7 +254,6 @@ impl Sql { class_hash=EXCLUDED.class_hash, layout=EXCLUDED.layout, \ packed_size=EXCLUDED.packed_size, unpacked_size=EXCLUDED.unpacked_size, \ executed_at=EXCLUDED.executed_at RETURNING *"; - let arguments = vec![ Argument::String(format!("{:#x}", selector)), Argument::String(namespace.to_string()), @@ -288,8 +265,11 @@ impl Sql { Argument::Int(unpacked_size as i64), Argument::String(utc_dt_string_from_timestamp(block_timestamp)), ]; - - self.query_queue.enqueue(insert_models, arguments, QueryType::RegisterModel); + self.executor.send(QueryMessage::new( + insert_models.to_string(), + arguments, + QueryType::RegisterModel, + ))?; let mut model_idx = 0_i64; self.build_register_queries_recursive( @@ -300,7 +280,7 @@ impl Sql { block_timestamp, &mut 0, &mut 0, - ); + )?; // we set the model in the cache directly // because entities might be using it before the query queue is processed @@ -363,14 +343,18 @@ impl Sql { arguments.push(Argument::String(keys.to_string())); } - self.query_queue.enqueue(insert_entities, arguments, QueryType::SetEntity(entity.clone())); + self.executor.send(QueryMessage::new( + insert_entities.to_string(), + arguments, + QueryType::SetEntity(entity.clone()), + ))?; - self.query_queue.enqueue( + self.executor.send(QueryMessage::other( "INSERT INTO entity_model (entity_id, model_id) VALUES (?, ?) ON CONFLICT(entity_id, \ - model_id) DO NOTHING", + model_id) DO NOTHING" + .to_string(), vec![Argument::String(entity_id.clone()), Argument::String(model_id.clone())], - QueryType::Other, - ); + ))?; let path = vec![namespaced_name]; self.build_set_entity_queries_recursive( @@ -380,7 +364,7 @@ impl Sql { (&entity, keys_str.is_none()), block_timestamp, &vec![], - ); + )?; Ok(()) } @@ -412,8 +396,8 @@ impl Sql { VALUES (?, ?, ?, ?) ON CONFLICT(id) DO UPDATE SET \ updated_at=CURRENT_TIMESTAMP, executed_at=EXCLUDED.executed_at, \ event_id=EXCLUDED.event_id RETURNING *"; - self.query_queue.enqueue( - insert_entities, + self.executor.send(QueryMessage::new( + insert_entities.to_string(), vec![ Argument::String(entity_id.clone()), Argument::String(keys_str), @@ -421,13 +405,13 @@ impl Sql { Argument::String(utc_dt_string_from_timestamp(block_timestamp)), ], QueryType::EventMessage(entity.clone()), - ); - self.query_queue.enqueue( + ))?; + self.executor.send(QueryMessage::other( "INSERT INTO event_model (entity_id, model_id) VALUES (?, ?) ON CONFLICT(entity_id, \ - model_id) DO NOTHING", + model_id) DO NOTHING" + .to_string(), vec![Argument::String(entity_id.clone()), Argument::String(model_id.clone())], - QueryType::Other, - ); + ))?; let path = vec![namespaced_name]; self.build_set_entity_queries_recursive( @@ -437,7 +421,7 @@ impl Sql { (&entity, false), block_timestamp, &vec![], - ); + )?; Ok(()) } @@ -453,10 +437,10 @@ impl Sql { let entity_id = format!("{:#x}", entity_id); let path = vec![entity.name()]; // delete entity models data - self.build_delete_entity_queries_recursive(path, &entity_id, &entity); + self.build_delete_entity_queries_recursive(path, &entity_id, &entity)?; - self.query_queue.enqueue( - "DELETE FROM entity_model WHERE entity_id = ? AND model_id = ?", + self.executor.send(QueryMessage::new( + "DELETE FROM entity_model WHERE entity_id = ? AND model_id = ?".to_string(), vec![Argument::String(entity_id.clone()), Argument::String(format!("{:#x}", model_id))], QueryType::DeleteEntity(DeleteEntityQuery { entity_id: entity_id.clone(), @@ -464,26 +448,28 @@ impl Sql { block_timestamp: utc_dt_string_from_timestamp(block_timestamp), ty: entity.clone(), }), - ); + ))?; Ok(()) } - pub fn set_metadata(&mut self, resource: &Felt, uri: &str, block_timestamp: u64) { + pub fn set_metadata(&mut self, resource: &Felt, uri: &str, block_timestamp: u64) -> Result<()> { let resource = Argument::FieldElement(*resource); let uri = Argument::String(uri.to_string()); let executed_at = Argument::String(utc_dt_string_from_timestamp(block_timestamp)); - self.query_queue.enqueue( + self.executor.send(QueryMessage::other( "INSERT INTO metadata (id, uri, executed_at) VALUES (?, ?, ?) ON CONFLICT(id) DO \ UPDATE SET id=excluded.id, executed_at=excluded.executed_at, \ - updated_at=CURRENT_TIMESTAMP", + updated_at=CURRENT_TIMESTAMP" + .to_string(), vec![resource, uri, executed_at], - QueryType::Other, - ); + ))?; + + Ok(()) } - pub async fn update_metadata( + pub fn update_metadata( &mut self, resource: &Felt, uri: &str, @@ -509,7 +495,7 @@ impl Sql { let statement = format!("UPDATE metadata SET {} WHERE id = ?", update.join(",")); arguments.push(Argument::FieldElement(*resource)); - self.query_queue.enqueue(statement, arguments, QueryType::Other); + self.executor.send(QueryMessage::other(statement, arguments))?; Ok(()) } @@ -539,13 +525,13 @@ impl Sql { transaction: &Transaction, transaction_id: &str, block_timestamp: u64, - ) { + ) -> Result<()> { let id = Argument::String(transaction_id.to_string()); let transaction_type = match transaction { Transaction::Invoke(_) => "INVOKE", Transaction::L1Handler(_) => "L1_HANDLER", - _ => return, + _ => return Ok(()), }; let (transaction_hash, sender_address, calldata, max_fee, signature, nonce) = @@ -566,13 +552,14 @@ impl Sql { Argument::String("".to_string()), // has no signature Argument::FieldElement((l1_handler_transaction.nonce).into()), ), - _ => return, + _ => return Ok(()), }; - self.query_queue.enqueue( + self.executor.send(QueryMessage::other( "INSERT OR IGNORE INTO transactions (id, transaction_hash, sender_address, calldata, \ max_fee, signature, nonce, transaction_type, executed_at) VALUES (?, ?, ?, ?, ?, ?, \ - ?, ?, ?)", + ?, ?, ?)" + .to_string(), vec![ id, transaction_hash, @@ -584,8 +571,9 @@ impl Sql { Argument::String(transaction_type.to_string()), Argument::String(utc_dt_string_from_timestamp(block_timestamp)), ], - QueryType::Other, - ); + ))?; + + Ok(()) } pub fn store_event( @@ -594,19 +582,22 @@ impl Sql { event: &Event, transaction_hash: Felt, block_timestamp: u64, - ) { + ) -> Result<()> { let id = Argument::String(event_id.to_string()); let keys = Argument::String(felts_to_sql_string(&event.keys)); let data = Argument::String(felts_to_sql_string(&event.data)); let hash = Argument::FieldElement(transaction_hash); let executed_at = Argument::String(utc_dt_string_from_timestamp(block_timestamp)); - self.query_queue.enqueue( + self.executor.send(QueryMessage::new( "INSERT OR IGNORE INTO events (id, keys, data, transaction_hash, executed_at) VALUES \ - (?, ?, ?, ?, ?) RETURNING *", + (?, ?, ?, ?, ?) RETURNING *" + .to_string(), vec![id, keys, data, hash, executed_at], QueryType::StoreEvent, - ); + ))?; + + Ok(()) } #[allow(clippy::too_many_arguments)] @@ -619,11 +610,11 @@ impl Sql { block_timestamp: u64, array_idx: &mut usize, parent_array_idx: &mut usize, - ) { + ) -> Result<()> { if let Ty::Enum(e) = model { if e.options.iter().all(|o| if let Ty::Tuple(t) = &o.ty { t.is_empty() } else { false }) { - return; + return Ok(()); } } @@ -635,13 +626,13 @@ impl Sql { block_timestamp, *array_idx, *parent_array_idx, - ); + )?; - let mut build_member = |pathname: &str, member: &Ty| { + let mut build_member = |pathname: &str, member: &Ty| -> Result<()> { if let Ty::Primitive(_) = member { - return; + return Ok(()); } else if let Ty::ByteArray(_) = member { - return; + return Ok(()); } let mut path_clone = path.clone(); @@ -655,20 +646,22 @@ impl Sql { block_timestamp, &mut (*array_idx + if let Ty::Array(_) = member { 1 } else { 0 }), &mut (*parent_array_idx + if let Ty::Array(_) = model { 1 } else { 0 }), - ); + )?; + + Ok(()) }; if let Ty::Struct(s) = model { for member in s.children.iter() { - build_member(&member.name, &member.ty); + build_member(&member.name, &member.ty)?; } } else if let Ty::Tuple(t) = model { for (idx, member) in t.iter().enumerate() { - build_member(format!("_{}", idx).as_str(), member); + build_member(format!("_{}", idx).as_str(), member)?; } } else if let Ty::Array(array) = model { let ty = &array[0]; - build_member("data", ty); + build_member("data", ty)?; } else if let Ty::Enum(e) = model { for child in e.options.iter() { // Skip enum options that have no type / member @@ -678,9 +671,11 @@ impl Sql { } } - build_member(&child.name, &child.ty); + build_member(&child.name, &child.ty)?; } } + + Ok(()) } fn build_set_entity_queries_recursive( @@ -692,103 +687,107 @@ impl Sql { entity: (&Ty, IsStoreUpdate), block_timestamp: u64, indexes: &Vec, - ) { + ) -> Result<()> { let (entity_id, is_event_message) = entity_id; let (entity, is_store_update_member) = entity; - let update_members = - |members: &[Member], query_queue: &mut QueryQueue, indexes: &Vec| { - let table_id = path.join("$"); - let mut columns = vec![ - "id".to_string(), - "event_id".to_string(), - "executed_at".to_string(), - "updated_at".to_string(), - if is_event_message { - "event_message_id".to_string() - } else { - "entity_id".to_string() - }, - ]; - - let mut arguments = vec![ - Argument::String(if is_event_message { - "event:".to_string() + entity_id - } else { - entity_id.to_string() - }), - Argument::String(event_id.to_string()), - Argument::String(utc_dt_string_from_timestamp(block_timestamp)), - Argument::String(chrono::Utc::now().to_rfc3339()), - Argument::String(entity_id.to_string()), - ]; + let update_members = |members: &[Member], + executor: &mut UnboundedSender, + indexes: &Vec| + -> Result<()> { + let table_id = path.join("$"); + let mut columns = vec![ + "id".to_string(), + "event_id".to_string(), + "executed_at".to_string(), + "updated_at".to_string(), + if is_event_message { + "event_message_id".to_string() + } else { + "entity_id".to_string() + }, + ]; - if !indexes.is_empty() { - columns.push("full_array_id".to_string()); - arguments.push(Argument::String( - std::iter::once(entity_id.to_string()) - .chain(indexes.iter().map(|i| i.to_string())) - .collect::>() - .join(FELT_DELIMITER), - )); - } + let mut arguments = vec![ + Argument::String(if is_event_message { + "event:".to_string() + entity_id + } else { + entity_id.to_string() + }), + Argument::String(event_id.to_string()), + Argument::String(utc_dt_string_from_timestamp(block_timestamp)), + Argument::String(chrono::Utc::now().to_rfc3339()), + Argument::String(entity_id.to_string()), + ]; + + if !indexes.is_empty() { + columns.push("full_array_id".to_string()); + arguments.push(Argument::String( + std::iter::once(entity_id.to_string()) + .chain(indexes.iter().map(|i| i.to_string())) + .collect::>() + .join(FELT_DELIMITER), + )); + } - for (column_idx, idx) in indexes.iter().enumerate() { - columns.push(format!("idx_{}", column_idx)); - arguments.push(Argument::Int(*idx)); - } + for (column_idx, idx) in indexes.iter().enumerate() { + columns.push(format!("idx_{}", column_idx)); + arguments.push(Argument::Int(*idx)); + } - for member in members.iter() { - match &member.ty { - Ty::Primitive(ty) => { - columns.push(format!("external_{}", &member.name)); - arguments.push(Argument::String(ty.to_sql_value().unwrap())); - } - Ty::Enum(e) => { - columns.push(format!("external_{}", &member.name)); - arguments.push(Argument::String(e.to_sql_value().unwrap())); - } - Ty::ByteArray(b) => { - columns.push(format!("external_{}", &member.name)); - arguments.push(Argument::String(b.clone())); - } - _ => {} + for member in members.iter() { + match &member.ty { + Ty::Primitive(ty) => { + columns.push(format!("external_{}", &member.name)); + arguments.push(Argument::String(ty.to_sql_value().unwrap())); + } + Ty::Enum(e) => { + columns.push(format!("external_{}", &member.name)); + arguments.push(Argument::String(e.to_sql_value().unwrap())); } + Ty::ByteArray(b) => { + columns.push(format!("external_{}", &member.name)); + arguments.push(Argument::String(b.clone())); + } + _ => {} } + } - let placeholders: Vec<&str> = arguments.iter().map(|_| "?").collect(); - let statement = if is_store_update_member && indexes.is_empty() { - arguments.push(Argument::String(if is_event_message { - "event:".to_string() + entity_id - } else { - entity_id.to_string() - })); - - // row has to exist. update it directly - format!( - "UPDATE [{table_id}] SET {updates} WHERE id = ?", - table_id = table_id, - updates = columns - .iter() - .zip(placeholders.iter()) - .map(|(column, placeholder)| format!("{} = {}", column, placeholder)) - .collect::>() - .join(", ") - ) + let placeholders: Vec<&str> = arguments.iter().map(|_| "?").collect(); + let statement = if is_store_update_member && indexes.is_empty() { + arguments.push(Argument::String(if is_event_message { + "event:".to_string() + entity_id } else { - format!( - "INSERT OR REPLACE INTO [{table_id}] ({}) VALUES ({})", - columns.join(","), - placeholders.join(",") - ) - }; - - query_queue.enqueue(statement, arguments, QueryType::Other); + entity_id.to_string() + })); + + // row has to exist. update it directly + format!( + "UPDATE [{table_id}] SET {updates} WHERE id = ?", + table_id = table_id, + updates = columns + .iter() + .zip(placeholders.iter()) + .map(|(column, placeholder)| format!("{} = {}", column, placeholder)) + .collect::>() + .join(", ") + ) + } else { + format!( + "INSERT OR REPLACE INTO [{table_id}] ({}) VALUES ({})", + columns.join(","), + placeholders.join(",") + ) }; + executor.send(QueryMessage::other(statement, arguments))?; + + Ok(()) + }; + match entity { Ty::Struct(s) => { - update_members(&s.children, &mut self.query_queue, indexes); + update_members(&s.children, &mut self.executor, indexes)?; for member in s.children.iter() { let mut path_clone = path.clone(); @@ -800,16 +799,20 @@ impl Sql { (&member.ty, is_store_update_member), block_timestamp, indexes, - ); + )?; } } Ty::Enum(e) => { if e.options.iter().all( |o| { - if let Ty::Tuple(t) = &o.ty { t.is_empty() } else { false } + if let Ty::Tuple(t) = &o.ty { + t.is_empty() + } else { + false + } }, ) { - return; + return Ok(()); } let option = e.options[e.option.unwrap() as usize].clone(); @@ -819,9 +822,9 @@ impl Sql { Member { name: "option".to_string(), ty: Ty::Enum(e.clone()), key: false }, Member { name: option.name.clone(), ty: option.ty.clone(), key: false }, ], - &mut self.query_queue, + &mut self.executor, indexes, - ); + )?; match &option.ty { // Skip enum options that have no type / member @@ -836,7 +839,7 @@ impl Sql { (&option.ty, is_store_update_member), block_timestamp, indexes, - ); + )?; } } } @@ -851,9 +854,9 @@ impl Sql { }) .collect::>() .as_slice(), - &mut self.query_queue, + &mut self.executor, indexes, - ); + )?; for (idx, member) in t.iter().enumerate() { let mut path_clone = path.clone(); @@ -865,7 +868,7 @@ impl Sql { (member, is_store_update_member), block_timestamp, indexes, - ); + )?; } } Ty::Array(array) => { @@ -881,7 +884,7 @@ impl Sql { let mut arguments = vec![Argument::String(entity_id.to_string())]; arguments.extend(indexes.iter().map(|idx| Argument::Int(*idx))); - self.query_queue.enqueue(query, arguments, QueryType::Other); + self.executor.send(QueryMessage::other(query, arguments))?; // insert the new array elements for (idx, member) in array.iter().enumerate() { @@ -890,9 +893,9 @@ impl Sql { update_members( &[Member { name: "data".to_string(), ty: member.clone(), key: false }], - &mut self.query_queue, + &mut self.executor, &indexes, - ); + )?; let mut path_clone = path.clone(); path_clone.push("data".to_string()); @@ -903,11 +906,13 @@ impl Sql { (member, is_store_update_member), block_timestamp, &indexes, - ); + )?; } } _ => {} } + + Ok(()) } fn build_delete_entity_queries_recursive( @@ -915,20 +920,19 @@ impl Sql { path: Vec, entity_id: &str, entity: &Ty, - ) { + ) -> Result<()> { match entity { Ty::Struct(s) => { let table_id = path.join("$"); let statement = format!("DELETE FROM [{table_id}] WHERE entity_id = ?"); - self.query_queue.enqueue( + self.executor.send(QueryMessage::other( statement, vec![Argument::String(entity_id.to_string())], - QueryType::Other, - ); + ))?; for member in s.children.iter() { let mut path_clone = path.clone(); path_clone.push(member.name.clone()); - self.build_delete_entity_queries_recursive(path_clone, entity_id, &member.ty); + self.build_delete_entity_queries_recursive(path_clone, entity_id, &member.ty)?; } } Ty::Enum(e) => { @@ -936,16 +940,15 @@ impl Sql { .iter() .all(|o| if let Ty::Tuple(t) = &o.ty { t.is_empty() } else { false }) { - return; + return Ok(()); } let table_id = path.join("$"); let statement = format!("DELETE FROM [{table_id}] WHERE entity_id = ?"); - self.query_queue.enqueue( + self.executor.send(QueryMessage::other( statement, vec![Argument::String(entity_id.to_string())], - QueryType::Other, - ); + ))?; for child in e.options.iter() { if let Ty::Tuple(t) = &child.ty { @@ -956,41 +959,41 @@ impl Sql { let mut path_clone = path.clone(); path_clone.push(child.name.clone()); - self.build_delete_entity_queries_recursive(path_clone, entity_id, &child.ty); + self.build_delete_entity_queries_recursive(path_clone, entity_id, &child.ty)?; } } Ty::Array(array) => { let table_id = path.join("$"); let statement = format!("DELETE FROM [{table_id}] WHERE entity_id = ?"); - self.query_queue.enqueue( + self.executor.send(QueryMessage::other( statement, vec![Argument::String(entity_id.to_string())], - QueryType::Other, - ); + ))?; for member in array.iter() { let mut path_clone = path.clone(); path_clone.push("data".to_string()); - self.build_delete_entity_queries_recursive(path_clone, entity_id, member); + self.build_delete_entity_queries_recursive(path_clone, entity_id, member)?; } } Ty::Tuple(t) => { let table_id = path.join("$"); let statement = format!("DELETE FROM [{table_id}] WHERE entity_id = ?"); - self.query_queue.enqueue( + self.executor.send(QueryMessage::other( statement, vec![Argument::String(entity_id.to_string())], - QueryType::Other, - ); + ))?; for (idx, member) in t.iter().enumerate() { let mut path_clone = path.clone(); path_clone.push(format!("_{}", idx)); - self.build_delete_entity_queries_recursive(path_clone, entity_id, member); + self.build_delete_entity_queries_recursive(path_clone, entity_id, member)?; } } _ => {} } + + Ok(()) } #[allow(clippy::too_many_arguments)] @@ -1003,7 +1006,7 @@ impl Sql { block_timestamp: u64, array_idx: usize, parent_array_idx: usize, - ) { + ) -> Result<()> { let table_id = path.join("$"); let mut indices = Vec::new(); @@ -1095,7 +1098,7 @@ impl Sql { Argument::String(utc_dt_string_from_timestamp(block_timestamp)), ]; - self.query_queue.enqueue(statement, arguments, QueryType::Other); + self.executor.send(QueryMessage::other(statement.to_string(), arguments))?; } } Ty::Tuple(tuple) => { @@ -1123,7 +1126,7 @@ impl Sql { Argument::String(utc_dt_string_from_timestamp(block_timestamp)), ]; - self.query_queue.enqueue(statement, arguments, QueryType::Other); + self.executor.send(QueryMessage::other(statement.to_string(), arguments))?; } } Ty::Array(array) => { @@ -1148,7 +1151,7 @@ impl Sql { Argument::String(utc_dt_string_from_timestamp(block_timestamp)), ]; - self.query_queue.enqueue(statement, arguments, QueryType::Other); + self.executor.send(QueryMessage::other(statement.to_string(), arguments))?; } Ty::Enum(e) => { for (idx, child) in e @@ -1187,7 +1190,7 @@ impl Sql { Argument::String(utc_dt_string_from_timestamp(block_timestamp)), ]; - self.query_queue.enqueue(statement, arguments, QueryType::Other); + self.executor.send(QueryMessage::other(statement.to_string(), arguments))?; } } _ => {} @@ -1226,18 +1229,18 @@ impl Sql { create_table_query .push_str("FOREIGN KEY (event_message_id) REFERENCES event_messages(id));"); - self.query_queue.enqueue(create_table_query, vec![], QueryType::Other); - - indices.iter().for_each(|s| { - self.query_queue.enqueue(s, vec![], QueryType::Other); - }); - } + self.executor.send(QueryMessage::other(create_table_query, vec![]))?; - /// Execute all queries in the queue - pub async fn execute(&mut self) -> Result<()> { - debug!("Executing {} queries from the queue", self.query_queue.queue.len()); - self.query_queue.execute_all().await?; + for s in indices.iter() { + self.executor.send(QueryMessage::other(s.to_string(), vec![]))?; + } Ok(()) } + + pub async fn execute(&self) -> Result<()> { + let (execute, recv) = QueryMessage::execute_recv(); + self.executor.send(execute)?; + recv.await? + } } diff --git a/crates/torii/core/src/sql/test.rs b/crates/torii/core/src/sql/test.rs index e4b9d2a458..bd6fe9208a 100644 --- a/crates/torii/core/src/sql/test.rs +++ b/crates/torii/core/src/sql/test.rs @@ -18,9 +18,11 @@ use starknet::core::utils::{get_contract_address, get_selector_from_name}; use starknet::providers::jsonrpc::HttpTransport; use starknet::providers::{JsonRpcClient, Provider}; use starknet_crypto::poseidon_hash_many; +use tempfile::NamedTempFile; use tokio::sync::broadcast; use crate::engine::{Engine, EngineConfig, Processors}; +use crate::executor::Executor; use crate::sql::Sql; use crate::types::ContractType; @@ -37,7 +39,7 @@ where let world_address = world.address; let mut engine = Engine::new( world, - db, + db.clone(), provider, Processors { ..Processors::default() }, EngineConfig::default(), @@ -49,17 +51,14 @@ where let data = engine.fetch_range(0, to, &HashMap::new()).await.unwrap(); engine.process_range(data).await.unwrap(); + db.execute().await.unwrap(); + Ok(engine) } #[tokio::test(flavor = "multi_thread")] #[katana_runner::test(accounts = 10, db_dir = copy_spawn_and_move_db().as_str())] async fn test_load_from_remote(sequencer: &RunnerCtx) { - let options = - SqliteConnectOptions::from_str("sqlite::memory:").unwrap().create_if_missing(true); - let pool = SqlitePoolOptions::new().max_connections(5).connect_with(options).await.unwrap(); - sqlx::migrate!("../migrations").run(&pool).await.unwrap(); - let setup = CompilerTestSetup::from_examples("../../dojo-core", "../../../examples/"); let config = setup.build_test_config("spawn-and-move", Profile::DEV); @@ -110,11 +109,36 @@ async fn test_load_from_remote(sequencer: &RunnerCtx) { TransactionWaiter::new(tx.transaction_hash, &provider).await.unwrap(); + // move + let tx = &account + .execute_v1(vec![Call { + to: actions_address, + selector: get_selector_from_name("move").unwrap(), + calldata: vec![Felt::ONE], + }]) + .send() + .await + .unwrap(); + + TransactionWaiter::new(tx.transaction_hash, &provider).await.unwrap(); + let world_reader = WorldContractReader::new(strat.world_address, Arc::clone(&provider)); - let mut db = Sql::new( + let tempfile = NamedTempFile::new().unwrap(); + let path = tempfile.path().to_string_lossy(); + let options = SqliteConnectOptions::from_str(&path).unwrap().create_if_missing(true); + let pool = SqlitePoolOptions::new().connect_with(options).await.unwrap(); + sqlx::migrate!("../migrations").run(&pool).await.unwrap(); + + let (shutdown_tx, _) = broadcast::channel(1); + let (mut executor, sender) = Executor::new(pool.clone(), shutdown_tx.clone()).await.unwrap(); + tokio::spawn(async move { + executor.run().await.unwrap(); + }); + + let db = Sql::new( pool.clone(), - world_reader.address, + sender.clone(), &HashMap::from([(world_reader.address, ContractType::WORLD)]), ) .await @@ -172,6 +196,7 @@ async fn test_load_from_remote(sequencer: &RunnerCtx) { assert_eq!(unpacked_size, 0); assert_eq!(count_table("entities", &pool).await, 2); + assert_eq!(count_table("event_messages", &pool).await, 2); let (id, keys): (String, String) = sqlx::query_as( format!( @@ -186,18 +211,11 @@ async fn test_load_from_remote(sequencer: &RunnerCtx) { assert_eq!(id, format!("{:#x}", poseidon_hash_many(&[account.address()]))); assert_eq!(keys, format!("{:#x}/", account.address())); - - db.execute().await.unwrap(); } #[tokio::test(flavor = "multi_thread")] #[katana_runner::test(accounts = 10, db_dir = copy_spawn_and_move_db().as_str())] async fn test_load_from_remote_del(sequencer: &RunnerCtx) { - let options = - SqliteConnectOptions::from_str("sqlite::memory:").unwrap().create_if_missing(true); - let pool = SqlitePoolOptions::new().max_connections(5).connect_with(options).await.unwrap(); - sqlx::migrate!("../migrations").run(&pool).await.unwrap(); - let setup = CompilerTestSetup::from_examples("../../dojo-core", "../../../examples/"); let config = setup.build_test_config("spawn-and-move", Profile::DEV); @@ -275,9 +293,21 @@ async fn test_load_from_remote_del(sequencer: &RunnerCtx) { let world_reader = WorldContractReader::new(strat.world_address, Arc::clone(&provider)); - let mut db = Sql::new( + let tempfile = NamedTempFile::new().unwrap(); + let path = tempfile.path().to_string_lossy(); + let options = SqliteConnectOptions::from_str(&path).unwrap().create_if_missing(true); + let pool = SqlitePoolOptions::new().connect_with(options).await.unwrap(); + sqlx::migrate!("../migrations").run(&pool).await.unwrap(); + + let (shutdown_tx, _) = broadcast::channel(1); + let (mut executor, sender) = Executor::new(pool.clone(), shutdown_tx.clone()).await.unwrap(); + tokio::spawn(async move { + executor.run().await.unwrap(); + }); + + let db = Sql::new( pool.clone(), - world_reader.address, + sender.clone(), &HashMap::from([(world_reader.address, ContractType::WORLD)]), ) .await @@ -291,18 +321,11 @@ async fn test_load_from_remote_del(sequencer: &RunnerCtx) { // TODO: check how we can have a test that is more chronological with Torii re-syncing // to ensure we can test intermediate states. - - db.execute().await.unwrap(); } #[tokio::test(flavor = "multi_thread")] #[katana_runner::test(accounts = 10, db_dir = copy_spawn_and_move_db().as_str())] async fn test_update_with_set_record(sequencer: &RunnerCtx) { - let options = - SqliteConnectOptions::from_str("sqlite::memory:").unwrap().create_if_missing(true); - let pool = SqlitePoolOptions::new().max_connections(5).connect_with(options).await.unwrap(); - sqlx::migrate!("../migrations").run(&pool).await.unwrap(); - let setup = CompilerTestSetup::from_examples("../../dojo-core", "../../../examples/"); let config = setup.build_test_config("spawn-and-move", Profile::DEV); @@ -368,17 +391,27 @@ async fn test_update_with_set_record(sequencer: &RunnerCtx) { let world_reader = WorldContractReader::new(strat.world_address, Arc::clone(&provider)); - let mut db = Sql::new( + let tempfile = NamedTempFile::new().unwrap(); + let path = tempfile.path().to_string_lossy(); + let options = SqliteConnectOptions::from_str(&path).unwrap().create_if_missing(true); + let pool = SqlitePoolOptions::new().connect_with(options).await.unwrap(); + sqlx::migrate!("../migrations").run(&pool).await.unwrap(); + + let (shutdown_tx, _) = broadcast::channel(1); + let (mut executor, sender) = Executor::new(pool.clone(), shutdown_tx.clone()).await.unwrap(); + tokio::spawn(async move { + executor.run().await.unwrap(); + }); + + let db = Sql::new( pool.clone(), - world_reader.address, + sender.clone(), &HashMap::from([(world_reader.address, ContractType::WORLD)]), ) .await .unwrap(); let _ = bootstrap_engine(world_reader, db.clone(), Arc::clone(&provider)).await.unwrap(); - - db.execute().await.unwrap(); } /// Count the number of rows in a table. diff --git a/crates/torii/graphql/Cargo.toml b/crates/torii/graphql/Cargo.toml index 459f073091..b4c2d3ced6 100644 --- a/crates/torii/graphql/Cargo.toml +++ b/crates/torii/graphql/Cargo.toml @@ -49,3 +49,4 @@ serial_test = "2.0.0" sozo-ops.workspace = true starknet-crypto.workspace = true starknet.workspace = true +tempfile.workspace = true diff --git a/crates/torii/graphql/src/tests/entities_test.rs b/crates/torii/graphql/src/tests/entities_test.rs index 6138aac846..efd74ab723 100644 --- a/crates/torii/graphql/src/tests/entities_test.rs +++ b/crates/torii/graphql/src/tests/entities_test.rs @@ -5,6 +5,7 @@ mod tests { use serde_json::Value; use starknet::core::types::Felt; use starknet_crypto::poseidon_hash_many; + use tempfile::NamedTempFile; use crate::schema::build_schema; use crate::tests::{ @@ -90,7 +91,9 @@ mod tests { // to run so combine all related tests into one #[tokio::test(flavor = "multi_thread")] async fn entities_test() -> Result<()> { - let pool = spinup_types_test().await?; + let tempfile = NamedTempFile::new().unwrap(); + let path = tempfile.path().to_string_lossy(); + let pool = spinup_types_test(&path).await?; let schema = build_schema(&pool).await.unwrap(); // default without params diff --git a/crates/torii/graphql/src/tests/metadata_test.rs b/crates/torii/graphql/src/tests/metadata_test.rs index 4505ad8c77..24224eb6b0 100644 --- a/crates/torii/graphql/src/tests/metadata_test.rs +++ b/crates/torii/graphql/src/tests/metadata_test.rs @@ -6,6 +6,8 @@ mod tests { use dojo_world::metadata::WorldMetadata; use sqlx::SqlitePool; use starknet::core::types::Felt; + use tokio::sync::broadcast; + use torii_core::executor::Executor; use torii_core::sql::Sql; use torii_core::types::ContractType; @@ -51,8 +53,14 @@ mod tests { #[sqlx::test(migrations = "../migrations")] async fn test_metadata(pool: SqlitePool) { + let (shutdown_tx, _) = broadcast::channel(1); + let (mut executor, sender) = + Executor::new(pool.clone(), shutdown_tx.clone()).await.unwrap(); + tokio::spawn(async move { + executor.run().await.unwrap(); + }); let mut db = - Sql::new(pool.clone(), Felt::ZERO, &HashMap::from([(Felt::ZERO, ContractType::WORLD)])) + Sql::new(pool.clone(), sender, &HashMap::from([(Felt::ZERO, ContractType::WORLD)])) .await .unwrap(); let schema = build_schema(&pool).await.unwrap(); @@ -76,9 +84,8 @@ mod tests { // TODO: we may want to store here the namespace and the seed. Check the // implementation to actually add those to the metadata table. let world_metadata: WorldMetadata = profile_config.world.into(); - db.set_metadata(&RESOURCE, URI, BLOCK_TIMESTAMP); + db.set_metadata(&RESOURCE, URI, BLOCK_TIMESTAMP).unwrap(); db.update_metadata(&RESOURCE, URI, &world_metadata, &None, &Some(cover_img.to_string())) - .await .unwrap(); db.execute().await.unwrap(); @@ -107,13 +114,19 @@ mod tests { #[sqlx::test(migrations = "../migrations")] async fn test_empty_content(pool: SqlitePool) { + let (shutdown_tx, _) = broadcast::channel(1); + let (mut executor, sender) = + Executor::new(pool.clone(), shutdown_tx.clone()).await.unwrap(); + tokio::spawn(async move { + executor.run().await.unwrap(); + }); let mut db = - Sql::new(pool.clone(), Felt::ZERO, &HashMap::from([(Felt::ZERO, ContractType::WORLD)])) + Sql::new(pool.clone(), sender, &HashMap::from([(Felt::ZERO, ContractType::WORLD)])) .await .unwrap(); let schema = build_schema(&pool).await.unwrap(); - db.set_metadata(&RESOURCE, URI, BLOCK_TIMESTAMP); + db.set_metadata(&RESOURCE, URI, BLOCK_TIMESTAMP).unwrap(); db.execute().await.unwrap(); let result = run_graphql_query(&schema, QUERY).await; diff --git a/crates/torii/graphql/src/tests/mod.rs b/crates/torii/graphql/src/tests/mod.rs index b03b9a71c6..d12c4c5e80 100644 --- a/crates/torii/graphql/src/tests/mod.rs +++ b/crates/torii/graphql/src/tests/mod.rs @@ -28,6 +28,7 @@ use starknet::providers::{JsonRpcClient, Provider}; use tokio::sync::broadcast; use tokio_stream::StreamExt; use torii_core::engine::{Engine, EngineConfig, Processors}; +use torii_core::executor::Executor; use torii_core::sql::Sql; use torii_core::types::ContractType; @@ -272,11 +273,10 @@ pub async fn model_fixtures(db: &mut Sql) { db.execute().await.unwrap(); } -pub async fn spinup_types_test() -> Result { - // change sqlite::memory: to sqlite:~/.test.db to dump database to disk +pub async fn spinup_types_test(path: &str) -> Result { let options = - SqliteConnectOptions::from_str("sqlite::memory:")?.create_if_missing(true).with_regexp(); - let pool = SqlitePoolOptions::new().max_connections(5).connect_with(options).await.unwrap(); + SqliteConnectOptions::from_str(path).unwrap().create_if_missing(true).with_regexp(); + let pool = SqlitePoolOptions::new().connect_with(options).await.unwrap(); sqlx::migrate!("../migrations").run(&pool).await.unwrap(); let setup = CompilerTestSetup::from_paths("../../dojo-core", &["../types-test"]); @@ -348,30 +348,34 @@ pub async fn spinup_types_test() -> Result { let world = WorldContractReader::new(strat.world_address, Arc::clone(&provider)); + let (shutdown_tx, _) = broadcast::channel(1); + let (mut executor, sender) = Executor::new(pool.clone(), shutdown_tx.clone()).await.unwrap(); + tokio::spawn(async move { + executor.run().await.unwrap(); + }); let db = Sql::new( pool.clone(), - strat.world_address, + sender, &HashMap::from([(strat.world_address, ContractType::WORLD)]), ) .await .unwrap(); - let world_address = strat.world_address; let (shutdown_tx, _) = broadcast::channel(1); let mut engine = Engine::new( world, - db, + db.clone(), Arc::clone(&provider), Processors { ..Processors::default() }, EngineConfig::default(), shutdown_tx, None, - Arc::new(HashMap::from([(world_address, ContractType::WORLD)])), + Arc::new(HashMap::from([(strat.world_address, ContractType::WORLD)])), ); let to = account.provider().block_hash_and_number().await?.block_number; let data = engine.fetch_range(0, to, &HashMap::new()).await.unwrap(); engine.process_range(data).await.unwrap(); - + db.execute().await.unwrap(); Ok(pool) } diff --git a/crates/torii/graphql/src/tests/models_ordering_test.rs b/crates/torii/graphql/src/tests/models_ordering_test.rs index 9b4abdf26e..42182182fa 100644 --- a/crates/torii/graphql/src/tests/models_ordering_test.rs +++ b/crates/torii/graphql/src/tests/models_ordering_test.rs @@ -3,6 +3,7 @@ mod tests { use anyhow::Result; use async_graphql::dynamic::Schema; use serde_json::Value; + use tempfile::NamedTempFile; use crate::schema::build_schema; use crate::tests::{run_graphql_query, spinup_types_test, Connection, WorldModel}; @@ -44,7 +45,9 @@ mod tests { // to run so combine all related tests into one #[tokio::test(flavor = "multi_thread")] async fn models_ordering_test() -> Result<()> { - let pool = spinup_types_test().await?; + let tempfile = NamedTempFile::new().unwrap(); + let path = tempfile.path().to_string_lossy(); + let pool = spinup_types_test(&path).await?; let schema = build_schema(&pool).await.unwrap(); // default params, test entity relationship, test nested types diff --git a/crates/torii/graphql/src/tests/models_test.rs b/crates/torii/graphql/src/tests/models_test.rs index 163d9afc41..78cd6f5458 100644 --- a/crates/torii/graphql/src/tests/models_test.rs +++ b/crates/torii/graphql/src/tests/models_test.rs @@ -6,6 +6,7 @@ mod tests { use async_graphql::dynamic::Schema; use serde_json::Value; use starknet::core::types::Felt; + use tempfile::NamedTempFile; use crate::schema::build_schema; use crate::tests::{ @@ -166,7 +167,9 @@ mod tests { #[allow(clippy::get_first)] #[tokio::test(flavor = "multi_thread")] async fn models_test() -> Result<()> { - let pool = spinup_types_test().await?; + let tempfile = NamedTempFile::new().unwrap(); + let path = tempfile.path().to_string_lossy(); + let pool = spinup_types_test(&path).await?; let schema = build_schema(&pool).await.unwrap(); // we need to order all the records because insertions are done in parallel diff --git a/crates/torii/graphql/src/tests/subscription_test.rs b/crates/torii/graphql/src/tests/subscription_test.rs index efe9a7fd47..f35b60fcc6 100644 --- a/crates/torii/graphql/src/tests/subscription_test.rs +++ b/crates/torii/graphql/src/tests/subscription_test.rs @@ -13,7 +13,8 @@ mod tests { use sqlx::SqlitePool; use starknet::core::types::Event; use starknet_crypto::{poseidon_hash_many, Felt}; - use tokio::sync::mpsc; + use tokio::sync::{broadcast, mpsc}; + use torii_core::executor::Executor; use torii_core::sql::utils::felts_to_sql_string; use torii_core::sql::Sql; use torii_core::types::ContractType; @@ -24,8 +25,14 @@ mod tests { #[sqlx::test(migrations = "../migrations")] #[serial] async fn test_entity_subscription(pool: SqlitePool) { + let (shutdown_tx, _) = broadcast::channel(1); + let (mut executor, sender) = + Executor::new(pool.clone(), shutdown_tx.clone()).await.unwrap(); + tokio::spawn(async move { + executor.run().await.unwrap(); + }); let mut db = - Sql::new(pool.clone(), Felt::ZERO, &HashMap::from([(Felt::ZERO, ContractType::WORLD)])) + Sql::new(pool.clone(), sender, &HashMap::from([(Felt::ZERO, ContractType::WORLD)])) .await .unwrap(); @@ -162,8 +169,14 @@ mod tests { #[sqlx::test(migrations = "../migrations")] #[serial] async fn test_entity_subscription_with_id(pool: SqlitePool) { + let (shutdown_tx, _) = broadcast::channel(1); + let (mut executor, sender) = + Executor::new(pool.clone(), shutdown_tx.clone()).await.unwrap(); + tokio::spawn(async move { + executor.run().await.unwrap(); + }); let mut db = - Sql::new(pool.clone(), Felt::ZERO, &HashMap::from([(Felt::ZERO, ContractType::WORLD)])) + Sql::new(pool.clone(), sender, &HashMap::from([(Felt::ZERO, ContractType::WORLD)])) .await .unwrap(); @@ -280,8 +293,14 @@ mod tests { #[sqlx::test(migrations = "../migrations")] #[serial] async fn test_model_subscription(pool: SqlitePool) { + let (shutdown_tx, _) = broadcast::channel(1); + let (mut executor, sender) = + Executor::new(pool.clone(), shutdown_tx.clone()).await.unwrap(); + tokio::spawn(async move { + executor.run().await.unwrap(); + }); let mut db = - Sql::new(pool.clone(), Felt::ZERO, &HashMap::from([(Felt::ZERO, ContractType::WORLD)])) + Sql::new(pool.clone(), sender, &HashMap::from([(Felt::ZERO, ContractType::WORLD)])) .await .unwrap(); // 0. Preprocess model value @@ -348,8 +367,14 @@ mod tests { #[sqlx::test(migrations = "../migrations")] #[serial] async fn test_model_subscription_with_id(pool: SqlitePool) { + let (shutdown_tx, _) = broadcast::channel(1); + let (mut executor, sender) = + Executor::new(pool.clone(), shutdown_tx.clone()).await.unwrap(); + tokio::spawn(async move { + executor.run().await.unwrap(); + }); let mut db = - Sql::new(pool.clone(), Felt::ZERO, &HashMap::from([(Felt::ZERO, ContractType::WORLD)])) + Sql::new(pool.clone(), sender, &HashMap::from([(Felt::ZERO, ContractType::WORLD)])) .await .unwrap(); // 0. Preprocess model value @@ -417,8 +442,14 @@ mod tests { #[sqlx::test(migrations = "../migrations")] #[serial] async fn test_event_emitted(pool: SqlitePool) { + let (shutdown_tx, _) = broadcast::channel(1); + let (mut executor, sender) = + Executor::new(pool.clone(), shutdown_tx.clone()).await.unwrap(); + tokio::spawn(async move { + executor.run().await.unwrap(); + }); let mut db = - Sql::new(pool.clone(), Felt::ZERO, &HashMap::from([(Felt::ZERO, ContractType::WORLD)])) + Sql::new(pool.clone(), sender, &HashMap::from([(Felt::ZERO, ContractType::WORLD)])) .await .unwrap(); let block_timestamp: u64 = 1710754478_u64; @@ -441,7 +472,8 @@ mod tests { }, Felt::ZERO, block_timestamp, - ); + ) + .unwrap(); db.execute().await.unwrap(); tx.send(()).await.unwrap(); diff --git a/crates/torii/grpc/Cargo.toml b/crates/torii/grpc/Cargo.toml index 492cc9da34..a9d4b00102 100644 --- a/crates/torii/grpc/Cargo.toml +++ b/crates/torii/grpc/Cargo.toml @@ -36,6 +36,7 @@ dojo-test-utils.workspace = true dojo-utils.workspace = true katana-runner.workspace = true scarb.workspace = true +tempfile.workspace = true [target.'cfg(target_arch = "wasm32")'.dependencies] tonic-web-wasm-client.workspace = true diff --git a/crates/torii/grpc/src/server/tests/entities_test.rs b/crates/torii/grpc/src/server/tests/entities_test.rs index c84e23775a..98388f466a 100644 --- a/crates/torii/grpc/src/server/tests/entities_test.rs +++ b/crates/torii/grpc/src/server/tests/entities_test.rs @@ -19,8 +19,10 @@ use starknet::core::utils::{get_contract_address, get_selector_from_name}; use starknet::providers::jsonrpc::HttpTransport; use starknet::providers::{JsonRpcClient, Provider}; use starknet_crypto::poseidon_hash_many; +use tempfile::NamedTempFile; use tokio::sync::broadcast; use torii_core::engine::{Engine, EngineConfig, Processors}; +use torii_core::executor::Executor; use torii_core::sql::Sql; use torii_core::types::ContractType; @@ -31,11 +33,17 @@ use crate::types::schema::Entity; #[tokio::test(flavor = "multi_thread")] #[katana_runner::test(accounts = 10, db_dir = copy_spawn_and_move_db().as_str())] async fn test_entities_queries(sequencer: &RunnerCtx) { - let options = SqliteConnectOptions::from_str("sqlite::memory:") - .unwrap() - .create_if_missing(true) - .with_regexp(); - let pool = SqlitePoolOptions::new().max_connections(5).connect_with(options).await.unwrap(); + let tempfile = NamedTempFile::new().unwrap(); + let path = tempfile.path().to_string_lossy(); + let options = + SqliteConnectOptions::from_str(&path).unwrap().create_if_missing(true).with_regexp(); + let pool = SqlitePoolOptions::new() + .min_connections(1) + .idle_timeout(None) + .max_lifetime(None) + .connect_with(options) + .await + .unwrap(); sqlx::migrate!("../migrations").run(&pool).await.unwrap(); let setup = CompilerTestSetup::from_examples("../../dojo-core", "../../../examples/"); @@ -91,14 +99,18 @@ async fn test_entities_queries(sequencer: &RunnerCtx) { TransactionWaiter::new(tx.transaction_hash, &provider).await.unwrap(); + let (shutdown_tx, _) = broadcast::channel(1); + let (mut executor, sender) = Executor::new(pool.clone(), shutdown_tx.clone()).await.unwrap(); + tokio::spawn(async move { + executor.run().await.unwrap(); + }); let db = Sql::new( pool.clone(), - strat.world_address, + sender, &HashMap::from([(strat.world_address, ContractType::WORLD)]), ) .await .unwrap(); - let world_address = strat.world_address; let (shutdown_tx, _) = broadcast::channel(1); let mut engine = Engine::new( @@ -109,13 +121,15 @@ async fn test_entities_queries(sequencer: &RunnerCtx) { EngineConfig::default(), shutdown_tx, None, - Arc::new(HashMap::from([(world_address, ContractType::WORLD)])), + Arc::new(HashMap::from([(strat.world_address, ContractType::WORLD)])), ); let to = provider.block_hash_and_number().await.unwrap().block_number; let data = engine.fetch_range(0, to, &HashMap::new()).await.unwrap(); engine.process_range(data).await.unwrap(); + db.execute().await.unwrap(); + let (_, receiver) = tokio::sync::mpsc::channel(1); let grpc = DojoWorld::new(db.pool, receiver, strat.world_address, provider.clone()); diff --git a/crates/torii/libp2p/src/server/mod.rs b/crates/torii/libp2p/src/server/mod.rs index aebf341a59..5682c8ac54 100644 --- a/crates/torii/libp2p/src/server/mod.rs +++ b/crates/torii/libp2p/src/server/mod.rs @@ -25,6 +25,7 @@ use starknet::core::types::{BlockId, BlockTag, Felt, FunctionCall}; use starknet::core::utils::get_selector_from_name; use starknet::providers::Provider; use starknet_crypto::poseidon_hash_many; +use torii_core::executor::QueryMessage; use torii_core::sql::utils::felts_to_sql_string; use torii_core::sql::Sql; use tracing::{info, warn}; @@ -530,7 +531,7 @@ async fn set_entity( keys: &str, ) -> anyhow::Result<()> { db.set_entity(ty, message_id, block_timestamp, entity_id, model_id, Some(keys)).await?; - db.execute().await?; + db.executor.send(QueryMessage::execute())?; Ok(()) } diff --git a/crates/torii/libp2p/src/tests.rs b/crates/torii/libp2p/src/tests.rs index d81355491a..dcc3af889f 100644 --- a/crates/torii/libp2p/src/tests.rs +++ b/crates/torii/libp2p/src/tests.rs @@ -1,6 +1,5 @@ #[cfg(test)] mod test { - use std::collections::HashMap; use std::error::Error; use crate::client::RelayClient; @@ -13,15 +12,9 @@ mod test { use crypto_bigint::U256; use dojo_types::primitive::Primitive; use dojo_types::schema::{Enum, EnumOption, Member, Struct, Ty}; - use dojo_world::contracts::abi::model::Layout; - use futures::StreamExt; use katana_runner::KatanaRunner; use serde_json::Number; - use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions}; use starknet::core::types::Felt; - use torii_core::simple_broker::SimpleBroker; - use torii_core::sql::Sql; - use torii_core::types::{ContractType, EventMessage}; #[cfg(target_arch = "wasm32")] use wasm_bindgen_test::*; @@ -542,8 +535,11 @@ mod test { use starknet::providers::JsonRpcClient; use starknet::signers::SigningKey; use starknet_crypto::Felt; + use tempfile::NamedTempFile; use tokio::select; + use tokio::sync::broadcast; use tokio::time::sleep; + use torii_core::executor::Executor; use torii_core::sql::Sql; use torii_core::types::ContractType; @@ -556,10 +552,18 @@ mod test { .try_init(); // Database - let options = ::from_str("sqlite::memory:") + let tempfile = NamedTempFile::new().unwrap(); + let path = tempfile.path().to_string_lossy(); + let options = ::from_str(&path) .unwrap() .create_if_missing(true); - let pool = SqlitePoolOptions::new().max_connections(5).connect_with(options).await.unwrap(); + let pool = SqlitePoolOptions::new() + .min_connections(1) + .idle_timeout(None) + .max_lifetime(None) + .connect_with(options) + .await + .unwrap(); sqlx::migrate!("../migrations").run(&pool).await.unwrap(); let sequencer = KatanaRunner::new().expect("Failed to create Katana sequencer"); @@ -568,8 +572,14 @@ mod test { let account = sequencer.account_data(0); + let (shutdown_tx, _) = broadcast::channel(1); + let (mut executor, sender) = + Executor::new(pool.clone(), shutdown_tx.clone()).await.unwrap(); + tokio::spawn(async move { + executor.run().await.unwrap(); + }); let mut db = - Sql::new(pool.clone(), Felt::ZERO, &HashMap::from([(Felt::ZERO, ContractType::WORLD)])) + Sql::new(pool.clone(), sender, &HashMap::from([(Felt::ZERO, ContractType::WORLD)])) .await .unwrap(); @@ -692,60 +702,6 @@ mod test { } } - // Test to verify that setting an entity message in the SQL database - // triggers a publish event on the broker - #[tokio::test] - async fn test_entity_message_trigger_publish() -> Result<(), Box> { - let _ = tracing_subscriber::fmt() - .with_env_filter("torii::relay::client=debug,torii::relay::server=debug") - .try_init(); - - let options = ::from_str("sqlite::memory:") - .unwrap() - .create_if_missing(true); - let pool = SqlitePoolOptions::new().max_connections(5).connect_with(options).await.unwrap(); - sqlx::migrate!("../migrations").run(&pool).await.unwrap(); - - let mut db = - Sql::new(pool.clone(), Felt::ZERO, &HashMap::from([(Felt::ZERO, ContractType::WORLD)])) - .await - .unwrap(); - let mut broker = SimpleBroker::::subscribe(); - - let entity = Ty::Struct(Struct { name: "Message".to_string(), children: vec![] }); - db.register_model( - "test_namespace", - entity.clone(), - Layout::Fixed(vec![]), - Felt::ZERO, - Felt::ZERO, - 0, - 0, - 0, - ) - .await?; - - // FIXME: register_model and set_event_message handle the name and namespace of entity type - // differently. - let entity = - Ty::Struct(Struct { name: "test_namespace-Message".to_string(), children: vec![] }); - - // Set the event message in the database - db.set_event_message(entity, "some_entity_id", 0).await?; - db.query_queue.execute_all().await?; - - // Check if a message was published to the broker - tokio::select! { - Some(message) = broker.next() => { - println!("Received message: {:?}", message); - Ok(()) - }, - _ = tokio::time::sleep(std::time::Duration::from_secs(5)) => { - Err("Timeout: No message received".into()) - } - } - } - #[cfg(target_arch = "wasm32")] #[wasm_bindgen_test] async fn test_client_connection_wasm() -> Result<(), Box> { diff --git a/examples/spawn-and-move/manifests/dev/deployment/manifest.json b/examples/spawn-and-move/manifests/dev/deployment/manifest.json index c3d4991c46..01e006eb03 100644 --- a/examples/spawn-and-move/manifests/dev/deployment/manifest.json +++ b/examples/spawn-and-move/manifests/dev/deployment/manifest.json @@ -1234,9 +1234,9 @@ ] } ], - "address": "0x46c1fd10836a8426197bf412fc5f26ea10f11a8d5c61474407f03f82c096593", - "transaction_hash": "0x7f540b040b1638b76a7f2a8fc13a33050d1c0556a63814f319a01d022b172cf", - "block_number": 3, + "address": "0x5fedbace16902d9ca4cdc1522f9fe156cd8c69a5d25e1436ee4b7b9933ad997", + "transaction_hash": "0x4c8e0d28e32c21f29f33ff68e245b65fcc91763abf53f284cce8c2274ff6115", + "block_number": 6, "seed": "dojo_examples", "metadata": { "profile_name": "dev", diff --git a/examples/spawn-and-move/manifests/dev/deployment/manifest.toml b/examples/spawn-and-move/manifests/dev/deployment/manifest.toml index 4e1ffec809..0835e3cb36 100644 --- a/examples/spawn-and-move/manifests/dev/deployment/manifest.toml +++ b/examples/spawn-and-move/manifests/dev/deployment/manifest.toml @@ -3,9 +3,9 @@ kind = "WorldContract" class_hash = "0x6f38d5d9507c5d9546290e1a27e309efe5a9af3770b6cc1627db4a1b90a7dce" original_class_hash = "0x6f38d5d9507c5d9546290e1a27e309efe5a9af3770b6cc1627db4a1b90a7dce" abi = "manifests/dev/deployment/abis/dojo-world.json" -address = "0x46c1fd10836a8426197bf412fc5f26ea10f11a8d5c61474407f03f82c096593" -transaction_hash = "0x7f540b040b1638b76a7f2a8fc13a33050d1c0556a63814f319a01d022b172cf" -block_number = 3 +address = "0x5fedbace16902d9ca4cdc1522f9fe156cd8c69a5d25e1436ee4b7b9933ad997" +transaction_hash = "0x4c8e0d28e32c21f29f33ff68e245b65fcc91763abf53f284cce8c2274ff6115" +block_number = 6 seed = "dojo_examples" manifest_name = "dojo-world"