From c90e5be99539ad6ae3c71ab1c349900009d5c2d9 Mon Sep 17 00:00:00 2001 From: stevensJourney Date: Wed, 5 Feb 2025 15:52:44 +0200 Subject: [PATCH] detect dropped databases --- modules/module-mongodb/src/common/MongoLSN.ts | 29 +++++++++++++-- .../src/replication/ChangeStream.ts | 36 +++++++++++++++---- .../replication/ChangeStreamReplicationJob.ts | 4 +-- .../module-mongodb/test/src/resume.test.ts | 2 +- 4 files changed, 59 insertions(+), 12 deletions(-) diff --git a/modules/module-mongodb/src/common/MongoLSN.ts b/modules/module-mongodb/src/common/MongoLSN.ts index 7d012ebd..7238dcdf 100644 --- a/modules/module-mongodb/src/common/MongoLSN.ts +++ b/modules/module-mongodb/src/common/MongoLSN.ts @@ -1,26 +1,43 @@ import { mongo } from '@powersync/lib-service-mongodb'; +import { storage } from '@powersync/service-core'; export type MongoLSNSpecification = { timestamp: mongo.Timestamp; + /** + * The ResumeToken type here is an alias for `unknown`. + * The docs mention the contents should be of the form. + * We use BSON serialization to store the resume token. + * ```typescript + * { + * "_data" : + * } + * ``` + */ resume_token?: mongo.ResumeToken; }; export const ZERO_LSN = '0000000000000000'; +const DELIMINATOR = '|'; + +/** + * Represent a Logical Sequence Number (LSN) for MongoDB replication sources. + * This stores a combination of the cluster timestamp and optional change stream resume token. + */ export class MongoLSN { static fromSerialized(comparable: string): MongoLSN { return new MongoLSN(MongoLSN.deserialize(comparable)); } private static deserialize(comparable: string): MongoLSNSpecification { - const [timestampString, resumeString] = comparable.split('|'); + const [timestampString, resumeString] = comparable.split(DELIMINATOR); const a = parseInt(timestampString.substring(0, 8), 16); const b = parseInt(timestampString.substring(8, 16), 16); return { timestamp: mongo.Timestamp.fromBits(b, a), - resume_token: resumeString ? JSON.parse(resumeString) : null + resume_token: resumeString ? storage.deserializeBson(Buffer.from(resumeString, 'base64')).resumeToken : null }; } @@ -42,7 +59,13 @@ export class MongoLSN { const a = timestamp.high.toString(16).padStart(8, '0'); const b = timestamp.low.toString(16).padStart(8, '0'); - return `${a}${b}|${resumeToken ? JSON.stringify(resumeToken) : ''}`; + const segments = [`${a}${b}`]; + + if (resumeToken) { + segments.push(storage.serializeBson({ resumeToken }).toString('base64')); + } + + return segments.join(DELIMINATOR); } toString() { diff --git a/modules/module-mongodb/src/replication/ChangeStream.ts b/modules/module-mongodb/src/replication/ChangeStream.ts index 1dfdc0f8..0087e7f1 100644 --- a/modules/module-mongodb/src/replication/ChangeStream.ts +++ b/modules/module-mongodb/src/replication/ChangeStream.ts @@ -218,9 +218,9 @@ export class ChangeStream { await touch(); } - const lsn = new MongoLSN({ timestamp: snapshotTime }); + const { comparable: lsn } = new MongoLSN({ timestamp: snapshotTime }); logger.info(`Snapshot commit at ${snapshotTime.inspect()} / ${lsn}`); - await batch.commit(lsn.comparable); + await batch.commit(lsn); } ); } finally { @@ -524,7 +524,7 @@ export class ChangeStream { const startAfter = lastLsn?.timestamp; const resumeAfter = lastLsn?.resumeToken; - logger.info(`Resume streaming at ${startAfter?.inspect()} / ${lastLsn}}`); + logger.info(`Resume streaming at ${startAfter?.inspect()} / ${lastLsn}`); const filters = this.getSourceNamespaceFilters(); @@ -553,6 +553,9 @@ export class ChangeStream { fullDocument: fullDocument }; + /** + * Only one of these options can be supplied at a time. + */ if (resumeAfter) { streamOptions.resumeAfter = resumeAfter; } else { @@ -591,6 +594,11 @@ export class ChangeStream { const originalChangeDocument = await stream.tryNext(); + // The stream was closed, we will only ever receive `null` from it + if (!originalChangeDocument && stream.closed) { + break; + } + if (originalChangeDocument == null || this.abort_signal.aborted) { continue; } @@ -625,14 +633,30 @@ export class ChangeStream { throw new ReplicationAssertionError(`Incomplete splitEvent: ${JSON.stringify(splitDocument.splitEvent)}`); } - // console.log('event', changeDocument); - if ( (changeDocument.operationType == 'insert' || changeDocument.operationType == 'update' || - changeDocument.operationType == 'replace') && + changeDocument.operationType == 'replace' || + changeDocument.operationType == 'drop') && changeDocument.ns.coll == CHECKPOINTS_COLLECTION ) { + /** + * Dropping the database does not provide an `invalidate` event. + * We typically would receive `drop` events for the collection which we + * would process below. + * + * However we don't commit the LSN after collections are dropped. + * The prevents the `startAfter` or `resumeToken` from advancing past the drop events. + * The stream also closes after the drop events. + * This causes an infinite loop of processing the collection drop events. + * + * This check here invalidates the change stream if our `_checkpoints` collection + * is dropped. This allows for detecting when the DB is dropped. + */ + if (changeDocument.operationType == 'drop') { + throw new ChangeStreamInvalidatedError('_checkpoints collection was dropped'); + } + const { comparable: lsn } = new MongoLSN({ timestamp: changeDocument.clusterTime!, resume_token: changeDocument._id diff --git a/modules/module-mongodb/src/replication/ChangeStreamReplicationJob.ts b/modules/module-mongodb/src/replication/ChangeStreamReplicationJob.ts index cc6f8e11..3d5b252c 100644 --- a/modules/module-mongodb/src/replication/ChangeStreamReplicationJob.ts +++ b/modules/module-mongodb/src/replication/ChangeStreamReplicationJob.ts @@ -1,4 +1,4 @@ -import { mongo } from '@powersync/lib-service-mongodb'; +import { isMongoServerError } from '@powersync/lib-service-mongodb'; import { container } from '@powersync/lib-services-framework'; import { replication } from '@powersync/service-core'; @@ -85,7 +85,7 @@ export class ChangeStreamReplicationJob extends replication.AbstractReplicationJ } if (e instanceof ChangeStreamInvalidatedError) { throw e; - } else if (e instanceof mongo.MongoError && e.hasErrorLabel('NonResumableChangeStreamError')) { + } else if (isMongoServerError(e) && e.hasErrorLabel('NonResumableChangeStreamError')) { throw new ChangeStreamInvalidatedError(e.message); } else { // Report the error if relevant, before retrying diff --git a/modules/module-mongodb/test/src/resume.test.ts b/modules/module-mongodb/test/src/resume.test.ts index 82c4a9e9..e56c3453 100644 --- a/modules/module-mongodb/test/src/resume.test.ts +++ b/modules/module-mongodb/test/src/resume.test.ts @@ -2,7 +2,7 @@ import { MongoLSN, ZERO_LSN } from '@module/common/MongoLSN.js'; import { mongo } from '@powersync/lib-service-mongodb'; import { describe, expect, test } from 'vitest'; -describe('mongo lsn resume tokens', () => { +describe('mongo lsn', () => { test('LSN with resume tokens should be comparable', () => { // Values without a resume token should be comparable expect(