Skip to content

Commit

Permalink
feat: clustering
Browse files Browse the repository at this point in the history
  • Loading branch information
Dwynr committed Sep 9, 2024
1 parent febdb54 commit 19da6e4
Show file tree
Hide file tree
Showing 3 changed files with 309 additions and 22 deletions.
78 changes: 57 additions & 21 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,11 @@ const server = new WebDAVServer({
authMode: "basic" | "digest"
})

server
.start()
.then(() =>
console.log(
`WebDAV server started on ${https ? "https" : "http"}://${
hostname === "127.0.0.1" ? "local.webdav.filen.io" : hostname
}:${port}`
)
)
.catch(console.error)
await server.start()

console.log(
`WebDAV server started on ${https ? "https" : "http"}://${hostname === "127.0.0.1" ? "local.webdav.filen.io" : hostname}:${port}`
)
```

3. Initialize the server (proxy mode)
Expand Down Expand Up @@ -98,19 +93,60 @@ const server = new WebDAVServer({
authMode: "basic" // Only basic auth is supported in proxy mode
})

server
.start()
.then(() =>
console.log(
`WebDAV server started on ${https ? "https" : "http"}://${
hostname === "127.0.0.1" ? "local.webdav.filen.io" : hostname
}:${port}`
)
)
.catch(console.error)
await server.start()

console.log(
`WebDAV server started on ${https ? "https" : "http"}://${hostname === "127.0.0.1" ? "local.webdav.filen.io" : hostname}:${port}`
)
```

4. Initialize the server (cluster mode)

```typescript
import FilenSDK from "@filen/sdk"
import path from "path"
import os from "os"
import { WebDAVServerCluster } from "@filen/webdav"

// Initialize a SDK instance (optional)
const filen = new FilenSDK({
metadataCache: true,
connectToSocket: true,
tmpPath: path.join(os.tmpdir(), "filen-sdk")
})

await filen.login({
email: "[email protected]",
password: "supersecret123",
twoFactorCode: "123456"
})

const hostname = "127.0.0.1"
const port = 1900
const https = false
const server = new WebDAVServerCluster({
hostname,
port,
https,
user: {
username: "admin",
password: "admin",
sdk: filen
},
authMode: "basic" | "digest",
threads: 16 // Number of threads to spawn. Defaults to CPU core count if omitted.
})

await server.start()

console.log(
`WebDAV server cluster started on ${https ? "https" : "http"}://${
hostname === "127.0.0.1" ? "local.webdav.filen.io" : hostname
}:${port}`
)
```

4. Access the server
5. Access the server

```sh
// MacOS
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@filen/webdav",
"version": "0.2.44",
"version": "0.2.45",
"description": "Filen WebDAV",
"main": "dist/index.js",
"types": "dist/index.d.ts",
Expand Down
251 changes: 251 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import { v4 as uuidv4 } from "uuid"
import { type Duplex } from "stream"
import { rateLimit } from "express-rate-limit"
import Logger from "./logger"
import cluster from "cluster"
import os from "os"

export type ServerConfig = {
hostname: string
Expand Down Expand Up @@ -528,4 +530,253 @@ export class WebDAVServer {
}
}

