Skip to content

Commit

Permalink
Merge pull request #58 from dojoengine/tps
Browse files Browse the repository at this point in the history
feat: add indexer update subscriptions
  • Loading branch information
Larkooo authored Oct 9, 2024
2 parents c9578c5 + 5a0cda4 commit 5dac7d0
Show file tree
Hide file tree
Showing 12 changed files with 515 additions and 589 deletions.
755 changes: 257 additions & 498 deletions Cargo.lock

Large diffs are not rendered by default.

18 changes: 10 additions & 8 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,23 +1,24 @@
[package]
edition = "2021"
name = "dojo-c"
version = "1.0.0-alpha.11"
version = "1.0.0-alpha.14"

[lib]
crate-type = ["cdylib", "rlib", "staticlib"]

[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
dojo-world = { git = "https://github.com/dojoengine/dojo", rev = "d039c6d46f819edc3f2161c0520b8c8fecec0092", features = ["metadata"]}

[dependencies]
dojo-types = { git = "https://github.com/dojoengine/dojo", rev = "91a0fd0eecf02227528a74df4eae3e9e0458338e" }
torii-client = { git = "https://github.com/dojoengine/dojo", rev = "91a0fd0eecf02227528a74df4eae3e9e0458338e" }
dojo-types = { git = "https://github.com/dojoengine/dojo", rev = "d039c6d46f819edc3f2161c0520b8c8fecec0092" }
torii-client = { git = "https://github.com/dojoengine/dojo", rev = "d039c6d46f819edc3f2161c0520b8c8fecec0092" }
torii-grpc = { git = "https://github.com/dojoengine/dojo", features = [
"client",
], rev = "91a0fd0eecf02227528a74df4eae3e9e0458338e" }
torii-relay = { git = "https://github.com/dojoengine/dojo", rev = "91a0fd0eecf02227528a74df4eae3e9e0458338e" }
dojo-world = { git = "https://github.com/dojoengine/dojo", rev = "91a0fd0eecf02227528a74df4eae3e9e0458338e" }

], rev = "d039c6d46f819edc3f2161c0520b8c8fecec0092" }
torii-relay = { git = "https://github.com/dojoengine/dojo", rev = "d039c6d46f819edc3f2161c0520b8c8fecec0092" }

starknet = "0.11.0"
starknet-crypto = "0.7.1"
starknet-crypto = "0.7.2"

parking_lot = "0.12.1"
tokio = { version = "1.39.2", default-features = false, features = ["rt"] }
Expand All @@ -34,6 +35,7 @@ cainome = { git = "https://github.com/cartridge-gg/cainome", tag = "v0.3.2" }

# WASM
[target.'cfg(target_arch = "wasm32")'.dependencies]
dojo-world = { git = "https://github.com/dojoengine/dojo", rev = "d039c6d46f819edc3f2161c0520b8c8fecec0092", features = []}
serde-wasm-bindgen = "0.6.3"
wasm-bindgen-futures = "0.4.39"
js-sys = "0.3.70"
Expand Down
11 changes: 11 additions & 0 deletions dojo.h
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,13 @@ typedef struct Resultbool {
};
} Resultbool;

typedef struct IndexerUpdate {
int64_t head;
int64_t tps;
int64_t last_block_timestamp;
struct FieldElement contract_address;
} IndexerUpdate;

typedef enum ResultCArrayFieldElement_Tag {
OkCArrayFieldElement,
ErrCArrayFieldElement,
Expand Down Expand Up @@ -644,6 +651,10 @@ struct Resultbool client_update_event_message_subscription(struct ToriiClient *c
const struct EntityKeysClause *clauses,
uintptr_t clauses_len);

struct ResultSubscription on_indexer_update(struct ToriiClient *client,
const struct FieldElement *contract_address,
void (*callback)(struct IndexerUpdate));

struct ResultCArrayFieldElement bytearray_serialize(const char *str);

struct Resultc_char bytearray_deserialize(const struct FieldElement *felts, uintptr_t felts_len);
Expand Down
11 changes: 11 additions & 0 deletions dojo.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -786,6 +786,13 @@ struct EntityKeysClause {
}
};

struct IndexerUpdate {
int64_t head;
int64_t tps;
int64_t last_block_timestamp;
FieldElement contract_address;
};

struct Signature {
/// The `r` value of a signature
FieldElement r;
Expand Down Expand Up @@ -900,6 +907,10 @@ Result<bool> client_update_event_message_subscription(ToriiClient *client,
const EntityKeysClause *clauses,
uintptr_t clauses_len);

Result<Subscription*> on_indexer_update(ToriiClient *client,
const FieldElement *contract_address,
void (*callback)(IndexerUpdate));

Result<CArray<FieldElement>> bytearray_serialize(const char *str);

Result<const char*> bytearray_deserialize(const FieldElement *felts, uintptr_t felts_len);
Expand Down
10 changes: 10 additions & 0 deletions dojo.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,12 @@ cdef extern from *:
bool ok;
Error err;

cdef struct IndexerUpdate:
int64_t head;
int64_t tps;
int64_t last_block_timestamp;
FieldElement contract_address;

cdef enum ResultCArrayFieldElement_Tag:
OkCArrayFieldElement,
ErrCArrayFieldElement,
Expand Down Expand Up @@ -409,6 +415,10 @@ cdef extern from *:
const EntityKeysClause *clauses,
uintptr_t clauses_len);

