Skip to content

Commit

Permalink
ping/pong msg types, txid util, actuall process forwarded messages
Browse files Browse the repository at this point in the history
  • Loading branch information
tantaman committed Aug 28, 2023
1 parent b8f0d95 commit 5f4c92b
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 25 deletions.
5 changes: 5 additions & 0 deletions packages/ws-common/src/decode.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ export function decode(msg: Uint8Array): Msg {
decoding.readVarInt(decoder),
],
} satisfies ForwardedChanges;
case tags.Ping:
case tags.Pong:
return {
_tag: tag,
};
default:
tag as never;
}
Expand Down
3 changes: 3 additions & 0 deletions packages/ws-common/src/encode.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ export function encode(msg: Msg): Uint8Array {
encoding.writeBigInt64(encoder, msg.newLastSeen[0]);
encoding.writeVarInt(encoder, msg.newLastSeen[1]);
return encoding.toUint8Array(encoder);
case tags.Ping:
case tags.Pong:
return encoding.toUint8Array(encoder);
}
}

Expand Down
9 changes: 8 additions & 1 deletion packages/ws-common/src/msgTypes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ export type Msg =
| RejectChanges
| StartStreaming
| ForwardedAnnouncePresence
| ForwardedChanges;
| ForwardedChanges
| Ping
| Pong;

export const tags = {
AnnouncePresence: 1,
Expand All @@ -13,6 +15,8 @@ export const tags = {
StartStreaming: 4,
ForwardedAnnouncePresence: 5,
ForwardedChanges: 6,
Ping: 7,
Pong: 8,
} as const;

export type Tags = typeof tags;
Expand Down Expand Up @@ -76,3 +80,6 @@ export type ForwardedChanges = Omit<Changes, "_tag"> &
room: string;
newLastSeen: readonly [bigint, number];
}>;

export type Ping = { _tag: Tags["Ping"] };
export type Pong = { _tag: Tags["Pong"] };
70 changes: 48 additions & 22 deletions packages/ws-litefs/src/LiteFSWriteService.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
import { Config, IDB, PresenceResponse, internal } from "@vlcn.io/ws-server";
import net from "net";
import { AnnouncePresence, Change, Changes } from "@vlcn.io/ws-common";
import fs from "fs";
import {
ForwardedAnnouncePresence,
ForwardedChanges,
decode,
encode,
tags,
} from "@vlcn.io/ws-common";
import DBCache from "@vlcn.io/ws-server/src/DBCache.js";
import { util } from "./internal/util.js";

export const port = 9000;

Expand Down Expand Up @@ -34,22 +40,32 @@ class EstablishedConnection {
conn.on("error", this.close);
}

#handleMessage = (data: Buffer) => {
#handleMessage = async (data: Buffer) => {
if (this.#closed) {
return;
}

// - presence
// - apply changes
// - close a given room? We have idle callbacks so that should cover the close case.

// decodes the binary-encoded message
// processes it by passing it to #writer
const msg = decode(data);
switch (msg._tag) {
case tags.ForwardedAnnouncePresence:
await this.#presenceAnnounced(msg.room, msg);
return;
case tags.ForwardedChanges:
await this.#changesReceived(msg.room, msg, msg.newLastSeen);
return;
case tags.Ping:
this.#pingReceived();
return;
default:
throw new Error(
`Unexpected message type on forwarded write service: ${msg._tag}`
);
}
};

async #presenceAnnounced(
room: string,
msg: AnnouncePresence
msg: ForwardedAnnouncePresence
): Promise<PresenceResponse> {
this.#schemaNamesAndVersions.set(room, [msg.schemaName, msg.schemaVersion]);
const dbEntry = this.#getDB(room);
Expand All @@ -64,22 +80,12 @@ class EstablishedConnection {
};
}

const content = await fs.promises.readFile(
internal.getDbPath(room, this.#config) + "-pos",
{ encoding: "utf-8" }
);
const [txidHex, _checksum] = content.split("/");

if (txidHex.length != 16) {
throw new Error("Unexpected txid length");
}

return { txid: BigInt("0x" + txidHex) };
return { txid: await util.getTxId(this.#config, room) };
}

async #changesReceived(
room: string,
msg: Changes,
msg: ForwardedChanges,
newLastSeen: readonly [bigint, number]
) {
const dbEntry = this.#getDB(room);
Expand All @@ -88,6 +94,20 @@ class EstablishedConnection {
await db.applyChangesetAndSetLastSeen(msg.changes, msg.sender, newLastSeen);
}

#pingReceived() {
const cb = (err?: Error) => {
if (err) {
console.error(err);
}
};
this.#conn.write(
encode({
_tag: tags.Pong,
}),
cb
);
}

#getDB(room: string) {
let entry = this.#dbs.get(room);
if (entry != null) {
Expand Down Expand Up @@ -130,7 +150,13 @@ class EstablishedConnection {
}
}
this.#conn.destroy();
// do not destroy the db cache. It is shared with the overall process which is still valid
// when we're downgraded from primary to follower.
};

__dbs_TESTS_ONLY() {
return this.#dbs;
}
}

/**
Expand Down
6 changes: 4 additions & 2 deletions packages/ws-litefs/src/internal/PrimaryConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,16 @@ class PrimarySocket {
this.#socket = new net.Socket();

this.#socket.connect(port, this.#currentPrimaryHostname, this.#onConnected);
this.#socket.on("data", this.#onData);
this.#socket.on("data", this.#handleMessage);
this.#socket.on("error", this.#onError);
this.#socket.on("close", this.#onClose);
}

#onConnected = () => {};

#onData = () => {};
#handleMessage = (data: Buffer) => {
// ping pong processing to re-establish connection that was broken for unknown reasons
};
#onError = () => {};
#onClose = () => {};

Expand Down
15 changes: 15 additions & 0 deletions packages/ws-litefs/src/internal/util.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { Config, internal } from "@vlcn.io/ws-server";
import fs from "fs";

export let primaryFilePath = "/var/lib/litefs/.primary";
Expand All @@ -13,4 +14,18 @@ export const util = {
return null;
});
},

async getTxId(config: Config, room: string) {
const content = await fs.promises.readFile(
internal.getDbPath(room, config) + "-pos",
{ encoding: "utf-8" }
);
const [txidHex, _checksum] = content.split("/");

if (txidHex.length != 16) {
throw new Error("Unexpected txid length");
}

return BigInt("0x" + txidHex);
},
};

0 comments on commit 5f4c92b

Please sign in to comment.