Skip to content

Commit

Permalink
feat: new ipc system
Browse files Browse the repository at this point in the history
  • Loading branch information
Deivu committed Jan 4, 2025
1 parent 4d7a66c commit af32b47
Show file tree
Hide file tree
Showing 13 changed files with 459 additions and 380 deletions.
24 changes: 18 additions & 6 deletions src/Indomitable.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import type { Client, ClientOptions as DiscordJsClientOptions } from 'discord.js';
import type { Message } from './ipc/BaseSocket';
import Cluster, { ClusterSettings } from 'node:cluster';
import EventEmitter from 'node:events';
import Os from 'node:os';
import { clearTimeout } from 'node:timers';
import { ConcurrencyServer } from './concurrency/ConcurrencyServer';
import { ShardClient } from './client/ShardClient';
import { ClusterManager } from './manager/ClusterManager';
import { IpcServer } from './ipc/IpcServer';
import {
Chunk,
FetchSessions,
Expand All @@ -14,13 +16,11 @@ import {
InternalOps,
InternalOpsData,
LibraryEvents,
Message,
SessionObject,
Transportable,
Sendable
} from './Util';


/**
* Options to control Indomitable behavior
*/
Expand Down Expand Up @@ -145,6 +145,7 @@ export class Indomitable extends EventEmitter {
public shardCount: number|'auto';
public cachedSession?: SessionObject;
public concurrencyServer?: ConcurrencyServer;
public ipcServer?: IpcServer;
public readonly clientOptions: DiscordJsClientOptions;
public readonly clusterSettings: ClusterSettings;
public readonly ipcTimeout: number;
Expand Down Expand Up @@ -188,6 +189,7 @@ export class Indomitable extends EventEmitter {
this.token = options.token;
this.clusters = new Map();
this.spawnQueue = [];
this.ipcServer = undefined;
this.concurrencyServer = undefined;
this.cachedSession = undefined;
this.busy = false;
Expand Down Expand Up @@ -230,29 +232,39 @@ export class Indomitable extends EventEmitter {
await shardClient.start(this.token);
return;
}

this.ipcServer = new IpcServer(this);
await this.ipcServer.listen();

if (this.handleConcurrency) {
const sessions = await this.fetchSessions();
this.concurrencyServer = new ConcurrencyServer(this, sessions.session_start_limit.max_concurrency);
const info = await this.concurrencyServer.start();
this.emit(LibraryEvents.DEBUG, `Handle concurrency is currently enabled! =>\n Server is currently bound to:\n Address: ${info.address}:${info.port}\n Concurrency: ${sessions.session_start_limit.max_concurrency}`);
}

if (typeof this.clusterCount !== 'number')
this.clusterCount = Os.cpus().length;
if (typeof this.shardCount !== 'number') {
const sessions = await this.fetchSessions();
this.shardCount = sessions.shards;
}

if (this.shardCount < this.clusterCount)
this.clusterCount = this.shardCount;

this.emit(LibraryEvents.DEBUG, `Starting ${this.shardCount} websocket shards across ${this.clusterCount} clusters`);
const shards = [ ...Array(this.shardCount).keys() ];
const chunks = Chunk(shards, Math.round(this.shardCount / this.clusterCount));

Cluster.setupPrimary({ ...{ serialization: 'json' }, ...this.clusterSettings });

for (let id = 0; id < this.clusterCount; id++) {
const chunk = chunks.shift()!;
const cluster = new ClusterManager({ id, shards: chunk, manager: this });
this.clusters.set(id, cluster);
}

await this.addToSpawnQueue(...this.clusters.values());
}

Expand Down Expand Up @@ -286,21 +298,21 @@ export class Indomitable extends EventEmitter {
if (!cluster) throw new Error('Invalid cluster id provided');

let abortableData: AbortableData|undefined;
if (this.ipcTimeout !== Infinity && sendable.repliable) {
if (this.ipcTimeout !== Infinity && sendable.reply) {
abortableData = MakeAbortableRequest(this.ipcTimeout);
}

let transportable: Transportable = {
content: sendable.content,
repliable: sendable.repliable
reply: sendable.reply
};

if (abortableData) {
transportable.signal = abortableData.controller.signal;
}

try {
return await cluster.ipc.send(transportable);
return await cluster.ipc?.send(transportable);
} finally {
if (abortableData) {
clearTimeout(abortableData.timeout);
Expand Down Expand Up @@ -380,7 +392,7 @@ export class Indomitable extends EventEmitter {
data: {},
internal: true
};
await this.send(id, { content, repliable: true });
await this.send(id, { content, reply: true });
}

/**
Expand Down
56 changes: 28 additions & 28 deletions src/Util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@ export const EnvProcessData = {
clusterId: Number(process.env.INDOMITABLE_CLUSTER || 0),
clusterCount: Number(process.env.INDOMITABLE_CLUSTER_TOTAL || 0),
shardIds: (process.env.INDOMITABLE_SHARDS || '').split(' ').map(Number),
shardCount: Number(process.env.INDOMITABLE_SHARDS_TOTAL || 0)
shardCount: Number(process.env.INDOMITABLE_SHARDS_TOTAL || 0),
serverIpcId: process.env.INDOMITABLE_SERVER_IPC_ID || ''
};

/**
* Internal operation codes for the cluster -> thread
* Internal operation codes for the main process -> cluster process
*/
export enum MainStrategyOps {
CONNECT = 'connect',
Expand All @@ -24,7 +25,7 @@ export enum MainStrategyOps {
}

/**
* Internal operation codes for the thread <- cluster
* Internal operation codes for the cluster process -> main process
*/
export enum ThreadStrategyOps {
REQUEST_IDENTIFY = 'requestIdentify',
Expand All @@ -43,7 +44,8 @@ export enum InternalOps {
RESTART_ALL = 'restartAll',
DESTROY_CLIENT = 'destroyClient',
SESSION_INFO = 'sessionInfo',
PING = 'ping'
PING = 'ping',
IDENTIFY = 'identify'
}

/**
Expand Down Expand Up @@ -105,22 +107,29 @@ export interface ThreadStrategyData {
internal: true
}

export interface InternalData {
internal: true
}

/**
* Data structure representing an internal event
*/
export interface InternalOpsData {
op: InternalOps,
data: any,
internal: true
export interface InternalOpsData extends InternalData {
op: InternalOps;
data: any;
}

/**
* Data structure representing an internal discord.js event
*/
export interface ClientEventData {
op: ClientEvents,
data: any,
internal: true,
export interface ClientEventData extends InternalData {
op: ClientEvents;
data: any;
}

export interface IpcIdentify {
clusterId: number;
serverId: string;
}

/**
Expand All @@ -137,8 +146,8 @@ export interface IpcErrorData {
*/
export interface Transportable {
content: any;
repliable?: boolean;
signal?: AbortSignal
reply?: boolean;
signal?: AbortSignal;
}

/**
Expand All @@ -150,12 +159,12 @@ export type Sendable = Omit<Transportable, 'signal'>;
* Data structure representing an internal abort data
*/
export interface InternalAbortSignal {
listener: () => void,
listener: () => void;
signal: AbortSignal
}

export interface SavePromiseOptions {
id: string;
nonce: string;
resolve: (data: unknown) => void;
reject: (reason: unknown) => void;
signal?: AbortSignal | undefined;
Expand All @@ -181,22 +190,13 @@ export interface InternalPromise {
/**
* Data structure representing internal IPC data
*/
export interface RawIpcMessage {
id: string|null;
export interface RawIpcMessage extends InternalData {
nonce: string;
content: any;
internal: true;
reply: boolean;
type: RawIpcMessageType
}

/**
* Data structure representing an IPC message
*/
export interface Message {
reply: (data: any) => void;
content: any;
repliable: boolean;
}

/**
* Data structure representing a Discord session
*/
Expand Down
2 changes: 1 addition & 1 deletion src/client/ShardClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ export class ShardClient {
const shardClientUtil = this.client.shard as ShardClientUtil;
const content: ClientEventData = { ...partial, internal: true };
shardClientUtil
.send({ content, repliable: false })
.send({ content, reply: false })
.catch((error: unknown) => this.client.emit(ClientEvents.ERROR, error as Error));
}
}
26 changes: 15 additions & 11 deletions src/client/ShardClientUtil.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
import type { Client } from 'discord.js';
import type { Message } from '../ipc/BaseSocket';
import type { Indomitable } from '../Indomitable';
import EventEmitter from 'node:events';
import { clearTimeout } from 'timers';
import { Indomitable } from '../Indomitable';
import { ClientWorker } from '../ipc/ClientWorker';
import { ClientSocket } from '../ipc/ClientSocket';
import {
EnvProcessData,
MakeAbortableRequest,
AbortableData,
InternalOps,
InternalOpsData,
Message,
SessionObject,
Transportable
} from '../Util';
Expand All @@ -29,19 +29,24 @@ export declare interface ShardClientUtil {
*/
export class ShardClientUtil extends EventEmitter {
public client: Client;
public readonly ipc: ClientWorker;
private readonly manager: Indomitable;
public readonly clusterId: number;
public readonly clusterCount: number;
public readonly shardIds: number[];
public readonly shardCount: number;
public readonly ipc: ClientSocket;

constructor(client: Client, manager: Indomitable) {
super();
this.client = client;
this.ipc = new ClientWorker(this, manager);
this.manager = manager;
this.clusterId = EnvProcessData.clusterId;
this.clusterCount = EnvProcessData.clusterCount;
this.shardIds = EnvProcessData.shardIds;
this.shardCount = EnvProcessData.shardCount;
this.ipc = new ClientSocket(this, EnvProcessData.serverIpcId);

this.ipc.connect();
}

/**
Expand All @@ -55,7 +60,7 @@ export class ShardClientUtil extends EventEmitter {
internal: true
};
const start = process.hrtime.bigint();
const end = await this.send({ content, repliable: true }) as number;
const end = await this.send({ content, reply: true }) as number;
return Number(BigInt(end) - start);
}

Expand All @@ -69,7 +74,7 @@ export class ShardClientUtil extends EventEmitter {
data: `(${script.toString()})(this, ${JSON.stringify(context)})`,
internal: true
};
return this.send({ content, repliable: true }) as Promise<unknown[]>;
return this.send({ content, reply: true }) as Promise<unknown[]>;
}

/**
Expand All @@ -91,7 +96,7 @@ export class ShardClientUtil extends EventEmitter {
data: { update },
internal: true
};
return this.send({ content, repliable: true }) as Promise<SessionObject>;
return this.send({ content, reply: true }) as Promise<SessionObject>;
}

/**
Expand Down Expand Up @@ -125,10 +130,9 @@ export class ShardClientUtil extends EventEmitter {
* @returns A promise that resolves to void or a repliable object
*/
public async send(transportable: Transportable): Promise<unknown|undefined> {
const manager = this.ipc.manager as Indomitable;
let abortableData: AbortableData | undefined;
if (!transportable.signal && (manager.ipcTimeout !== Infinity && transportable.repliable)) {
abortableData = MakeAbortableRequest(manager.ipcTimeout);
if (!transportable.signal && (this.manager.ipcTimeout !== Infinity && transportable.reply)) {
abortableData = MakeAbortableRequest(this.manager.ipcTimeout);
transportable.signal = abortableData.controller.signal;
}
try {
Expand Down
Loading

0 comments on commit af32b47

Please sign in to comment.