From 0f28fb3c10affb07b11e81b245c4d84ce5006f82 Mon Sep 17 00:00:00 2001 From: Dominic Gunther Bauer <46312751+DominicGBauer@users.noreply.github.com> Date: Thu, 30 Jan 2025 12:44:07 +0200 Subject: [PATCH] feat: add throttle and delay to connect (#478) --- .changeset/slow-crews-watch.md | 6 + .../src/client/AbstractPowerSyncDatabase.ts | 40 +++-- .../AbstractStreamingSyncImplementation.ts | 37 ++++- .../react-native/src/db/PowerSyncDatabase.ts | 8 +- packages/web/src/db/PowerSyncDatabase.ts | 11 +- .../src/db/AbstractPowerSyncDatabase.test.ts | 151 ++++++++++++++++++ .../tests/src/db/PowersyncDatabase.test.ts | 65 ++++++++ 7 files changed, 290 insertions(+), 28 deletions(-) create mode 100644 .changeset/slow-crews-watch.md create mode 100644 packages/web/tests/src/db/AbstractPowerSyncDatabase.test.ts create mode 100644 packages/web/tests/src/db/PowersyncDatabase.test.ts diff --git a/.changeset/slow-crews-watch.md b/.changeset/slow-crews-watch.md new file mode 100644 index 000000000..afa4124ff --- /dev/null +++ b/.changeset/slow-crews-watch.md @@ -0,0 +1,6 @@ +--- +'@powersync/react-native': minor +'@powersync/common': minor +--- + +Add `retryDelayMs` and `crudUploadThrottleMs` to `connect` so that the values can be dynamically changed upon reconnecting. diff --git a/packages/common/src/client/AbstractPowerSyncDatabase.ts b/packages/common/src/client/AbstractPowerSyncDatabase.ts index 8dc50477a..8a7507b18 100644 --- a/packages/common/src/client/AbstractPowerSyncDatabase.ts +++ b/packages/common/src/client/AbstractPowerSyncDatabase.ts @@ -24,9 +24,12 @@ import { CrudEntry, CrudEntryJSON } from './sync/bucket/CrudEntry.js'; import { CrudTransaction } from './sync/bucket/CrudTransaction.js'; import { DEFAULT_CRUD_UPLOAD_THROTTLE_MS, - PowerSyncConnectionOptions, + type AdditionalConnectionOptions, + type PowerSyncConnectionOptions, StreamingSyncImplementation, - StreamingSyncImplementationListener + StreamingSyncImplementationListener, + DEFAULT_RETRY_DELAY_MS, + type RequiredAdditionalConnectionOptions } from './sync/stream/AbstractStreamingSyncImplementation.js'; import { runOnSchemaChange } from './runOnSchemaChange.js'; @@ -35,21 +38,13 @@ export interface DisconnectAndClearOptions { clearLocal?: boolean; } -export interface BasePowerSyncDatabaseOptions { +export interface BasePowerSyncDatabaseOptions extends AdditionalConnectionOptions { /** Schema used for the local database. */ schema: Schema; - /** - * Delay for retrying sync streaming operations - * from the PowerSync backend after an error occurs. + * @deprecated Use {@link retryDelayMs} instead as this will be removed in future releases. */ retryDelay?: number; - /** - * Backend Connector CRUD operations are throttled - * to occur at most every `crudUploadThrottleMs` - * milliseconds. - */ - crudUploadThrottleMs?: number; logger?: ILogger; } @@ -129,7 +124,7 @@ export const DEFAULT_POWERSYNC_CLOSE_OPTIONS: PowerSyncCloseOptions = { export const DEFAULT_WATCH_THROTTLE_MS = 30; export const DEFAULT_POWERSYNC_DB_OPTIONS = { - retryDelay: 5000, + retryDelayMs: 5000, logger: Logger.get('PowerSyncDatabase'), crudUploadThrottleMs: DEFAULT_CRUD_UPLOAD_THROTTLE_MS }; @@ -243,7 +238,8 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver { this.currentStatus = new SyncStatus({ diff --git a/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts b/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts index 40553d459..7ac17fcd7 100644 --- a/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts +++ b/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts @@ -37,10 +37,9 @@ export interface LockOptions { signal?: AbortSignal; } -export interface AbstractStreamingSyncImplementationOptions { +export interface AbstractStreamingSyncImplementationOptions extends AdditionalConnectionOptions { adapter: BucketStorageAdapter; uploadCrud: () => Promise; - crudUploadThrottleMs?: number; /** * An identifier for which PowerSync DB this sync implementation is * linked to. Most commonly DB name, but not restricted to DB name. @@ -48,7 +47,6 @@ export interface AbstractStreamingSyncImplementationOptions { identifier?: string; logger?: ILogger; remote: AbstractRemote; - retryDelayMs?: number; } export interface StreamingSyncImplementationListener extends BaseListener { @@ -67,7 +65,10 @@ export interface StreamingSyncImplementationListener extends BaseListener { * Configurable options to be used when connecting to the PowerSync * backend instance. */ -export interface PowerSyncConnectionOptions { +export interface PowerSyncConnectionOptions extends BaseConnectionOptions, AdditionalConnectionOptions {} + + /** @internal */ +export interface BaseConnectionOptions { /** * The connection method to use when streaming updates from * the PowerSync backend instance. @@ -81,6 +82,25 @@ export interface PowerSyncConnectionOptions { params?: Record; } + /** @internal */ +export interface AdditionalConnectionOptions { + /** + * Delay for retrying sync streaming operations + * from the PowerSync backend after an error occurs. + */ + retryDelayMs?: number; + /** + * Backend Connector CRUD operations are throttled + * to occur at most every `crudUploadThrottleMs` + * milliseconds. + */ + crudUploadThrottleMs?: number; +} + + +/** @internal */ +export type RequiredAdditionalConnectionOptions = Required + export interface StreamingSyncImplementation extends BaseObserver, Disposable { /** * Connects to the sync service @@ -102,14 +122,17 @@ export interface StreamingSyncImplementation extends BaseObserver = { +export type RequiredPowerSyncConnectionOptions = Required; + +export const DEFAULT_STREAM_CONNECTION_OPTIONS: RequiredPowerSyncConnectionOptions = { connectionMethod: SyncStreamConnectionMethod.WEB_SOCKET, params: {} }; @@ -427,7 +450,7 @@ The next upload iteration will be delayed.`); type: LockType.SYNC, signal, callback: async () => { - const resolvedOptions: Required = { + const resolvedOptions: RequiredPowerSyncConnectionOptions = { ...DEFAULT_STREAM_CONNECTION_OPTIONS, ...(options ?? {}) }; diff --git a/packages/react-native/src/db/PowerSyncDatabase.ts b/packages/react-native/src/db/PowerSyncDatabase.ts index 7f7db0c74..3d37a7e9b 100644 --- a/packages/react-native/src/db/PowerSyncDatabase.ts +++ b/packages/react-native/src/db/PowerSyncDatabase.ts @@ -5,6 +5,7 @@ import { DBAdapter, PowerSyncBackendConnector, PowerSyncDatabaseOptionsWithSettings, + type RequiredAdditionalConnectionOptions, SqliteBucketStorage } from '@powersync/common'; import { ReactNativeRemote } from '../sync/stream/ReactNativeRemote'; @@ -42,7 +43,8 @@ export class PowerSyncDatabase extends AbstractPowerSyncDatabase { } protected generateSyncStreamImplementation( - connector: PowerSyncBackendConnector + connector: PowerSyncBackendConnector, + options: RequiredAdditionalConnectionOptions ): AbstractStreamingSyncImplementation { const remote = new ReactNativeRemote(connector); @@ -53,8 +55,8 @@ export class PowerSyncDatabase extends AbstractPowerSyncDatabase { await this.waitForReady(); await connector.uploadData(this); }, - retryDelayMs: this.options.retryDelay, - crudUploadThrottleMs: this.options.crudUploadThrottleMs, + retryDelayMs: options.retryDelayMs, + crudUploadThrottleMs: options.crudUploadThrottleMs, identifier: this.database.name }); } diff --git a/packages/web/src/db/PowerSyncDatabase.ts b/packages/web/src/db/PowerSyncDatabase.ts index ddc2a76a8..e2c112ce0 100644 --- a/packages/web/src/db/PowerSyncDatabase.ts +++ b/packages/web/src/db/PowerSyncDatabase.ts @@ -3,6 +3,7 @@ import { type PowerSyncBackendConnector, type PowerSyncCloseOptions, type PowerSyncConnectionOptions, + type RequiredAdditionalConnectionOptions, AbstractPowerSyncDatabase, DBAdapter, DEFAULT_POWERSYNC_CLOSE_OPTIONS, @@ -13,7 +14,7 @@ import { PowerSyncDatabaseOptionsWithOpenFactory, PowerSyncDatabaseOptionsWithSettings, SqliteBucketStorage, - StreamingSyncImplementation + StreamingSyncImplementation, } from '@powersync/common'; import { Mutex } from 'async-mutex'; import { getNavigatorLocks } from '../shared/navigator'; @@ -194,11 +195,15 @@ export class PowerSyncDatabase extends AbstractPowerSyncDatabase { return getNavigatorLocks().request(`lock-${this.database.name}`, cb); } - protected generateSyncStreamImplementation(connector: PowerSyncBackendConnector): StreamingSyncImplementation { + protected generateSyncStreamImplementation( + connector: PowerSyncBackendConnector, + options: RequiredAdditionalConnectionOptions + ): StreamingSyncImplementation { const remote = new WebRemote(connector); - const syncOptions: WebStreamingSyncImplementationOptions = { ...(this.options as {}), + retryDelayMs: options.retryDelayMs, + crudUploadThrottleMs: options.crudUploadThrottleMs, flags: this.resolvedFlags, adapter: this.bucketStorageAdapter, remote, diff --git a/packages/web/tests/src/db/AbstractPowerSyncDatabase.test.ts b/packages/web/tests/src/db/AbstractPowerSyncDatabase.test.ts new file mode 100644 index 000000000..01d34f75b --- /dev/null +++ b/packages/web/tests/src/db/AbstractPowerSyncDatabase.test.ts @@ -0,0 +1,151 @@ +import { describe, it, expect, vi } from 'vitest'; +import { AbstractPowerSyncDatabase, DEFAULT_RETRY_DELAY_MS, DEFAULT_CRUD_UPLOAD_THROTTLE_MS, BucketStorageAdapter, DBAdapter, PowerSyncBackendConnector, PowerSyncDatabaseOptionsWithSettings, RequiredAdditionalConnectionOptions, StreamingSyncImplementation } from '@powersync/common'; +import { testSchema } from '../../utils/testDb'; + +class TestPowerSyncDatabase extends AbstractPowerSyncDatabase { + protected openDBAdapter(options: PowerSyncDatabaseOptionsWithSettings): DBAdapter { + return {} as any + } + protected generateSyncStreamImplementation(connector: PowerSyncBackendConnector, options: RequiredAdditionalConnectionOptions): StreamingSyncImplementation { + return undefined as any; + } + protected generateBucketStorageAdapter(): BucketStorageAdapter { + return { + init: vi.fn() + } as any + } + _initialize(): Promise { + return Promise.resolve(); + } + + get database() { + return { + get: vi.fn().mockResolvedValue({ version: '0.3.0'}), + execute: vi.fn(), + refreshSchema: vi.fn(), + } as any + } + // Expose protected method for testing + public testResolvedConnectionOptions(options?: any) { + return this.resolvedConnectionOptions(options); + } +} + +describe('AbstractPowerSyncDatabase', () => { + describe('resolvedConnectionOptions', () => { + it('should use connect options when provided', () => { + const db = new TestPowerSyncDatabase({ + schema: testSchema, + database: { dbFilename: 'test.db' } + }); + + const result = db.testResolvedConnectionOptions({ + retryDelayMs: 1000, + crudUploadThrottleMs: 2000 + }); + + expect(result).toEqual({ + retryDelayMs: 1000, + crudUploadThrottleMs: 2000 + }); + }); + + it('should fallback to constructor options when connect options not provided', () => { + const db = new TestPowerSyncDatabase({ + schema: testSchema, + database: { dbFilename: 'test.db' }, + retryDelayMs: 3000, + crudUploadThrottleMs: 4000 + }); + + const result = db.testResolvedConnectionOptions(); + + expect(result).toEqual({ + retryDelayMs: 3000, + crudUploadThrottleMs: 4000 + }); + }); + + it('should convert retryDelay to retryDelayMs', () => { + const db = new TestPowerSyncDatabase({ + schema: testSchema, + database: { dbFilename: 'test.db' }, + retryDelay: 5000 + }); + + const result = db.testResolvedConnectionOptions(); + + expect(result).toEqual({ + retryDelayMs: 5000, + crudUploadThrottleMs: DEFAULT_CRUD_UPLOAD_THROTTLE_MS + }); + }); + + it('should prioritize retryDelayMs over retryDelay in constructor options', () => { + const db = new TestPowerSyncDatabase({ + schema: testSchema, + database: { dbFilename: 'test.db' }, + retryDelay: 5000, + retryDelayMs: 6000 + }); + + const result = db.testResolvedConnectionOptions(); + + expect(result).toEqual({ + retryDelayMs: 6000, + crudUploadThrottleMs: DEFAULT_CRUD_UPLOAD_THROTTLE_MS + }); + }); + + it('should prioritize connect options over constructor options', () => { + const db = new TestPowerSyncDatabase({ + schema: testSchema, + database: { dbFilename: 'test.db' }, + retryDelayMs: 5000, + crudUploadThrottleMs: 6000 + }); + + const result = db.testResolvedConnectionOptions({ + retryDelayMs: 7000, + crudUploadThrottleMs: 8000 + }); + + expect(result).toEqual({ + retryDelayMs: 7000, + crudUploadThrottleMs: 8000 + }); + }); + + it('should use default values when no options provided', () => { + const db = new TestPowerSyncDatabase({ + schema: testSchema, + database: { dbFilename: 'test.db' } + }); + + const result = db.testResolvedConnectionOptions(); + + expect(result).toEqual({ + retryDelayMs: DEFAULT_RETRY_DELAY_MS, + crudUploadThrottleMs: DEFAULT_CRUD_UPLOAD_THROTTLE_MS + }); + }); + + it('should handle partial connect options', () => { + const db = new TestPowerSyncDatabase({ + schema: testSchema, + database: { dbFilename: 'test.db' }, + retryDelayMs: 5000, + crudUploadThrottleMs: 6000 + }); + + const result = db.testResolvedConnectionOptions({ + retryDelayMs: 7000 + }); + + expect(result).toEqual({ + retryDelayMs: 7000, + crudUploadThrottleMs: 6000 + }); + }); + }); +}); diff --git a/packages/web/tests/src/db/PowersyncDatabase.test.ts b/packages/web/tests/src/db/PowersyncDatabase.test.ts new file mode 100644 index 000000000..345ec029f --- /dev/null +++ b/packages/web/tests/src/db/PowersyncDatabase.test.ts @@ -0,0 +1,65 @@ +import { describe, it, expect, beforeEach, vi, afterEach } from 'vitest'; +import { AbstractPowerSyncDatabase, PowerSyncDatabase, SyncStreamConnectionMethod } from '../../../src'; +import { testSchema } from '../../utils/testDb'; + +describe('PowerSyncDatabase', () => { + let db: PowerSyncDatabase; + let mockConnector: any; + let mockLogger: any; + let mockSyncImplementation: any; + + beforeEach(() => { + mockLogger = { + debug: vi.fn(), + }; + + // Initialize with minimal required options + db = new PowerSyncDatabase({ + schema: testSchema, + database: { + dbFilename: 'test.db' + }, + logger: mockLogger + }); + + vi.spyOn(db as any, 'runExclusive').mockImplementation((cb: any) => cb()); + + vi.spyOn(AbstractPowerSyncDatabase.prototype, 'connect') + .mockResolvedValue(undefined); + }); + + afterEach(() => { + vi.clearAllMocks(); + }); + + describe('connect', () => { + it('should log debug message when attempting to connect', async () => { + await db.connect(mockConnector); + expect(mockLogger.debug).toHaveBeenCalledWith('Attempting to connect to PowerSync instance'); + expect(db['runExclusive']).toHaveBeenCalled(); + }); + + it('should use connect with correct options', async () => { + await db.connect(mockConnector, { + retryDelayMs: 1000, + crudUploadThrottleMs: 2000, + params: { + param1: 1 + }, + connectionMethod: SyncStreamConnectionMethod.HTTP + }); + + expect(AbstractPowerSyncDatabase.prototype.connect).toHaveBeenCalledWith( + mockConnector, + { + retryDelayMs: 1000, + crudUploadThrottleMs: 2000, + connectionMethod: "http", + params: { + param1: 1, + }, + } + ); + }); + }); +});