From 5df24a01082023c7f00820ff3e60fb6230e3dcd0 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Mon, 10 Feb 2025 18:15:50 +0200 Subject: [PATCH] Add initial test for BucketChecksumState. --- .../src/storage/BucketStorageFactory.ts | 8 ++ .../src/storage/StorageProvider.ts | 3 + .../src/storage/SyncRulesBucketStorage.ts | 78 +++++++++++-------- packages/service-core/src/sync/sync.ts | 36 +++++---- .../test/src/sync/checksum_state.test.ts | 74 ++++++++++++++++++ 5 files changed, 153 insertions(+), 46 deletions(-) create mode 100644 packages/service-core/test/src/sync/checksum_state.test.ts diff --git a/packages/service-core/src/storage/BucketStorageFactory.ts b/packages/service-core/src/storage/BucketStorageFactory.ts index a01dce39..a253c643 100644 --- a/packages/service-core/src/storage/BucketStorageFactory.ts +++ b/packages/service-core/src/storage/BucketStorageFactory.ts @@ -4,6 +4,14 @@ import { ReplicationEventPayload } from './ReplicationEventPayload.js'; import { ReplicationLock } from './ReplicationLock.js'; import { SyncRulesBucketStorage } from './SyncRulesBucketStorage.js'; +/** + * Represents a configured storage provider. + * + * The provider can handle multiple copies of sync rules concurrently, each with their own storage. + * This is to handle replication of a new version of sync rules, while the old version is still active. + * + * Storage APIs for a specific copy of sync rules are provided by the `SyncRulesBucketStorage` instances. + */ export interface BucketStorageFactory extends AsyncDisposableObserverClient { /** * Update sync rules from configuration, if changed. diff --git a/packages/service-core/src/storage/StorageProvider.ts b/packages/service-core/src/storage/StorageProvider.ts index f8adb74b..6db6b346 100644 --- a/packages/service-core/src/storage/StorageProvider.ts +++ b/packages/service-core/src/storage/StorageProvider.ts @@ -16,6 +16,9 @@ export interface GetStorageOptions { resolvedConfig: util.ResolvedPowerSyncConfig; } +/** + * Represents a provider that can create a storage instance for a specific storage type from configuration. + */ export interface BucketStorageProvider { /** * The storage type that this provider provides. diff --git a/packages/service-core/src/storage/SyncRulesBucketStorage.ts b/packages/service-core/src/storage/SyncRulesBucketStorage.ts index 73b58153..5aa3c18d 100644 --- a/packages/service-core/src/storage/SyncRulesBucketStorage.ts +++ b/packages/service-core/src/storage/SyncRulesBucketStorage.ts @@ -8,6 +8,9 @@ import { SourceEntityDescriptor } from './SourceEntity.js'; import { SourceTable } from './SourceTable.js'; import { SyncStorageWriteCheckpointAPI } from './WriteCheckpointAPI.js'; +/** + * Storage for a specific copy of sync rules. + */ export interface SyncRulesBucketStorage extends DisposableObserverClient, SyncStorageWriteCheckpointAPI { @@ -16,17 +19,58 @@ export interface SyncRulesBucketStorage readonly factory: BucketStorageFactory; + /** + * Resolve a table, keeping track of it internally. + */ resolveTable(options: ResolveTableOptions): Promise; + /** + * Use this to get access to update storage data. + */ startBatch( options: StartBatchOptions, callback: (batch: BucketStorageBatch) => Promise ): Promise; - getCheckpoint(): Promise; - getParsedSyncRules(options: ParseSyncRulesOptions): SqlSyncRules; + /** + * Terminate the sync rules. + * + * This clears the storage, and sets state to TERMINATED. + * + * Must only be called on stopped sync rules. + */ + terminate(options?: TerminateOptions): Promise; + + getStatus(): Promise; + + /** + * Clear the storage, without changing state. + */ + clear(): Promise; + + autoActivate(): Promise; + + /** + * Record a replication error. + * + * This could be a recoverable error (e.g. temporary network failure), + * or a permanent error (e.g. missing toast data). + * + * Errors are cleared on commit. + */ + reportError(e: any): Promise; + + compact(options?: CompactOptions): Promise; + + // ## Read operations + + getCheckpoint(): Promise; + + /** + * Used to resolve "dynamic" parameter queries. + */ getParameterSets(checkpoint: util.OpId, lookups: SqliteJsonValue[][]): Promise; /** @@ -59,36 +103,6 @@ export interface SyncRulesBucketStorage * Returns zero checksums for any buckets not found. */ getChecksums(checkpoint: util.OpId, buckets: string[]): Promise; - - /** - * Terminate the sync rules. - * - * This clears the storage, and sets state to TERMINATED. - * - * Must only be called on stopped sync rules. - */ - terminate(options?: TerminateOptions): Promise; - - getStatus(): Promise; - - /** - * Clear the storage, without changing state. - */ - clear(): Promise; - - autoActivate(): Promise; - - /** - * Record a replication error. - * - * This could be a recoverable error (e.g. temporary network failure), - * or a permanent error (e.g. missing toast data). - * - * Errors are cleared on commit. - */ - reportError(e: any): Promise; - - compact(options?: CompactOptions): Promise; } export interface SyncRulesBucketStorageListener extends DisposableListener { diff --git a/packages/service-core/src/sync/sync.ts b/packages/service-core/src/sync/sync.ts index 486cb839..b2e2d95d 100644 --- a/packages/service-core/src/sync/sync.ts +++ b/packages/service-core/src/sync/sync.ts @@ -100,19 +100,19 @@ async function* streamResponseInner( ): AsyncGenerator { // Bucket state of bucket id -> op_id. // This starts with the state from the client. May contain buckets that the user do not have access to (anymore). - let initialBuckets = new Map(); + let initialBucketState = new Map(); const { raw_data, binary_data } = params; if (params.buckets) { for (let { name, after: start } of params.buckets) { - initialBuckets.set(name, { start_op_id: start }); + initialBucketState.set(name, { start_op_id: start }); } } const checkpointUserId = util.checkpointUserId(syncParams.token_parameters.user_id as string, params.client_id); - const checksumState = new BucketChecksumState(bucketStorage, syncRules, syncParams, initialBuckets); + const checksumState = new BucketChecksumState({ bucketStorage, syncRules, syncParams, initialBucketState }); const stream = bucketStorage.watchWriteCheckpoint({ user_id: checkpointUserId, signal, @@ -436,24 +436,32 @@ interface CheckpointLine { bucketsToFetch: BucketDescription[]; } +export interface BucketChecksumStateOptions { + bucketStorage: BucketChecksumStateStorage; + syncRules: SqlSyncRules; + syncParams: RequestParameters; + initialBucketState: Map; +} + +// Use a more specific type to simplify testing +export type BucketChecksumStateStorage = Pick; + export class BucketChecksumState { + private readonly bucketStorage: BucketChecksumStateStorage; + // Bucket state of bucket id -> op_id. // This starts with the state from the client. May contain buckets that the user do not have access to (anymore). - private dataBuckets = new Map(); + public dataBuckets = new Map(); private lastChecksums: util.ChecksumMap | null = null; private lastWriteCheckpoint: bigint | null = null; private readonly parameterState: BucketParameterState; - constructor( - private bucketStorage: storage.SyncRulesBucketStorage, - syncRules: SqlSyncRules, - syncParams: RequestParameters, - initialBucketState: Map - ) { - this.parameterState = new BucketParameterState(bucketStorage, syncRules, syncParams); - this.dataBuckets = initialBucketState; + constructor(options: BucketChecksumStateOptions) { + this.bucketStorage = options.bucketStorage; + this.parameterState = new BucketParameterState(options.bucketStorage, options.syncRules, options.syncParams); + this.dataBuckets = options.initialBucketState; } async buildNextCheckpointLine(next: storage.WriteCheckpoint): Promise { @@ -617,7 +625,7 @@ export interface CheckpointUpdate { } class BucketParameterState { - public readonly bucketStorage: storage.SyncRulesBucketStorage; + public readonly bucketStorage: BucketChecksumStateStorage; public readonly syncRules: SqlSyncRules; public readonly syncParams: RequestParameters; private readonly querier: BucketParameterQuerier; @@ -627,7 +635,7 @@ class BucketParameterState { // First time we're called, we need to fetch all buckets. private invalidated: boolean = true; - constructor(bucketStorage: storage.SyncRulesBucketStorage, syncRules: SqlSyncRules, syncParams: RequestParameters) { + constructor(bucketStorage: BucketChecksumStateStorage, syncRules: SqlSyncRules, syncParams: RequestParameters) { this.bucketStorage = bucketStorage; this.syncRules = syncRules; this.syncParams = syncParams; diff --git a/packages/service-core/test/src/sync/checksum_state.test.ts b/packages/service-core/test/src/sync/checksum_state.test.ts new file mode 100644 index 00000000..3befb07d --- /dev/null +++ b/packages/service-core/test/src/sync/checksum_state.test.ts @@ -0,0 +1,74 @@ +import { BucketChecksum, BucketChecksumState, BucketChecksumStateStorage } from '@/index.js'; +import { RequestParameters, SqlSyncRules } from '@powersync/service-sync-rules'; +import { describe, expect, test } from 'vitest'; + +describe('BucketChecksumState', () => { + test('global bucket', async () => { + const storage: BucketChecksumStateStorage = { + async getChecksums(checkpoint, buckets) { + return new Map( + buckets.map((b, i) => { + return [ + b, + { + bucket: b, + checksum: Number(checkpoint), + count: i + } + ]; + }) + ); + }, + getParameterSets(checkpoint, lookups) { + throw new Error('Method not implemented.'); + } + }; + + const syncRules = SqlSyncRules.fromYaml( + ` +bucket_definitions: + global: + data: [] + `, + { defaultSchema: 'public' } + ); + const state = new BucketChecksumState({ + initialBucketState: new Map(), + syncParams: new RequestParameters({ sub: '' }, {}), + syncRules: syncRules, + bucketStorage: storage + }); + + const line = (await state.buildNextCheckpointLine({ base: { checkpoint: '1', lsn: '1' }, writeCheckpoint: null }))!; + expect(line.checkpointLine).toEqual({ + checkpoint: { + buckets: [{ bucket: 'global[]', checksum: 1, count: 0, priority: 3 }], + last_op_id: '1', + write_checkpoint: undefined + } + }); + expect(line.bucketsToFetch).toEqual([ + { + bucket: 'global[]', + priority: 3 + } + ]); + + state.checkpointFilter({ + invalidate: true + }); + + const line2 = (await state.buildNextCheckpointLine({ + base: { checkpoint: '2', lsn: '2' }, + writeCheckpoint: null + }))!; + expect(line2.checkpointLine).toEqual({ + checkpoint_diff: { + removed_buckets: [], + updated_buckets: [{ bucket: 'global[]', checksum: 2, count: 0, priority: 3 }], + last_op_id: '2', + write_checkpoint: undefined + } + }); + }); +});