diff --git a/source/droid/gateway/compression.d b/source/droid/gateway/compression.d new file mode 100644 index 0000000..e22407c --- /dev/null +++ b/source/droid/gateway/compression.d @@ -0,0 +1,48 @@ +module droid.gateway.compression; + +import droid.exception; +import std.zlib, + std.stdio, + std.conv : to; + +enum CompressionType : string { + NONE = "", + ZLIB = "zlib", + ZLIB_STREAM = "zlib-stream" +} +class Decompressor { + string read(ubyte[] data) { + throw new DroidException("Compression type not supported!"); + } +} + +class ZLibStream : Decompressor { + const ubyte[] ZLIB_SUFFIX = [0x0, 0x0, 0xFF, 0xFF]; + UnCompress decompressor; + + ubyte[] buffer; + + this() { + decompressor = new UnCompress(HeaderFormat.deflate); + } + + /* + * Reads a zlib stream from the websocket + * This will append the data to a buffer, + * returning nothing if the data is not a full zlib frame + * otherwise, returning the decompressed string. + */ + override string read(ubyte[] data) { + buffer ~= data; + + if (data[$-4..$] != ZLIB_SUFFIX) { + return ""; + } + + string decompressed = to!string(decompressor.uncompress(buffer)); + decompressor.flush(); + buffer = null; + + return decompressed; + } +} diff --git a/source/droid/gateway/gateway.d b/source/droid/gateway/gateway.d index 2c3c8f6..4d4f08c 100644 --- a/source/droid/gateway/gateway.d +++ b/source/droid/gateway/gateway.d @@ -6,7 +6,9 @@ import core.time, std.stdio, std.typecons, std.random, - std.experimental.logger; + std.experimental.logger, + std.array, + std.zlib; import vibe.core.core, vibe.http.common, @@ -19,7 +21,8 @@ import droid.exception, droid.api, droid.gateway.opcode, droid.gateway.packet, - droid.data.event_type; + droid.data.event_type, + droid.gateway.compression; final class Gateway { @@ -32,7 +35,9 @@ final class Gateway private immutable OpcodeHandlerMap OPCODE_MAPPING; - private immutable string gatewayUrl_; + private string gatewayUrl_; + private immutable CompressionType compressionType = CompressionType.ZLIB_STREAM; + private Decompressor decompressor = null; private API api_; private WebSocket ws_; @@ -61,6 +66,18 @@ final class Gateway void connect(in bool blocking = true, in bool reconnecting = false) { + if (compressionType != CompressionType.NONE) { + gatewayUrl_ = gatewayUrl_ ~ "&compress=" ~ compressionType; + switch (compressionType) { + case CompressionType.ZLIB_STREAM: + decompressor = new ZLibStream(); + break; + default: + throw new DroidException("Compression type not supported!"); + + } + } + if (!tryConnect(gatewayUrl_)) { logger_.tracef("Could not connect to given gateway url %s, using API", gatewayUrl_); tryConnect(api_.getGatewayUrl(), true); @@ -136,7 +153,17 @@ final class Gateway assert(ws_ && ws_.connected); while (ws_.waitForData()) { - const packet = parseMessage(ws_.receiveText()); + auto data = ""; + + if (decompressor) + data = decompressor.read(ws_.receiveBinary()); + else + data = ws_.receiveText(); + + // The data isn't complete (not a full zlib message, or something borked) + if (data == "") return; + + const packet = parseMessage(data); auto opcodeHandler = packet.opcode in OPCODE_MAPPING; if (opcodeHandler) { diff --git a/source/droid/gateway/package.d b/source/droid/gateway/package.d index 42b2360..9769d38 100644 --- a/source/droid/gateway/package.d +++ b/source/droid/gateway/package.d @@ -4,4 +4,5 @@ public { import droid.gateway.opcode; import droid.gateway.packet; import droid.gateway.gateway; + import droid.gateway.compression; }