Skip to content

Commit

Permalink
✨ get some basic gateway stuff out of the way
Browse files Browse the repository at this point in the history
  • Loading branch information
melike2d committed Sep 27, 2021
1 parent 15f6290 commit 2aaf734
Show file tree
Hide file tree
Showing 23 changed files with 813 additions and 228 deletions.
8 changes: 4 additions & 4 deletions .eslintrc.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"extends": "@neocord",
"parserOptions": {
"project": "./tsconfig.eslint.json"
}
"extends": "@keiryo",
"parserOptions": {
"project": "./tsconfig.eslint.json"
}
}
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@

---

copyright 2020 - 2021 © neocord team & contributors
copyright 2020 - 2021 © keiryo team & contributors
10 changes: 5 additions & 5 deletions ava.config.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
export default {
typescript: {
rewritePaths: {
"src/": "dist/"
typescript: {
rewritePaths: {
"src/": "dist/"
}
}
}
}
}
77 changes: 43 additions & 34 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,36 +1,45 @@
{
"name": "@neocord/project",
"version": "1.0.0",
"description": "A project in the neocord family!",
"main": "dist/index.js",
"repository": "[email protected]:neo-cord/typescript-project.git",
"author": "neocord team",
"license": "Apache-2.0",
"private": false,
"publishConfig": {
"access": "public"
},
"scripts": {
"build": "rm -rf dist && tsc --removeComments",
"lint": "eslint src --ext .ts",
"lint:fix": "yarn lint --fix",
"test": "ava"
},
"files": [
"!*.test.*",
"!*.spec.*",
"dist/",
"LICENSE",
"README.md"
],
"devDependencies": {
"@ava/typescript": "1.1.1",
"@neocord/eslint-config": "neo-cord/eslint-config",
"@types/node": "14.14.41",
"@typescript-eslint/eslint-plugin": "4.22.0",
"@typescript-eslint/parser": "4.22.0",
"ava": "3.15.0",
"eslint": "7.25.0",
"typescript": "4.2.4"
}
"name": "@keiryo/project",
"version": "1.0.0",
"description": "A project in the keiryo family!",
"main": "dist/index.js",
"repository": "[email protected]:KeiryoJS/typescript-project.git",
"author": "keiryo team",
"license": "Apache-2.0",
"private": false,
"publishConfig": {
"access": "public"
},
"scripts": {
"build": "rm -rf dist && tsc --removeComments",
"lint": "eslint src --ext .ts",
"lint:fix": "yarn lint --fix",
"test": "ava"
},
"files": [
"!*.test.*",
"!*.spec.*",
"dist/",
"LICENSE",
"README.md"
],
"devDependencies": {
"@ava/typescript": "2.0.0",
"@keiryo/eslint-config": "^1.0.1",
"@types/node": "16.9.6",
"@types/ws": "^8.2.0",
"@typescript-eslint/eslint-plugin": "4.31.2",
"@typescript-eslint/parser": "4.31.2",
"ava": "3.15.0",
"eslint": "7.32.0",
"tiny-typed-emitter": "^2.1.0",
"typescript": "4.4.3"
},
"dependencies": {
"@keiryo/common": "^1.0.2",
"@keiryo/rest": "^1.0.1",
"discord-api-types": "^0.23.1",
"tslib": "^2.3.1",
"ws": "^8.2.2"
}
}
8 changes: 4 additions & 4 deletions renovate.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"extends": [
"config:base"
],
"commitMessage": ":arrow_up: {{{commitMessagePrefix}}} {{{commitMessageAction}}} {{{commitMessageTopic}}} {{{commitMessageExtra}}} {{{commitMessageSuffix}}}"
"extends": [
"config:base"
],
"commitMessage": ":arrow_up: {{{commitMessagePrefix}}} {{{commitMessageAction}}} {{{commitMessageTopic}}} {{{commitMessageExtra}}} {{{commitMessageSuffix}}}"
}
6 changes: 6 additions & 0 deletions src/cluster.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import type { RestClient } from "@keiryo/rest";

export class Cluster {
constructor(readonly rest: RestClient) {
}
}
61 changes: 61 additions & 0 deletions src/gateway/default.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import type { GatewayEvents, IGateway, ISessionManager, MaxConcurrency, Session } from "./interface";
import { AsyncLimiter, Collection } from "@keiryo/common";
import type { RestClient } from "@keiryo/rest";
import type { RESTGetAPIGatewayBotResult } from "discord-api-types";
import { TypedEmitter } from "tiny-typed-emitter";

export class Gateway extends TypedEmitter<GatewayEvents> implements IGateway {
readonly sessions: ISessionManager;

private buckets: Collection<number, AsyncLimiter>;
private maxConcurrency?: MaxConcurrency;

constructor(readonly rest: RestClient) {
super();

this.sessions = new SessionManager();
this.buckets = new Collection();
}

async setup(): Promise<void> {
await this.get();
for (let bucket = 0; bucket < this.maxConcurrency!; bucket++) {
const limiter = new AsyncLimiter(1, 5000, true);
this.buckets.set(bucket, limiter);
}
}

async queueIdentify(shard: number, callback: () => Promise<void>): Promise<void> {
if (!this.maxConcurrency) {
throw new Error("This gateway instance has not been set up yet. Please call Gateway#setup before identifying.");
}

let bucket = this.buckets.get(shard % this.maxConcurrency);
if (!bucket) {
this.emit("warn", `Creating rate-limit bucket for unknown key: ${shard % this.maxConcurrency}.`);
bucket = new AsyncLimiter(1, 5000, true);
this.buckets.set(shard % this.maxConcurrency, bucket);
}

await bucket.consume(callback);
}

async get(): Promise<RESTGetAPIGatewayBotResult> {
const result = await this.rest.get<RESTGetAPIGatewayBotResult>("/gateway/bot");
this.maxConcurrency = result!.session_start_limit.max_concurrency as MaxConcurrency;
return result!;
}
}

