Skip to content

Commit

Permalink
Configurable exit_on_error for sync rule validation errors.
Browse files Browse the repository at this point in the history
  • Loading branch information
rkistner committed Feb 10, 2025
1 parent 7429939 commit fc3ecc8
Show file tree
Hide file tree
Showing 17 changed files with 82 additions and 45 deletions.
31 changes: 20 additions & 11 deletions modules/module-mongodb-storage/src/storage/MongoBucketStorage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,22 +101,19 @@ export class MongoBucketStorage
};
}

async configureSyncRules(sync_rules: string, options?: { lock?: boolean }) {
async configureSyncRules(options: storage.UpdateSyncRulesOptions) {
const next = await this.getNextSyncRulesContent();
const active = await this.getActiveSyncRulesContent();

if (next?.sync_rules_content == sync_rules) {
if (next?.sync_rules_content == options.content) {
logger.info('Sync rules from configuration unchanged');
return { updated: false };
} else if (next == null && active?.sync_rules_content == sync_rules) {
} else if (next == null && active?.sync_rules_content == options.content) {
logger.info('Sync rules from configuration unchanged');
return { updated: false };
} else {
logger.info('Sync rules updated from configuration');
const persisted_sync_rules = await this.updateSyncRules({
content: sync_rules,
lock: options?.lock
});
const persisted_sync_rules = await this.updateSyncRules(options);
return { updated: true, persisted_sync_rules, lock: persisted_sync_rules.current_lock ?? undefined };
}
}
Expand All @@ -130,7 +127,8 @@ export class MongoBucketStorage
if (next != null && next.slot_name == slot_name) {
// We need to redo the "next" sync rules
await this.updateSyncRules({
content: next.sync_rules_content
content: next.sync_rules_content,
validate: false
});
// Pro-actively stop replicating
await this.db.sync_rules.updateOne(
Expand All @@ -147,7 +145,8 @@ export class MongoBucketStorage
} else if (next == null && active?.slot_name == slot_name) {
// Slot removed for "active" sync rules, while there is no "next" one.
await this.updateSyncRules({
content: active.sync_rules_content
content: active.sync_rules_content,
validate: false
});

// Pro-actively stop replicating
Expand All @@ -166,8 +165,18 @@ export class MongoBucketStorage
}

async updateSyncRules(options: storage.UpdateSyncRulesOptions): Promise<MongoPersistedSyncRulesContent> {
// We do not validate sync rules at this point.
// That is done when using the sync rules, so that the diagnostics API can report the errors.
if (options.validate) {
// Parse and validate before applying any changes
SqlSyncRules.fromYaml(options.content, {
// No schema-based validation at this point
schema: undefined,
defaultSchema: 'not_applicable', // Not needed for validation
throwOnError: true
});
} else {
// We do not validate sync rules at this point.
// That is done when using the sync rules, so that the diagnostics API can report the errors.
}

let rules: MongoPersistedSyncRulesContent | undefined = undefined;

Expand Down
2 changes: 1 addition & 1 deletion modules/module-mongodb/test/src/change_stream_utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ export class ChangeStreamTestContext {
}

async updateSyncRules(content: string) {
const syncRules = await this.factory.updateSyncRules({ content: content });
const syncRules = await this.factory.updateSyncRules({ content: content, validate: true });
this.storage = this.factory.getInstance(syncRules);
return this.storage!;
}
Expand Down
2 changes: 1 addition & 1 deletion modules/module-mysql/test/src/BinlogStreamUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ export class BinlogStreamTestContext {
}

async updateSyncRules(content: string): Promise<SyncRulesBucketStorage> {
const syncRules = await this.factory.updateSyncRules({ content: content });
const syncRules = await this.factory.updateSyncRules({ content: content, validate: true });
this.storage = this.factory.getInstance(syncRules);
return this.storage!;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import * as framework from '@powersync/lib-services-framework';
import { storage, sync, utils } from '@powersync/service-core';
import { storage, sync, UpdateSyncRulesOptions, utils } from '@powersync/service-core';
import * as pg_wire from '@powersync/service-jpgwire';
import * as sync_rules from '@powersync/service-sync-rules';
import crypto from 'crypto';
Expand Down Expand Up @@ -169,42 +169,40 @@ export class PostgresBucketStorageFactory
}

// TODO possibly share implementation in abstract class
async configureSyncRules(
sync_rules: string,
options?: { lock?: boolean }
): Promise<{
async configureSyncRules(options: UpdateSyncRulesOptions): Promise<{
updated: boolean;
persisted_sync_rules?: storage.PersistedSyncRulesContent;
lock?: storage.ReplicationLock;
}> {
const next = await this.getNextSyncRulesContent();
const active = await this.getActiveSyncRulesContent();

if (next?.sync_rules_content == sync_rules) {
if (next?.sync_rules_content == options.content) {
framework.logger.info('Sync rules from configuration unchanged');
return { updated: false };
} else if (next == null && active?.sync_rules_content == sync_rules) {
} else if (next == null && active?.sync_rules_content == options.content) {
framework.logger.info('Sync rules from configuration unchanged');
return { updated: false };
} else {
framework.logger.info('Sync rules updated from configuration');
const persisted_sync_rules = await this.updateSyncRules({
content: sync_rules,
lock: options?.lock
});
const persisted_sync_rules = await this.updateSyncRules(options);
return { updated: true, persisted_sync_rules, lock: persisted_sync_rules.current_lock ?? undefined };
}
}

async updateSyncRules(options: storage.UpdateSyncRulesOptions): Promise<PostgresPersistedSyncRulesContent> {
// TODO some shared implementation for this might be nice
// Parse and validate before applying any changes
sync_rules.SqlSyncRules.fromYaml(options.content, {
// No schema-based validation at this point
schema: undefined,
defaultSchema: 'not_applicable', // Not needed for validation
throwOnError: true
});
if (options.validate) {
// Parse and validate before applying any changes
sync_rules.SqlSyncRules.fromYaml(options.content, {
// No schema-based validation at this point
schema: undefined,
defaultSchema: 'not_applicable', // Not needed for validation
throwOnError: true
});
} else {
// Apply unconditionally. Any errors will be reported via the diagnostics API.
}

return this.db.transaction(async (db) => {
await db.sql`
Expand Down Expand Up @@ -266,7 +264,8 @@ export class PostgresBucketStorageFactory
if (next != null && next.slot_name == slot_name) {
// We need to redo the "next" sync rules
await this.updateSyncRules({
content: next.sync_rules_content
content: next.sync_rules_content,
validate: false
});
// Pro-actively stop replicating
await this.db.sql`
Expand All @@ -280,7 +279,8 @@ export class PostgresBucketStorageFactory
} else if (next == null && active?.slot_name == slot_name) {
// Slot removed for "active" sync rules, while there is no "next" one.
await this.updateSyncRules({
content: active.sync_rules_content
content: active.sync_rules_content,
validate: false
});

// Pro-actively stop replicating
Expand Down
2 changes: 1 addition & 1 deletion modules/module-postgres/test/src/wal_stream_utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ export class WalStreamTestContext implements AsyncDisposable {
}

async updateSyncRules(content: string) {
const syncRules = await this.factory.updateSyncRules({ content: content });
const syncRules = await this.factory.updateSyncRules({ content: content, validate: true });
this.storage = this.factory.getInstance(syncRules);
return this.storage!;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1502,7 +1502,7 @@ bucket_definitions:
replication_size_bytes: 0
});

const r = await f.configureSyncRules('bucket_definitions: {}');
const r = await f.configureSyncRules({ content: 'bucket_definitions: {}', validate: false });
const storage = f.getInstance(r.persisted_sync_rules!);
await storage.autoActivate();

Expand Down
12 changes: 9 additions & 3 deletions packages/service-core/src/replication/AbstractReplicator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,21 +93,27 @@ export abstract class AbstractReplicator<T extends AbstractReplicationJob = Abst

private async runLoop() {
const syncRules = await this.syncRuleProvider.get();

let configuredLock: storage.ReplicationLock | undefined = undefined;
if (syncRules != null) {
this.logger.info('Loaded sync rules');
try {
// Configure new sync rules, if they have changed.
// In that case, also immediately take out a lock, so that another process doesn't start replication on it.
const { lock } = await this.storage.configureSyncRules(syncRules, {
lock: true

const { lock } = await this.storage.configureSyncRules({
content: syncRules,
lock: true,
validate: this.syncRuleProvider.exitOnError
});
if (lock) {
configuredLock = lock;
}
} catch (e) {
// Log, but continue with previous sync rules
// Log and re-raise to exit.
// Should only reach this due to validation errors if exit_on_error is true.
this.logger.error(`Failed to update sync rules from configuration`, e);
throw e;
}
} else {
this.logger.info('No sync rules configured - configure via API');
Expand Down
5 changes: 4 additions & 1 deletion packages/service-core/src/routes/endpoints/admin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,10 @@ export const reprocess = routeDefinition({
}

const new_rules = await activeBucketStorage.updateSyncRules({
content: active.sync_rules.content
content: active.sync_rules.content,
// These sync rules already passed validation. But if the rules are not valid anymore due
// to a service change, we do want to report the error here.
validate: true
});

const baseConfig = await apiHandler.getSourceConfig();
Expand Down
9 changes: 7 additions & 2 deletions packages/service-core/src/routes/endpoints/sync-rules.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,9 @@ export const deploySyncRules = routeDefinition({
}

const sync_rules = await storageEngine.activeBucketStorage.updateSyncRules({
content: content
content: content,
// Aready validated above
validate: false
});

return {
Expand Down Expand Up @@ -167,7 +169,10 @@ export const reprocessSyncRules = routeDefinition({
}

const new_rules = await activeBucketStorage.updateSyncRules({
content: sync_rules.sync_rules.content
content: sync_rules.sync_rules.content,
// These sync rules already passed validation. But if the rules are not valid anymore due
// to a service change, we do want to report the error here.
validate: true
});
return {
slot_name: new_rules.slot_name
Expand Down
6 changes: 4 additions & 2 deletions packages/service-core/src/storage/BucketStorage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,7 @@ export interface BucketStorageFactory extends AsyncDisposableObserverClient<Buck
* Update sync rules from configuration, if changed.
*/
configureSyncRules(
sync_rules: string,
options?: { lock?: boolean }
options: UpdateSyncRulesOptions
): Promise<{ updated: boolean; persisted_sync_rules?: PersistedSyncRulesContent; lock?: ReplicationLock }>;

/**
Expand All @@ -90,6 +89,8 @@ export interface BucketStorageFactory extends AsyncDisposableObserverClient<Buck

/**
* Deploy new sync rules.
*
* Similar to configureSyncRules, but applies the update unconditionally.
*/
updateSyncRules(options: UpdateSyncRulesOptions): Promise<PersistedSyncRulesContent>;

Expand Down Expand Up @@ -232,6 +233,7 @@ export interface PersistedSyncRules {
export interface UpdateSyncRulesOptions {
content: string;
lock?: boolean;
validate?: boolean;
}

export interface SyncRulesBucketStorageOptions {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,8 @@ export class CompoundConfigCollector {
}
}
return {
present: false
present: false,
exit_on_error: true
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ export class Base64SyncRulesCollector extends SyncRulesCollector {

return {
present: true,
exit_on_error: baseConfig.sync_rules?.exit_on_error ?? true,
content: Buffer.from(sync_rules_base64, 'base64').toString()
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ export class FileSystemSyncRulesCollector extends SyncRulesCollector {
// Only persist the path here, and load on demand using `loadSyncRules()`.
return {
present: true,
exit_on_error: baseConfig.sync_rules?.exit_on_error ?? true,
path: config_path ? path.resolve(path.dirname(config_path), sync_path) : sync_path
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ export class InlineSyncRulesCollector extends SyncRulesCollector {

return {
present: true,
exit_on_error: true,
...baseConfig.sync_rules
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ import fs from 'fs/promises';

export interface SyncRulesProvider {
get(): Promise<string | undefined>;

readonly exitOnError: boolean;
}

export class ConfigurationFileSyncRulesProvider implements SyncRulesProvider {
Expand All @@ -15,4 +17,8 @@ export class ConfigurationFileSyncRulesProvider implements SyncRulesProvider {
return await fs.readFile(this.config.path, 'utf-8');
}
}

get exitOnError() {
return this.config.exit_on_error;
}
}
1 change: 1 addition & 0 deletions packages/service-core/src/util/config/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ export type SyncRulesConfig = {
present: boolean;
content?: string;
path?: string;
exit_on_error: boolean;
};

export type ResolvedPowerSyncConfig = {
Expand Down
3 changes: 2 additions & 1 deletion packages/types/src/config/PowerSyncConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,8 @@ export const powerSyncConfig = t.object({
sync_rules: t
.object({
path: t.string.optional(),
content: t.string.optional()
content: t.string.optional(),
exit_on_error: t.boolean.optional()
})
.optional(),

Expand Down

0 comments on commit fc3ecc8

Please sign in to comment.