Skip to content

Commit

Permalink
Format
Browse files Browse the repository at this point in the history
  • Loading branch information
simolus3 committed Feb 3, 2025
1 parent fd152bf commit f98ef57
Show file tree
Hide file tree
Showing 18 changed files with 198 additions and 119 deletions.
12 changes: 9 additions & 3 deletions docs/compacting-operations.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,10 @@ The second part is compacting to CLEAR operations. For each bucket, we keep trac
For an initial workaround, defragmenting can be performed outside powersync by touching all rows in a bucket:

```sql
update mytable set id = id
-- Repeat the above for other tables in the same bucket if relevant
UPDATE mytable
SET
id = id
-- Repeat the above for other tables in the same bucket if relevant
```

After this, the normal MOVE + CLEAR compacting will compact the bucket to only have a single operation per active row.
Expand All @@ -86,7 +88,11 @@ This would cause existing clients to re-sync every row, while reducing the numbe
If an updated_at column or similar is present, we can use this to defragment more incrementally:

```sql
update mytable set id = id where updated_at < now() - interval '1 week'
UPDATE mytable
SET
id = id
WHERE
updated_at < now() - interval '1 week'
```

This version avoids unnecessary defragmentation of rows modified recently.
Expand Down
2 changes: 1 addition & 1 deletion docs/postgres-initial-replication.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ This is our first approach.
We start by creating a logical replication slot, exporting a snapshot:

```sql
CREATE_REPLICATION_SLOT <slot> LOGICAL pgoutput EXPORT_SNAPSHOT
CREATE_REPLICATION_SLOT < slot > LOGICAL pgoutput EXPORT_SNAPSHOT
```

While that connection stays open, we create another connection with a transaction, and read each table:
Expand Down
2 changes: 1 addition & 1 deletion packages/service-core-tests/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@

A small helper package which exposes common unit tests and test utility functions.

This package is used in various modules for their unit tests.
This package is used in various modules for their unit tests.
Original file line number Diff line number Diff line change
Expand Up @@ -394,13 +394,13 @@ bucket_definitions:
const parameter_sets = await bucketStorage.getParameterSets(checkpoint, lookups);
expect(parameter_sets).toEqual([{ workspace_id: 'workspace1' }]);

const buckets = await sync_rules.queryBucketIds({
const buckets = await sync_rules.queryBucketDescriptions({
getParameterSets(lookups) {
return bucketStorage.getParameterSets(checkpoint, lookups);
},
parameters
});
expect(buckets).toEqual(['by_workspace["workspace1"]']);
expect(buckets).toEqual([{ bucket: 'by_workspace["workspace1"]', priority: 1 }]);
});

