Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(torii): POW inter torii messaging through libp2p eg. for replicas #2868

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/torii/client/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ impl Client {
) -> Result<Self, Error> {
let mut grpc_client = torii_grpc::client::WorldClient::new(torii_url, world).await?;

let relay_client = torii_relay::client::RelayClient::new(relay_url)?;
let relay_client = torii_relay::client::RelayClient::new(relay_url, None)?;

let metadata = grpc_client.metadata().await?;

Expand Down
8 changes: 4 additions & 4 deletions crates/torii/graphql/src/mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,10 @@ lazy_static! {
Name::new("contractAddress"),
TypeData::Simple(TypeRef::named(Primitive::Felt252(None).to_string())),
),
(
Name::new("transactionHash"),
TypeData::Simple(TypeRef::named(Primitive::Felt252(None).to_string())),
),
// (
// Name::new("transactionHash"),
// TypeData::Simple(TypeRef::named(Primitive::Felt252(None).to_string())),
// ),
(
Name::new("executedAt"),
TypeData::Simple(TypeRef::named(GraphqlType::DateTime.to_string())),
Expand Down
13 changes: 8 additions & 5 deletions crates/torii/graphql/src/object/model.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
use std::str::FromStr;

use async_graphql::dynamic::indexmap::IndexMap;
use async_graphql::dynamic::{
Enum, Field, InputObject, InputValue, SubscriptionField, SubscriptionFieldFuture, TypeRef,
};
use async_graphql::{Name, Value};
use starknet_crypto::Felt;
use tokio_stream::StreamExt;
use torii_sqlite::simple_broker::SimpleBroker;
use torii_sqlite::types::Model;
Expand Down Expand Up @@ -80,7 +83,7 @@ impl ResolvableObject for ModelObject {
{
SubscriptionFieldFuture::new(async move {
let id = match ctx.args.get("id") {
Some(id) => Some(id.string()?.to_string()),
Some(id) => Some(Felt::from_str(&id.string()?.to_string())?),
None => None,
};
// if id is None, then subscribe to all models
Expand All @@ -104,12 +107,12 @@ impl ResolvableObject for ModelObject {
impl ModelObject {
pub fn value_mapping(model: Model) -> ValueMapping {
IndexMap::from([
(Name::new("id"), Value::from(model.id)),
(Name::new("id"), Value::from(format!("{:#x}", model.id))),
(Name::new("name"), Value::from(model.name)),
(Name::new("namespace"), Value::from(model.namespace)),
(Name::new("classHash"), Value::from(model.class_hash)),
(Name::new("contractAddress"), Value::from(model.contract_address)),
(Name::new("transactionHash"), Value::from(model.transaction_hash)),
(Name::new("classHash"), Value::from(format!("{:#x}", model.class_hash))),
(Name::new("contractAddress"), Value::from(format!("{:#x}", model.contract_address))),
// (Name::new("transactionHash"), Value::from(format!("{:#x}", model.transaction_hash))),
(
Name::new("createdAt"),
Value::from(model.created_at.format(DATETIME_FORMAT).to_string()),
Expand Down
3 changes: 1 addition & 2 deletions crates/torii/graphql/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,7 @@ async fn build_objects(pool: &SqlitePool) -> Result<(Vec<ObjectVariant>, Vec<Uni

// model data objects
for model in models {
let schema: Ty = serde_json::from_str(&model.schema)
.map_err(|e| anyhow::anyhow!(format!("Failed to parse model schema: {e}")))?;
let schema: Ty = model.schema;
let type_mapping = build_type_mapping(&model.namespace, &schema);

if !type_mapping.is_empty() {
Expand Down
3 changes: 1 addition & 2 deletions crates/torii/grpc/src/server/subscriptions/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,7 @@ impl Service {
update: &ContractUpdated,
) -> Result<(), Error> {
let mut closed_stream = Vec::new();
let contract_address =
Felt::from_str(&update.contract_address).map_err(ParseError::FromStr)?;
let contract_address = update.contract_address;

for (idx, sub) in subs.subscribers.read().await.iter() {
if sub.contract_address != Felt::ZERO && sub.contract_address != contract_address {
Expand Down
109 changes: 105 additions & 4 deletions crates/torii/libp2p/src/client/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;

use dojo_types::naming::compute_selector_from_tag;
use dojo_world::contracts::abigen::model::Layout;
use futures::channel::mpsc::{UnboundedReceiver, UnboundedSender};
use futures::channel::oneshot;
use futures::lock::Mutex;
Expand All @@ -12,13 +15,17 @@ use libp2p::swarm::{NetworkBehaviour, Swarm, SwarmEvent};
#[cfg(not(target_arch = "wasm32"))]
use libp2p::tcp;
use libp2p::{identify, identity, noise, ping, yamux, Multiaddr, PeerId};
use starknet::core::types::Event;
use starknet_crypto::Felt;
use torii_sqlite::executor::QueryMessage;
use torii_sqlite::Sql;
use tracing::info;

pub mod events;
use crate::client::events::ClientEvent;
use crate::constants;
use crate::error::Error;
use crate::types::Message;
use crate::types::{Message, Update};

pub(crate) const LOG_TARGET: &str = "torii::relay::client";

Expand All @@ -39,6 +46,7 @@ pub struct RelayClient {
#[allow(missing_debug_implementations)]
pub struct EventLoop {
swarm: Swarm<Behaviour>,
sql: Option<Sql>,
command_receiver: UnboundedReceiver<Command>,
}

Expand All @@ -49,7 +57,7 @@ enum Command {

impl RelayClient {
#[cfg(not(target_arch = "wasm32"))]
pub fn new(relay_addr: String) -> Result<Self, Error> {
pub fn new(relay_addr: String, replica_db: Option<Sql>) -> Result<Self, Error> {
let local_key = identity::Keypair::generate_ed25519();
let peer_id = PeerId::from(local_key.public());

Expand Down Expand Up @@ -87,18 +95,30 @@ impl RelayClient {
})
.build();

if replica_db.is_some() {
swarm
.behaviour_mut()
.gossipsub
.subscribe(&IdentTopic::new(constants::UPDATE_MESSAGING_TOPIC))?;
}

info!(target: LOG_TARGET, addr = %relay_addr, "Dialing relay.");
swarm.dial(relay_addr.parse::<Multiaddr>()?)?;

let (command_sender, command_receiver) = futures::channel::mpsc::unbounded();
Ok(Self {
command_sender: CommandSender::new(command_sender),
event_loop: Arc::new(Mutex::new(EventLoop { swarm, command_receiver })),
event_loop: Arc::new(Mutex::new(EventLoop {
swarm,
command_receiver,
sql: replica_db,
})),
})
}

#[cfg(target_arch = "wasm32")]
pub fn new(relay_addr: String) -> Result<Self, Error> {
// We are never gonna be a replica in the browser.
pub fn new(relay_addr: String, _replica_db: Option<Sql>) -> Result<Self, Error> {
let local_key = identity::Keypair::generate_ed25519();
let peer_id = PeerId::from(local_key.public());

Expand Down Expand Up @@ -195,6 +215,82 @@ impl EventLoop {
}
}

async fn handle_update(&mut self, update: Update) {
// TODO: Implement update handling.
info!(target: LOG_TARGET, update = ?update, "Received update.");
// We can safely unwrap because we subscribe to updates only if replica_db is provided.
let sql = self.sql.as_mut().unwrap();

match update {
Update::Head(cursor) => {
sql.set_head(
cursor.head as u64,
cursor.last_block_timestamp as u64,
0,
cursor.contract_address,
)
.await
.unwrap();
}
Update::Model(model) => {
sql.register_model(
&model.namespace,
&model.schema,
model.layout,
model.class_hash,
model.contract_address,
model.packed_size,
model.unpacked_size,
model.executed_at.timestamp() as u64,
None,
)
.await
.unwrap();
}
Update::Entity(entity) => {
let id = Felt::from_str(&entity.id).unwrap();
let model = entity.updated_model.unwrap();
let model_id = compute_selector_from_tag(&model.name());
if entity.deleted {
sql.delete_entity(
id,
model_id,
model,
&entity.event_id,
entity.executed_at.timestamp() as u64,
)
.await
.unwrap();
} else {
sql.set_entity(
model,
&entity.event_id,
entity.executed_at.timestamp() as u64,
id,
model_id,
Some(&entity.keys),
)
.await
.unwrap();
}
}
Update::EventMessage(event_message) => {
let model = event_message.updated_model.unwrap();
sql.set_event_message(
model,
&event_message.event_id,
event_message.executed_at.timestamp() as u64,
event_message.historical,
)
.await
.unwrap();
}
Update::Event(event) => {
// TODO
}
}
}

pub async fn run(&mut self) {
let mut is_relay_ready = false;
let commands_queue = Arc::new(Mutex::new(Vec::new()));
Expand All @@ -207,6 +303,11 @@ impl EventLoop {
},
event = self.swarm.select_next_some() => {
match event {
SwarmEvent::Behaviour(ClientEvent::Gossipsub(gossipsub::Event::Message { message, .. })) => {
if let Ok(update) = serde_json::from_slice::<Update>(&message.data) {
self.handle_update(update).await;
}
},
SwarmEvent::Behaviour(ClientEvent::Gossipsub(gossipsub::Event::Subscribed { topic, .. })) => {
// Handle behaviour events.
info!(target: LOG_TARGET, topic = ?topic, "Relay ready. Received subscription confirmation.");
Expand Down
1 change: 1 addition & 0 deletions crates/torii/libp2p/src/constants.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub(crate) const GOSSIPSUB_HEARTBEAT_INTERVAL_SECS: u64 = 10;
pub(crate) const MESSAGING_TOPIC: &str = "message";
pub(crate) const UPDATE_MESSAGING_TOPIC: &str = "update";
pub(crate) const IDLE_CONNECTION_TIMEOUT_SECS: u64 = 60;
2 changes: 1 addition & 1 deletion crates/torii/libp2p/src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ async fn test_client_messaging() -> Result<(), Box<dyn Error>> {
});

// Initialize the first client (listener)
let client = RelayClient::new("/ip4/127.0.0.1/tcp/9900".to_string())?;
let client = RelayClient::new("/ip4/127.0.0.1/tcp/9900".to_string(), None)?;
tokio::spawn(async move {
client.event_loop.lock().await.run().await;
});
Expand Down
16 changes: 16 additions & 0 deletions crates/torii/libp2p/src/types.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,25 @@
use serde::{Deserialize, Serialize};
use starknet::core::types::Felt;
use torii_sqlite::types::{ContractCursor, Entity, Event, EventMessage, Model};
use torii_typed_data::TypedData;

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Message {
pub message: TypedData,
pub signature: Vec<Felt>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum Update {
// Latest contract head
Head(ContractCursor),
// Registered model
Model(Model),
// Updated entity state
Entity(Entity),
// Indexed event message
EventMessage(EventMessage),
// Indexed raw event
Event(Event),
// TODO: Add more types of updates here.
}
57 changes: 5 additions & 52 deletions crates/torii/sqlite/src/cache.rs
Original file line number Diff line number Diff line change
@@ -1,34 +1,14 @@
use std::collections::{HashMap, HashSet};

use dojo_types::schema::Ty;
use dojo_world::contracts::abigen::model::Layout;
use sqlx::{Pool, Sqlite, SqlitePool};
use starknet_crypto::Felt;
use tokio::sync::RwLock;

use crate::constants::TOKEN_BALANCE_TABLE;
use crate::error::{Error, ParseError};
use crate::types::ContractType;
use crate::error::Error;
use crate::types::{ContractType, Model};
use crate::utils::I256;

#[derive(Debug, Clone)]
pub struct Model {
/// Namespace of the model
pub namespace: String,
/// The name of the model
pub name: String,
/// The selector of the model
pub selector: Felt,
/// The class hash of the model
pub class_hash: Felt,
/// The contract address of the model
pub contract_address: Felt,
pub packed_size: u32,
pub unpacked_size: u32,
pub layout: Layout,
pub schema: Ty,
}

#[derive(Debug)]
pub struct ModelCache {
pool: SqlitePool,
Expand Down Expand Up @@ -65,42 +45,15 @@ impl ModelCache {
}

async fn update_model(&self, selector: &Felt) -> Result<Model, Error> {
let (
namespace,
name,
class_hash,
contract_address,
packed_size,
unpacked_size,
layout,
schema,
): (String, String, String, String, u32, u32, String, String) = sqlx::query_as(
"SELECT namespace, name, class_hash, contract_address, packed_size, unpacked_size, \
layout, schema FROM models WHERE id = ?",
let model: Model = sqlx::query_as(
"SELECT id, namespace, name, class_hash, contract_address, transaction_hash, packed_size, unpacked_size, \
layout, schema, executed_at, created_at FROM models WHERE id = ?",
)
.bind(format!("{:#x}", selector))
.fetch_one(&self.pool)
.await?;

let class_hash = Felt::from_hex(&class_hash).map_err(ParseError::FromStr)?;
let contract_address = Felt::from_hex(&contract_address).map_err(ParseError::FromStr)?;

let layout = serde_json::from_str(&layout).map_err(ParseError::FromJsonStr)?;
let schema = serde_json::from_str(&schema).map_err(ParseError::FromJsonStr)?;

let mut cache = self.model_cache.write().await;

let model = Model {
namespace,
name,
selector: *selector,
class_hash,
contract_address,
packed_size,
unpacked_size,
layout,
schema,
};
cache.insert(*selector, model.clone());

Ok(model)
Expand Down
Loading
Loading