Skip to content

Commit

Permalink
Fix handling of multi-tables (#229)
Browse files Browse the repository at this point in the history
## Description of Changes

Unfortunately, none of our tests currently cover this, but while working
on the V9 upgrade, I noticed that this code still relies on `type(Row)`
as a unique table identifier.

That no longer holds with multi-tables as several tables can share the
same `Row` type. In that case, subscription updates would be grouped
incorrectly and always applied to the same first table that uses `Row`
for its data storage.

This PR fixes that by using the table handle itself as a key (compared
by reference).

If transaction updates are already grouped uniquely by table, it should
be possible to simplify this code much further, but I'm not sure if such
guarantee exists, so leaving that untouched.

## API

 - [ ] This is an API breaking change to the SDK

*If the API is breaking, please state below what will break*

## Requires SpacetimeDB PRs
*List any PRs here that are required for this SDK change to work*

## Testsuite
*If you would like to run the your SDK changes in this PR against a
specific SpacetimeDB branch, specify that here. This can be a branch
name or a link to a PR.*

SpacetimeDB branch name: master

## Testing
*Write instructions for a test that you performed for this PR*

- [x] I did `dotnet test`, but as mentioned in the description, this
requires adding tests for multi-table subscriptions, which I'm afraid I
won't have time to do, so have to leave to follow-up devs.

---------

Co-authored-by: james gilles <[email protected]>
  • Loading branch information
RReverser and kazimuth authored Feb 6, 2025
1 parent 20c6480 commit 7680a73
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 47 deletions.
2 changes: 0 additions & 2 deletions src/RemoteTablesBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,5 @@ protected void AddTable(IRemoteTableHandle table)
Log.Error($"We don't know that this table is: {name}");
return null;
}

internal IEnumerable<IRemoteTableHandle> GetTables() => tables.Values;
}
}
76 changes: 31 additions & 45 deletions src/SpacetimeDBClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@
using System.IO;
using System.IO.Compression;
using System.Linq;
using System.Net.WebSockets;
using System.Threading;
using System.Threading.Tasks;
using SpacetimeDB.BSATN;
using SpacetimeDB.Internal;
using SpacetimeDB.ClientApi;
using Thread = System.Threading.Thread;
using System.Diagnostics;

namespace SpacetimeDB
{
Expand Down Expand Up @@ -232,7 +232,7 @@ struct ProcessedMessage
struct PreProcessedMessage
{
public ProcessedMessage processed;
public Dictionary<Type, HashSet<byte[]>>? subscriptionInserts;
public Dictionary<IRemoteTableHandle, HashSet<byte[]>>? subscriptionInserts;
}

private readonly BlockingCollection<UnprocessedMessage> _messageQueue =
Expand Down Expand Up @@ -366,28 +366,28 @@ void PreProcessMessages()
}
}

