Skip to content

Commit

Permalink
Add initial test for BucketChecksumState.
Browse files Browse the repository at this point in the history
  • Loading branch information
rkistner committed Feb 10, 2025
1 parent 0124b29 commit 5df24a0
Show file tree
Hide file tree
Showing 5 changed files with 153 additions and 46 deletions.
8 changes: 8 additions & 0 deletions packages/service-core/src/storage/BucketStorageFactory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,14 @@ import { ReplicationEventPayload } from './ReplicationEventPayload.js';
import { ReplicationLock } from './ReplicationLock.js';
import { SyncRulesBucketStorage } from './SyncRulesBucketStorage.js';

/**
* Represents a configured storage provider.
*
* The provider can handle multiple copies of sync rules concurrently, each with their own storage.
* This is to handle replication of a new version of sync rules, while the old version is still active.
*
* Storage APIs for a specific copy of sync rules are provided by the `SyncRulesBucketStorage` instances.
*/
export interface BucketStorageFactory extends AsyncDisposableObserverClient<BucketStorageFactoryListener> {
/**
* Update sync rules from configuration, if changed.
Expand Down
3 changes: 3 additions & 0 deletions packages/service-core/src/storage/StorageProvider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ export interface GetStorageOptions {
resolvedConfig: util.ResolvedPowerSyncConfig;
}

/**
* Represents a provider that can create a storage instance for a specific storage type from configuration.
*/
export interface BucketStorageProvider {
/**
* The storage type that this provider provides.
Expand Down
78 changes: 46 additions & 32 deletions packages/service-core/src/storage/SyncRulesBucketStorage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ import { SourceEntityDescriptor } from './SourceEntity.js';
import { SourceTable } from './SourceTable.js';
import { SyncStorageWriteCheckpointAPI } from './WriteCheckpointAPI.js';

/**
* Storage for a specific copy of sync rules.
*/
export interface SyncRulesBucketStorage
extends DisposableObserverClient<SyncRulesBucketStorageListener>,
SyncStorageWriteCheckpointAPI {
Expand All @@ -16,17 +19,58 @@ export interface SyncRulesBucketStorage

readonly factory: BucketStorageFactory;

/**
* Resolve a table, keeping track of it internally.
*/
resolveTable(options: ResolveTableOptions): Promise<ResolveTableResult>;

/**
* Use this to get access to update storage data.
*/
startBatch(
options: StartBatchOptions,
callback: (batch: BucketStorageBatch) => Promise<void>
): Promise<FlushedResult | null>;

getCheckpoint(): Promise<ReplicationCheckpoint>;

getParsedSyncRules(options: ParseSyncRulesOptions): SqlSyncRules;

/**
* Terminate the sync rules.
*
* This clears the storage, and sets state to TERMINATED.
*
* Must only be called on stopped sync rules.
*/
terminate(options?: TerminateOptions): Promise<void>;

getStatus(): Promise<SyncRuleStatus>;

/**
* Clear the storage, without changing state.
*/
clear(): Promise<void>;

autoActivate(): Promise<void>;

/**
* Record a replication error.
*
* This could be a recoverable error (e.g. temporary network failure),
* or a permanent error (e.g. missing toast data).
*
* Errors are cleared on commit.
*/
reportError(e: any): Promise<void>;

compact(options?: CompactOptions): Promise<void>;

// ## Read operations

getCheckpoint(): Promise<ReplicationCheckpoint>;

/**
* Used to resolve "dynamic" parameter queries.
*/
getParameterSets(checkpoint: util.OpId, lookups: SqliteJsonValue[][]): Promise<SqliteJsonRow[]>;

/**
Expand Down Expand Up @@ -59,36 +103,6 @@ export interface SyncRulesBucketStorage
* Returns zero checksums for any buckets not found.
*/
getChecksums(checkpoint: util.OpId, buckets: string[]): Promise<util.ChecksumMap>;

/**
* Terminate the sync rules.
*
* This clears the storage, and sets state to TERMINATED.
*
* Must only be called on stopped sync rules.
*/
terminate(options?: TerminateOptions): Promise<void>;

getStatus(): Promise<SyncRuleStatus>;

/**
* Clear the storage, without changing state.
*/
clear(): Promise<void>;

autoActivate(): Promise<void>;

/**
* Record a replication error.
*
* This could be a recoverable error (e.g. temporary network failure),
* or a permanent error (e.g. missing toast data).
*
* Errors are cleared on commit.
*/
reportError(e: any): Promise<void>;

compact(options?: CompactOptions): Promise<void>;
}

export interface SyncRulesBucketStorageListener extends DisposableListener {
Expand Down
36 changes: 22 additions & 14 deletions packages/service-core/src/sync/sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,19 +100,19 @@ async function* streamResponseInner(
): AsyncGenerator<util.StreamingSyncLine | string | null> {
// Bucket state of bucket id -> op_id.
// This starts with the state from the client. May contain buckets that the user do not have access to (anymore).
let initialBuckets = new Map<string, BucketSyncState>();
let initialBucketState = new Map<string, BucketSyncState>();

const { raw_data, binary_data } = params;

if (params.buckets) {
for (let { name, after: start } of params.buckets) {
initialBuckets.set(name, { start_op_id: start });
initialBucketState.set(name, { start_op_id: start });
}
}

const checkpointUserId = util.checkpointUserId(syncParams.token_parameters.user_id as string, params.client_id);

const checksumState = new BucketChecksumState(bucketStorage, syncRules, syncParams, initialBuckets);
const checksumState = new BucketChecksumState({ bucketStorage, syncRules, syncParams, initialBucketState });
const stream = bucketStorage.watchWriteCheckpoint({
user_id: checkpointUserId,
signal,
Expand Down Expand Up @@ -436,24 +436,32 @@ interface CheckpointLine {
bucketsToFetch: BucketDescription[];
}

export interface BucketChecksumStateOptions {
bucketStorage: BucketChecksumStateStorage;
syncRules: SqlSyncRules;
syncParams: RequestParameters;
initialBucketState: Map<string, BucketSyncState>;
}

// Use a more specific type to simplify testing
export type BucketChecksumStateStorage = Pick<storage.SyncRulesBucketStorage, 'getChecksums' | 'getParameterSets'>;

export class BucketChecksumState {
private readonly bucketStorage: BucketChecksumStateStorage;

// Bucket state of bucket id -> op_id.
// This starts with the state from the client. May contain buckets that the user do not have access to (anymore).
private dataBuckets = new Map<string, BucketSyncState>();
public dataBuckets = new Map<string, BucketSyncState>();

private lastChecksums: util.ChecksumMap | null = null;
private lastWriteCheckpoint: bigint | null = null;

private readonly parameterState: BucketParameterState;

constructor(
private bucketStorage: storage.SyncRulesBucketStorage,
syncRules: SqlSyncRules,
syncParams: RequestParameters,
initialBucketState: Map<string, BucketSyncState>
) {
this.parameterState = new BucketParameterState(bucketStorage, syncRules, syncParams);
this.dataBuckets = initialBucketState;
constructor(options: BucketChecksumStateOptions) {
this.bucketStorage = options.bucketStorage;
this.parameterState = new BucketParameterState(options.bucketStorage, options.syncRules, options.syncParams);
this.dataBuckets = options.initialBucketState;
}

async buildNextCheckpointLine(next: storage.WriteCheckpoint): Promise<CheckpointLine | null> {
Expand Down Expand Up @@ -617,7 +625,7 @@ export interface CheckpointUpdate {
}

class BucketParameterState {
public readonly bucketStorage: storage.SyncRulesBucketStorage;
public readonly bucketStorage: BucketChecksumStateStorage;
public readonly syncRules: SqlSyncRules;
public readonly syncParams: RequestParameters;
private readonly querier: BucketParameterQuerier;
Expand All @@ -627,7 +635,7 @@ class BucketParameterState {
// First time we're called, we need to fetch all buckets.
private invalidated: boolean = true;

constructor(bucketStorage: storage.SyncRulesBucketStorage, syncRules: SqlSyncRules, syncParams: RequestParameters) {
constructor(bucketStorage: BucketChecksumStateStorage, syncRules: SqlSyncRules, syncParams: RequestParameters) {
this.bucketStorage = bucketStorage;
this.syncRules = syncRules;
this.syncParams = syncParams;
Expand Down
74 changes: 74 additions & 0 deletions packages/service-core/test/src/sync/checksum_state.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import { BucketChecksum, BucketChecksumState, BucketChecksumStateStorage } from '@/index.js';
import { RequestParameters, SqlSyncRules } from '@powersync/service-sync-rules';
import { describe, expect, test } from 'vitest';

describe('BucketChecksumState', () => {
test('global bucket', async () => {
const storage: BucketChecksumStateStorage = {
async getChecksums(checkpoint, buckets) {
return new Map<string, BucketChecksum>(
buckets.map((b, i) => {
return [
b,
{
bucket: b,
checksum: Number(checkpoint),
count: i
}
];
})
);
},
getParameterSets(checkpoint, lookups) {
throw new Error('Method not implemented.');
}
};

const syncRules = SqlSyncRules.fromYaml(
`
bucket_definitions:
global:
data: []
`,
{ defaultSchema: 'public' }
);
const state = new BucketChecksumState({
initialBucketState: new Map(),
syncParams: new RequestParameters({ sub: '' }, {}),
syncRules: syncRules,
bucketStorage: storage
});

const line = (await state.buildNextCheckpointLine({ base: { checkpoint: '1', lsn: '1' }, writeCheckpoint: null }))!;
expect(line.checkpointLine).toEqual({
checkpoint: {
buckets: [{ bucket: 'global[]', checksum: 1, count: 0, priority: 3 }],
last_op_id: '1',
write_checkpoint: undefined
}
});
expect(line.bucketsToFetch).toEqual([
{
bucket: 'global[]',
priority: 3
}
]);

state.checkpointFilter({
invalidate: true
});

const line2 = (await state.buildNextCheckpointLine({
base: { checkpoint: '2', lsn: '2' },
writeCheckpoint: null
}))!;
expect(line2.checkpointLine).toEqual({
checkpoint_diff: {
removed_buckets: [],
updated_buckets: [{ bucket: 'global[]', checksum: 2, count: 0, priority: 3 }],
last_op_id: '2',
write_checkpoint: undefined
}
});
});
});

0 comments on commit 5df24a0

Please sign in to comment.