ResultSubscription on_indexer_update(ToriiClient *client,
const FieldElement *contract_address,
void (*callback)(IndexerUpdate));

ResultCArrayFieldElement bytearray_serialize(const char *str);

Resultc_char bytearray_deserialize(const FieldElement *felts, uintptr_t felts_len);
Expand Down
2 changes: 1 addition & 1 deletion rustfmt.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ imports_granularity = "Module"
normalize_comments = true
normalize_doc_attributes = true
unstable_features = true
version = "Two"
style_edition = "2024"
wrap_comments = true

# To use these settings in vscode, add the following line to your settings.json file:
Expand Down
103 changes: 63 additions & 40 deletions src/c/mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
mod types;

use std::ffi::{c_void, CStr, CString};
use std::ffi::{CStr, CString, c_void};
use std::ops::Deref;
use std::os::raw::c_char;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Duration;

use cainome::cairo_serde::{self, ByteArray, CairoSerde};
Expand All @@ -17,14 +17,14 @@ use starknet::core::utils::get_contract_address;
use starknet::providers::jsonrpc::HttpTransport;
use starknet::providers::{JsonRpcClient, Provider as _};
use starknet::signers::{LocalWallet, SigningKey, VerifyingKey};
use starknet_crypto::{poseidon_hash_many, Felt};
use starknet_crypto::{Felt, poseidon_hash_many};
use stream_cancel::{StreamExt as _, Tripwire};
use tokio::time::sleep;
use tokio_stream::StreamExt;
use torii_client::client::Client as TClient;
use torii_relay::typed_data::TypedData;
use torii_relay::types::Message;
use types::{EntityKeysClause, Struct};
use types::{EntityKeysClause, IndexerUpdate, Struct};

