Skip to content

Commit

Permalink
feat: concurrency manager refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
Deivu committed Apr 17, 2024
1 parent d6a303b commit a229a61
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 9 deletions.
44 changes: 44 additions & 0 deletions src/concurrency/AsyncQueue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import { EventEmitter, once } from 'events';

export declare interface AsyncQueueWaitOptions {
signal?: AbortSignal | undefined;
}

export declare interface AsyncQueueEmitter extends EventEmitter {
on(event: 'resolve', listener: (message: string) => void): this;
once(event: 'resolve', listener: (message: string) => void): this;
off(event: 'resolve', listener: (event: unknown) => void): this;
}

export class AsyncQueue {
private queue: AsyncQueueEmitter[];
constructor() {
this.queue = [];
}

public get remaining(): number {
return this.queue.length;
}

public wait({ signal }: AsyncQueueWaitOptions): Promise<void[]> {
const next = this.remaining ? once(this.queue[this.remaining - 1], 'resolve', { signal }) : Promise.resolve([]);

const emitter: AsyncQueueEmitter = new EventEmitter();
this.queue.push(emitter);

if (signal) {
const listener = () => {
const index = this.queue.indexOf(emitter);
if (index !== 1) this.queue.splice(index, 1);
}
signal.addEventListener('abort', listener);
}

return next
}

public shift(): void {
const emitter = this.queue.shift();
if (typeof emitter !== 'undefined') emitter.emit('resolve');
}
}
42 changes: 33 additions & 9 deletions src/concurrency/ConcurrencyManager.ts
Original file line number Diff line number Diff line change
@@ -1,27 +1,51 @@
import { SimpleIdentifyThrottler } from '@discordjs/ws';
import { ExtendedMap } from './ExtendedMap';
import { AsyncQueue } from './AsyncQueue';
import { Delay } from '../Util';

/**
* A wrapper for @discordjs/ws to work exclusively with Indomitable's dynamic concurrency with support for abort controller
* Based on Discord.JS Simple Identify Throttler, for use of Indomitable
*/
export class ConcurrencyManager {
private readonly throttler: SimpleIdentifyThrottler;
private readonly queues: ExtendedMap;
private readonly signals: Map<number, AbortController>;
private readonly concurrency: number;
constructor(concurrency: number) {
this.throttler = new SimpleIdentifyThrottler(concurrency);
this.queues = new ExtendedMap();
this.signals = new Map();
this.concurrency = concurrency;
}

/**
* Method to try and acquire a lock for identify
*/
public async waitForIdentify(shardId: number): Promise<void> {
try {
let abort = this.signals.get(shardId);
if (!abort) {
abort = new AbortController();
this.signals.set(shardId, abort);
const abort = this.signals.get(shardId) || new AbortController();

if (!this.signals.has(shardId)) this.signals.set(shardId, abort);

const key = shardId % this.concurrency;
const state = this.queues.ensure(key, () => {
return {
queue: new AsyncQueue(),
resets: Number.POSITIVE_INFINITY
};
});

try {
await state.queue.wait({ signal: abort.signal });

const difference = state.resets - Date.now();

if (difference <= 5000) {
const time = difference + Math.random() * 1500;
await Delay(time);
}

state.resets = Date.now() + 5_000;
} finally {
state.queue.shift();
}
await this.throttler.waitForIdentify(shardId, abort.signal);
} finally {
this.signals.delete(shardId);
}
Expand Down
11 changes: 11 additions & 0 deletions src/concurrency/ExtendedMap.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
export class ExtendedMap extends Map {
public ensure<K, V>(key: K, generator: (key: K, collection: ExtendedMap) => V): V {
let value = this.get(key)
if (value) return value;

value = generator(key, this);
this.set(key, value)

return value;
}
}

0 comments on commit a229a61

Please sign in to comment.