From da4b20ae1a38bc0020219864ded9c1fa691bba64 Mon Sep 17 00:00:00 2001 From: Matt <1009003+tantaman@users.noreply.github.com> Date: Thu, 24 Aug 2023 11:05:55 -0400 Subject: [PATCH] start switching back to a DBFactory design We have more information at this level and cleanup is easier to manage as the connection over which to forward writes is per-db. --- packages/ws-litefs/package.json | 3 ++- packages/ws-litefs/src/LiteFSDBFactory.ts | 0 packages/ws-litefs/src/index.ts | 2 +- .../{ => internal}/LiteFSWriteForwarder.ts | 0 .../src/internal/PrimaryStatusObserver.ts | 9 +++++++ packages/ws-server/src/ConnectionBroker.ts | 18 ++------------ packages/ws-server/src/DB.ts | 4 +++- packages/ws-server/src/DBFactory.ts | 11 +++++++++ packages/ws-server/src/SyncConnection.ts | 12 ++-------- packages/ws-server/src/__tests__/DB.test.ts | 2 ++ packages/ws-server/src/config.ts | 6 ++--- packages/ws-server/src/index.ts | 1 - .../ws-server/src/streams/InboundStream.ts | 24 +++---------------- pnpm-lock.yaml | 3 +++ 14 files changed, 41 insertions(+), 54 deletions(-) create mode 100644 packages/ws-litefs/src/LiteFSDBFactory.ts rename packages/ws-litefs/src/{ => internal}/LiteFSWriteForwarder.ts (100%) create mode 100644 packages/ws-server/src/DBFactory.ts diff --git a/packages/ws-litefs/package.json b/packages/ws-litefs/package.json index 9f0fad1..435bd20 100644 --- a/packages/ws-litefs/package.json +++ b/packages/ws-litefs/package.json @@ -11,7 +11,7 @@ "scripts": { "build": "tsc --build", "watch": "tsc --build --watch", - "test": "vitest run", + "test": "echo no tests here yet", "deep-clean": "rm -rf ./dist || true && rm tsconfig.tsbuildinfo || true" }, "repository": { @@ -24,6 +24,7 @@ "@vlcn.io/ws-common": "workspace:*", "@vlcn.io/ws-server": "workspace:*", "better-sqlite3": "^8.3.0", + "chokidar": "^3.5.3", "throttle-debounce": "^5.0.0", "winston": "^3.10.0" }, diff --git a/packages/ws-litefs/src/LiteFSDBFactory.ts b/packages/ws-litefs/src/LiteFSDBFactory.ts new file mode 100644 index 0000000..e69de29 diff --git a/packages/ws-litefs/src/index.ts b/packages/ws-litefs/src/index.ts index 0c9153d..e921e91 100644 --- a/packages/ws-litefs/src/index.ts +++ b/packages/ws-litefs/src/index.ts @@ -1,2 +1,2 @@ -export { LiteFSWriteForwarder } from "./LiteFSWriteForwarder.js"; +export { LiteFSWriteForwarder } from "./internal/LiteFSWriteForwarder.js"; export { createLiteFSWriteService } from "./LiteFSWriteService.js"; diff --git a/packages/ws-litefs/src/LiteFSWriteForwarder.ts b/packages/ws-litefs/src/internal/LiteFSWriteForwarder.ts similarity index 100% rename from packages/ws-litefs/src/LiteFSWriteForwarder.ts rename to packages/ws-litefs/src/internal/LiteFSWriteForwarder.ts diff --git a/packages/ws-litefs/src/internal/PrimaryStatusObserver.ts b/packages/ws-litefs/src/internal/PrimaryStatusObserver.ts index 86fbc21..488c418 100644 --- a/packages/ws-litefs/src/internal/PrimaryStatusObserver.ts +++ b/packages/ws-litefs/src/internal/PrimaryStatusObserver.ts @@ -1,4 +1,6 @@ import { Config } from "@vlcn.io/ws-server"; +import chokidar from "chokidar"; +import path from "path"; /** * This will observe the db directory for `-pos` files. @@ -15,6 +17,13 @@ export default class PrimaryStatusObserver { constructor(config: Config) { this.#observers = new Map void>>(); + chokidar.watch(config.dbFolder + path.sep + "*-pos", { + followSymlinks: false, + usePolling: false, + interval: 100, + binaryInterval: 300, + ignoreInitial: false, + }); } observe(room: string, cb: () => void) {} diff --git a/packages/ws-server/src/ConnectionBroker.ts b/packages/ws-server/src/ConnectionBroker.ts index cb69051..05396d4 100644 --- a/packages/ws-server/src/ConnectionBroker.ts +++ b/packages/ws-server/src/ConnectionBroker.ts @@ -22,13 +22,11 @@ export default class ConnectionBroker { readonly #dbCache; readonly #ws; readonly #room; - readonly #writeForwarder; - constructor({ ws, dbCache, room, writeForwarder }: Options) { + constructor({ ws, dbCache, room }: Options) { this.#dbCache = dbCache; this.#ws = ws; this.#room = room; - this.#writeForwarder = writeForwarder || null; this.#ws.on("message", (data) => { // TODO: for litefs support we should just read the tag out @@ -66,23 +64,11 @@ export default class ConnectionBroker { ); } - if ( - this.#writeForwarder && - this.#writeForwarder.shouldForwardWrites() - ) { - // forward and await the write - // then go. - // How do we know when the underlying db has receive the changes - // on this replica tho? Need to check replication status... - // The write forwarder should block us and handle that detail. - } - const syncConnection = new SyncConnection( this.#dbCache, new Transport(this.#ws), this.#room, - msg, - this.#writeForwarder + msg ); this.#syncConnection = syncConnection; syncConnection.start(); diff --git a/packages/ws-server/src/DB.ts b/packages/ws-server/src/DB.ts index cf6bca7..05c1c19 100644 --- a/packages/ws-server/src/DB.ts +++ b/packages/ws-server/src/DB.ts @@ -3,11 +3,13 @@ import { Config } from "./config.js"; import path from "node:path"; import fs from "node:fs"; import { extensionPath } from "@vlcn.io/crsqlite"; -import { Change, bytesToHex, cryb64 } from "@vlcn.io/ws-common"; +import { Change, cryb64 } from "@vlcn.io/ws-common"; import { throttle } from "throttle-debounce"; import FSNotify from "./fs/FSNotify.js"; import touchHack from "./fs/touchHack.js"; +export interface IDB {} + /** * Abstracts over a DB and provides just the operations requred by the sync server. * diff --git a/packages/ws-server/src/DBFactory.ts b/packages/ws-server/src/DBFactory.ts new file mode 100644 index 0000000..5d37187 --- /dev/null +++ b/packages/ws-server/src/DBFactory.ts @@ -0,0 +1,11 @@ +import { IDB } from "./DB.js"; + +export interface IDBFactory { + createDB(): Promise; +} + +export default class DBFactory implements IDBFactory { + async createDB(): Promise { + throw new Error("unimplemented"); + } +} diff --git a/packages/ws-server/src/SyncConnection.ts b/packages/ws-server/src/SyncConnection.ts index 94da7a8..49d9ea3 100644 --- a/packages/ws-server/src/SyncConnection.ts +++ b/packages/ws-server/src/SyncConnection.ts @@ -20,16 +20,13 @@ export default class SyncConnection { readonly #room; readonly #outboundStream; readonly #inboundStream; - readonly #writeForwarder; constructor( dbCache: DBCache, transport: Transport, room: string, - msg: AnnouncePresence, - writeForwarder: IWriteForwarder | null + msg: AnnouncePresence ) { - this.#writeForwarder = writeForwarder; logger.info( `Spun up a sync connection on room ${room} to client ws and client dbid ${bytesToHex( msg.sender @@ -45,12 +42,7 @@ export default class SyncConnection { msg.lastSeens, msg.sender ); - this.#inboundStream = new InboundStream( - transport, - this.#db, - msg.sender, - this.#writeForwarder - ); + this.#inboundStream = new InboundStream(transport, this.#db, msg.sender); } start() { diff --git a/packages/ws-server/src/__tests__/DB.test.ts b/packages/ws-server/src/__tests__/DB.test.ts index 0be485e..c01fce9 100644 --- a/packages/ws-server/src/__tests__/DB.test.ts +++ b/packages/ws-server/src/__tests__/DB.test.ts @@ -3,12 +3,14 @@ import DB from "../DB.js"; import fs from "node:fs"; import { Config } from "../config.js"; import { cryb64 } from "@vlcn.io/ws-common"; +import DBFactory from "../DBFactory.js"; test("db instantiation", () => { const config: Config = { schemaFolder: "./testSchemas", dbFolder: null, pathPattern: /\/vlcn-ws/, + dbFactory: new DBFactory(), }; const schemaContent = fs.readFileSync("./testSchemas/test.sql", "utf-8"); diff --git a/packages/ws-server/src/config.ts b/packages/ws-server/src/config.ts index cbbe4c1..e46fb1b 100644 --- a/packages/ws-server/src/config.ts +++ b/packages/ws-server/src/config.ts @@ -1,10 +1,11 @@ -import { IWriteForwarder } from "./IWriteForwarder"; +import DBFactory, { IDBFactory } from "./DBFactory.js"; export const defaultConfig: Config = Object.freeze({ dbFolder: "./dbs", schemaFolder: "./schemas", pathPattern: /\/vlcn-ws/, notifyLatencyMs: 50, + dbFactory: new DBFactory(), }); export type Config = Readonly<{ @@ -12,6 +13,5 @@ export type Config = Readonly<{ schemaFolder: string; pathPattern: RegExp; notifyLatencyMs?: number; - writeForwarder?: IWriteForwarder; - writeForwarderPort?: number; + dbFactory: IDBFactory; }>; diff --git a/packages/ws-server/src/index.ts b/packages/ws-server/src/index.ts index 98fe2e8..dbd3e84 100644 --- a/packages/ws-server/src/index.ts +++ b/packages/ws-server/src/index.ts @@ -80,7 +80,6 @@ export function attachWebsocketServer( ws, dbCache, room: options.room, - writeForwarder: config.writeForwarder, }); }); } diff --git a/packages/ws-server/src/streams/InboundStream.ts b/packages/ws-server/src/streams/InboundStream.ts index 4bc18fb..0375dc0 100644 --- a/packages/ws-server/src/streams/InboundStream.ts +++ b/packages/ws-server/src/streams/InboundStream.ts @@ -1,7 +1,6 @@ import { Changes, greaterThanOrEqual, tags } from "@vlcn.io/ws-common"; import DB from "../DB.js"; import Transport from "../Trasnport.js"; -import { IWriteForwarder } from "../IWriteForwarder.js"; /** * Processes a stream of changes from the given sender. @@ -15,18 +14,11 @@ export default class InboundStream { readonly #db; readonly #from; #lastSeen: readonly [bigint, number] | null = null; - readonly #writeForwarder: IWriteForwarder | null; - constructor( - transport: Transport, - db: DB, - from: Uint8Array, - writeForwarder: IWriteForwarder | null - ) { + constructor(transport: Transport, db: DB, from: Uint8Array) { this.#transport = transport; this.#db = db; this.#from = from; - this.#writeForwarder = writeForwarder; } start() { @@ -65,18 +57,8 @@ export default class InboundStream { } const lastChange = msg.changes[msg.changes.length - 1]; const newLastSeen = [lastChange[5], 0] as const; - if ( - this.#writeForwarder != null && - this.#writeForwarder.shouldForwardWrites() - ) { - // forward the write - } else { - this.#db.applyChangesetAndSetLastSeen( - msg.changes, - msg.sender, - newLastSeen - ); - } + this.#db.applyChangesetAndSetLastSeen(msg.changes, msg.sender, newLastSeen); + this.#lastSeen = newLastSeen; } } diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 638a57c..3215310 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -553,6 +553,9 @@ importers: better-sqlite3: specifier: ^8.3.0 version: 8.3.0 + chokidar: + specifier: ^3.5.3 + version: 3.5.3 throttle-debounce: specifier: ^5.0.0 version: 5.0.0