export class SessionManager implements ISessionManager {
private _sessions = new Collection<number, Session>();

async get(shard: number): Promise<Session> {
return this._sessions.get(shard) ?? { sequence: -1 };
}

async update(shard: number, session: Session): Promise<void> {
const current = await this.get(shard);
this._sessions.set(shard, { ...current, ...session });
}
}
2 changes: 2 additions & 0 deletions src/gateway/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export * from "./interface";
export * from "./default";
63 changes: 63 additions & 0 deletions src/gateway/interface.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import type { TypedEmitter } from "tiny-typed-emitter";
import type { RESTGetAPIGatewayBotResult } from "discord-api-types";

export interface IGateway extends TypedEmitter<GatewayEvents> {
/**
* The session manager.
*/
readonly sessions: ISessionManager;

/**
* Setup this gateway, usually called right after instantiation.
* @returns {Promise<void>}
*/
setup(): Promise<void>;

/**
* Queues an OP 2 Identify payload.
*
* @param shard The shard id which is identifying.
* @param callback The callback
*/
queueIdentify(shard: number, callback: () => Promise<void>): Promise<void>;

/**
* Gets the /gateway/bot information.
*
* @returns {Promise<RESTGetAPIGatewayBotResult>}
*/
get(): Promise<RESTGetAPIGatewayBotResult>;
}

export interface ISessionManager {
/**
* Get a shard's session.
* @param shard The shard id.
*/
get(shard: number): Promise<Session>;

/**
* Update a shard's session.
* @param shard The shard to update.
* @param session The session data.
*/
update(shard: number, session: Session): Promise<void>;
}

export type MaxConcurrency = 1 | 16 | 32 | 64;

export type GatewayEvents = {
debug: (message: string) => void;
warn: (message: string) => void;
}

export interface Session {
/**
* The ID for this session.
*/
id?: string;
/**
* The last received sequence, used for resuming.
*/
sequence: number;
}
6 changes: 0 additions & 6 deletions src/index.test.ts

This file was deleted.

11 changes: 10 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
@@ -1 +1,10 @@
export const hello = "world";
export * from "./gateway";

export * from "./shard/compression";
export * from "./shard/encoding";
export * from "./shard/interface";
export * from "./shard/state";

export * from "./util/functions";

export * from "./cluster";
2 changes: 2 additions & 0 deletions src/shard/compression/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export * from "./interface";
export * from "./zlib";
31 changes: 31 additions & 0 deletions src/shard/compression/interface.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import type { TypedEmitter } from "tiny-typed-emitter";

export interface IDecompressor extends TypedEmitter<DecompressorEvents> {
readonly type: "zlib-stream";

/**
* Initializes this decompressor.
*/
init(): void;

/**
* Closes this decompressor.
*/
close(): void;

/**
* Adds data to decompress.
* @param data The data to decompress.
*/
decompress(data: CompressedData): void;
}

export type DecompressorFactory = () => IDecompressor;

export type CompressedData = string | Buffer | Buffer[] | ArrayBuffer;

export type DecompressorEvents = {
data: (decompressed: Buffer) => void;
debug: (message: string) => void;
error: (error: Error) => void;
}
83 changes: 83 additions & 0 deletions src/shard/compression/zlib.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import type { CompressedData, DecompressorEvents, DecompressorFactory, IDecompressor } from "./interface";
import { TypedEmitter } from "tiny-typed-emitter";
import * as zlib from "zlib";

export class ZlibDecompressor extends TypedEmitter<DecompressorEvents> implements IDecompressor {
static readonly FACTORY: DecompressorFactory = () => new ZlibDecompressor();

readonly type = "zlib-stream";

private chunks: Buffer[] = [];
private incomingChunks: Buffer[] = [];
private flushing = false;
private unzip!: zlib.Unzip;

init(): void {
this.unzip = zlib.createUnzip({
flush: zlib.constants.Z_SYNC_FLUSH,
chunkSize: 128 * 1024
});

this.unzip.on("data", c => this.chunks.push(c));
this.unzip.on("error", e => this.emit("error", e));
}

close(): void {
this.unzip.close();
}

decompress(data: CompressedData): void {
if (data instanceof Buffer) {
this._decompress(data);
return;
} else if (Array.isArray(data)) {
this.emit("debug", "received fragmented buffer message.");
for (const buf of data) this._decompress(buf);
return;
} else if (data instanceof ArrayBuffer) {
this.emit("debug", "received array buffer message.");
this._decompress(Buffer.from(data));
return;
}

throw new TypeError("Received invalid data");
}

private _decompress(buf: Buffer) {
this.flushing
? this.incomingChunks.push(buf)
: this._write(buf);
}

private _flush() {
this.flushing = false;
if (!this.chunks.length) {
return;
}

let buf = this.chunks[0];
if (this.chunks.length > 1) {
buf = Buffer.concat(this.chunks);
}

this.chunks = [];
while (this.incomingChunks.length > 0) {
const incoming = this.incomingChunks.shift();
if (incoming && this._write(incoming)) break;
}

this.emit("data", buf);
}

private _write(buf: Buffer) {
this.unzip.write(buf);

const len = buf.length;
if (len >= 4 && buf.readUInt32BE(len - 4) === 0xFFFF) {
this.flushing = true;
this.unzip.flush(zlib.constants.Z_SYNC_FLUSH, this._flush.bind(this));
}

return this.flushing;
}
}
2 changes: 2 additions & 0 deletions src/shard/encoding/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export * from "./interface";
export * from "./json";
Loading

0 comments on commit 2aaf734

Please sign in to comment.