Skip to content

Commit

Permalink
start switching back to a DBFactory design
Browse files Browse the repository at this point in the history
We have more information at this level and cleanup is easier to manage as the connection over which to forward writes is per-db.
  • Loading branch information
tantaman committed Aug 24, 2023
1 parent a766da6 commit da4b20a
Show file tree
Hide file tree
Showing 14 changed files with 41 additions and 54 deletions.
3 changes: 2 additions & 1 deletion packages/ws-litefs/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
"scripts": {
"build": "tsc --build",
"watch": "tsc --build --watch",
"test": "vitest run",
"test": "echo no tests here yet",
"deep-clean": "rm -rf ./dist || true && rm tsconfig.tsbuildinfo || true"
},
"repository": {
Expand All @@ -24,6 +24,7 @@
"@vlcn.io/ws-common": "workspace:*",
"@vlcn.io/ws-server": "workspace:*",
"better-sqlite3": "^8.3.0",
"chokidar": "^3.5.3",
"throttle-debounce": "^5.0.0",
"winston": "^3.10.0"
},
Expand Down
Empty file.
2 changes: 1 addition & 1 deletion packages/ws-litefs/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
export { LiteFSWriteForwarder } from "./LiteFSWriteForwarder.js";
export { LiteFSWriteForwarder } from "./internal/LiteFSWriteForwarder.js";
export { createLiteFSWriteService } from "./LiteFSWriteService.js";
9 changes: 9 additions & 0 deletions packages/ws-litefs/src/internal/PrimaryStatusObserver.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import { Config } from "@vlcn.io/ws-server";
import chokidar from "chokidar";
import path from "path";