test('save and load parameters with dynamic global buckets', async () => {
Expand Down Expand Up @@ -466,14 +466,17 @@ bucket_definitions:
parameter_sets.sort((a, b) => JSON.stringify(a).localeCompare(JSON.stringify(b)));
expect(parameter_sets).toEqual([{ workspace_id: 'workspace1' }, { workspace_id: 'workspace3' }]);

const buckets = await sync_rules.queryBucketIds({
const buckets = await sync_rules.queryBucketDescriptions({
getParameterSets(lookups) {
return bucketStorage.getParameterSets(checkpoint, lookups);
},
parameters
});
buckets.sort();
expect(buckets).toEqual(['by_public_workspace["workspace1"]', 'by_public_workspace["workspace3"]']);
buckets.sort((a, b) => a.bucket.localeCompare(b.bucket));
expect(buckets).toEqual([
{ bucket: 'by_public_workspace["workspace1"]', priority: 1 },
{ bucket: 'by_public_workspace["workspace3"]', priority: 1 }
]);
});

test('multiple parameter queries', async () => {
Expand Down Expand Up @@ -562,12 +565,14 @@ bucket_definitions:
expect(parameter_sets2).toEqual([{ workspace_id: 'workspace3' }]);

// Test final values - the important part
const buckets = await sync_rules.queryBucketIds({
getParameterSets(lookups) {
return bucketStorage.getParameterSets(checkpoint, lookups);
},
parameters
});
const buckets = (
await sync_rules.queryBucketDescriptions({
getParameterSets(lookups) {
return bucketStorage.getParameterSets(checkpoint, lookups);
},
parameters
})
).map((e) => e.bucket);
buckets.sort();
expect(buckets).toEqual(['by_workspace["workspace1"]', 'by_workspace["workspace3"]']);
});
Expand Down
27 changes: 17 additions & 10 deletions packages/service-core/src/sync/sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ async function* streamResponseInner(
for (let bucket of allBuckets) {
dataBucketsNew.set(bucket.bucket, {
description: bucket,
start_op_id: dataBuckets.get(bucket.bucket)?.start_op_id ?? '0',
start_op_id: dataBuckets.get(bucket.bucket)?.start_op_id ?? '0'
});
}
dataBuckets = dataBucketsNew;
Expand All @@ -167,7 +167,10 @@ async function* streamResponseInner(
// No changes - don't send anything to the client
continue;
}
const updatedBucketDescriptions = diff.updatedBuckets.map((e) => ({...e, priority: dataBuckets.get(e.bucket)!.description!.priority}));
const updatedBucketDescriptions = diff.updatedBuckets.map((e) => ({
...e,
priority: dataBuckets.get(e.bucket)!.description!.priority
}));
bucketsToFetch = updatedBucketDescriptions;

let message = `Updated checkpoint: ${checkpoint} | `;
Expand All @@ -188,7 +191,7 @@ async function* streamResponseInner(
last_op_id: checkpoint,
write_checkpoint: writeCheckpoint ? String(writeCheckpoint) : undefined,
removed_buckets: diff.removedBuckets,
updated_buckets: updatedBucketDescriptions,
updated_buckets: updatedBucketDescriptions
}
};

Expand All @@ -202,7 +205,10 @@ async function* streamResponseInner(
checkpoint: {
last_op_id: checkpoint,
write_checkpoint: writeCheckpoint ? String(writeCheckpoint) : undefined,
buckets: [...checksumMap.values()].map((e) => ({...e, priority: dataBuckets.get(e.bucket)!.description!.priority})),
buckets: [...checksumMap.values()].map((e) => ({
...e,
priority: dataBuckets.get(e.bucket)!.description!.priority
}))
}
};
yield checksum_line;
Expand All @@ -229,9 +235,9 @@ async function* streamResponseInner(
signal,
tracker,
user_id: syncParams.user_id,
forPriority: currentPriority !== lowestPriority ? currentPriority : undefined,
forPriority: currentPriority !== lowestPriority ? currentPriority : undefined
});
}
};

for (let i = 0; i < bucketsToFetch.length; i++) {
if (bucketsToFetch[i].priority == currentPriority) {
Expand Down Expand Up @@ -326,7 +332,9 @@ async function* bucketDataBatch(request: BucketDataRequest): AsyncGenerator<Buck
}
// Optimization: Only fetch buckets for which the checksums have changed since the last checkpoint
// For the first batch, this will be all buckets.
const filteredBuckets = new Map(bucketsToFetch.map((bucket) => [bucket.bucket, dataBuckets.get(bucket.bucket)?.start_op_id!]));
const filteredBuckets = new Map(
bucketsToFetch.map((bucket) => [bucket.bucket, dataBuckets.get(bucket.bucket)?.start_op_id!])
);
const data = storage.getBucketDataBatch(checkpoint, filteredBuckets);

let has_more = false;
Expand Down Expand Up @@ -388,7 +396,7 @@ async function* bucketDataBatch(request: BucketDataRequest): AsyncGenerator<Buck
const line: util.StreamingSyncCheckpointPartiallyComplete = {
partial_checkpoint_complete: {
last_op_id: checkpoint,
priority: request.forPriority,
priority: request.forPriority
}
};
yield { data: line, done: true };
Expand All @@ -400,7 +408,6 @@ async function* bucketDataBatch(request: BucketDataRequest): AsyncGenerator<Buck
};
yield { data: line, done: true };
}

}
}
} finally {
Expand Down Expand Up @@ -428,7 +435,7 @@ function transformLegacyResponse(bucketData: util.SyncBucketData): any {
};
}

