Skip to content

Commit

Permalink
feat: add throttle and delay to connect (#478)
Browse files Browse the repository at this point in the history
  • Loading branch information
DominicGBauer authored Jan 30, 2025
1 parent a4895cc commit 0f28fb3
Show file tree
Hide file tree
Showing 7 changed files with 290 additions and 28 deletions.
6 changes: 6 additions & 0 deletions .changeset/slow-crews-watch.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
'@powersync/react-native': minor
'@powersync/common': minor
---

Add `retryDelayMs` and `crudUploadThrottleMs` to `connect` so that the values can be dynamically changed upon reconnecting.
40 changes: 25 additions & 15 deletions packages/common/src/client/AbstractPowerSyncDatabase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,12 @@ import { CrudEntry, CrudEntryJSON } from './sync/bucket/CrudEntry.js';
import { CrudTransaction } from './sync/bucket/CrudTransaction.js';
import {
DEFAULT_CRUD_UPLOAD_THROTTLE_MS,
PowerSyncConnectionOptions,
type AdditionalConnectionOptions,
type PowerSyncConnectionOptions,
StreamingSyncImplementation,
StreamingSyncImplementationListener
StreamingSyncImplementationListener,
DEFAULT_RETRY_DELAY_MS,
type RequiredAdditionalConnectionOptions
} from './sync/stream/AbstractStreamingSyncImplementation.js';
import { runOnSchemaChange } from './runOnSchemaChange.js';

Expand All @@ -35,21 +38,13 @@ export interface DisconnectAndClearOptions {
clearLocal?: boolean;
}

export interface BasePowerSyncDatabaseOptions {
export interface BasePowerSyncDatabaseOptions extends AdditionalConnectionOptions {
/** Schema used for the local database. */
schema: Schema;

/**
* Delay for retrying sync streaming operations
* from the PowerSync backend after an error occurs.
* @deprecated Use {@link retryDelayMs} instead as this will be removed in future releases.
*/
retryDelay?: number;
/**
* Backend Connector CRUD operations are throttled
* to occur at most every `crudUploadThrottleMs`
* milliseconds.
*/
crudUploadThrottleMs?: number;
logger?: ILogger;
}

Expand Down Expand Up @@ -129,7 +124,7 @@ export const DEFAULT_POWERSYNC_CLOSE_OPTIONS: PowerSyncCloseOptions = {
export const DEFAULT_WATCH_THROTTLE_MS = 30;

export const DEFAULT_POWERSYNC_DB_OPTIONS = {
retryDelay: 5000,
retryDelayMs: 5000,
logger: Logger.get('PowerSyncDatabase'),
crudUploadThrottleMs: DEFAULT_CRUD_UPLOAD_THROTTLE_MS
};
Expand Down Expand Up @@ -243,7 +238,8 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
protected abstract openDBAdapter(options: PowerSyncDatabaseOptionsWithSettings): DBAdapter;

protected abstract generateSyncStreamImplementation(
connector: PowerSyncBackendConnector
connector: PowerSyncBackendConnector,
options: RequiredAdditionalConnectionOptions
): StreamingSyncImplementation;

protected abstract generateBucketStorageAdapter(): BucketStorageAdapter;
Expand Down Expand Up @@ -376,6 +372,15 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
return this.waitForReady();
}

// Use the options passed in during connect, or fallback to the options set during database creation or fallback to the default options
resolvedConnectionOptions(options?: PowerSyncConnectionOptions): RequiredAdditionalConnectionOptions {
return {
retryDelayMs: options?.retryDelayMs ?? this.options.retryDelayMs ?? this.options.retryDelay ?? DEFAULT_RETRY_DELAY_MS,
crudUploadThrottleMs:
options?.crudUploadThrottleMs ?? this.options.crudUploadThrottleMs ?? DEFAULT_CRUD_UPLOAD_THROTTLE_MS
};
}

/**
* Connects to stream of events from the PowerSync instance.
*/
Expand All @@ -388,7 +393,12 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
throw new Error('Cannot connect using a closed client');
}

this.syncStreamImplementation = this.generateSyncStreamImplementation(connector);
const { retryDelayMs, crudUploadThrottleMs } = this.resolvedConnectionOptions(options);

this.syncStreamImplementation = this.generateSyncStreamImplementation(connector, {
retryDelayMs,
crudUploadThrottleMs,
});
this.syncStatusListenerDisposer = this.syncStreamImplementation.registerListener({
statusChanged: (status) => {
this.currentStatus = new SyncStatus({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,16 @@ export interface LockOptions<T> {
signal?: AbortSignal;
}

export interface AbstractStreamingSyncImplementationOptions {
export interface AbstractStreamingSyncImplementationOptions extends AdditionalConnectionOptions {
adapter: BucketStorageAdapter;
uploadCrud: () => Promise<void>;
crudUploadThrottleMs?: number;
/**
* An identifier for which PowerSync DB this sync implementation is
* linked to. Most commonly DB name, but not restricted to DB name.
*/
identifier?: string;
logger?: ILogger;
remote: AbstractRemote;
retryDelayMs?: number;
}

export interface StreamingSyncImplementationListener extends BaseListener {
Expand All @@ -67,7 +65,10 @@ export interface StreamingSyncImplementationListener extends BaseListener {
* Configurable options to be used when connecting to the PowerSync
* backend instance.
*/
export interface PowerSyncConnectionOptions {
export interface PowerSyncConnectionOptions extends BaseConnectionOptions, AdditionalConnectionOptions {}

/** @internal */
export interface BaseConnectionOptions {
/**
* The connection method to use when streaming updates from
* the PowerSync backend instance.
Expand All @@ -81,6 +82,25 @@ export interface PowerSyncConnectionOptions {
params?: Record<string, StreamingSyncRequestParameterType>;
}

/** @internal */
export interface AdditionalConnectionOptions {
/**
* Delay for retrying sync streaming operations
* from the PowerSync backend after an error occurs.
*/
retryDelayMs?: number;
/**
* Backend Connector CRUD operations are throttled
* to occur at most every `crudUploadThrottleMs`
* milliseconds.
*/
crudUploadThrottleMs?: number;
}


/** @internal */
export type RequiredAdditionalConnectionOptions = Required<AdditionalConnectionOptions>

export interface StreamingSyncImplementation extends BaseObserver<StreamingSyncImplementationListener>, Disposable {
/**
* Connects to the sync service
Expand All @@ -102,14 +122,17 @@ export interface StreamingSyncImplementation extends BaseObserver<StreamingSyncI
}

export const DEFAULT_CRUD_UPLOAD_THROTTLE_MS = 1000;
export const DEFAULT_RETRY_DELAY_MS = 5000;

export const DEFAULT_STREAMING_SYNC_OPTIONS = {
retryDelayMs: 5000,
retryDelayMs: DEFAULT_RETRY_DELAY_MS,
logger: Logger.get('PowerSyncStream'),
crudUploadThrottleMs: DEFAULT_CRUD_UPLOAD_THROTTLE_MS
};

export const DEFAULT_STREAM_CONNECTION_OPTIONS: Required<PowerSyncConnectionOptions> = {
export type RequiredPowerSyncConnectionOptions = Required<BaseConnectionOptions>;

export const DEFAULT_STREAM_CONNECTION_OPTIONS: RequiredPowerSyncConnectionOptions = {
connectionMethod: SyncStreamConnectionMethod.WEB_SOCKET,
params: {}
};
Expand Down Expand Up @@ -427,7 +450,7 @@ The next upload iteration will be delayed.`);
type: LockType.SYNC,
signal,
callback: async () => {
const resolvedOptions: Required<PowerSyncConnectionOptions> = {
const resolvedOptions: RequiredPowerSyncConnectionOptions = {
...DEFAULT_STREAM_CONNECTION_OPTIONS,
...(options ?? {})
};
Expand Down
8 changes: 5 additions & 3 deletions packages/react-native/src/db/PowerSyncDatabase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {
DBAdapter,
PowerSyncBackendConnector,
PowerSyncDatabaseOptionsWithSettings,
type RequiredAdditionalConnectionOptions,
SqliteBucketStorage
} from '@powersync/common';
import { ReactNativeRemote } from '../sync/stream/ReactNativeRemote';
Expand Down Expand Up @@ -42,7 +43,8 @@ export class PowerSyncDatabase extends AbstractPowerSyncDatabase {
}

protected generateSyncStreamImplementation(
connector: PowerSyncBackendConnector
connector: PowerSyncBackendConnector,
options: RequiredAdditionalConnectionOptions
): AbstractStreamingSyncImplementation {
const remote = new ReactNativeRemote(connector);

Expand All @@ -53,8 +55,8 @@ export class PowerSyncDatabase extends AbstractPowerSyncDatabase {
await this.waitForReady();
await connector.uploadData(this);
},
retryDelayMs: this.options.retryDelay,
crudUploadThrottleMs: this.options.crudUploadThrottleMs,
retryDelayMs: options.retryDelayMs,
crudUploadThrottleMs: options.crudUploadThrottleMs,
identifier: this.database.name
});
}
Expand Down
11 changes: 8 additions & 3 deletions packages/web/src/db/PowerSyncDatabase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import {
type PowerSyncBackendConnector,
type PowerSyncCloseOptions,
type PowerSyncConnectionOptions,
type RequiredAdditionalConnectionOptions,
AbstractPowerSyncDatabase,
DBAdapter,
DEFAULT_POWERSYNC_CLOSE_OPTIONS,
Expand All @@ -13,7 +14,7 @@ import {
PowerSyncDatabaseOptionsWithOpenFactory,
PowerSyncDatabaseOptionsWithSettings,
SqliteBucketStorage,
StreamingSyncImplementation
StreamingSyncImplementation,
} from '@powersync/common';
import { Mutex } from 'async-mutex';
import { getNavigatorLocks } from '../shared/navigator';
Expand Down Expand Up @@ -194,11 +195,15 @@ export class PowerSyncDatabase extends AbstractPowerSyncDatabase {
return getNavigatorLocks().request(`lock-${this.database.name}`, cb);
}

protected generateSyncStreamImplementation(connector: PowerSyncBackendConnector): StreamingSyncImplementation {
protected generateSyncStreamImplementation(
connector: PowerSyncBackendConnector,
options: RequiredAdditionalConnectionOptions
): StreamingSyncImplementation {
const remote = new WebRemote(connector);

const syncOptions: WebStreamingSyncImplementationOptions = {
...(this.options as {}),
retryDelayMs: options.retryDelayMs,
crudUploadThrottleMs: options.crudUploadThrottleMs,
flags: this.resolvedFlags,
adapter: this.bucketStorageAdapter,
remote,
Expand Down
151 changes: 151 additions & 0 deletions packages/web/tests/src/db/AbstractPowerSyncDatabase.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
import { describe, it, expect, vi } from 'vitest';
import { AbstractPowerSyncDatabase, DEFAULT_RETRY_DELAY_MS, DEFAULT_CRUD_UPLOAD_THROTTLE_MS, BucketStorageAdapter, DBAdapter, PowerSyncBackendConnector, PowerSyncDatabaseOptionsWithSettings, RequiredAdditionalConnectionOptions, StreamingSyncImplementation } from '@powersync/common';
import { testSchema } from '../../utils/testDb';

class TestPowerSyncDatabase extends AbstractPowerSyncDatabase {
protected openDBAdapter(options: PowerSyncDatabaseOptionsWithSettings): DBAdapter {
return {} as any
}
protected generateSyncStreamImplementation(connector: PowerSyncBackendConnector, options: RequiredAdditionalConnectionOptions): StreamingSyncImplementation {
return undefined as any;
}
protected generateBucketStorageAdapter(): BucketStorageAdapter {
return {
init: vi.fn()
} as any
}
_initialize(): Promise<void> {
return Promise.resolve();
}

get database() {
return {
get: vi.fn().mockResolvedValue({ version: '0.3.0'}),
execute: vi.fn(),
refreshSchema: vi.fn(),
} as any
}
// Expose protected method for testing
public testResolvedConnectionOptions(options?: any) {
return this.resolvedConnectionOptions(options);
}
}

describe('AbstractPowerSyncDatabase', () => {
describe('resolvedConnectionOptions', () => {
it('should use connect options when provided', () => {
const db = new TestPowerSyncDatabase({
schema: testSchema,
database: { dbFilename: 'test.db' }
});

const result = db.testResolvedConnectionOptions({
retryDelayMs: 1000,
crudUploadThrottleMs: 2000
});

expect(result).toEqual({
retryDelayMs: 1000,
crudUploadThrottleMs: 2000
});
});

it('should fallback to constructor options when connect options not provided', () => {
const db = new TestPowerSyncDatabase({
schema: testSchema,
database: { dbFilename: 'test.db' },
retryDelayMs: 3000,
crudUploadThrottleMs: 4000
});

const result = db.testResolvedConnectionOptions();

expect(result).toEqual({
retryDelayMs: 3000,
crudUploadThrottleMs: 4000
});
});

it('should convert retryDelay to retryDelayMs', () => {
const db = new TestPowerSyncDatabase({
schema: testSchema,
database: { dbFilename: 'test.db' },
retryDelay: 5000
});

const result = db.testResolvedConnectionOptions();

expect(result).toEqual({
retryDelayMs: 5000,
crudUploadThrottleMs: DEFAULT_CRUD_UPLOAD_THROTTLE_MS
});
});

it('should prioritize retryDelayMs over retryDelay in constructor options', () => {
const db = new TestPowerSyncDatabase({
schema: testSchema,
database: { dbFilename: 'test.db' },
retryDelay: 5000,
retryDelayMs: 6000
});

const result = db.testResolvedConnectionOptions();

expect(result).toEqual({
retryDelayMs: 6000,
crudUploadThrottleMs: DEFAULT_CRUD_UPLOAD_THROTTLE_MS
});
});

it('should prioritize connect options over constructor options', () => {
const db = new TestPowerSyncDatabase({
schema: testSchema,
database: { dbFilename: 'test.db' },
retryDelayMs: 5000,
crudUploadThrottleMs: 6000
});

const result = db.testResolvedConnectionOptions({
retryDelayMs: 7000,
crudUploadThrottleMs: 8000
});

expect(result).toEqual({
retryDelayMs: 7000,
crudUploadThrottleMs: 8000
});
});

it('should use default values when no options provided', () => {
const db = new TestPowerSyncDatabase({
schema: testSchema,
database: { dbFilename: 'test.db' }
});

const result = db.testResolvedConnectionOptions();

expect(result).toEqual({
retryDelayMs: DEFAULT_RETRY_DELAY_MS,
crudUploadThrottleMs: DEFAULT_CRUD_UPLOAD_THROTTLE_MS
});
});

it('should handle partial connect options', () => {
const db = new TestPowerSyncDatabase({
schema: testSchema,
database: { dbFilename: 'test.db' },
retryDelayMs: 5000,
crudUploadThrottleMs: 6000
});

const result = db.testResolvedConnectionOptions({
retryDelayMs: 7000
});

expect(result).toEqual({
retryDelayMs: 7000,
crudUploadThrottleMs: 6000
});
});
});
});
Loading

0 comments on commit 0f28fb3

Please sign in to comment.