Skip to content

Commit

Permalink
Implement incremental subscriptions on the client (#2111)
Browse files Browse the repository at this point in the history
Co-authored-by: Zeke Foppa <[email protected]>
  • Loading branch information
jsdt and bfops authored Jan 31, 2025
1 parent 6aa75bd commit e2ffc07
Show file tree
Hide file tree
Showing 19 changed files with 892 additions and 204 deletions.
22 changes: 22 additions & 0 deletions crates/cli/src/subcommands/generate/rust.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1426,6 +1426,7 @@ impl __sdk::EventContext for EventContext {{
/// A handle on a subscribed query.
// TODO: Document this better after implementing the new subscription API.
#[derive(Clone)]
pub struct SubscriptionHandle {{
imp: __sdk::SubscriptionHandleImpl<RemoteModule>,
}}
Expand All @@ -1438,6 +1439,27 @@ impl __sdk::SubscriptionHandle for SubscriptionHandle {{
fn new(imp: __sdk::SubscriptionHandleImpl<RemoteModule>) -> Self {{
Self {{ imp }}
}}
/// Returns true if this subscription has been terminated due to an unsubscribe call or an error.
fn is_ended(&self) -> bool {{
self.imp.is_ended()
}}
/// Returns true if this subscription has been applied and has not yet been unsubscribed.
fn is_active(&self) -> bool {{
self.imp.is_active()
}}
/// Unsubscribe from the query controlled by this `SubscriptionHandle`,
/// then run `on_end` when its rows are removed from the client cache.
fn unsubscribe_then(self, on_end: __sdk::OnEndedCallback<RemoteModule>) -> __anyhow::Result<()> {{
self.imp.unsubscribe_then(Some(on_end))
}}
fn unsubscribe(self) -> __anyhow::Result<()> {{
self.imp.unsubscribe_then(None)
}}
}}
/// Alias trait for a [`__sdk::DbContext`] connected to this module,
Expand Down
22 changes: 22 additions & 0 deletions crates/cli/tests/snapshots/codegen__codegen_rust.snap
Original file line number Diff line number Diff line change
Expand Up @@ -1544,6 +1544,7 @@ impl __sdk::EventContext for EventContext {

/// A handle on a subscribed query.
// TODO: Document this better after implementing the new subscription API.
#[derive(Clone)]
pub struct SubscriptionHandle {
imp: __sdk::SubscriptionHandleImpl<RemoteModule>,
}
Expand All @@ -1556,6 +1557,27 @@ impl __sdk::SubscriptionHandle for SubscriptionHandle {
fn new(imp: __sdk::SubscriptionHandleImpl<RemoteModule>) -> Self {
Self { imp }
}

/// Returns true if this subscription has been terminated due to an unsubscribe call or an error.
fn is_ended(&self) -> bool {
self.imp.is_ended()
}

/// Returns true if this subscription has been applied and has not yet been unsubscribed.
fn is_active(&self) -> bool {
self.imp.is_active()
}

/// Unsubscribe from the query controlled by this `SubscriptionHandle`,
/// then run `on_end` when its rows are removed from the client cache.
fn unsubscribe_then(self, on_end: __sdk::OnEndedCallback<RemoteModule>) -> __anyhow::Result<()> {
self.imp.unsubscribe_then(Some(on_end))
}

fn unsubscribe(self) -> __anyhow::Result<()> {
self.imp.unsubscribe_then(None)
}

}

/// Alias trait for a [`__sdk::DbContext`] connected to this module,
Expand Down
23 changes: 19 additions & 4 deletions crates/sdk/examples/quickstart-chat/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#![allow(clippy::disallowed_macros)]
mod module_bindings;
use std::sync::{atomic::AtomicU8, Arc};

use module_bindings::*;

use spacetimedb_client_api_messages::websocket::Compression;
Expand Down Expand Up @@ -116,7 +118,6 @@ fn on_sub_applied(ctx: &EventContext) {
print_message(ctx, &message);
}
}

// ## Warn if set_name failed

/// Our `on_set_name` callback: print a warning if the reducer failed.
Expand Down Expand Up @@ -178,12 +179,26 @@ fn connect_to_db() -> DbConnection {
}

// # Subscribe to queries
fn subscribe_to_queries(ctx: &DbConnection, queries: &[&str], callback: fn(&EventContext)) {
if queries.is_empty() {
panic!("No queries to subscribe to.");
}
let remaining_queries = Arc::new(AtomicU8::new(queries.len() as u8));
for query in queries {
let remaining_queries = remaining_queries.clone();
ctx.subscription_builder()
.on_applied(move |ctx| {
if remaining_queries.fetch_sub(1, std::sync::atomic::Ordering::Relaxed) == 1 {
callback(ctx);
}
})
.subscribe(query);
}
}

/// Register subscriptions for all rows of both tables.
fn subscribe_to_tables(ctx: &DbConnection) {
ctx.subscription_builder()
.on_applied(on_sub_applied)
.subscribe(["SELECT * FROM user;", "SELECT * FROM message;"]);
subscribe_to_queries(ctx, &["SELECT * FROM user", "SELECT * FROM message"], on_sub_applied);
}

// # Handle user input
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

44 changes: 34 additions & 10 deletions crates/sdk/examples/quickstart-chat/module_bindings/mod.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

2 comments on commit e2ffc07

@github-actions
Copy link

@github-actions github-actions bot commented on e2ffc07 Jan 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Callgrind benchmark results Error when comparing benchmarks: Couldn't find AWS credentials in environment, credentials file, or IAM role.

Caused by:
Couldn't find AWS credentials in environment, credentials file, or IAM role.

@github-actions
Copy link

@github-actions github-actions bot commented on e2ffc07 Jan 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Criterion benchmark results

Error when comparing benchmarks: Couldn't find AWS credentials in environment, credentials file, or IAM role.

Caused by:
Couldn't find AWS credentials in environment, credentials file, or IAM role.

Please sign in to comment.