function limitedBuckets(buckets: string[] | {bucket: string}[], limit: number) {
function limitedBuckets(buckets: string[] | { bucket: string }[], limit: number) {
buckets = buckets.map((b) => {
if (typeof b != 'string') {
return b.bucket;
Expand Down
5 changes: 2 additions & 3 deletions packages/service-core/src/util/protocol-types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ export interface StreamingSyncCheckpointPartiallyComplete {
partial_checkpoint_complete: {
last_op_id: OpId;
priority: BucketPriority;
}
};
}

export interface StreamingSyncKeepalive {
Expand Down Expand Up @@ -153,5 +153,4 @@ export interface BucketChecksum {
count: number;
}

export interface BucketChecksumWithDescription extends BucketChecksum, BucketDescription {
}
export interface BucketChecksumWithDescription extends BucketChecksum, BucketDescription {}
24 changes: 12 additions & 12 deletions packages/sync-rules/src/BucketDescription.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
/**
* The priority in which to synchronize buckets.
*
*
* Lower numbers represent higher priorities.
* Generally, the sync service _may_ synchronize buckets with higher priorities first.
* Priorities also refine the consistency notion by the sync service in the way that clients
* may choose to publish data when all buckets of a certain priority have been synchronized.
* So, when clients are synchronizing buckets with different priorities, they will only get
* consistent views within each priority.
*
*
* Additionally, data from buckets with priority 0 may be made visible when clients still
* have data in their upload queue.
*/
Expand All @@ -16,13 +16,13 @@ export type BucketPriority = 0 | 1 | 2 | 3;
export const defaultBucketPriority: BucketPriority = 1;

export interface BucketDescription {
/**
* The id of the bucket, which is derived from the name of the bucket's definition
* in the sync rules as well as the values returned by the parameter queries.
*/
bucket: string;
/**
* The priority used to synchronize this bucket, derived from its definition.
*/
priority: BucketPriority;
};
/**
* The id of the bucket, which is derived from the name of the bucket's definition
* in the sync rules as well as the values returned by the parameter queries.
*/
bucket: string;
/**
* The priority used to synchronize this bucket, derived from its definition.
*/
priority: BucketPriority;
}
6 changes: 4 additions & 2 deletions packages/sync-rules/src/SqlParameterQuery.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,9 @@ export class SqlParameterQuery {
const where = q.where;
const filter = tools.compileWhereClause(where);

const bucket_parameters = (q.columns ?? []).map((column) => tools.getOutputName(column));
const bucket_parameters = (q.columns ?? [])
.map((column) => tools.getOutputName(column))
.filter((c) => !tools.isBucketPriorityParameter(c));
rows.sourceTable = sourceTable;
rows.table = alias;
rows.sql = sql;
Expand Down Expand Up @@ -279,7 +281,7 @@ export class SqlParameterQuery {

return {
bucket: getBucketId(this.descriptor_name!, this.bucket_parameters!, result),
priority: this.priority ?? defaultBucketPriority,
priority: this.priority ?? defaultBucketPriority
};
})
.filter((lookup) => lookup != null);
Expand Down
14 changes: 9 additions & 5 deletions packages/sync-rules/src/StaticSqlParameterQuery.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ export class StaticSqlParameterQuery {

const filter = tools.compileParameterValueExtractor(where);
const columns = q.columns ?? [];
const bucket_parameters = columns.map((column) => tools.getOutputName(column));
const bucket_parameters = (q.columns ?? [])
.map((column) => tools.getOutputName(column))
.filter((c) => !tools.isBucketPriorityParameter(c));

query.sql = sql;
query.descriptor_name = descriptor_name;
Expand Down Expand Up @@ -106,10 +108,12 @@ export class StaticSqlParameterQuery {
}
}

return [{
bucket: getBucketId(this.descriptor_name!, this.bucket_parameters!, result),
priority: this.priority ?? defaultBucketPriority,
}];
return [
{
bucket: getBucketId(this.descriptor_name!, this.bucket_parameters!, result),
priority: this.priority ?? defaultBucketPriority
}
];
}

get hasAuthenticatedBucketParameters(): boolean {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ export class TableValuedFunctionSqlParameterQuery {
for (let row of rows) {
const description = this.getIndividualBucketDescription(row, parameters);
if (description !== null) {
total.push(description);
total.push(description);
}
}
return total;
Expand Down Expand Up @@ -166,7 +166,7 @@ export class TableValuedFunctionSqlParameterQuery {

return {
bucket: getBucketId(this.descriptor_name!, this.bucket_parameters!, result),
priority: this.priority ?? defaultBucketPriority,
priority: this.priority ?? defaultBucketPriority
};
}

Expand Down
2 changes: 1 addition & 1 deletion packages/sync-rules/src/sql_filters.ts
Original file line number Diff line number Diff line change
Expand Up @@ -741,7 +741,7 @@ export class SqlTools {
}

isBucketPriorityParameter(name: string): boolean {
return name == "_priority";
return name == '_priority';
}

extractBucketPriority(expr: Expr): BucketPriority | undefined {
Expand Down
2 changes: 1 addition & 1 deletion packages/sync-rules/src/sql_functions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ const iif: DocumentedSqlFunction = {
parameters: [
{ name: 'x', type: ExpressionType.ANY, optional: false },
{ name: 'y', type: ExpressionType.ANY, optional: false },
{ name: 'z', type: ExpressionType.ANY, optional: false },
{ name: 'z', type: ExpressionType.ANY, optional: false }
],
getReturnType() {
return ExpressionType.ANY;
Expand Down
Loading

0 comments on commit f98ef57

Please sign in to comment.