(List<DbOp>, Dictionary<System.Type, HashSet<byte[]>>) PreProcessLegacySubscription(InitialSubscription initSub)
(List<DbOp>, Dictionary<IRemoteTableHandle, HashSet<byte[]>>) PreProcessLegacySubscription(InitialSubscription initSub)
{
var dbOps = new List<DbOp>();
// This is all of the inserts
int cap = initSub.DatabaseUpdate.Tables.Sum(a => (int)a.NumRows);
// FIXME: shouldn't this be `new(initSub.DatabaseUpdate.Tables.Length)` ?
Dictionary<System.Type, HashSet<byte[]>> subscriptionInserts = new(capacity: cap);
Dictionary<IRemoteTableHandle, HashSet<byte[]>> subscriptionInserts = new(capacity: cap);

HashSet<byte[]> GetInsertHashSet(System.Type tableType, int tableSize)
HashSet<byte[]> GetInsertHashSet(IRemoteTableHandle table, int tableSize)
{
if (!subscriptionInserts.TryGetValue(tableType, out var hashSet))
if (!subscriptionInserts.TryGetValue(table, out var hashSet))
{
hashSet = new HashSet<byte[]>(capacity: tableSize, comparer: ByteArrayComparer.Instance);
subscriptionInserts[tableType] = hashSet;
subscriptionInserts[table] = hashSet;
}
return hashSet;
}

// First apply all of the state
foreach (var (table, update) in GetTables(initSub.DatabaseUpdate))
{
var hashSet = GetInsertHashSet(table.ClientTableType, (int)update.NumRows);
var hashSet = GetInsertHashSet(table, (int)update.NumRows);

PreProcessInsertOnlyTable(table, update, dbOps, hashSet);
}
Expand All @@ -398,22 +398,27 @@ HashSet<byte[]> GetInsertHashSet(System.Type tableType, int tableSize)
/// TODO: the dictionary is here for backwards compatibility and can be removed
/// once we get rid of legacy subscriptions.
/// </summary>
(List<DbOp>, Dictionary<System.Type, HashSet<byte[]>>) PreProcessSubscribeApplied(SubscribeApplied subscribeApplied)
(List<DbOp>, Dictionary<IRemoteTableHandle, HashSet<byte[]>>) PreProcessSubscribeApplied(SubscribeApplied subscribeApplied)
{
var table = Db.GetTable(subscribeApplied.Rows.TableName) ?? throw new Exception($"Unknown table name: {subscribeApplied.Rows.TableName}");
var dbOps = new List<DbOp>();
HashSet<byte[]> inserts = new();
HashSet<byte[]> inserts = new(comparer: ByteArrayComparer.Instance);

PreProcessInsertOnlyTable(table, subscribeApplied.Rows.TableRows, dbOps, inserts);

var result = new Dictionary<System.Type, HashSet<byte[]>>();
result[table.ClientTableType] = inserts;
var result = new Dictionary<IRemoteTableHandle, HashSet<byte[]>>
{
[table] = inserts
};

return (dbOps, result);
}

void PreProcessInsertOnlyTable(IRemoteTableHandle table, TableUpdate update, List<DbOp> dbOps, HashSet<byte[]> inserts)
{
// In debug mode, make sure we use a byte array comparer in HashSet and not a reference-equal `byte[]` by accident.
Debug.Assert(inserts.Comparer is ByteArrayComparer);

foreach (var cqu in update.Updates)
{
var qu = DecompressDecodeQueryUpdate(cqu);
Expand All @@ -437,7 +442,6 @@ void PreProcessInsertOnlyTable(IRemoteTableHandle table, TableUpdate update, Lis
dbOps.Add(op);
}
}

}


Expand Down Expand Up @@ -479,8 +483,10 @@ List<DbOp> PreProcessDatabaseUpdate(DatabaseUpdate updates)
{
var dbOps = new List<DbOp>();

// All row updates that have a primary key, this contains inserts, deletes and updates
var primaryKeyChanges = new Dictionary<(System.Type tableType, object primaryKeyValue), DbOp>();
// All row updates that have a primary key, this contains inserts, deletes and updates.
// TODO: is there any guarantee that transaction update contains each table only once, aka updates are already grouped by table?
// If so, we could simplify this and other methods by moving the dictionary inside the main loop and using only the primary key as key.
var primaryKeyChanges = new Dictionary<(IRemoteTableHandle table, object primaryKeyValue), DbOp>();

// First apply all of the state
foreach (var (table, update) in GetTables(updates))
Expand All @@ -494,26 +500,20 @@ List<DbOp> PreProcessDatabaseUpdate(DatabaseUpdate updates)
if (pk != null)
{
// Compound key that we use for lookup.
// Consists of type of the table (for faster comparison that string names) + actual primary key of the row.
var key = (table.ClientTableType, pk);
// Consists of the table handle (for faster comparison that string names) + actual primary key of the row.
var key = (table, pk);

if (primaryKeyChanges.TryGetValue(key, out var oldOp))
{
if ((op.insert is not null && oldOp.insert is not null) || (op.delete is not null && oldOp.delete is not null))
if (oldOp.insert is not null)
{
Log.Warn($"Update with the same primary key was applied multiple times! tableName={update.TableName}");
// TODO(jdetter): Is this a correctable error? This would be a major error on the
// SpacetimeDB side.
continue;
}

var (insertOp, deleteOp) = op.insert is not null ? (op, oldOp) : (oldOp, op);
op = new DbOp
{
table = insertOp.table,
delete = deleteOp.delete,
insert = insertOp.insert,
};
op.delete = oldOp.delete;
}
primaryKeyChanges[key] = op;
}
Expand All @@ -529,26 +529,20 @@ List<DbOp> PreProcessDatabaseUpdate(DatabaseUpdate updates)
if (pk != null)
{
// Compound key that we use for lookup.
// Consists of type of the table (for faster comparison that string names) + actual primary key of the row.
var key = (table.ClientTableType, pk);
// Consists of the table handle (for faster comparison that string names) + actual primary key of the row.
var key = (table, pk);

if (primaryKeyChanges.TryGetValue(key, out var oldOp))
{
if ((op.insert is not null && oldOp.insert is not null) || (op.delete is not null && oldOp.delete is not null))
if (oldOp.delete is not null)
{
Log.Warn($"Update with the same primary key was applied multiple times! tableName={update.TableName}");
// TODO(jdetter): Is this a correctable error? This would be a major error on the
// SpacetimeDB side.
continue;
}

var (insertOp, deleteOp) = op.insert is not null ? (op, oldOp) : (oldOp, op);
op = new DbOp
{
table = insertOp.table,
delete = deleteOp.delete,
insert = insertOp.insert,
};
op.insert = oldOp.insert;
}
primaryKeyChanges[key] = op;
}
Expand Down Expand Up @@ -587,7 +581,7 @@ PreProcessedMessage PreProcessMessage(UnprocessedMessage unprocessed)
ReducerEvent<Reducer>? reducerEvent = default;

// This is all of the inserts, used for updating the stale but un-cleared client cache.
Dictionary<System.Type, HashSet<byte[]>>? subscriptionInserts = null;
Dictionary<IRemoteTableHandle, HashSet<byte[]>>? subscriptionInserts = null;

switch (message)
{
Expand Down Expand Up @@ -659,16 +653,8 @@ ProcessedMessage CalculateStateDiff(PreProcessedMessage preProcessedMessage)
// the client cache.
if (preProcessedMessage.subscriptionInserts is { } subscriptionInserts)
{
foreach (var table in Db.GetTables())
foreach (var (table, hashSet) in subscriptionInserts)
{
if (!subscriptionInserts.TryGetValue(table.ClientTableType, out var hashSet))
{
// We don't know if the user is waiting for subscriptions on other tables.
// Leave the stale data for untouched tables in the cache; this is
// the best we can do.
continue;
}

foreach (var (rowBytes, oldValue) in table.IterEntries().Where(kv => !hashSet.Contains(kv.Key)))
{
processed.dbOps.Add(new DbOp
Expand Down

0 comments on commit 7680a73

Please sign in to comment.