Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add zlib-stream #9

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions source/droid/gateway/compression.d
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
module droid.gateway.compression;

import droid.exception;
import std.zlib,
std.stdio,
std.conv : to;

enum CompressionType : string {
NONE = "",
ZLIB = "zlib",
ZLIB_STREAM = "zlib-stream"
}
class Decompressor {
string read(ubyte[] data) {
throw new DroidException("Compression type not supported!");
}
}

class ZLibStream : Decompressor {
const ubyte[] ZLIB_SUFFIX = [0x0, 0x0, 0xFF, 0xFF];
UnCompress decompressor;

ubyte[] buffer;

this() {
decompressor = new UnCompress(HeaderFormat.deflate);
}

/*
* Reads a zlib stream from the websocket
* This will append the data to a buffer,
* returning nothing if the data is not a full zlib frame
* otherwise, returning the decompressed string.
*/
override string read(ubyte[] data) {
buffer ~= data;

if (data[$-4..$] != ZLIB_SUFFIX) {
return "";
}

string decompressed = to!string(decompressor.uncompress(buffer));
decompressor.flush();
buffer = null;

return decompressed;
}
}
35 changes: 31 additions & 4 deletions source/droid/gateway/gateway.d
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ import core.time,
std.stdio,
std.typecons,
std.random,
std.experimental.logger;
std.experimental.logger,
std.array,
std.zlib;

import vibe.core.core,
vibe.http.common,
Expand All @@ -19,7 +21,8 @@ import droid.exception,
droid.api,
droid.gateway.opcode,
droid.gateway.packet,
droid.data.event_type;
droid.data.event_type,
droid.gateway.compression;

final class Gateway
{
Expand All @@ -32,7 +35,9 @@ final class Gateway

private immutable OpcodeHandlerMap OPCODE_MAPPING;

private immutable string gatewayUrl_;
private string gatewayUrl_;
private immutable CompressionType compressionType = CompressionType.ZLIB_STREAM;
private Decompressor decompressor = null;

private API api_;
private WebSocket ws_;
Expand Down Expand Up @@ -61,6 +66,18 @@ final class Gateway

void connect(in bool blocking = true, in bool reconnecting = false)
{
if (compressionType != CompressionType.NONE) {
gatewayUrl_ = gatewayUrl_ ~ "&compress=" ~ compressionType;
switch (compressionType) {
case CompressionType.ZLIB_STREAM:
decompressor = new ZLibStream();
break;
default:
throw new DroidException("Compression type not supported!");

}
}

if (!tryConnect(gatewayUrl_)) {
logger_.tracef("Could not connect to given gateway url %s, using API", gatewayUrl_);
tryConnect(api_.getGatewayUrl(), true);
Expand Down Expand Up @@ -136,7 +153,17 @@ final class Gateway
assert(ws_ && ws_.connected);

while (ws_.waitForData()) {
const packet = parseMessage(ws_.receiveText());
auto data = "";

if (decompressor)
data = decompressor.read(ws_.receiveBinary());
else
data = ws_.receiveText();

// The data isn't complete (not a full zlib message, or something borked)
if (data == "") return;

const packet = parseMessage(data);

auto opcodeHandler = packet.opcode in OPCODE_MAPPING;
if (opcodeHandler) {
Expand Down
1 change: 1 addition & 0 deletions source/droid/gateway/package.d
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ public {
import droid.gateway.opcode;
import droid.gateway.packet;
import droid.gateway.gateway;
import droid.gateway.compression;
}