use self::types::{
BlockId, CArray, Call, Entity, Error, Query, Result, Signature, ToriiClient, Ty, WorldMetadata,
Expand Down Expand Up @@ -159,24 +159,6 @@ pub unsafe extern "C" fn client_on_entity_state_update(

let subscription = Subscription { id: Arc::clone(&subscription_id), trigger };

// Create the first subscription and get the ID on the main thread
let entity_stream = client.inner.on_entity_updated(clauses.clone());
let mut rcv = match client.runtime.block_on(entity_stream) {
Ok(rcv) => rcv,
Err(e) => return Result::Err(e.into()),
};

match client.runtime.block_on(rcv.next()) {
Some(Ok((id, _))) => {
subscription_id.store(id, Ordering::SeqCst);
}
_ => {
return Result::Err(Error {
message: CString::new("failed to get initial subscription id").unwrap().into_raw(),
});
}
}

// Spawn a new thread to handle the stream and reconnections
let client_clone = client.clone();
let subscription_id_clone = Arc::clone(&subscription_id);
Expand Down Expand Up @@ -260,24 +242,6 @@ pub unsafe extern "C" fn client_on_event_message_update(

let subscription = Subscription { id: Arc::clone(&subscription_id), trigger };

// Create the first subscription and get the ID on the main thread
let entity_stream = client.inner.on_event_message_updated(clauses.clone());
let mut rcv = match client.runtime.block_on(entity_stream) {
Ok(rcv) => rcv,
Err(e) => return Result::Err(e.into()),
};

match client.runtime.block_on(rcv.next()) {
Some(Ok((id, _))) => {
subscription_id.store(id, Ordering::SeqCst);
}
_ => {
return Result::Err(Error {
message: CString::new("failed to get initial subscription id").unwrap().into_raw(),
});
}
}

// Spawn a new thread to handle the stream and reconnections
let client_clone = client.clone();
let subscription_id_clone = Arc::clone(&subscription_id);
Expand Down Expand Up @@ -344,6 +308,65 @@ pub unsafe extern "C" fn client_update_event_message_subscription(
}
}

#[no_mangle]
#[allow(clippy::missing_safety_doc)]
pub unsafe extern "C" fn on_indexer_update(
client: *mut ToriiClient,
contract_address: *const types::FieldElement,
callback: unsafe extern "C" fn(IndexerUpdate),
) -> Result<*mut Subscription> {
let client = Arc::from_raw(client);
let contract_address = if contract_address.is_null() {
None
} else {
Some(unsafe { (&*contract_address).into() })
};

let subscription_id = Arc::new(AtomicU64::new(0));
let (trigger, tripwire) = Tripwire::new();

let subscription = Subscription { id: Arc::clone(&subscription_id), trigger };

// Spawn a new thread to handle the stream and reconnections
let client_clone = client.clone();
client.runtime.spawn(async move {
let mut backoff = Duration::from_secs(1);
let max_backoff = Duration::from_secs(60);

loop {
let rcv = client_clone.inner.on_indexer_updated(contract_address).await;

match rcv {
Ok(rcv) => {
backoff = Duration::from_secs(1); // Reset backoff on successful connection

let mut rcv = rcv.take_until_if(tripwire.clone());

while let Some(Ok(update)) = rcv.next().await {
callback((&update).into());
}
}
Err(_) => {
// Check if the tripwire has been triggered before attempting to reconnect
if tripwire.clone().await {
break; // Exit the loop if the subscription has been cancelled
}
}
}

// If we've reached this point, the stream has ended (possibly due to disconnection)
// We'll try to reconnect after a delay, unless the tripwire has been triggered
if tripwire.clone().await {
break; // Exit the loop if the subscription has been cancelled
}
sleep(backoff).await;
backoff = std::cmp::min(backoff * 2, max_backoff);
}
});

Result::Ok(Box::into_raw(Box::new(subscription)))
}

#[no_mangle]
#[allow(clippy::missing_safety_doc)]
pub unsafe extern "C" fn bytearray_serialize(
Expand Down
51 changes: 44 additions & 7 deletions src/c/types.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::ffi::{c_char, CStr, CString};
use std::ffi::{CStr, CString, c_char};

use starknet::core::utils::get_selector_from_name;
use torii_client::client::Client;
Expand Down Expand Up @@ -51,6 +51,37 @@ impl<T> From<COption<T>> for Option<T> {
}
}

#[derive(Debug, Clone)]
#[repr(C)]
pub struct IndexerUpdate {
pub head: i64,
pub tps: i64,
pub last_block_timestamp: i64,
pub contract_address: FieldElement,
}

impl From<&IndexerUpdate> for torii_grpc::types::IndexerUpdate {
fn from(val: &IndexerUpdate) -> Self {
torii_grpc::types::IndexerUpdate {
head: val.head,
tps: val.tps,
last_block_timestamp: val.last_block_timestamp,
contract_address: (&val.contract_address).into(),
}
}
}

impl From<&torii_grpc::types::IndexerUpdate> for IndexerUpdate {
fn from(val: &torii_grpc::types::IndexerUpdate) -> Self {
IndexerUpdate {
head: val.head,
tps: val.tps,
last_block_timestamp: val.last_block_timestamp,
contract_address: (&val.contract_address).into(),
}
}
}

#[derive(Debug, Clone)]
#[repr(C)]
pub struct Signature {
Expand Down Expand Up @@ -271,23 +302,29 @@ pub enum MemberValue {
impl From<&MemberValue> for torii_grpc::types::MemberValue {
fn from(val: &MemberValue) -> Self {
match val {
MemberValue::Primitive(primitive) => torii_grpc::types::MemberValue::Primitive((&primitive.clone()).into()),
MemberValue::String(string) => torii_grpc::types::MemberValue::String(unsafe { CStr::from_ptr(*string).to_string_lossy().to_string() }),
MemberValue::Primitive(primitive) => {
torii_grpc::types::MemberValue::Primitive((&primitive.clone()).into())
}
MemberValue::String(string) => torii_grpc::types::MemberValue::String(unsafe {
CStr::from_ptr(*string).to_string_lossy().to_string()
}),
}
}
}

impl From<&torii_grpc::types::MemberValue> for MemberValue {
fn from(val: &torii_grpc::types::MemberValue) -> Self {
match val {
torii_grpc::types::MemberValue::Primitive(primitive) => MemberValue::Primitive((&primitive.clone()).into()),
torii_grpc::types::MemberValue::String(string) => MemberValue::String(CString::new(string.clone()).unwrap().into_raw()),
torii_grpc::types::MemberValue::Primitive(primitive) => {
MemberValue::Primitive((&primitive.clone()).into())
}
torii_grpc::types::MemberValue::String(string) => {
MemberValue::String(CString::new(string.clone()).unwrap().into_raw())
}
}
}
}



#[derive(Clone, Debug)]
#[repr(C)]
pub struct MemberClause {
Expand Down
4 changes: 2 additions & 2 deletions src/types.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use std::ffi::c_char;
use std::sync::atomic::AtomicU64;
use std::sync::Arc;
use std::sync::atomic::AtomicU64;

use starknet::accounts::SingleOwnerAccount;
use starknet::providers::jsonrpc::HttpTransport;
use starknet::providers::JsonRpcClient;
use starknet::providers::jsonrpc::HttpTransport;
use starknet::signers::LocalWallet;
use stream_cancel::Trigger;
use torii_client::client::Client;
Expand Down
Loading

0 comments on commit 5dac7d0

Please sign in to comment.