diff --git a/src/client/ShardClient.ts b/src/client/ShardClient.ts index 8a2079c..95f8f73 100644 --- a/src/client/ShardClient.ts +++ b/src/client/ShardClient.ts @@ -21,7 +21,7 @@ export class ShardClient { clientOptions.shardCount = EnvProcessData.shardCount; if (manager.handleConcurrency) { if (!clientOptions.ws) clientOptions.ws = {}; - clientOptions.ws.buildStrategy = ws => new IndomitableStrategy(ws, new BaseWorker(manager)); + if (!clientOptions.ws.buildStrategy) clientOptions.ws.buildStrategy = ws => new IndomitableStrategy(ws, new BaseWorker()); } this.client = new manager.client(clientOptions); // @ts-expect-error: Override shard client util with indomitable shard client util diff --git a/src/client/ShardClientUtil.ts b/src/client/ShardClientUtil.ts index 1b34891..5519258 100644 --- a/src/client/ShardClientUtil.ts +++ b/src/client/ShardClientUtil.ts @@ -125,9 +125,10 @@ export class ShardClientUtil extends EventEmitter { * @returns A promise that resolves to void or a repliable object */ public async send(transportable: Transportable): Promise { + const manager = this.ipc.manager as Indomitable; let abortableData: AbortableData | undefined; - if (!transportable.signal && (this.ipc.manager.ipcTimeout !== Infinity && transportable.repliable)) { - abortableData = MakeAbortableRequest(this.ipc.manager.ipcTimeout); + if (!transportable.signal && (manager.ipcTimeout !== Infinity && transportable.repliable)) { + abortableData = MakeAbortableRequest(manager.ipcTimeout); transportable.signal = abortableData.controller.signal; } try { diff --git a/src/ipc/BaseIpc.ts b/src/ipc/BaseIpc.ts index c18ad1d..24b0017 100644 --- a/src/ipc/BaseIpc.ts +++ b/src/ipc/BaseIpc.ts @@ -1,3 +1,4 @@ +import EventEmitter from 'node:events'; import { Serializable } from 'node:child_process'; import { randomUUID } from 'crypto'; import { Indomitable } from '../Indomitable.js'; @@ -17,9 +18,9 @@ import { * Base class where primary and worker ipc inherits */ export abstract class BaseIpc { - public readonly manager: Indomitable; + public readonly manager: Indomitable|EventEmitter; protected readonly promises: Map; - protected constructor(manager: Indomitable) { + protected constructor(manager: Indomitable|EventEmitter) { this.manager = manager; this.promises = new Map(); } diff --git a/src/ipc/BaseWorker.ts b/src/ipc/BaseWorker.ts index ff56fcd..1faaa9a 100644 --- a/src/ipc/BaseWorker.ts +++ b/src/ipc/BaseWorker.ts @@ -1,17 +1,17 @@ +import EventEmitter from 'node:events'; import { Serializable } from 'node:child_process'; -import { BaseIpc } from './BaseIpc.js'; import { Indomitable } from '../Indomitable'; +import { BaseIpc } from './BaseIpc.js'; import { Message, RawIpcMessage } from '../Util'; /** * Basic worker ipc class, basic child process ipc handler */ export class BaseWorker extends BaseIpc { - constructor(manager: Indomitable) { + constructor(manager: Indomitable | EventEmitter = new EventEmitter()) { super(manager); - process.on('message', - data => this.handleRawResponse(data as Serializable, () => null) - ); + process + .on('message', data => this.handleRawResponse(data as Serializable, () => null)); } protected available(): boolean { diff --git a/src/ipc/MainStrategyWorker.ts b/src/ipc/MainStrategyWorker.ts index 62d714a..4009fa2 100644 --- a/src/ipc/MainStrategyWorker.ts +++ b/src/ipc/MainStrategyWorker.ts @@ -9,7 +9,6 @@ export class MainStrategyWorker extends BaseIpc { public readonly thread: Worker; public readonly strategy: IndomitableStrategy; constructor(id: number, thread: Worker, strategy: IndomitableStrategy) { - // @ts-expect-error: Indomitable will not be used in this class super(new EventEmitter()); this.id = id; this.thread = thread; diff --git a/src/ipc/MainWorker.ts b/src/ipc/MainWorker.ts index 9e015cf..1d5ad91 100644 --- a/src/ipc/MainWorker.ts +++ b/src/ipc/MainWorker.ts @@ -1,5 +1,6 @@ import { BaseIpc } from './BaseIpc.js'; import { ClusterManager } from '../manager/ClusterManager.js'; +import { Indomitable } from '../Indomitable'; import { InternalOps, InternalOpsData, @@ -33,6 +34,7 @@ export class MainWorker extends BaseIpc { protected async handleMessage(message: Message): Promise { this.manager.emit(LibraryEvents.DEBUG, `Received internal message. op: ${message.content.op} | data: ${JSON.stringify(message.content.data || {})}`); + const manager = this.manager as Indomitable; if (internalOpsValues.includes(message.content.op)) { const content = message.content as InternalOpsData; switch(content.op) { @@ -43,7 +45,7 @@ export class MainWorker extends BaseIpc { } case InternalOps.EVAL: { // don't touch eval data, just forward it to clusters since this is already an instance of InternalEvent - const data = await this.manager.broadcast({ + const data = await manager.broadcast({ content, repliable: true }); @@ -51,23 +53,23 @@ export class MainWorker extends BaseIpc { break; } case InternalOps.SESSION_INFO: { - if (content.data.update || !this.manager.cachedSession) - this.manager.cachedSession = await this.manager.fetchSessions(); - message.reply(this.manager.cachedSession); + if (content.data.update || !manager.cachedSession) + manager.cachedSession = await manager.fetchSessions(); + message.reply(manager.cachedSession); break; } case InternalOps.REQUEST_IDENTIFY: - await this.manager.concurrencyManager!.waitForIdentify(content.data.shardId); + await manager.concurrencyManager!.waitForIdentify(content.data.shardId); message.reply(null); break; case InternalOps.CANCEL_IDENTIFY: - this.manager.concurrencyManager!.abortIdentify(content.data.shardId); + manager.concurrencyManager!.abortIdentify(content.data.shardId); break; case InternalOps.RESTART: - await this.manager.restart(content.data.clusterId); + await manager.restart(content.data.clusterId); break; case InternalOps.RESTART_ALL: - await this.manager.restartAll(); + await manager.restartAll(); break; } } else if (clientEventsValues.includes(message.content.op)) { diff --git a/src/ipc/ThreadStrategyWorker.ts b/src/ipc/ThreadStrategyWorker.ts index 11832ca..91cd669 100644 --- a/src/ipc/ThreadStrategyWorker.ts +++ b/src/ipc/ThreadStrategyWorker.ts @@ -12,7 +12,6 @@ import { export class ThreadStrategyWorker extends BaseIpc { private shard: WebSocketShard|undefined; constructor() { - // @ts-expect-error: Indomitable will not be used in the thread process super(new EventEmitter()); parentPort!.on('message', message => this.handleRawResponse(message, () => null)); }