/**
* This will observe the db directory for `-pos` files.
Expand All @@ -15,6 +17,13 @@ export default class PrimaryStatusObserver {

constructor(config: Config) {
this.#observers = new Map<string, Set<() => void>>();
chokidar.watch(config.dbFolder + path.sep + "*-pos", {
followSymlinks: false,
usePolling: false,
interval: 100,
binaryInterval: 300,
ignoreInitial: false,
});
}

observe(room: string, cb: () => void) {}
Expand Down
18 changes: 2 additions & 16 deletions packages/ws-server/src/ConnectionBroker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,11 @@ export default class ConnectionBroker {
readonly #dbCache;
readonly #ws;
readonly #room;
readonly #writeForwarder;

constructor({ ws, dbCache, room, writeForwarder }: Options) {
constructor({ ws, dbCache, room }: Options) {
this.#dbCache = dbCache;
this.#ws = ws;
this.#room = room;
this.#writeForwarder = writeForwarder || null;

this.#ws.on("message", (data) => {
// TODO: for litefs support we should just read the tag out
Expand Down Expand Up @@ -66,23 +64,11 @@ export default class ConnectionBroker {
);
}

if (
this.#writeForwarder &&
this.#writeForwarder.shouldForwardWrites()
) {
// forward and await the write
// then go.
// How do we know when the underlying db has receive the changes
// on this replica tho? Need to check replication status...
// The write forwarder should block us and handle that detail.
}

const syncConnection = new SyncConnection(
this.#dbCache,
new Transport(this.#ws),
this.#room,
msg,
this.#writeForwarder
msg
);
this.#syncConnection = syncConnection;
syncConnection.start();
Expand Down
4 changes: 3 additions & 1 deletion packages/ws-server/src/DB.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ import { Config } from "./config.js";
import path from "node:path";
import fs from "node:fs";
import { extensionPath } from "@vlcn.io/crsqlite";
import { Change, bytesToHex, cryb64 } from "@vlcn.io/ws-common";
import { Change, cryb64 } from "@vlcn.io/ws-common";
import { throttle } from "throttle-debounce";
import FSNotify from "./fs/FSNotify.js";
import touchHack from "./fs/touchHack.js";

export interface IDB {}

/**
* Abstracts over a DB and provides just the operations requred by the sync server.
*
Expand Down
11 changes: 11 additions & 0 deletions packages/ws-server/src/DBFactory.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import { IDB } from "./DB.js";

export interface IDBFactory {
createDB(): Promise<IDB>;
}

export default class DBFactory implements IDBFactory {
async createDB(): Promise<IDB> {
throw new Error("unimplemented");
}
}
12 changes: 2 additions & 10 deletions packages/ws-server/src/SyncConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,13 @@ export default class SyncConnection {
readonly #room;
readonly #outboundStream;
readonly #inboundStream;
readonly #writeForwarder;

constructor(
dbCache: DBCache,
transport: Transport,
room: string,
msg: AnnouncePresence,
writeForwarder: IWriteForwarder | null
msg: AnnouncePresence
) {
this.#writeForwarder = writeForwarder;
logger.info(
`Spun up a sync connection on room ${room} to client ws and client dbid ${bytesToHex(
msg.sender
Expand All @@ -45,12 +42,7 @@ export default class SyncConnection {
msg.lastSeens,
msg.sender
);
this.#inboundStream = new InboundStream(
transport,
this.#db,
msg.sender,
this.#writeForwarder
);
this.#inboundStream = new InboundStream(transport, this.#db, msg.sender);
}

start() {
Expand Down
2 changes: 2 additions & 0 deletions packages/ws-server/src/__tests__/DB.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ import DB from "../DB.js";
import fs from "node:fs";
import { Config } from "../config.js";
import { cryb64 } from "@vlcn.io/ws-common";
import DBFactory from "../DBFactory.js";

test("db instantiation", () => {
const config: Config = {
schemaFolder: "./testSchemas",
dbFolder: null,
pathPattern: /\/vlcn-ws/,
dbFactory: new DBFactory(),
};

const schemaContent = fs.readFileSync("./testSchemas/test.sql", "utf-8");
Expand Down
6 changes: 3 additions & 3 deletions packages/ws-server/src/config.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
import { IWriteForwarder } from "./IWriteForwarder";
import DBFactory, { IDBFactory } from "./DBFactory.js";

export const defaultConfig: Config = Object.freeze({
dbFolder: "./dbs",
schemaFolder: "./schemas",
pathPattern: /\/vlcn-ws/,
notifyLatencyMs: 50,
dbFactory: new DBFactory(),
});

export type Config = Readonly<{
dbFolder: string | null;
schemaFolder: string;
pathPattern: RegExp;
notifyLatencyMs?: number;
writeForwarder?: IWriteForwarder;
writeForwarderPort?: number;
dbFactory: IDBFactory;
}>;
1 change: 0 additions & 1 deletion packages/ws-server/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ export function attachWebsocketServer(
ws,
dbCache,
room: options.room,
writeForwarder: config.writeForwarder,
});
});
}
Expand Down
24 changes: 3 additions & 21 deletions packages/ws-server/src/streams/InboundStream.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { Changes, greaterThanOrEqual, tags } from "@vlcn.io/ws-common";
import DB from "../DB.js";
import Transport from "../Trasnport.js";
import { IWriteForwarder } from "../IWriteForwarder.js";

/**
* Processes a stream of changes from the given sender.
Expand All @@ -15,18 +14,11 @@ export default class InboundStream {
readonly #db;
readonly #from;
#lastSeen: readonly [bigint, number] | null = null;
readonly #writeForwarder: IWriteForwarder | null;

constructor(
transport: Transport,
db: DB,
from: Uint8Array,
writeForwarder: IWriteForwarder | null
) {
constructor(transport: Transport, db: DB, from: Uint8Array) {
this.#transport = transport;
this.#db = db;
this.#from = from;
this.#writeForwarder = writeForwarder;
}

start() {
Expand Down Expand Up @@ -65,18 +57,8 @@ export default class InboundStream {
}
const lastChange = msg.changes[msg.changes.length - 1];
const newLastSeen = [lastChange[5], 0] as const;
if (
this.#writeForwarder != null &&
this.#writeForwarder.shouldForwardWrites()
) {
// forward the write
} else {
this.#db.applyChangesetAndSetLastSeen(
msg.changes,
msg.sender,
newLastSeen
);
}
this.#db.applyChangesetAndSetLastSeen(msg.changes, msg.sender, newLastSeen);

this.#lastSeen = newLastSeen;
}
}
3 changes: 3 additions & 0 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit da4b20a

Please sign in to comment.