/**
* WebDAVServerCluster
*
* @export
* @class WebDAVServerCluster
* @typedef {WebDAVServerCluster}
*/
export class WebDAVServerCluster {
private enableHTTPS: boolean
private authMode: AuthMode
private rateLimit: RateLimit
private serverConfig: ServerConfig
private proxyMode: boolean
private user:
| {
sdkConfig?: FilenSDKConfig
sdk?: FilenSDK
username: string
password: string
}
| undefined
private threads: number
private workers: Record<
number,
{
worker: ReturnType<typeof cluster.fork>
ready: boolean
}
> = {}
private stopSpawning: boolean = false

/**
* Creates an instance of WebDAVServerCluster.
*
* @constructor
* @public
* @param {{
* hostname?: string
* port?: number
* authMode?: "basic" | "digest"
* https?: boolean
* user?: {
* sdkConfig?: FilenSDKConfig
* sdk?: FilenSDK
* username: string
* password: string
* }
* rateLimit?: RateLimit
* disableLogging?: boolean
* threads?: number
* }} param0
* @param {string} [param0.hostname="127.0.0.1"]
* @param {number} [param0.port=1900]
* @param {{ sdkConfig?: FilenSDKConfig; sdk?: FilenSDK; username: string; password: string; }} param0.user
* @param {("basic" | "digest")} [param0.authMode="basic"]
* @param {boolean} [param0.https=false]
* @param {RateLimit} [param0.rateLimit={
* windowMs: 1000,
* limit: 1000,
* key: "username"
* }]
* @param {number} param0.threads
*/
public constructor({
hostname = "127.0.0.1",
port = 1900,
user,
authMode = "basic",
https = false,
rateLimit = {
windowMs: 1000,
limit: 1000,
key: "username"
},
threads
}: {
hostname?: string
port?: number
authMode?: "basic" | "digest"
https?: boolean
user?: {
sdkConfig?: FilenSDKConfig
sdk?: FilenSDK
username: string
password: string
}
rateLimit?: RateLimit
disableLogging?: boolean
threads?: number
}) {
this.enableHTTPS = https
this.authMode = authMode
this.rateLimit = rateLimit
this.serverConfig = {
hostname,
port
}
this.proxyMode = typeof user === "undefined"
this.threads = typeof threads === "number" ? threads : os.cpus().length
this.user = user

if (this.proxyMode && this.authMode === "digest") {
throw new Error("Digest authentication is not supported in proxy mode.")
}

if (this.user) {
if (!this.user.sdk && !this.user.sdkConfig) {
throw new Error("Either pass a configured SDK instance OR a SDKConfig object to the user object.")
}

if (this.user.username.length === 0 || this.user.password.length === 0) {
throw new Error("Username or password empty.")
}
}
}

/**
* Spawn a worker.
*
* @private
*/
private spawnWorker(): void {
if (this.stopSpawning) {
return
}

const worker = cluster.fork()

this.workers[worker.id] = {
worker,
ready: false
}
}

/**
* Fork all needed threads.
*
* @private
* @async
* @returns {Promise<"master" | "worker">}
*/
private async startCluster(): Promise<"master" | "worker"> {
if (cluster.isPrimary) {
return await new Promise<"master" | "worker">((resolve, reject) => {
try {
let workersReady = 0

for (let i = 0; i < this.threads; i++) {
this.spawnWorker()
}

cluster.on("exit", async worker => {
workersReady--

delete this.workers[worker.id]

await new Promise<void>(resolve => setTimeout(resolve, 1000))

try {
this.spawnWorker()
} catch {
// Noop
}
})

const errorTimeout = setTimeout(() => {
reject(new Error("Could not spawn all workers."))
}, 15000)

cluster.on("message", (worker, message) => {
if (message === "ready" && this.workers[worker.id]) {
workersReady++

this.workers[worker.id]!.ready = true

if (workersReady >= this.threads) {
clearTimeout(errorTimeout)

resolve("master")
}
}
})
} catch (e) {
reject(e)
}
})
}

const server = new WebDAVServer({
hostname: this.serverConfig.hostname,
port: this.serverConfig.port,
authMode: this.authMode,
disableLogging: true,
user: this.user,
rateLimit: this.rateLimit,
https: this.enableHTTPS
})

await server.start()

if (process.send) {
process.send("ready")
}

return "worker"
}

/**
* Start the WebDAV cluster.
*
* @public
* @async
* @returns {Promise<void>}
*/
public async start(): Promise<void> {
await new Promise<void>((resolve, reject) => {
this.startCluster()
.then(type => {
if (type === "master") {
resolve()
}
})
.catch(reject)
})
}

/**
* Stop the WebDAV cluster.
*
* @public
* @async
* @returns {Promise<void>}
*/
public async stop(): Promise<void> {
cluster.removeAllListeners()

this.stopSpawning = true

for (const id in this.workers) {
this.workers[id]!.worker.destroy()
}

await new Promise<void>(resolve => setTimeout(resolve, 1000))

this.workers = {}
this.stopSpawning = false
}
}

export default WebDAVServer

0 comments on commit 19da6e4

Please sign in to comment.