Skip to content

Commit

Permalink
fix: make ipc class detachable if needed
Browse files Browse the repository at this point in the history
  • Loading branch information
Deivu committed Jul 31, 2023
1 parent bd80dde commit 9bd4d36
Show file tree
Hide file tree
Showing 7 changed files with 22 additions and 20 deletions.
2 changes: 1 addition & 1 deletion src/client/ShardClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ export class ShardClient {
clientOptions.shardCount = EnvProcessData.shardCount;
if (manager.handleConcurrency) {
if (!clientOptions.ws) clientOptions.ws = {};
clientOptions.ws.buildStrategy = ws => new IndomitableStrategy(ws, new BaseWorker(manager));
if (!clientOptions.ws.buildStrategy) clientOptions.ws.buildStrategy = ws => new IndomitableStrategy(ws, new BaseWorker());
}
this.client = new manager.client(clientOptions);
// @ts-expect-error: Override shard client util with indomitable shard client util
Expand Down
5 changes: 3 additions & 2 deletions src/client/ShardClientUtil.ts
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,10 @@ export class ShardClientUtil extends EventEmitter {
* @returns A promise that resolves to void or a repliable object
*/
public async send(transportable: Transportable): Promise<unknown|undefined> {
const manager = this.ipc.manager as Indomitable;
let abortableData: AbortableData | undefined;
if (!transportable.signal && (this.ipc.manager.ipcTimeout !== Infinity && transportable.repliable)) {
abortableData = MakeAbortableRequest(this.ipc.manager.ipcTimeout);
if (!transportable.signal && (manager.ipcTimeout !== Infinity && transportable.repliable)) {
abortableData = MakeAbortableRequest(manager.ipcTimeout);
transportable.signal = abortableData.controller.signal;
}
try {
Expand Down
5 changes: 3 additions & 2 deletions src/ipc/BaseIpc.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import EventEmitter from 'node:events';
import { Serializable } from 'node:child_process';
import { randomUUID } from 'crypto';
import { Indomitable } from '../Indomitable.js';
Expand All @@ -17,9 +18,9 @@ import {
* Base class where primary and worker ipc inherits
*/
export abstract class BaseIpc {
public readonly manager: Indomitable;
public readonly manager: Indomitable|EventEmitter;
protected readonly promises: Map<string, InternalPromise>;
protected constructor(manager: Indomitable) {
protected constructor(manager: Indomitable|EventEmitter) {
this.manager = manager;
this.promises = new Map();
}
Expand Down
10 changes: 5 additions & 5 deletions src/ipc/BaseWorker.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
import EventEmitter from 'node:events';
import { Serializable } from 'node:child_process';
import { BaseIpc } from './BaseIpc.js';
import { Indomitable } from '../Indomitable';
import { BaseIpc } from './BaseIpc.js';
import { Message, RawIpcMessage } from '../Util';

/**
* Basic worker ipc class, basic child process ipc handler
*/
export class BaseWorker extends BaseIpc {
constructor(manager: Indomitable) {
constructor(manager: Indomitable | EventEmitter = new EventEmitter()) {
super(manager);
process.on('message',
data => this.handleRawResponse(data as Serializable, () => null)
);
process
.on('message', data => this.handleRawResponse(data as Serializable, () => null));
}

protected available(): boolean {
Expand Down
1 change: 0 additions & 1 deletion src/ipc/MainStrategyWorker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ export class MainStrategyWorker extends BaseIpc {
public readonly thread: Worker;
public readonly strategy: IndomitableStrategy;
constructor(id: number, thread: Worker, strategy: IndomitableStrategy) {
// @ts-expect-error: Indomitable will not be used in this class
super(new EventEmitter());
this.id = id;
this.thread = thread;
Expand Down
18 changes: 10 additions & 8 deletions src/ipc/MainWorker.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { BaseIpc } from './BaseIpc.js';
import { ClusterManager } from '../manager/ClusterManager.js';
import { Indomitable } from '../Indomitable';
import {
InternalOps,
InternalOpsData,
Expand Down Expand Up @@ -33,6 +34,7 @@ export class MainWorker extends BaseIpc {

protected async handleMessage(message: Message): Promise<void> {
this.manager.emit(LibraryEvents.DEBUG, `Received internal message. op: ${message.content.op} | data: ${JSON.stringify(message.content.data || {})}`);
const manager = this.manager as Indomitable;
if (internalOpsValues.includes(message.content.op)) {
const content = message.content as InternalOpsData;
switch(content.op) {
Expand All @@ -43,31 +45,31 @@ export class MainWorker extends BaseIpc {
}
case InternalOps.EVAL: {
// don't touch eval data, just forward it to clusters since this is already an instance of InternalEvent
const data = await this.manager.broadcast({
const data = await manager.broadcast({
content,
repliable: true
});
message.reply(data);
break;
}
case InternalOps.SESSION_INFO: {
if (content.data.update || !this.manager.cachedSession)
this.manager.cachedSession = await this.manager.fetchSessions();
message.reply(this.manager.cachedSession);
if (content.data.update || !manager.cachedSession)
manager.cachedSession = await manager.fetchSessions();
message.reply(manager.cachedSession);
break;
}
case InternalOps.REQUEST_IDENTIFY:
await this.manager.concurrencyManager!.waitForIdentify(content.data.shardId);
await manager.concurrencyManager!.waitForIdentify(content.data.shardId);
message.reply(null);
break;
case InternalOps.CANCEL_IDENTIFY:
this.manager.concurrencyManager!.abortIdentify(content.data.shardId);
manager.concurrencyManager!.abortIdentify(content.data.shardId);
break;
case InternalOps.RESTART:
await this.manager.restart(content.data.clusterId);
await manager.restart(content.data.clusterId);
break;
case InternalOps.RESTART_ALL:
await this.manager.restartAll();
await manager.restartAll();
break;
}
} else if (clientEventsValues.includes(message.content.op)) {
Expand Down
1 change: 0 additions & 1 deletion src/ipc/ThreadStrategyWorker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import {
export class ThreadStrategyWorker extends BaseIpc {
private shard: WebSocketShard|undefined;
constructor() {
// @ts-expect-error: Indomitable will not be used in the thread process
super(new EventEmitter());
parentPort!.on('message', message => this.handleRawResponse(message, () => null));
}
Expand Down

0 comments on commit 9bd4d36

Please sign in to comment.