Skip to content

Commit

Permalink
detect dropped databases
Browse files Browse the repository at this point in the history
  • Loading branch information
stevensJourney committed Feb 5, 2025
1 parent 0da4c50 commit c90e5be
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 12 deletions.
29 changes: 26 additions & 3 deletions modules/module-mongodb/src/common/MongoLSN.ts
Original file line number Diff line number Diff line change
@@ -1,26 +1,43 @@
import { mongo } from '@powersync/lib-service-mongodb';
import { storage } from '@powersync/service-core';

export type MongoLSNSpecification = {
timestamp: mongo.Timestamp;
/**
* The ResumeToken type here is an alias for `unknown`.
* The docs mention the contents should be of the form.
* We use BSON serialization to store the resume token.
* ```typescript
* {
* "_data" : <BinData|string>
* }
* ```
*/
resume_token?: mongo.ResumeToken;
};

export const ZERO_LSN = '0000000000000000';

const DELIMINATOR = '|';

/**
* Represent a Logical Sequence Number (LSN) for MongoDB replication sources.
* This stores a combination of the cluster timestamp and optional change stream resume token.
*/
export class MongoLSN {
static fromSerialized(comparable: string): MongoLSN {
return new MongoLSN(MongoLSN.deserialize(comparable));
}

private static deserialize(comparable: string): MongoLSNSpecification {
const [timestampString, resumeString] = comparable.split('|');
const [timestampString, resumeString] = comparable.split(DELIMINATOR);

const a = parseInt(timestampString.substring(0, 8), 16);
const b = parseInt(timestampString.substring(8, 16), 16);

return {
timestamp: mongo.Timestamp.fromBits(b, a),
resume_token: resumeString ? JSON.parse(resumeString) : null
resume_token: resumeString ? storage.deserializeBson(Buffer.from(resumeString, 'base64')).resumeToken : null
};
}

Expand All @@ -42,7 +59,13 @@ export class MongoLSN {
const a = timestamp.high.toString(16).padStart(8, '0');
const b = timestamp.low.toString(16).padStart(8, '0');

return `${a}${b}|${resumeToken ? JSON.stringify(resumeToken) : ''}`;
const segments = [`${a}${b}`];

if (resumeToken) {
segments.push(storage.serializeBson({ resumeToken }).toString('base64'));
}

return segments.join(DELIMINATOR);
}

toString() {
Expand Down
36 changes: 30 additions & 6 deletions modules/module-mongodb/src/replication/ChangeStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -218,9 +218,9 @@ export class ChangeStream {
await touch();
}

const lsn = new MongoLSN({ timestamp: snapshotTime });
const { comparable: lsn } = new MongoLSN({ timestamp: snapshotTime });
logger.info(`Snapshot commit at ${snapshotTime.inspect()} / ${lsn}`);
await batch.commit(lsn.comparable);
await batch.commit(lsn);
}
);
} finally {
Expand Down Expand Up @@ -524,7 +524,7 @@ export class ChangeStream {
const startAfter = lastLsn?.timestamp;
const resumeAfter = lastLsn?.resumeToken;

logger.info(`Resume streaming at ${startAfter?.inspect()} / ${lastLsn}}`);
logger.info(`Resume streaming at ${startAfter?.inspect()} / ${lastLsn}`);

const filters = this.getSourceNamespaceFilters();

Expand Down Expand Up @@ -553,6 +553,9 @@ export class ChangeStream {
fullDocument: fullDocument
};

/**
* Only one of these options can be supplied at a time.
*/
if (resumeAfter) {
streamOptions.resumeAfter = resumeAfter;
} else {
Expand Down Expand Up @@ -591,6 +594,11 @@ export class ChangeStream {

const originalChangeDocument = await stream.tryNext();

// The stream was closed, we will only ever receive `null` from it
if (!originalChangeDocument && stream.closed) {
break;
}

if (originalChangeDocument == null || this.abort_signal.aborted) {
continue;
}
Expand Down Expand Up @@ -625,14 +633,30 @@ export class ChangeStream {
throw new ReplicationAssertionError(`Incomplete splitEvent: ${JSON.stringify(splitDocument.splitEvent)}`);
}

// console.log('event', changeDocument);

if (
(changeDocument.operationType == 'insert' ||
changeDocument.operationType == 'update' ||
changeDocument.operationType == 'replace') &&
changeDocument.operationType == 'replace' ||
changeDocument.operationType == 'drop') &&
changeDocument.ns.coll == CHECKPOINTS_COLLECTION
) {
/**
* Dropping the database does not provide an `invalidate` event.
* We typically would receive `drop` events for the collection which we
* would process below.
*
* However we don't commit the LSN after collections are dropped.
* The prevents the `startAfter` or `resumeToken` from advancing past the drop events.
* The stream also closes after the drop events.
* This causes an infinite loop of processing the collection drop events.
*
* This check here invalidates the change stream if our `_checkpoints` collection
* is dropped. This allows for detecting when the DB is dropped.
*/
if (changeDocument.operationType == 'drop') {
throw new ChangeStreamInvalidatedError('_checkpoints collection was dropped');
}

const { comparable: lsn } = new MongoLSN({
timestamp: changeDocument.clusterTime!,
resume_token: changeDocument._id
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { mongo } from '@powersync/lib-service-mongodb';
import { isMongoServerError } from '@powersync/lib-service-mongodb';
import { container } from '@powersync/lib-services-framework';
import { replication } from '@powersync/service-core';

Expand Down Expand Up @@ -85,7 +85,7 @@ export class ChangeStreamReplicationJob extends replication.AbstractReplicationJ
}
if (e instanceof ChangeStreamInvalidatedError) {
throw e;
} else if (e instanceof mongo.MongoError && e.hasErrorLabel('NonResumableChangeStreamError')) {
} else if (isMongoServerError(e) && e.hasErrorLabel('NonResumableChangeStreamError')) {
throw new ChangeStreamInvalidatedError(e.message);
} else {
// Report the error if relevant, before retrying
Expand Down
2 changes: 1 addition & 1 deletion modules/module-mongodb/test/src/resume.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { MongoLSN, ZERO_LSN } from '@module/common/MongoLSN.js';
import { mongo } from '@powersync/lib-service-mongodb';
import { describe, expect, test } from 'vitest';

describe('mongo lsn resume tokens', () => {
describe('mongo lsn', () => {
test('LSN with resume tokens should be comparable', () => {
// Values without a resume token should be comparable
expect(
Expand Down

0 comments on commit c90e5be

Please sign in to comment.