From 92a921f2deceaa463466350343c4386cbb0b4536 Mon Sep 17 00:00:00 2001 From: Conrad Hofmeyr Date: Mon, 15 Jan 2024 11:57:48 -0700 Subject: [PATCH] Batched data uploads using supabase RPC. --- .../database.sql | 12 +++++ .../library/powersync/SupabaseConnector.ts | 44 ++++++++++--------- 2 files changed, 36 insertions(+), 20 deletions(-) diff --git a/demos/powersync-supabase-yjs-text-collab-demo/database.sql b/demos/powersync-supabase-yjs-text-collab-demo/database.sql index 3213819..ecc41e4 100644 --- a/demos/powersync-supabase-yjs-text-collab-demo/database.sql +++ b/demos/powersync-supabase-yjs-text-collab-demo/database.sql @@ -24,3 +24,15 @@ CREATE OR REPLACE FUNCTION get_document_update_data(document_id uuid) RETURNS te SELECT JSON_AGG(update_data) as updates FROM document_updates WHERE document_id=$1; $$ LANGUAGE SQL; +CREATE OR REPLACE FUNCTION insert_document_updates(batch TEXT) +RETURNS VOID AS $$ +BEGIN + INSERT INTO document_updates (id, document_id, update_data) + SELECT + (elem->>'id')::UUID, + (elem->>'document_id')::UUID, + decode(elem->>'update_b64', 'base64') + FROM json_array_elements(batch::json) AS elem + ON CONFLICT (id) DO NOTHING; +END; +$$ LANGUAGE plpgsql; diff --git a/demos/powersync-supabase-yjs-text-collab-demo/src/library/powersync/SupabaseConnector.ts b/demos/powersync-supabase-yjs-text-collab-demo/src/library/powersync/SupabaseConnector.ts index ad31c52..f86841a 100644 --- a/demos/powersync-supabase-yjs-text-collab-demo/src/library/powersync/SupabaseConnector.ts +++ b/demos/powersync-supabase-yjs-text-collab-demo/src/library/powersync/SupabaseConnector.ts @@ -6,16 +6,7 @@ import { UpdateType } from '@journeyapps/powersync-sdk-web'; -import { - SupabaseClient, - createClient, - PostgrestError, - FunctionsHttpError, - FunctionsRelayError, - FunctionsFetchError -} from '@supabase/supabase-js'; - -import { b64ToHex } from '@/library/binary-utils'; +import { SupabaseClient, createClient, PostgrestError, FunctionsHttpError, FunctionsRelayError, FunctionsFetchError } from '@supabase/supabase-js'; export type SupabaseConfig = { supabaseUrl: string; @@ -100,28 +91,33 @@ export class SupabaseConnector extends BaseObserver i } async uploadData(database: AbstractPowerSyncDatabase): Promise { - const transaction = await database.getNextCrudTransaction(); + const batch = await database.getCrudBatch(200); - if (!transaction) { + if (!batch) { return; } + let updateBatch: any[] = []; + let lastOp: CrudEntry | null = null; try { // Note: If transactional consistency is important, use database functions // or edge functions to process the entire transaction in a single call. - for (let op of transaction.crud) { + for (let op of batch.crud) { lastOp = op; const table = this.client.from(op.table); let result: any; let record: any; if (op.op == UpdateType.PUT || op.op == UpdateType.PATCH) { record = { ...op.opData }; - if (op.table == 'document_updates') { - // need to use hex with postgrest/supabase-js https://discord.com/channels/839993398554656828/1192604644048904222/1192604644048904222 - record.update_data = b64ToHex(record.update_b64); - delete record.update_b64; - } + } + + if (op.op == UpdateType.PUT && op.table == 'document_updates') { + updateBatch.push({ + ...record, + id: op.id + }); + continue; } switch (op.op) { @@ -143,7 +139,15 @@ export class SupabaseConnector extends BaseObserver i } } - await transaction.complete(); + if (updateBatch.length > 0) { + console.log('inserting batch of size', updateBatch.length); + const result = await this.client.rpc('insert_document_updates', { batch: JSON.stringify(updateBatch) }); + if (result.error) { + throw new Error(`Could not update Supabase. Received error: ${result.error.message}`); + } + } + + await batch.complete(); } catch (ex: any) { console.debug(ex); if (typeof ex.code == 'string' && FATAL_RESPONSE_CODES.some((regex) => regex.test(ex.code))) { @@ -155,7 +159,7 @@ export class SupabaseConnector extends BaseObserver i * elsewhere instead of discarding, and/or notify the user. */ console.error(`Data upload error - discarding ${lastOp}`, ex); - await transaction.complete(); + await batch.complete(); } else { // Error may be retryable - e.g. network error or temporary server error. // Throwing an error here causes this call to be retried after a delay.