diff --git a/src/concurrency/AsyncQueue.ts b/src/concurrency/AsyncQueue.ts new file mode 100644 index 0000000..df9fc7f --- /dev/null +++ b/src/concurrency/AsyncQueue.ts @@ -0,0 +1,44 @@ +import { EventEmitter, once } from 'events'; + +export declare interface AsyncQueueWaitOptions { + signal?: AbortSignal | undefined; +} + +export declare interface AsyncQueueEmitter extends EventEmitter { + on(event: 'resolve', listener: (message: string) => void): this; + once(event: 'resolve', listener: (message: string) => void): this; + off(event: 'resolve', listener: (event: unknown) => void): this; +} + +export class AsyncQueue { + private queue: AsyncQueueEmitter[]; + constructor() { + this.queue = []; + } + + public get remaining(): number { + return this.queue.length; + } + + public wait({ signal }: AsyncQueueWaitOptions): Promise { + const next = this.remaining ? once(this.queue[this.remaining - 1], 'resolve', { signal }) : Promise.resolve([]); + + const emitter: AsyncQueueEmitter = new EventEmitter(); + this.queue.push(emitter); + + if (signal) { + const listener = () => { + const index = this.queue.indexOf(emitter); + if (index !== 1) this.queue.splice(index, 1); + } + signal.addEventListener('abort', listener); + } + + return next + } + + public shift(): void { + const emitter = this.queue.shift(); + if (typeof emitter !== 'undefined') emitter.emit('resolve'); + } +} diff --git a/src/concurrency/ConcurrencyManager.ts b/src/concurrency/ConcurrencyManager.ts index 8200e02..9f12f53 100644 --- a/src/concurrency/ConcurrencyManager.ts +++ b/src/concurrency/ConcurrencyManager.ts @@ -1,14 +1,18 @@ -import { SimpleIdentifyThrottler } from '@discordjs/ws'; +import { ExtendedMap } from './ExtendedMap'; +import { AsyncQueue } from './AsyncQueue'; +import { Delay } from '../Util'; /** - * A wrapper for @discordjs/ws to work exclusively with Indomitable's dynamic concurrency with support for abort controller + * Based on Discord.JS Simple Identify Throttler, for use of Indomitable */ export class ConcurrencyManager { - private readonly throttler: SimpleIdentifyThrottler; + private readonly queues: ExtendedMap; private readonly signals: Map; + private readonly concurrency: number; constructor(concurrency: number) { - this.throttler = new SimpleIdentifyThrottler(concurrency); + this.queues = new ExtendedMap(); this.signals = new Map(); + this.concurrency = concurrency; } /** @@ -16,12 +20,32 @@ export class ConcurrencyManager { */ public async waitForIdentify(shardId: number): Promise { try { - let abort = this.signals.get(shardId); - if (!abort) { - abort = new AbortController(); - this.signals.set(shardId, abort); + const abort = this.signals.get(shardId) || new AbortController(); + + if (!this.signals.has(shardId)) this.signals.set(shardId, abort); + + const key = shardId % this.concurrency; + const state = this.queues.ensure(key, () => { + return { + queue: new AsyncQueue(), + resets: Number.POSITIVE_INFINITY + }; + }); + + try { + await state.queue.wait({ signal: abort.signal }); + + const difference = state.resets - Date.now(); + + if (difference <= 5000) { + const time = difference + Math.random() * 1500; + await Delay(time); + } + + state.resets = Date.now() + 5_000; + } finally { + state.queue.shift(); } - await this.throttler.waitForIdentify(shardId, abort.signal); } finally { this.signals.delete(shardId); } diff --git a/src/concurrency/ExtendedMap.ts b/src/concurrency/ExtendedMap.ts new file mode 100644 index 0000000..4b58458 --- /dev/null +++ b/src/concurrency/ExtendedMap.ts @@ -0,0 +1,11 @@ +export class ExtendedMap extends Map { + public ensure(key: K, generator: (key: K, collection: ExtendedMap) => V): V { + let value = this.get(key) + if (value) return value; + + value = generator(key, this); + this.set(key, value) + + return value; + } +} \ No newline at end of file