From af32b47769cf9a51ceb583235c58d5f97f49a5c7 Mon Sep 17 00:00:00 2001 From: Saya Date: Sat, 4 Jan 2025 11:41:03 +0800 Subject: [PATCH] feat: new ipc system --- src/Indomitable.ts | 24 +++-- src/Util.ts | 56 +++++------ src/client/ShardClient.ts | 2 +- src/client/ShardClientUtil.ts | 26 ++--- src/ipc/BaseIpc.ts | 170 -------------------------------- src/ipc/BaseSocket.ts | 176 ++++++++++++++++++++++++++++++++++ src/ipc/BaseWorker.ts | 28 ------ src/ipc/ClientSocket.ts | 65 +++++++++++++ src/ipc/ClientWorker.ts | 42 -------- src/ipc/IpcServer.ts | 50 ++++++++++ src/ipc/MainWorker.ts | 88 ----------------- src/ipc/ServerSocket.ts | 99 +++++++++++++++++++ src/manager/ClusterManager.ts | 13 +-- 13 files changed, 459 insertions(+), 380 deletions(-) delete mode 100644 src/ipc/BaseIpc.ts create mode 100644 src/ipc/BaseSocket.ts delete mode 100644 src/ipc/BaseWorker.ts create mode 100644 src/ipc/ClientSocket.ts delete mode 100644 src/ipc/ClientWorker.ts create mode 100644 src/ipc/IpcServer.ts delete mode 100644 src/ipc/MainWorker.ts create mode 100644 src/ipc/ServerSocket.ts diff --git a/src/Indomitable.ts b/src/Indomitable.ts index 9888e74..5829a94 100644 --- a/src/Indomitable.ts +++ b/src/Indomitable.ts @@ -1,4 +1,5 @@ import type { Client, ClientOptions as DiscordJsClientOptions } from 'discord.js'; +import type { Message } from './ipc/BaseSocket'; import Cluster, { ClusterSettings } from 'node:cluster'; import EventEmitter from 'node:events'; import Os from 'node:os'; @@ -6,6 +7,7 @@ import { clearTimeout } from 'node:timers'; import { ConcurrencyServer } from './concurrency/ConcurrencyServer'; import { ShardClient } from './client/ShardClient'; import { ClusterManager } from './manager/ClusterManager'; +import { IpcServer } from './ipc/IpcServer'; import { Chunk, FetchSessions, @@ -14,13 +16,11 @@ import { InternalOps, InternalOpsData, LibraryEvents, - Message, SessionObject, Transportable, Sendable } from './Util'; - /** * Options to control Indomitable behavior */ @@ -145,6 +145,7 @@ export class Indomitable extends EventEmitter { public shardCount: number|'auto'; public cachedSession?: SessionObject; public concurrencyServer?: ConcurrencyServer; + public ipcServer?: IpcServer; public readonly clientOptions: DiscordJsClientOptions; public readonly clusterSettings: ClusterSettings; public readonly ipcTimeout: number; @@ -188,6 +189,7 @@ export class Indomitable extends EventEmitter { this.token = options.token; this.clusters = new Map(); this.spawnQueue = []; + this.ipcServer = undefined; this.concurrencyServer = undefined; this.cachedSession = undefined; this.busy = false; @@ -230,29 +232,39 @@ export class Indomitable extends EventEmitter { await shardClient.start(this.token); return; } + + this.ipcServer = new IpcServer(this); + await this.ipcServer.listen(); + if (this.handleConcurrency) { const sessions = await this.fetchSessions(); this.concurrencyServer = new ConcurrencyServer(this, sessions.session_start_limit.max_concurrency); const info = await this.concurrencyServer.start(); this.emit(LibraryEvents.DEBUG, `Handle concurrency is currently enabled! =>\n Server is currently bound to:\n Address: ${info.address}:${info.port}\n Concurrency: ${sessions.session_start_limit.max_concurrency}`); } + if (typeof this.clusterCount !== 'number') this.clusterCount = Os.cpus().length; if (typeof this.shardCount !== 'number') { const sessions = await this.fetchSessions(); this.shardCount = sessions.shards; } + if (this.shardCount < this.clusterCount) this.clusterCount = this.shardCount; + this.emit(LibraryEvents.DEBUG, `Starting ${this.shardCount} websocket shards across ${this.clusterCount} clusters`); const shards = [ ...Array(this.shardCount).keys() ]; const chunks = Chunk(shards, Math.round(this.shardCount / this.clusterCount)); + Cluster.setupPrimary({ ...{ serialization: 'json' }, ...this.clusterSettings }); + for (let id = 0; id < this.clusterCount; id++) { const chunk = chunks.shift()!; const cluster = new ClusterManager({ id, shards: chunk, manager: this }); this.clusters.set(id, cluster); } + await this.addToSpawnQueue(...this.clusters.values()); } @@ -286,13 +298,13 @@ export class Indomitable extends EventEmitter { if (!cluster) throw new Error('Invalid cluster id provided'); let abortableData: AbortableData|undefined; - if (this.ipcTimeout !== Infinity && sendable.repliable) { + if (this.ipcTimeout !== Infinity && sendable.reply) { abortableData = MakeAbortableRequest(this.ipcTimeout); } let transportable: Transportable = { content: sendable.content, - repliable: sendable.repliable + reply: sendable.reply }; if (abortableData) { @@ -300,7 +312,7 @@ export class Indomitable extends EventEmitter { } try { - return await cluster.ipc.send(transportable); + return await cluster.ipc?.send(transportable); } finally { if (abortableData) { clearTimeout(abortableData.timeout); @@ -380,7 +392,7 @@ export class Indomitable extends EventEmitter { data: {}, internal: true }; - await this.send(id, { content, repliable: true }); + await this.send(id, { content, reply: true }); } /** diff --git a/src/Util.ts b/src/Util.ts index 4b3a985..72c985c 100644 --- a/src/Util.ts +++ b/src/Util.ts @@ -9,11 +9,12 @@ export const EnvProcessData = { clusterId: Number(process.env.INDOMITABLE_CLUSTER || 0), clusterCount: Number(process.env.INDOMITABLE_CLUSTER_TOTAL || 0), shardIds: (process.env.INDOMITABLE_SHARDS || '').split(' ').map(Number), - shardCount: Number(process.env.INDOMITABLE_SHARDS_TOTAL || 0) + shardCount: Number(process.env.INDOMITABLE_SHARDS_TOTAL || 0), + serverIpcId: process.env.INDOMITABLE_SERVER_IPC_ID || '' }; /** - * Internal operation codes for the cluster -> thread + * Internal operation codes for the main process -> cluster process */ export enum MainStrategyOps { CONNECT = 'connect', @@ -24,7 +25,7 @@ export enum MainStrategyOps { } /** - * Internal operation codes for the thread <- cluster + * Internal operation codes for the cluster process -> main process */ export enum ThreadStrategyOps { REQUEST_IDENTIFY = 'requestIdentify', @@ -43,7 +44,8 @@ export enum InternalOps { RESTART_ALL = 'restartAll', DESTROY_CLIENT = 'destroyClient', SESSION_INFO = 'sessionInfo', - PING = 'ping' + PING = 'ping', + IDENTIFY = 'identify' } /** @@ -105,22 +107,29 @@ export interface ThreadStrategyData { internal: true } +export interface InternalData { + internal: true +} + /** * Data structure representing an internal event */ -export interface InternalOpsData { - op: InternalOps, - data: any, - internal: true +export interface InternalOpsData extends InternalData { + op: InternalOps; + data: any; } /** * Data structure representing an internal discord.js event */ -export interface ClientEventData { - op: ClientEvents, - data: any, - internal: true, +export interface ClientEventData extends InternalData { + op: ClientEvents; + data: any; +} + +export interface IpcIdentify { + clusterId: number; + serverId: string; } /** @@ -137,8 +146,8 @@ export interface IpcErrorData { */ export interface Transportable { content: any; - repliable?: boolean; - signal?: AbortSignal + reply?: boolean; + signal?: AbortSignal; } /** @@ -150,12 +159,12 @@ export type Sendable = Omit; * Data structure representing an internal abort data */ export interface InternalAbortSignal { - listener: () => void, + listener: () => void; signal: AbortSignal } export interface SavePromiseOptions { - id: string; + nonce: string; resolve: (data: unknown) => void; reject: (reason: unknown) => void; signal?: AbortSignal | undefined; @@ -181,22 +190,13 @@ export interface InternalPromise { /** * Data structure representing internal IPC data */ -export interface RawIpcMessage { - id: string|null; +export interface RawIpcMessage extends InternalData { + nonce: string; content: any; - internal: true; + reply: boolean; type: RawIpcMessageType } -/** - * Data structure representing an IPC message - */ -export interface Message { - reply: (data: any) => void; - content: any; - repliable: boolean; -} - /** * Data structure representing a Discord session */ diff --git a/src/client/ShardClient.ts b/src/client/ShardClient.ts index e3a3f3a..a8054bf 100644 --- a/src/client/ShardClient.ts +++ b/src/client/ShardClient.ts @@ -81,7 +81,7 @@ export class ShardClient { const shardClientUtil = this.client.shard as ShardClientUtil; const content: ClientEventData = { ...partial, internal: true }; shardClientUtil - .send({ content, repliable: false }) + .send({ content, reply: false }) .catch((error: unknown) => this.client.emit(ClientEvents.ERROR, error as Error)); } } diff --git a/src/client/ShardClientUtil.ts b/src/client/ShardClientUtil.ts index 5519258..6f5ac89 100644 --- a/src/client/ShardClientUtil.ts +++ b/src/client/ShardClientUtil.ts @@ -1,15 +1,15 @@ import type { Client } from 'discord.js'; +import type { Message } from '../ipc/BaseSocket'; +import type { Indomitable } from '../Indomitable'; import EventEmitter from 'node:events'; import { clearTimeout } from 'timers'; -import { Indomitable } from '../Indomitable'; -import { ClientWorker } from '../ipc/ClientWorker'; +import { ClientSocket } from '../ipc/ClientSocket'; import { EnvProcessData, MakeAbortableRequest, AbortableData, InternalOps, InternalOpsData, - Message, SessionObject, Transportable } from '../Util'; @@ -29,19 +29,24 @@ export declare interface ShardClientUtil { */ export class ShardClientUtil extends EventEmitter { public client: Client; - public readonly ipc: ClientWorker; + private readonly manager: Indomitable; public readonly clusterId: number; public readonly clusterCount: number; public readonly shardIds: number[]; public readonly shardCount: number; + public readonly ipc: ClientSocket; + constructor(client: Client, manager: Indomitable) { super(); this.client = client; - this.ipc = new ClientWorker(this, manager); + this.manager = manager; this.clusterId = EnvProcessData.clusterId; this.clusterCount = EnvProcessData.clusterCount; this.shardIds = EnvProcessData.shardIds; this.shardCount = EnvProcessData.shardCount; + this.ipc = new ClientSocket(this, EnvProcessData.serverIpcId); + + this.ipc.connect(); } /** @@ -55,7 +60,7 @@ export class ShardClientUtil extends EventEmitter { internal: true }; const start = process.hrtime.bigint(); - const end = await this.send({ content, repliable: true }) as number; + const end = await this.send({ content, reply: true }) as number; return Number(BigInt(end) - start); } @@ -69,7 +74,7 @@ export class ShardClientUtil extends EventEmitter { data: `(${script.toString()})(this, ${JSON.stringify(context)})`, internal: true }; - return this.send({ content, repliable: true }) as Promise; + return this.send({ content, reply: true }) as Promise; } /** @@ -91,7 +96,7 @@ export class ShardClientUtil extends EventEmitter { data: { update }, internal: true }; - return this.send({ content, repliable: true }) as Promise; + return this.send({ content, reply: true }) as Promise; } /** @@ -125,10 +130,9 @@ export class ShardClientUtil extends EventEmitter { * @returns A promise that resolves to void or a repliable object */ public async send(transportable: Transportable): Promise { - const manager = this.ipc.manager as Indomitable; let abortableData: AbortableData | undefined; - if (!transportable.signal && (manager.ipcTimeout !== Infinity && transportable.repliable)) { - abortableData = MakeAbortableRequest(manager.ipcTimeout); + if (!transportable.signal && (this.manager.ipcTimeout !== Infinity && transportable.reply)) { + abortableData = MakeAbortableRequest(this.manager.ipcTimeout); transportable.signal = abortableData.controller.signal; } try { diff --git a/src/ipc/BaseIpc.ts b/src/ipc/BaseIpc.ts deleted file mode 100644 index 206d864..0000000 --- a/src/ipc/BaseIpc.ts +++ /dev/null @@ -1,170 +0,0 @@ -import EventEmitter from 'node:events'; -import { Serializable } from 'node:child_process'; -import { randomUUID } from 'node:crypto'; -import { Indomitable } from '../Indomitable'; -import { - InternalAbortSignal, - InternalPromise, - IpcErrorData, - LibraryEvents, - Message, - RawIpcMessage, - RawIpcMessageType, - SavePromiseOptions, - Transportable -} from '../Util.js'; - -/** - * Base class where primary and worker ipc inherits - */ -export abstract class BaseIpc { - public readonly manager: Indomitable|EventEmitter; - protected readonly promises: Map; - protected constructor(manager: Indomitable|EventEmitter) { - this.manager = manager; - this.promises = new Map(); - } - - /** - * Number of promises pending to be resolved - */ - public get pendingPromises(): number { - return this.promises.size; - } - - /** - * Rejects all the pending promises - */ - public flushPromises(reason: string): void { - const error = new Error(reason); - for (const promise of this.promises.values()) { - if (promise.controller) { - promise.controller.signal.removeEventListener('abort', promise.controller.listener); - } - promise.reject(error); - } - this.promises.clear(); - } - - /** - * Raw send method without abort controller handling - * @param transportable Data to send - */ - public send(transportable: Transportable): Promise { - return new Promise((resolve, reject) => { - if (!this.available()) { - return resolve(undefined); - } - const repliable = transportable.repliable || false; - const id = repliable ? randomUUID() : null; - const data: RawIpcMessage = { - id, - content: transportable.content, - internal: true, - type: RawIpcMessageType.MESSAGE - }; - this.sendData(data); - if (!id) return resolve(undefined); - this.waitForPromise({ id, resolve, reject, signal: transportable.signal }); - }); - } - - /** - * Taps into message event of worker or primary process to handle ipc communication - * @internal - */ - public async handleRawResponse(data: Serializable, errorCallback: (error: unknown) => any): Promise { - try { - this.manager.emit(LibraryEvents.RAW, data); - if (!(data as any).internal) return; - switch((data as RawIpcMessage).type) { - case RawIpcMessageType.MESSAGE: - return await this.handleUnparsedMessage(data as RawIpcMessage); - case RawIpcMessageType.RESPONSE: - case RawIpcMessageType.ERROR: - return this.handlePromise(data as RawIpcMessage); - } - } catch (error: unknown) { - errorCallback(error); - } - } - - protected waitForPromise(options: SavePromiseOptions): void { - let controller: InternalAbortSignal|undefined; - if (options.signal) { - const listener = () => { - this.promises.delete(options.id); - options.reject(new Error('This operation is aborted')); - }; - controller = { - listener, - signal: options.signal - }; - controller.signal.addEventListener('abort', listener); - } - this.promises.set(options.id, { resolve: options.resolve, reject: options.reject, controller } as InternalPromise); - } - - private handlePromise(data: RawIpcMessage): void { - const id = data.id as string; - const promise = this.promises.get(id); - if (!promise) return; - this.promises.delete(id); - if (promise.controller) { - promise.controller.signal.removeEventListener('abort', promise.controller.listener); - } - if (data.type === RawIpcMessageType.ERROR) { - const content = data.content as IpcErrorData; - const error = new Error(content.reason); - error.stack = content.stack; - error.name = content.name; - promise.reject(error); - return; - } - promise.resolve(data.content); - } - - private async handleUnparsedMessage(data: RawIpcMessage): Promise { - const reply = (content: any) => { - if (!data.id) return; - const response: RawIpcMessage = { - id: data.id, - content, - internal: true, - type: RawIpcMessageType.RESPONSE - }; - this.sendData(response); - }; - const message: Message = { - repliable: !!data.id, - content: data.content, - reply - }; - if (!data.content.internal) - return this.emitMessage(message); - try { - await this.handleMessage(message); - } catch (error: any) { - if (!message.repliable) return; - const response: RawIpcMessage = { - id: data.id, - content: { - name: error.name, - reason: error.reason, - stack: error.stack - }, - internal: true, - type: RawIpcMessageType.ERROR - }; - this.sendData(response); - } - } - - protected emitMessage(message: Message): void { - this.manager.emit(LibraryEvents.MESSAGE, message); - } - - protected abstract available(): boolean; - protected abstract sendData(data: RawIpcMessage): void; - protected abstract handleMessage(message: Message): Promise; -} diff --git a/src/ipc/BaseSocket.ts b/src/ipc/BaseSocket.ts new file mode 100644 index 0000000..84a1613 --- /dev/null +++ b/src/ipc/BaseSocket.ts @@ -0,0 +1,176 @@ +import type {Socket} from 'node:net'; +import { + InternalAbortSignal, + InternalPromise, + IpcErrorData, IpcIdentify, + RawIpcMessage, + RawIpcMessageType, + SavePromiseOptions, + Transportable +} from '../Util'; +import { randomUUID } from 'node:crypto'; + +export class Message { + private readonly socket: Socket; + private readonly nonce: string; + private readonly reply: boolean; + public readonly content: any; + + constructor(socket: Socket, data: RawIpcMessage) { + this.socket = socket; + this.nonce = data.nonce; + this.reply = data.reply; + this.content = data.content; + } + + get shouldReply(): boolean { + return !this.socket.destroyed && this.reply; + } + + public send(content: any): void { + if (!this.shouldReply) return; + const response: RawIpcMessage = { + nonce: this.nonce, + reply: false, + type: RawIpcMessageType.RESPONSE, + internal: true, + content + }; + this.socket.write(JSON.stringify(response)); + } + + public error(error: Error): void { + if (!this.shouldReply) return; + const response: RawIpcMessage = { + nonce: this.nonce, + content: error, + reply: false, + type: RawIpcMessageType.ERROR, + internal: true + }; + this.socket.write(JSON.stringify(response)); + } +} + +export abstract class BaseSocket { + protected readonly socket: Socket; + protected readonly promises: Map; + + protected constructor(socket: Socket) { + this.socket = socket; + this.promises = new Map(); + + this.socket + .on('data', buffer => this.onData(buffer)) + .on('error', error => this.handleError(error)) + .on('close', () => this.onClose()); + } + + /** + * Raw send method without abort controller handling + * @param transportable Data to send + */ + public send(transportable: Transportable): Promise { + return new Promise((resolve, reject) => { + if (this.socket.destroyed) { + return resolve(undefined); + } + const nonce = randomUUID(); + const data: RawIpcMessage = { + nonce, + content: transportable.content, + reply: transportable.reply || false, + type: RawIpcMessageType.MESSAGE, + internal: true + }; + this.socket.write(JSON.stringify(data)); + if (!data.reply) return resolve(undefined); + this.waitForPromise({ nonce, resolve, reject, signal: transportable.signal }); + }); + } + + /** + * Rejects all the pending promises + */ + private flushPending(reason: string): void { + const error = new Error(reason); + for (const promise of this.promises.values()) { + if (promise.controller) { + promise.controller.signal.removeEventListener('abort', promise.controller.listener); + } + promise.reject(error); + } + this.promises.clear(); + } + + /** + * Taps into message event of worker or primary process to handle ipc communication + * @internal + */ + private onData(buffer: Buffer): void { + const data = JSON.parse(buffer.toString()); + + if (typeof data.internal !== 'boolean' && !data.internal) return; + + switch(data.type) { + case RawIpcMessageType.MESSAGE: + return void this + .handleMessage(new Message(this.socket, data)) + .catch(() => null); + case RawIpcMessageType.RESPONSE: + case RawIpcMessageType.ERROR: + return this.handlePromise(data); + } + } + + private onClose(): void { + this.flushPending('Connection closed'); + this.handleClose(); + } + + private waitForPromise(options: SavePromiseOptions): void { + let controller: InternalAbortSignal|undefined; + + if (options.signal) { + + const listener = () => { + this.promises.delete(options.nonce); + options.reject(new Error('This operation is aborted')); + }; + + controller = { + listener, + signal: options.signal + }; + controller.signal.addEventListener('abort', listener); + } + + this.promises.set(options.nonce, { resolve: options.resolve, reject: options.reject, controller } as InternalPromise); + } + + private handlePromise(data: RawIpcMessage): void { + const promise = this.promises.get(data.nonce); + if (!promise) return; + + this.promises.delete(data.nonce); + + if (promise.controller) { + promise.controller.signal.removeEventListener('abort', promise.controller.listener); + } + + if (data.type === RawIpcMessageType.ERROR) { + const content = data.content as IpcErrorData; + const error = new Error(content.reason); + error.stack = content.stack; + error.name = content.name; + promise.reject(error); + return; + } + + promise.resolve(data.content); + } + + protected abstract handleMessage(message: Message): Promise; + protected abstract handleError(error: Error): void; + protected abstract handleClose(): void; +} \ No newline at end of file diff --git a/src/ipc/BaseWorker.ts b/src/ipc/BaseWorker.ts deleted file mode 100644 index 8fb1485..0000000 --- a/src/ipc/BaseWorker.ts +++ /dev/null @@ -1,28 +0,0 @@ -import EventEmitter from 'node:events'; -import { Serializable } from 'node:child_process'; -import { Indomitable } from '../Indomitable'; -import { BaseIpc } from './BaseIpc'; -import { Message, RawIpcMessage } from '../Util'; - -/** - * Basic worker ipc class, basic child process ipc handler - */ -export class BaseWorker extends BaseIpc { - constructor(manager: Indomitable | EventEmitter = new EventEmitter()) { - super(manager); - process - .on('message', data => this.handleRawResponse(data as Serializable, () => null)); - } - - protected available(): boolean { - return !!process.send; - } - - protected sendData(data: RawIpcMessage): void { - process.send!(data); - } - - protected handleMessage(message: Message): Promise { - return Promise.resolve(); - } -} diff --git a/src/ipc/ClientSocket.ts b/src/ipc/ClientSocket.ts new file mode 100644 index 0000000..d184e2e --- /dev/null +++ b/src/ipc/ClientSocket.ts @@ -0,0 +1,65 @@ +import type { ShardClientUtil } from '../client/ShardClientUtil'; +import { Socket } from 'node:net'; +import { BaseSocket, Message } from './BaseSocket'; +import { ClientEventData, InternalOps, InternalOpsData, IpcIdentify, LibraryEvents } from '../Util'; + +const internalOpsValues = Object.values(InternalOps); + +export class ClientSocket extends BaseSocket { + public readonly shard: ShardClientUtil; + private readonly serverId: string; + constructor(shard: ShardClientUtil, serverId: string) { + super(new Socket()); + this.shard = shard; + this.serverId = serverId; + } + + public connect(): void { + this.socket.connect({ path: `./indomitable-${this.serverId}` }, () => { + this.identify({ clusterId: this.shard.clusterId, serverId: this.serverId }) + .catch(() => null); + }); + + } + + public identify(data: IpcIdentify): Promise { + const content: InternalOpsData = { + op: InternalOps.IDENTIFY, + internal: true, + data + }; + return this.send({ content, reply: true }) as Promise; + } + + protected handleClose(): void { + // tba + } + + protected handleError(error: Error): void { + this.shard.client.emit('error', error); + } + + protected handleMessage(message: Message): Promise { + if (message.content.internal !== 'boolean' && !message.content.internal) { + this.shard.emit(LibraryEvents.MESSAGE, message); + return Promise.resolve(); + } + + const content = message.content as InternalOpsData | ClientEventData; + + if (!internalOpsValues.includes(message.content.op)) + return Promise.resolve(); + + switch (content.op) { + case InternalOps.EVAL: + // @ts-expect-error + message.reply(this.shard.client._eval(content.data)); + break; + case InternalOps.DESTROY_CLIENT: + this.shard.client!.destroy(); + message.send(null); + } + + return Promise.resolve(); + } +} \ No newline at end of file diff --git a/src/ipc/ClientWorker.ts b/src/ipc/ClientWorker.ts deleted file mode 100644 index c2b6bc5..0000000 --- a/src/ipc/ClientWorker.ts +++ /dev/null @@ -1,42 +0,0 @@ -import { Indomitable } from '../Indomitable'; -import { ShardClientUtil } from '../client/ShardClientUtil'; -import { BaseWorker } from './BaseWorker'; -import { - InternalOps, - InternalOpsData, - LibraryEvents, - Message -} from '../Util'; - -const internalOpsValues = Object.values(InternalOps); - -/** - * Extended worker ipc class, shard client util ipc class - */ -export class ClientWorker extends BaseWorker { - public readonly shard: ShardClientUtil; - constructor(shard: ShardClientUtil, manager: Indomitable) { - super(manager); - this.shard = shard; - } - - protected emitMessage(message: Message): void { - this.shard.emit(LibraryEvents.MESSAGE, message); - } - - protected handleMessage(message: Message): Promise { - if (!internalOpsValues.includes(message.content.op)) - return Promise.resolve(); - const content = message.content as InternalOpsData; - switch (content.op) { - case InternalOps.EVAL: - // @ts-expect-error - message.reply(this.shard.client._eval(content.data)); - break; - case InternalOps.DESTROY_CLIENT: - this.shard.client!.destroy(); - message.reply(null); - } - return Promise.resolve(); - } -} diff --git a/src/ipc/IpcServer.ts b/src/ipc/IpcServer.ts new file mode 100644 index 0000000..62f94bd --- /dev/null +++ b/src/ipc/IpcServer.ts @@ -0,0 +1,50 @@ +import type { Indomitable } from '../Indomitable'; +import type { ServerSocket } from './ServerSocket'; + +import { randomUUID } from 'node:crypto'; +import { createServer, Server, Socket } from 'node:net'; +import { LibraryEvents } from '../Util'; + + +export class IpcServer { + private readonly manager: Indomitable; + private readonly server: Server; + private readonly sockets: Map; + public readonly serverId: string; + + constructor(manager: Indomitable) { + this.manager = manager; + this.server = createServer(); + this.sockets = new Map(); + this.serverId = randomUUID(); + } + + public getServer(id: string): ServerSocket | undefined { + return this.sockets.get(id); + } + + public listen(): Promise { + return new Promise((resolve, reject) => { + const listener = (error: Error) => reject(error); + + this.server.once('error', listener); + + this.server.listen(`./indomitable-${this.serverId}`, () => { + // @ts-expect-error: why this errors? + this.server.removeListener('error', listener); + + this.server + .on('connection', (socket) => { + + }) + .on('error', (code) => this.manager.emit(LibraryEvents.ERROR, new Error(`IPC server errored with code: ${code}`))); + + resolve(); + }); + }); + } + + private createSocket(): void { + + } +} \ No newline at end of file diff --git a/src/ipc/MainWorker.ts b/src/ipc/MainWorker.ts deleted file mode 100644 index 58dca5c..0000000 --- a/src/ipc/MainWorker.ts +++ /dev/null @@ -1,88 +0,0 @@ -import { BaseIpc } from './BaseIpc'; -import { ClusterManager } from '../manager/ClusterManager'; -import { Indomitable } from '../Indomitable'; -import { - InternalOps, - InternalOpsData, - ClientEvents, - ClientEventData, - LibraryEvents, - Message, - RawIpcMessage -} from '../Util'; - -const internalOpsValues = Object.values(InternalOps); -const clientEventsValues = Object.values(ClientEvents); - -/** - * Primary ipc class. Only initialized at main process - */ -export class MainWorker extends BaseIpc { - public readonly cluster: ClusterManager; - constructor(cluster: ClusterManager) { - super(cluster.manager); - this.cluster = cluster; - } - - protected available(): boolean { - return !!this.cluster.worker; - } - - protected sendData(data: RawIpcMessage): void { - this.cluster.worker?.send(data); - } - - protected async handleMessage(message: Message): Promise { - this.manager.emit(LibraryEvents.DEBUG, `Received internal message. op: ${message.content.op} | data: ${JSON.stringify(message.content.data || {})}`); - const manager = this.manager as Indomitable; - if (internalOpsValues.includes(message.content.op)) { - const content = message.content as InternalOpsData; - switch(content.op) { - case InternalOps.PING: { - const end = process.hrtime.bigint().toString(); - message.reply(end); - break; - } - case InternalOps.EVAL: { - // don't touch eval data, just forward it to clusters since this is already an instance of InternalEvent - const data = await manager.broadcast({ - content, - repliable: true - }); - message.reply(data); - break; - } - case InternalOps.SESSION_INFO: { - if (content.data.update || !manager.cachedSession) - manager.cachedSession = await manager.fetchSessions(); - message.reply(manager.cachedSession); - break; - } - case InternalOps.RESTART: - await manager.restart(content.data.clusterId); - break; - case InternalOps.RESTART_ALL: - await manager.restartAll(); - break; - } - } else if (clientEventsValues.includes(message.content.op)) { - const content = message.content as ClientEventData; - switch(content.op) { - case ClientEvents.READY: - this.manager.emit(LibraryEvents.CLIENT_READY, content.data); - break; - case ClientEvents.SHARD_READY: - this.manager.emit(LibraryEvents.SHARD_READY, content.data); - break; - case ClientEvents.SHARD_RECONNECT: - this.manager.emit(LibraryEvents.SHARD_RECONNECT, content.data); - break; - case ClientEvents.SHARD_RESUME: - this.manager.emit(LibraryEvents.SHARD_RESUME, content.data); - break; - case ClientEvents.SHARD_DISCONNECT: - this.manager.emit(LibraryEvents.SHARD_DISCONNECT, content.data); - } - } - } -} diff --git a/src/ipc/ServerSocket.ts b/src/ipc/ServerSocket.ts new file mode 100644 index 0000000..30866cd --- /dev/null +++ b/src/ipc/ServerSocket.ts @@ -0,0 +1,99 @@ +import type {Socket} from 'node:net'; +import type {Indomitable} from '../Indomitable'; +import {IpcServer} from './IpcServer'; +import {BaseSocket, Message} from './BaseSocket'; +import {ClientEventData, ClientEvents, InternalOps, InternalOpsData, IpcIdentify, LibraryEvents,} from '../Util'; + +const internalOpsValues = Object.values(InternalOps); +const clientEventsValues = Object.values(ClientEvents); + +export class ServerSocket extends BaseSocket { + private readonly manager: Indomitable; + private readonly server: IpcServer; + + constructor(manager: Indomitable, server: IpcServer, socket: Socket) { + super(socket); + this.manager = manager; + this.server = server; + } + + protected handleClose(): void { + this.manager.emit(LibraryEvents.DEBUG, `A socket closed with ${this.socket.bytesRead} byte(s) data written`); + } + + protected handleError(error: Error): void { + this.manager.emit(LibraryEvents.ERROR, error); + } + + protected async handleMessage(message: Message): Promise { + try { + if (typeof message.content.internal !== 'boolean' && !message.content.internal) + return void this.manager.emit(LibraryEvents.MESSAGE, message); + + const content = message.content as InternalOpsData | ClientEventData; + + if (internalOpsValues.includes(message.content.op)) { + + switch(content.op) { + case InternalOps.IDENTIFY: { + if (content.data.serverId !== this.server.serverId) { + this.socket.destroy(); + message.error(new Error('Incorrect server id passed')); + break; + } + message.send(null); + break; + } + case InternalOps.PING: { + const end = process.hrtime.bigint().toString(); + message.send(end); + break; + } + case InternalOps.EVAL: { + // don't touch eval data, just forward it to clusters since this is already an instance of InternalEvent + const data = await this.manager.broadcast({ + content, + reply: true + }); + message.send(data); + break; + } + case InternalOps.SESSION_INFO: { + if (content.data.update || !this.manager.cachedSession) + this.manager.cachedSession = await this.manager.fetchSessions(); + message.send(this.manager.cachedSession); + break; + } + case InternalOps.RESTART: + await this.manager.restart(content.data.clusterId); + break; + case InternalOps.RESTART_ALL: + await this.manager.restartAll(); + break; + } + + } else if (clientEventsValues.includes(message.content.op)) { + + switch(content.op) { + case ClientEvents.READY: + this.manager.emit(LibraryEvents.CLIENT_READY, content.data); + break; + case ClientEvents.SHARD_READY: + this.manager.emit(LibraryEvents.SHARD_READY, content.data); + break; + case ClientEvents.SHARD_RECONNECT: + this.manager.emit(LibraryEvents.SHARD_RECONNECT, content.data); + break; + case ClientEvents.SHARD_RESUME: + this.manager.emit(LibraryEvents.SHARD_RESUME, content.data); + break; + case ClientEvents.SHARD_DISCONNECT: + this.manager.emit(LibraryEvents.SHARD_DISCONNECT, content.data); + } + + } + } catch (error: any) { + message.error(error); + } + } +} \ No newline at end of file diff --git a/src/manager/ClusterManager.ts b/src/manager/ClusterManager.ts index f422766..37dcce4 100644 --- a/src/manager/ClusterManager.ts +++ b/src/manager/ClusterManager.ts @@ -1,7 +1,7 @@ +import type { Indomitable, ShardEventData } from '../Indomitable'; +import type { ServerSocket } from '../ipc/ServerSocket'; import Cluster, { Worker } from 'node:cluster'; import { clearTimeout } from 'node:timers'; -import { Indomitable, ShardEventData } from '../Indomitable'; -import { MainWorker } from '../ipc/MainWorker'; import { Delay, LibraryEvents } from '../Util'; /** @@ -19,7 +19,6 @@ export interface ClusterManagerOptions { export class ClusterManager { public readonly manager: Indomitable; public readonly id: number; - public readonly ipc: MainWorker; public shards: number[]; public started: boolean; public ready: boolean; @@ -35,7 +34,6 @@ export class ClusterManager { this.manager = options.manager; this.id = options.id; this.shards = options.shards; - this.ipc = new MainWorker(this); this.started = false; this.started = false; this.ready = false; @@ -43,6 +41,10 @@ export class ClusterManager { this.worker = undefined; } + get ipc(): ServerSocket | undefined { + return this.manager.ipcServer!.getServer(this.id.toString()); + } + /** * Destroy associated worker process * @param signal Process exit signal @@ -76,10 +78,10 @@ export class ClusterManager { INDOMITABLE_CONCURRENCY_SERVER_ADDRESS: this.manager.concurrencyServer?.info.address, INDOMITABLE_CONCURRENCY_SERVER_PORT: this.manager.concurrencyServer?.info.port, INDOMITABLE_CONCURRENCY_SERVER_PASSWORD: this.manager.concurrencyServer?.key, + INDOMITABLE_SERVER_IPC_ID: this.manager.ipcServer!.serverId, ...process.env }); this.worker - .on('message', message => this.ipc.handleRawResponse(message, error => this.manager.emit(LibraryEvents.ERROR, error as Error))) .on('error', error => this.manager.emit(LibraryEvents.ERROR, error as Error)) .once('exit', (code, signal) => { this.cleanup(code, signal); @@ -98,7 +100,6 @@ export class ClusterManager { * Remove all listeners on attached worker process and free from memory */ private cleanup(code: number|null, signal: string|null) { - this.ipc.flushPromises(`Cluster exited with close code ${code || 'unknown'} signal ${signal || 'unknown'}`); this.worker?.removeAllListeners(); this.worker = undefined; this.ready = false;