Skip to content

Commit

Permalink
opt(torii): indexing model schema retrieval (#2319)
Browse files Browse the repository at this point in the history
* opt(torii): indexing model schema retrieval

* :fix and optimize grpc

* chore: clippy
  • Loading branch information
Larkooo authored Aug 21, 2024
1 parent 0955638 commit 54a891e
Show file tree
Hide file tree
Showing 11 changed files with 106 additions and 79 deletions.
75 changes: 61 additions & 14 deletions crates/torii/core/src/cache.rs
Original file line number Diff line number Diff line change
@@ -1,52 +1,87 @@
use std::collections::HashMap;

use dojo_types::schema::Ty;
use dojo_world::contracts::abi::model::Layout;
use sqlx::SqlitePool;
use starknet_crypto::Felt;
use tokio::sync::RwLock;

use crate::error::{Error, QueryError};
use crate::error::{Error, ParseError, QueryError};
use crate::model::{parse_sql_model_members, SqlModelMember};

#[derive(Debug, Clone)]
pub struct Model {
/// Namespace of the model
pub namespace: String,
/// The name of the model
pub name: String,
/// The selector of the model
pub selector: Felt,
/// The class hash of the model
pub class_hash: Felt,
/// The contract address of the model
pub contract_address: Felt,
pub packed_size: u32,
pub unpacked_size: u32,
pub layout: Layout,
pub schema: Ty,
}

#[derive(Debug)]
pub struct ModelCache {
pool: SqlitePool,
cache: RwLock<HashMap<Felt, Ty>>,
cache: RwLock<HashMap<Felt, Model>>,
}

impl ModelCache {
pub fn new(pool: SqlitePool) -> Self {
Self { pool, cache: RwLock::new(HashMap::new()) }
}

pub async fn schemas(&self, selectors: &[Felt]) -> Result<Vec<Ty>, Error> {
pub async fn models(&self, selectors: &[Felt]) -> Result<Vec<Model>, Error> {
let mut schemas = Vec::with_capacity(selectors.len());
for selector in selectors {
schemas.push(self.schema(selector).await?);
schemas.push(self.model(selector).await?);
}

Ok(schemas)
}

pub async fn schema(&self, selector: &Felt) -> Result<Ty, Error> {
pub async fn model(&self, selector: &Felt) -> Result<Model, Error> {
{
let cache = self.cache.read().await;
if let Some(model) = cache.get(selector).cloned() {
return Ok(model);
}
}

self.update_schema(selector).await
self.update_model(selector).await
}

async fn update_schema(&self, selector: &Felt) -> Result<Ty, Error> {
async fn update_model(&self, selector: &Felt) -> Result<Model, Error> {
let formatted_selector = format!("{:#x}", selector);

let (namespace, name): (String, String) =
sqlx::query_as("SELECT namespace, name FROM models WHERE id = ?")
.bind(formatted_selector.clone())
.fetch_one(&self.pool)
.await?;
let (namespace, name, class_hash, contract_address, packed_size, unpacked_size, layout): (
String,
String,
String,
String,
u32,
u32,
String,
) = sqlx::query_as(
"SELECT namespace, name, class_hash, contract_address, packed_size, unpacked_size, \
layout FROM models WHERE id = ?",
)
.bind(format!("{:#x}", selector))
.fetch_one(&self.pool)
.await?;

let class_hash = Felt::from_hex(&class_hash).map_err(ParseError::FromStr)?;
let contract_address = Felt::from_hex(&contract_address).map_err(ParseError::FromStr)?;

let layout = serde_json::from_str(&layout).map_err(ParseError::FromJsonStr)?;

let model_members: Vec<SqlModelMember> = sqlx::query_as(
"SELECT id, model_idx, member_idx, name, type, type_enum, enum_options, key FROM \
model_members WHERE model_id = ? ORDER BY model_idx ASC, member_idx ASC",
Expand All @@ -61,9 +96,21 @@ impl ModelCache {

let schema = parse_sql_model_members(&namespace, &name, &model_members);
let mut cache = self.cache.write().await;
cache.insert(*selector, schema.clone());

Ok(schema)
let model = Model {
namespace,
name,
selector: *selector,
class_hash,
contract_address,
packed_size,
unpacked_size,
layout,
schema,
};
cache.insert(*selector, model.clone());

Ok(model)
}

pub async fn clear(&self) {
Expand Down
5 changes: 2 additions & 3 deletions crates/torii/core/src/processors/event_message.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use anyhow::{Error, Result};
use async_trait::async_trait;
use dojo_world::contracts::model::ModelReader;
use dojo_world::contracts::world::WorldContractReader;
use starknet::core::types::{Event, TransactionReceiptWithBlockInfo};
use starknet::providers::Provider;
Expand Down Expand Up @@ -54,7 +53,7 @@ where

info!(
target: LOG_TARGET,
model = %model.name(),
model = %model.name,
"Store event message."
);

Expand All @@ -63,7 +62,7 @@ where
let mut keys_and_unpacked =
[event.keys[1..event.keys.len() - 1].to_vec(), event.data.clone()].concat();

let mut entity = model.schema().await?;
let mut entity = model.schema.clone();
entity.deserialize(&mut keys_and_unpacked)?;

db.set_event_message(entity, event_id, block_timestamp).await?;
Expand Down
5 changes: 2 additions & 3 deletions crates/torii/core/src/processors/store_del_record.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use anyhow::{Error, Ok, Result};
use async_trait::async_trait;
use dojo_world::contracts::model::ModelReader;
use dojo_world::contracts::world::WorldContractReader;
use starknet::core::types::{Event, TransactionReceiptWithBlockInfo};
use starknet::providers::Provider;
Expand Down Expand Up @@ -53,12 +52,12 @@ where

info!(
target: LOG_TARGET,
name = %model.name(),
name = %model.name,
"Store delete record."
);

let entity_id = event.data[ENTITY_ID_INDEX];
let entity = model.schema().await?;
let entity = model.schema;

db.delete_entity(entity_id, entity, event_id, block_timestamp).await?;

Expand Down
5 changes: 2 additions & 3 deletions crates/torii/core/src/processors/store_set_record.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use anyhow::{Context, Error, Ok, Result};
use async_trait::async_trait;
use dojo_world::contracts::model::ModelReader;
use dojo_world::contracts::world::WorldContractReader;
use num_traits::ToPrimitive;
use starknet::core::types::{Event, TransactionReceiptWithBlockInfo};
Expand Down Expand Up @@ -54,7 +53,7 @@ where

info!(
target: LOG_TARGET,
name = %model.name(),
name = %model.name,
"Store set record.",
);

Expand All @@ -72,7 +71,7 @@ where
let values = event.data[values_start..values_end].to_vec();
let mut keys_and_unpacked = [keys, values].concat();

let mut entity = model.schema().await?;
let mut entity = model.schema;
entity.deserialize(&mut keys_and_unpacked)?;

db.set_entity(entity, event_id, block_timestamp).await?;
Expand Down
7 changes: 3 additions & 4 deletions crates/torii/core/src/processors/store_update_member.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use anyhow::{Context, Error, Result};
use async_trait::async_trait;
use dojo_world::contracts::model::ModelReader;
use dojo_world::contracts::naming;
use dojo_world::contracts::world::WorldContractReader;
use num_traits::ToPrimitive;
Expand Down Expand Up @@ -57,7 +56,7 @@ where
let member_selector = event.data[MEMBER_INDEX];

let model = db.model(selector).await?;
let schema = model.schema().await?;
let schema = model.schema;

let mut member = schema
.as_struct()
Expand All @@ -73,7 +72,7 @@ where

info!(
target: LOG_TARGET,
name = %model.name(),
name = %model.name,
entity_id = format!("{:#x}", entity_id),
member = %member.name,
"Store update member.",
Expand All @@ -86,7 +85,7 @@ where
// Skip the length to only get the values as they will be deserialized.
let mut values = event.data[values_start + 1..=values_end].to_vec();

let tag = naming::get_tag(model.namespace(), model.name());
let tag = naming::get_tag(&model.namespace, &model.name);

if !db.does_entity_exist(tag.clone(), entity_id).await? {
warn!(
Expand Down
7 changes: 3 additions & 4 deletions crates/torii/core/src/processors/store_update_record.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use anyhow::{Context, Error, Ok, Result};
use async_trait::async_trait;
use dojo_world::contracts::model::ModelReader;
use dojo_world::contracts::naming;
use dojo_world::contracts::world::WorldContractReader;
use num_traits::ToPrimitive;
Expand Down Expand Up @@ -56,7 +55,7 @@ where

info!(
target: LOG_TARGET,
name = %model.name(),
name = %model.name,
entity_id = format!("{:#x}", entity_id),
"Store update record.",
);
Expand All @@ -68,14 +67,14 @@ where
// Skip the length to only get the values as they will be deserialized.
let values = event.data[values_start + 1..=values_end].to_vec();

let tag = naming::get_tag(model.namespace(), model.name());
let tag = naming::get_tag(&model.namespace, &model.name);

// Keys are read from the db, since we don't have access to them when only
// the entity id is passed.
let keys = db.get_entity_keys(entity_id, &tag).await?;
let mut keys_and_unpacked = [keys, values].concat();

let mut entity = model.schema().await?;
let mut entity = model.schema;
entity.deserialize(&mut keys_and_unpacked)?;

db.set_entity(entity, event_id, block_timestamp).await?;
Expand Down
20 changes: 11 additions & 9 deletions crates/torii/core/src/sql.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::convert::TryInto;
use std::str::FromStr;
use std::sync::Arc;

use anyhow::{anyhow, Result};
use chrono::Utc;
Expand All @@ -13,7 +14,7 @@ use sqlx::{Pool, Row, Sqlite};
use starknet::core::types::{Event, Felt, InvokeTransaction, Transaction};
use starknet_crypto::poseidon_hash_many;

use crate::model::ModelSQLReader;
use crate::cache::{Model, ModelCache};
use crate::query_queue::{Argument, QueryQueue};
use crate::simple_broker::SimpleBroker;
use crate::types::{
Expand All @@ -37,6 +38,7 @@ pub struct Sql {
world_address: Felt,
pub pool: Pool<Sqlite>,
query_queue: QueryQueue,
model_cache: Arc<ModelCache>,
}

impl Sql {
Expand All @@ -54,7 +56,12 @@ impl Sql {

query_queue.execute_all().await?;

Ok(Self { pool, world_address, query_queue })
Ok(Self {
pool: pool.clone(),
world_address,
query_queue,
model_cache: Arc::new(ModelCache::new(pool)),
})
}

pub async fn head(&self) -> Result<(u64, Option<Felt>)> {
Expand Down Expand Up @@ -411,13 +418,8 @@ impl Sql {
Ok(())
}

pub async fn model(&self, selector: Felt) -> Result<ModelSQLReader> {
match ModelSQLReader::new(selector, self.pool.clone()).await {
Ok(reader) => Ok(reader),
Err(e) => {
Err(anyhow::anyhow!("Failed to get model from db for selector {selector:#x}: {e}"))
}
}
pub async fn model(&self, selector: Felt) -> Result<Model> {
self.model_cache.model(&selector).await.map_err(|e| e.into())
}

/// Retrieves the keys definition for a given model.
Expand Down
Loading

0 comments on commit 54a891e

Please sign in to comment.